ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Kafka常见问题及解决方法

2020-11-29 16:34:49  阅读:810  来源: 互联网

标签:常见问题 err sarama broker kafka topic 解决 Kafka partitions


前言

Apache Kafka是一款优秀的开源消息中间件,主要应用于活动跟踪、消息穿透、日志、流处理等场景。我们使用该产品时,首先应当需要了解该产品的特性,以及产品的说明
但是由于官方文档较多,实际在使用的过程中,quick start往往是我们接触的第一步,但是quick start的配置实在是太过简陋,从而在实际的使用过程甚至在生产环境中发生一些严重的问题。话不多说,以下内容为实践中遇到的问题及解决办法详解。

正文

1、Kafka集群无法在跨网络的环境中正常工作

kafka在实际使用的过程中,同样的配置,在不同的网络环境下,可能会导致无法正常工作,我们应该分清内网和外网的区别。

在Kafka中涉及到网络的核心配置参数主要为两个

参数名 配置参考 说明 注意
listeners PLAINTEXT://hostname:9092 主要用来定义Kafka Broker的Listener的配置项。hostname如果设置为0.0.0.0则绑定所有的网卡地址;如果hostname为空则绑定默认的网卡。如果没有配置hostname则默认为java.net.InetAddress.getCanonicalHostName() 如果只设置该参数,那么就无法进行跨网络的访问,只有内网中的服务可以用
advertised.listeners PLAINTEXT://hostname:9092 作用主要是向生产者和消费者公布主机名和端口。如果不设置该值,那么将会默认使用listeners的值 在docker中或者在虚拟机上部署kafka集群,这种情况下是需要用到 advertised.listeners,这样可以确保外部的生产者和消费者能够正确的使用kafka服务。

核心配置示例

listeners: INSIDE://:9092,OUTSIDE://0.0.0.0:9094  
advertised.listeners: INSIDE://:9092,OUTSIDE://<网卡ip或主机名>:端口   
listener.security.protocol.map: "INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT"  
inter.broker.listener.name: "INSIDE"

说明:listener.security.protocol.mapinter.broker.listener.name分别表示的是协议的传输方式(以上为明文)和监听的borker。

2、Kafka集群数据节点不均衡

错误的创建topic,可能会导致生产者无法均衡或随机的往kafka集群中推送数据,消费者无法均衡的消费数据,kafka集群中部分节点数据不均衡,系统不稳定,无法有效的高可用。

创建topic主要有两种方式

  • 手动创建
    bin/kafka-topics.sh --create --zookeeper {hostname:port} --replication-factor 3 --partitions 6 --topic test
  1. replication-factor表示副本数量,建议至少设置为2,一般是3,最高设置为4。合理的设置该数量可以保证系统更稳定(允许N-1个broker宕机),但是更多的副本(如果acks=all,则会造成较高的延时),系统磁盘的使用率会更高(一般若是为3,则相对于为2时,会占据更多50% 的磁盘空间)
  2. partitions表示分区,kafka通过分区策略,将不同的分区分配在一个集群中的broker上,一般会分散在不同的broker上,当只有一个broker时,所有的分区就只分配到该Broker上。一般来说 Kafka 不能有太多的 Partition,一个broker不应该承载超过2000 到 4000 个partitions(考虑此broker上所有来自不同topics的partitions)。
    同时,一个Kafka集群上brokers中所有的partitions总数最多不应超过20,000个,集群节点数量低于6的时候,我们通常设置的值为2* broker数。此准则基于的原理是:在有broker宕机后,zookeeper需要重新做选举。若是partitions数目过多,则需要执行大量的选举策略。
  • 自动创建
    需要将auto.create.topics.enable设置为true,kafka发现该topic不存在的话,会按默认配置自动创建topic。
    配置示例:
num.partitions=6 #自动创建的partitions值为6
default.replication.factor=3 #自动创建的replication_factor值为3
auto.create.topics.enable=true # 开启自动创建

具体的数值设置,可以参考手动创建的中的详细说明,然后通过测试结果进行调整。

3、Kafka集群日志数据过大堆积磁盘

生产者在推送数据后,kafka通常会在data目录下对应的topic数据目录中生成日志信息,随着时间的增长,如果没有对日志作清理动作,那么必定导致磁盘的不可用。

清理策略

  • log.cleanup.policy=delete,kafka日志的清理策略,默认是delete,就是根据配置的时间空间来清理日志;还可以配置成compact,当旧数据的回收时间或者尺寸限制到达时,会进行日志压缩。
# 需要自己根据实际情况设置
log.retention.bytes=-1
# 默认的保留时间是7天
log.retention.hours=168

值得注意的是log.retention.bytes表示的是每个topic下每个partition保存数据的总量;注意,这是每个partitions的上限,因此这个数值乘以partitions的个数就是每个topic保存的数据总量。同时注意:如果log.retention.hours和log.retention.bytes都设置了,则超过了任何一个限制都会造成删除一个段文件。这项设置可以由每个topic设置时进行覆盖。

4、Kafka客户端的注意事项

以上是kafka常见的问题及解决办法,想要用好kafka,还必须要了解相对应的客户端提供的各种参数含义。
以下以go作为示例,采用的是sarama客户端中值得关注的如下:
生产者配置Producer.Partitioner分区器的选择策略,同步异步模块的实现方式差异。

生产者: 主要分为同步模式和异步模式

  1. 同步模式:需等待返回成功后,阻塞其他逻辑执行
  config := sarama.NewConfig()  //实例化sarama的Config
  config.Producer.Return.Successes = true  //开启消息发送成功后通知
  config.Producer.Partitioner = sarama.NewRandomPartitioner //随机分区器 可选择不同的策略,如轮询等
  client,err := sarama.NewClient([]string{"127.0.0.1:9092"}, config) //初始化
  defer client.Close()
  if err != nil {panic(err)}
  producer,err := sarama.NewSyncProducerFromClient(client) // 同步
  if err!=nil {panic(err)}
  partition, offset , err := producer.SendMessage(&sarama.ProducerMessage{Topic: "test_topic", Key: nil, Value: sarama.StringEncoder("this is go sync message")})
  if err != nil {
     panic(err)
  }
  1. 异步模式:生产者不等待成功直接返回,不阻塞其他逻辑执行
   config := sarama.NewConfig() //实例化sarama的Config
   config.Producer.Return.Successes = true //开启消息发送成功后通知
   config.Producer.Partitioner = sarama.NewRandomPartitioner //随机分区器 可选择不同的策略,如轮询等
   client, err := sarama.NewClient([]{"127.0.0.1:9092"}, config)
   if err != nil {
       panic(err)
   }
   producer, err := sarama.NewAsyncProducerFromClient //异步
   if err != nil {
       panic(err)
   }
   defer producer.Close()
   producer.Input() <- &sarama.ProducerMessage{Topic: "test_topic", Key: nil, Value: sarama.StringEncoder("this is go async message")}
   select {
           case msg := <-producer.Successes(): // 如果config.Producer.Return.Successes 设置为false 那么无需获取报告状态,否则必须获取该状态
               fmt.Printf("message successes: [%s]\n",msg.Value)
           case err := <-producer.Errors():
               fmt.Println("message failure: ", err)
           default:
               fmt.Println("message default",)
   }

消费者

消费者配置Offsets.Initial偏移量的选择,从最旧还是最新的消息开始消费,只能是sarama.OffsetOldestsarama.OffsetNewest
Offsets.ResetOffsets :如果程序重启,true表示不从上次中断的位置消费,false表示从上次中断的位置消费。

总结

本文主要是记录在kafka开发过程中常见的问题解决办法,通过不断总结积累问题解决办法,在遇到同类问题时能够快速的复盘,以最快的速度解决问题。

标签:常见问题,err,sarama,broker,kafka,topic,解决,Kafka,partitions
来源: https://www.cnblogs.com/denghailei/p/14056690.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有