标签:producer spring 192.168 kafka SpringBoot2 9092 整合 Kafka consumer
环境准备
producer端maven依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
application.properties配置
## Spring整合kafka
spring.kafka.bootstrap-servers=192.168.21.107:9092,192.168.21.108:9092,192.168.21.109:9092
# kafka producer 发送消息失败时的重试次数
spring.kafka.producer.retries=3
# 批量发送数据的配置
spring.kafka.producer.batch-size=16384
# 设置kafka 生产者内存缓冲区的大小(32M)
spring.kafka.producer.buffer-memory=33554432
# kafka消息的序列化配置
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringDeserializer
# kafka 投递配置项
spring.kafka.producer.acks=1
生产端Service编写
kafkaProducerService.java
@Slf4j
@Component
public class KafkaProducerService {
@Autowired
private KafkaTemplate<String,Object> kafkaTemplate;
public void sendMessage(String topic,Object object){
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, object);
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable throwable) {
log.error("发送消息失败:"+throwable.getMessage());
}
@Override
public void onSuccess(SendResult<String, Object> result) {
log.info("发送消息成功:"+result.toString());
}
});
}
}
consumer端application.xml配置
# Spring整合kafka
spring.kafka.bootstrap-servers=192.168.21.107:9092,192.168.21.108:9092,192.168.21.109:9092
# kafka consumer 消息的签收机制:手工签收
spring.kafka.consumer.enable-auto-commit=false
# 手工签收
spring.kafka.listener.ack-mode=manual
# latest[默认]:在偏移量无效的情况下,消费者从最新的记录开始读取数据
# earliest: 在偏移量无效的情况下,消费者从起始位置读取分区的进度
spring.kafka.consumer.auto-offset-reset=earliest
# 序列化配置
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization
#并行度
spring.kafka.listener.concurrency=5
消费端Service
KafkaConsumerService.java
@KafkaListener(groupId = "group02",topics = "topic02")
public void onMessage(ConsumerRecord<String,Object> record, Acknowledgment acknowledgment, Consumer<?,?> consumer){
log.info("消费端接收消息:{}",record.value());
record.value();
//手工签收机制
acknowledgment.acknowledge();
}
标签:producer,spring,192.168,kafka,SpringBoot2,9092,整合,Kafka,consumer 来源: https://www.cnblogs.com/shine-rainbow/p/springboot2x-zheng-hekafka.html
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。