ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

玩转Kafka—Spring&Go整合Kafka

2021-11-20 21:32:34  阅读:157  来源: 互联网

标签:err sarama Spring kafka go spring Go Kafka consumer


玩转Kafka—Spring整合Kafka

1 新建Spring Boot项目,增加依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.20</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.76</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

2 项目结构

在这里插入图片描述

3 代码

3.1 配置文件和Kafka服务器所需配置

application.properties

server.port=8080
#制定kafka代理地址
spring.kafka.bootstrap-servers=8.131.57.161:9092
#消息发送失败重试次数
spring.kafka.producer.retries=0
#每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
#每次批量发送消息的缓冲区大小
spring.kafka.producer.buffer-memory=335554432
# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

# 指定默认消费者group id
spring.kafka.consumer.group-id=user-log-group
spring.kafka.consumer.bootstrap-servers=8.131.57.161:9092
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100

# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

Kafka服务器所需配置,server.properties文件

# 33行左右 0.0.0.0代表允许外部端口连接
listeners=PLAINTEXT://0.0.0.0:9092  
# 36行左右 ip代表外部代理地址
advertised.listeners=PLAINTEXT://8.131.57.161:9092   

3.2 生产者和实体类代码

Student.java

/**
 * @desc: 实体类
 * @author: YanMingXin
 * @create: 2021/11/20-12:43
 **/
@Data
@Accessors(chain = true)
@NoArgsConstructor
@AllArgsConstructor
public class Student implements Serializable {

    private String id;

    private String name;

    private String context;

}

StudentService.java

/**
 * @desc: 接口
 * @author: YanMingXin
 * @create: 2021/11/20-12:43
 **/
public interface StudentService {
    void stuSayHello(Student student);
}

StudentServiceImpl.java

/**
 * @desc: 接口实现类
 * @author: YanMingXin
 * @create: 2021/11/20-12:43
 **/
@Service
public class StudentServiceImpl implements StudentService {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
     * topic
     */
    private static final String STU_TOPIC = "stu.sayHello";

    @Override
    public void stuSayHello(Student student) {
        Student stu = new Student("1", "zs", "Hello Ls.");
        kafkaTemplate.send(STU_TOPIC, JSON.toJSONString(stu));
    }
}

3.3 消费者代码

MyKafkaListener.java

/**
 * @desc: 消费者监听
 * @author: YanMingXin
 * @create: 2021/11/20-12:44
 **/
@Component
public class MyKafkaListener {

    /**
     * topic
     */
    private static final String STU_TOPIC = "stu.sayHello";

    @KafkaListener(topics = {STU_TOPIC})
    public void stuTopicConsumer(ConsumerRecord consumerRecord) {
        Optional kafkaMsg = Optional.ofNullable(consumerRecord.value());
        if (kafkaMsg.isPresent()) {
            Object msg = kafkaMsg.get();
            System.err.println(msg);
        }
    }
}

3.4 测试

@SpringBootTest
class SpKafkaApplicationTests {

    @Autowired
    private StudentService studentService;

    @Test
    void contextLoads() throws Exception{
        for (int i = 0; i < 900000; i++) {
            studentService.stuSayHello(new Student());
        }
    }
}

玩转Kafka—Golang整合Kafka

几个常见的Go整合Kafka客户端工具:我们本次使用的是Shopify

  • Shopify:https://github.com/Shopify/sarama

  • Big Data Open Source Security:https://github.com/stealthly/go_kafka_client

  • OptioPay:https://github.com/optiopay/kafka

    https://github.com/nuance/kafka

    https://github.com/jdamick/kafka.go

  • Confluent:https://github.com/confluentinc/confluent-kafka-go

    Docs: http://docs.confluent.io/current/clients/index.html

  • Travis Bischel: https://pkg.go.dev/github.com/twmb/kafka-go/pkg/kgo

ps:配置go get代理(类似于Maven配置阿里云镜像)教程:

https://goproxy.io/zh/docs/getting-started.html

1 新建go modules

在这里插入图片描述

2 项目结构

在这里插入图片描述

3 生产者代码

KakaProducer.go

package main

import (
   "fmt"
   "github.com/Shopify/sarama"
   "time"
)

//消息生产者
func main() {
   //获取配置类
   config := sarama.NewConfig() //配置类实例(指针类型)
   config.Producer.RequiredAcks = sarama.WaitForAll //代理需要的确认可靠性级别(默认为WaitForLocal)
   config.Producer.Partitioner = sarama.NewRandomPartitioner  //生成用于选择要发送消息的分区的分区(默认为散列消息键)。
   config.Producer.Return.Successes = true //如果启用,成功传递的消息将在成功通道(默认禁用)。
   //获取客户端对象
   client, err := sarama.NewSyncProducer([]string{"8.131.57.161:9092"}, config)
   if err != nil {
      //获取客户端失败
      fmt.Println("producer close, err:", err)
      return
   }
   //延迟执行,类似于栈,等到其他代码都执行完毕后再执行
   defer client.Close()
   //一直循环
   for {
      //获取Message对象
      msg := &sarama.ProducerMessage{}
      //设置topic
      msg.Topic = "go_kafka"
      //设置Message值
      msg.Value = sarama.StringEncoder("this is a good test, my message is good")
      //发送消息,返回pid、片偏移
      pid, offset, err := client.SendMessage(msg)
      //发送失败
      if err != nil {
         fmt.Println("send message failed,", err)
         return
      }
      //打印返回结果
      fmt.Printf("pid:%v offset:%v\n", pid, offset)
      //线程休眠下
      time.Sleep(10 * time.Second)
   }
}

4 消费者代码

KafkaConsumer.go

package main

import (
	"fmt"
	"github.com/Shopify/sarama"
	"strings"
	"sync"
	"time"
)

var (
	wg sync.WaitGroup //同步等待组
	//在类型上,它是一个结构体。一个WaitGroup的用途是等待一个goroutine的集合执行完成。
	//主goroutine调用了Add()方法来设置要等待的goroutine的数量。
	//然后,每个goroutine都会执行并且执行完成后调用Done()这个方法。
	//与此同时,可以使用Wait()方法来阻塞,直到所有的goroutine都执行完成。
)

func main() {
	//获取消费者对象 可以设置多个IP地址和端口号,使用逗号进行分割
	consumer, err := sarama.NewConsumer(strings.Split("8.131.57.161:9092", ","), nil)
	//获取失败
	if err != nil {
		fmt.Println("Failed to start consumer: %s", err)
		return
	}
	//对该topic进行监听
	partitionList, err := consumer.Partitions("go_kafka")
	if err != nil {
		fmt.Println("Failed to get the list of partitions: ", err)
		return
	}
	//打印分区
	fmt.Println(partitionList)
	//获取分区和片偏移
	for partition := range partitionList {
		pc, err := consumer.ConsumePartition("go_kafka", int32(partition), sarama.OffsetNewest)
		if err != nil {
			fmt.Printf("Failed to start consumer for partition %d: %s\n", partition, err)
			return
		}
		//延迟执行
		defer pc.AsyncClose()
		//启动多线程
		go func(pc sarama.PartitionConsumer) {
			wg.Add(1)
			//获得message的信息
			for msg := range pc.Messages() {
				fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
				fmt.Println()
			}
			wg.Done()
		}(pc)
	}
	//线程休眠
	time.Sleep(10 * time.Second)
	wg.Wait()
	consumer.Close()
}

5 测试

在这里插入图片描述
在这里插入图片描述
参考文章:https://www.cnblogs.com/angelyan/p/10800739.html

标签:err,sarama,Spring,kafka,go,spring,Go,Kafka,consumer
来源: https://blog.csdn.net/Mr_YanMingXin/article/details/121445891

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有