ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Kafka消费者

2020-03-18 16:04:22  阅读:186  来源: 互联网

标签:消费者 群组 value Kafka record props put


消费者和消费者群组

生产者往主题写入消息的速度超过了应用程序验证数据的速度。如果使用单个消费者处理消息,应用程序跟不上消息生成的速度。此时,有必要对消费者进行横向伸缩,我们可以使用多个消费者从同一个主题读取消息,对消息进行分流。

一个群组里的消费者订阅同一个主题,每个消费者接收主题的一部分分区的消息。

不同群组之间消费者互不影响

消费者编程

在读取消息之前,需要先创建一个KafkaConsumer对象。

Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
订阅

创建消费者后,下一步要订阅主题,subscribe()方法接收一个主题列表作为参数

consumer.subscribe(Collections.singletonList("customerCountries"));

分区分配策略

Kafka有两种分配策略,一是RoundRobin,一是Range

RoundRobin消息轮询(轮流询问)是消费者API的核心,通过一个简单的轮询向服务器请求数据。轮询会处理所有的细节,包括群组协调、分区再均衡、发送心跳和获取数据。

try {
    while (true) { 
        ConsumerRecords<String, String> records = consumer.poll(100); 
    for (ConsumerRecord<String, String> record : records) 
    {
    log.debug("topic = %s, partition = %s, offset = %d, customer = %s,
    country = %s\n",
    record.topic(), record.partition(), record.offset(),
    record.key(), record.value());
    int updatedCount = 1;
    if (custCountryMap.countainsValue(record.value())) {
        updatedCount = custCountryMap.get(record.value()) + 1;
    }
    custCountryMap.put(record.value(), updatedCount)
    JSONObject json = new JSONObject(custCountryMap);
    System.out.println(json.toString(4)) 
    }
    }
} finally {
    consumer.close(); 
}

Range根据当前的消费者组来划分,一个范围一个范围来分配。会产生消费者之间订阅数量差距过大的问题。

提交和偏移量

每次调用poll()方法,会返回生产者写入kafka但还没被读取的记录,因此我们可以追踪到哪些记录是被群组里的哪个消费者读取的。

消费者默认将offset保存在kafka内置的topic中,该topic为__consumer_offset

标签:消费者,群组,value,Kafka,record,props,put
来源: https://www.cnblogs.com/chenshaowei/p/12517906.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有