标签:Flink 192.168 kafka connector item SQLClinet user id
1.通过自建kafka的生产者来产生数据
/bin/kafka-console-producter.sh --broker-list 192.168.58.177:9092 --topic my_topic
数据
{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"} {"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"} {"user_id": "662868", "item_id":"1784", "category_id": "54123654", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"} {"user_id": "662854", "item_id":"1456", "category_id": "12345678", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"} {"user_id": "662858", "item_id":"1457", "category_id": "12345679", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
2.在kafka进行消费
/bin/kafka-console-consumer.sh --bootstrap-server 192.168.58.177:9092 --topic my_topic --partition 0 --offset 0
3.在Flink的sqlclient 创建表
CREATE TABLE user_log1 ( user_id VARCHAR, item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts VARCHAR ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'my-topic-one', 'connector.startup-mode' = 'earliest-offset', 'connector.properties.group.id' = 'testGroup', 'connector.properties.zookeeper.connect' = '192.168.58.171:2181,192.168.58.177:2181,192.168.58.178:2181', 'connector.properties.bootstrap.servers' = '192.168.58.177:9092', 'format.type' = 'json' );
实时计算
select item_id,count(*) from user_log1 group by item_id;
标签:Flink,192.168,kafka,connector,item,SQLClinet,user,id 来源: https://www.cnblogs.com/yaowentao/p/12668885.html
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。