ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

基于SpringBoot整合Kafaka

2022-04-10 15:31:06  阅读:217  来源: 互联网

标签:SpringBoot kafaka spring kafka Kafaka 整合 org message public


一、安装及配置kafaka (版本:2.11)

1、安装配置过程

//下载解压
wget http://mirror.bit.edu.cn/apache/kafka/1.1.0/kafka_2.11-1.1.0.tgz
或下载地址:https://kafka.apache.org/downloads.html
解压

//配置
vi server.properties
添加端口port=9092
添加host.name=实际Ip
修改配置项listeners=PLAINTEXT://实际ip:9092,取消原注释

//启动运行
./zookeeper-server-start.sh /home/likangwen/kafaka/kafka_2.11-1.1.0/config/zookeeper.properties &
./kafka-server-start.sh /home/likangwen/kafaka/kafka_2.11-1.1.0/config/server.properties &

 

2、成果

(1)启动zookeeper

 

 

(2)启动kafaka

 

二、springboot整合kafaka实现一个简单的发布/订阅消息系统

1、整合过程

//添加kafaka依赖
<!--kafka支持-->
    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
      <version>2.2.9.RELEASE</version><!--$NO-MVN-MAN-VER$-->
    </dependency>
    
//在application.properties添加kafka配置
#kafka相关配置
spring.kafka.bootstrap-servers=192.168.245.100:9092
#设置一个默认组
spring.kafka.consumer.group-id=0
#key-value序列化反序列化
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#每次批量发送消息的数量
spring.kafka.producer.batch-size=65536
spring.kafka.producer.buffer-memory=524288

//创建kafaka生产者
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

/**
 * producer:kafaka生产者(发布消息)
 */
@Component
public class KafkaSender {

    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    /**
     * 发送消息到kafka
     */
    public void sendChannelMess(String channel, String message){
        kafkaTemplate.send(channel,message);
    }

}

//创建Kafaka消费者
/**
 * consumer:kafaka消费者(订阅消息)
 */
@Component
public class KafkaConsumer {

    @KafkaListener(topics = {"seckill"})
    public void receiveMessage(String message){
        //收到通道的消息之后执行秒杀操作
        System.out.println("监听topic:"+"seckill"+",接收到该主题消息message:"+message);
    }

}

//创建测试控制器
@RestController
public class MessageController {

    @Autowired
    private KafkaSender kafkaSender;

    //@Autowired
    //private KafkaTemplate<String,Object> kafkaTemplate;


//    @RequestMapping("/message/send")
//    public boolean send(@RequestParam String message){
//
//        System.out.println("receive a request: /message/send");
//        kafkaTemplate.send("seckill",message);
//        return true;
//    }

    @RequestMapping("/sendMessageByKafaka")
    public boolean sendMessageByKafaka(){
        String message="这是一条关于kafaka消息订阅发布的测试消息!";
        String channel="seckill";

        System.out.println("receive a request: /sendMessageByKafaka");
        kafkaSender.sendChannelMess(channel,message);

        return true;
    }

}

//启动springboot项目即可

//在浏览器访问指定url,观察日志输出

 

2、成果

(1)发送消息:"This is kafaka"

 

 

(2)接收消息

 

 

(3)其它

 

三、FAQ问题集锦

1、启动kafaka项目出现“Error creating bean with name 'org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration': Unexpected exception during bean creation; ”?

原因:

java.lang.ClassNotFoundException: org.springframework.kafka.transaction.KafkaAwareTransactionManager

解决方法:

更换Kafaka依赖的版本2.x.x

 

2、启动kafaka项目出现“Consumer clientId=consumer-1, groupId=0] Connection to node -1 co”?

原因:服务器的kafaka配置有误,无法建立连接

解决方法:

修改配置项host.name=实际Ip,例如192.168.245.100(不能为127.0.0.1)

修改配置项listeners=PLAINTEXT://实际ip:9092,取消原注释

建立起连接后,kafaka项目打印日志及服务器打印日志,正确如下所示:

 

 

 

 

 

 

 

标签:SpringBoot,kafaka,spring,kafka,Kafaka,整合,org,message,public
来源: https://www.cnblogs.com/lkw-cnblogs/p/16126053.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有