标签:消费者 实现 props private kafka org import KAFKA consumer
1、POM文件导入
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>2.1.1</version> </dependency>
2、yml配置文件这样写
kafka: consumer: enable-auto-commit: true group-id: kafkaProducer auto-commit-interval: 1000 auto-offset-reset: latest bootstrap-servers: ip:prot key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer concurrency: 3
3、kafka配置类
package com.harzone.kafka; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; import java.util.Arrays; import java.util.Properties; @Component public class KafkaConsumerConfig implements ApplicationRunner { @Value("${kafka.consumer.bootstrap-servers}") private String servers; @Value("${kafka.consumer.enable-auto-commit}") private boolean enableAutoCommit; @Value("${kafka.consumer.auto-commit-interval}") private String autoCommitInterval; @Value("${kafka.consumer.group-id}") private String groupId; @Value("${kafka.consumer.auto-offset-reset}") private String autoOffsetReset; @Value("${kafka.consumer.concurrency}") private int concurrency; private final String topic = "test_producer"; Properties props; private KafkaConsumer<String, String> kafkaConsumer; private Properties initProperties() { // zookeeper 配置 props = new Properties(); props.put("bootstrap.servers", servers); // group 代表一个消费组 props.put("group.id", "kafkaProducer"); props.put("session.timeout.ms", "30000"); // 往zookeeper上写offset的频率 props.put("auto.commit.interval.ms", "1000"); // key的反序列化类型 props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); return props; } @Override public void run(ApplicationArguments args) throws Exception { //初始化kafka链接 initProperties(); //Runtime.getRuntime().availableProcessors()线程数量 KafkaConsumerThread consumerThread = new KafkaConsumerThread(props, topic, Runtime.getRuntime().availableProcessors() - 1); consumerThread.start(); } } 4、kafka多线程拉取实现
package com.harzone.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.*; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class KafkaConsumerThread extends Thread { private static final Logger log = LoggerFactory.getLogger(KafkaConsumerThread.class); private KafkaConsumer<String, String> kafkaConsumer; //线程池 private ExecutorService executorService; //线程数 private int threadNumber; static List<String> list = new ArrayList<>(); //有参构造函数,初始化数据配置 public KafkaConsumerThread(Properties properties, String topic, int availableProcessors) { kafkaConsumer = new KafkaConsumer<String, String>(properties); //subscribe从最新处消费(assign从最后处消费)----singletonList:返回一个不可变的列表 kafkaConsumer.subscribe(Collections.singletonList(topic)); this.threadNumber = availableProcessors; //ThreadPoolExecutor线程池(核心线程池大小,最大线程数,线程最大空闲时间,时间单位,线程等待队列,拒绝策略) executorService = new ThreadPoolExecutor(threadNumber, threadNumber, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy()); } @Override public void run() { try { while (true) { //poll(多长时间拉取一次ms) ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(10)); if (!records.isEmpty()) { executorService.submit(new RecordsHandler(records)); } } } catch (Exception e) { e.printStackTrace(); } finally { kafkaConsumer.close(); } } }
标签:消费者,实现,props,private,kafka,org,import,KAFKA,consumer 来源: https://blog.csdn.net/u013049353/article/details/121488388
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。