标签:Map 生产者 分区 Kafka 发送 消息 key 服务端
1配置生产者参数 必须的3个参数 brokerlist:服务端地址,集群地址,至少配置2个,会自动获取其他的服务端地址; key序列化类: value序列化类: 非必须: clientid:不填 会给producer-数字 2创建生产者实例 KafkaProducer p = new KafkaProducer(prop);//prop 就是上面几个参数; 3创建消息实体 ProducerRecord r = new ();好多种构造 需要的 参数:含义 | 是否必填 | |
主题 | 是 | |
分区号 | 不是,不填则有分区器生成分区号; | |
key | 不是 | |
value | 不是,空即为墓碑消息 | |
header | 不是 | 可以添一些其他消息,来实现其他功能,如TTL |
p.send(r) | 发后即忘 | 最高效,可靠性最低 | |
p.send(r).get(); | 同步发送 | 低,可靠性高 | |
p.send(r,new CalllBack(){}); | 异步发送 | 高效, 可靠性高 |
消息先经过生产者拦截器,onsend( )方法,可以对 消息内容改变; -——》 | 序列化器,对key和value,将对象转化成字节数组; -——》 | 分区器,查看参数是否带有分区号,有,直接跳过;没有,根据进入设置的分区器方法,得到分区号a -——》 | sender线程会将 Map<分区号,Dueue<ProducerBath>>转化成 Map<Node,List<Request>>,node代表需要发送的各个节点,List<Request>代表这个节点需要发送的消息结合;当消息大小达到最小限制后,批量发送 -——》 | inFlightRequests这里还有一个Map<Node,List<Request>>,来判断每个连接,有几个未返回的响应,如果list的size超过了系统设置的inflights.size的个数,则直接返回失败; 没有超过,往当前node对应的list中add一个Request,继续 -——》 | 服务端写入消息,返回成功或者失败 kafka服务端,判断是否发送成功? 系统参数ack -1,代表需要所有ISR副本都追加了这个消息,才返回成功 1,只要leader副本追加了这个消息,就返回成功 n, 代表需要n个ISR副本都追加了这个消息,才返回成功 | | | ↓ |
拦截器的确认方法执行; 然后才是异步发送的回调函数执行; | 这里还有一个Map<Node,List<Request>>,来判断每个连接,有几个未返回的响应,这里remove一个Request 《—————— |
- 如果在发消息的时候指定了分区,则消息投递到指定的分区
- 如果没有指定分区,但是消息的key不为空,则基于key的哈希值来选择一个分区
- 如果既没有指定分区,且消息的key也是空,则用轮询的方式选择一个分区
标签:Map,生产者,分区,Kafka,发送,消息,key,服务端 来源: https://www.cnblogs.com/xlblog/p/15473188.html
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。