标签:spring boot kafka props put new CONFIG public
一、引入kafka依赖:
<!-- kafka 依赖 开始 --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.8.0</version> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency> <!-- kafka 依赖 结束 -->
二、设置kafka配置参数
kafka: # 指定kafka server的地址,集群配多个,中间,逗号隔开 bootstrap-servers: 127.0.0.1:9092 # 生产者 producer: # 写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败, # 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。 retries: 0 # 每次批量发送消息的数量,produce积累到一定数据,一次发送 batch-size: 16384 # produce积累数据一次发送,缓存大小达到buffer.memory就发送数据 buffer-memory: 33554432 # 指定消息key和消息体的编解码方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer properties: linger.ms: 1 acks: 1 # 消费者 consumer: enable-auto-commit: false auto-commit-interval: 100ms key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonSerializer properties: session.timeout.ms: 15000 group-id: myGroup max-poll-records: 2 template: default-topic: testTopic streams: replication-factor: 3
三、编写KafkaProducerConfig配置类:
@Configuration public class KafkaProducerConfig { @Value(value = "localhost:9092") private String bootstrapAddress; //1. Send string to Kafka @Bean public ProducerFactory<String, String> producerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(props); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } //2. Send User objects to Kafka @Bean public ProducerFactory<String, User> userProducerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate<String, User> userKafkaTemplate() { return new KafkaTemplate<>(userProducerFactory()); } }
四、编写KafkaConsumerConfig配置类:
@Configuration public class KafkaConsumerConfig { @Value(value = "localhost:9092") private String bootstrapAddress; @Value(value = "group_id") private String groupId; @Value(value = "group_id") private String userGroupId; // 1. Consume string data from Kafka @Bean public ConsumerFactory<String, String> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); return new DefaultKafkaConsumerFactory<>(props); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } // 2. Consume user objects from Kafka public ConsumerFactory<String, User> userConsumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); props.put(ConsumerConfig.GROUP_ID_CONFIG, userGroupId); props.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(User.class)); } @Bean public ConcurrentKafkaListenerContainerFactory<String, User> userKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(userConsumerFactory()); return factory; } }
标签:spring,boot,kafka,props,put,new,CONFIG,public 来源: https://www.cnblogs.com/jamstack/p/16220426.html
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。