ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

kafka学习笔记

2022-07-01 16:02:35  阅读:117  来源: 互联网

标签:bin topic -- IP zookeeper 笔记 kafka 学习


前言

kafka官网:https://kafka.apache.org/

Kafka是最初由Linkedin公司开发,是一个分布式的、支持分区的(Partition)、多副本的(Replica),基于zookeeper协调的分布式消息系统。它最大的特性就是可以实时处理大量数据以满足各种场景,比如:基于Hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎、web/niginx日志、访问日志、消息服务等。kafka由scala语言编写,Linkedin在2010年把kafka贡献给了Apache基金会并成为顶级开源项目。

kafka使用SSL认证会降低至少20%~30%的性能,生产环境一般都通过配置防火墙来保证安全性。

 

关键术语

Broker:消息中间件的处理节点,一个kafka节点就是一个broker,一个或者多个Broker可以组成一个kafka集群;

Topic:kafka根据topic对消息进行归类,发布到kafka集群的每条消息都需要指定一个topic;

Producer:消息生产者,向Broker发送消息的客户端;

Consumer:消息消费者,从Broker读取消息的客户端;

ConsumerGroup:每个Consumer都属于一个特定的Consumer Group,一条消息可以被多个不同的Consumer Group消费,但是一个Consumer Group中只能有一个Consumer可以消费该消息;

Partition:是物理上的概念,表示分区,一个topic可以分为多个partition,每个partition的内部消息都是有序的。

 

kafka结构示意图如下:

 

 

  

 kafka使用场景

①、日志收集:可以用kafka收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,比如:Hadoop、HBASE、Solr等;

②、消息系统:解耦和生产者、消费者、缓存消息等;

③、用户活动跟踪:kafka经常被用来记录web用户或者app用户的各种活动,比如浏览网页、搜索、点击等操作,这些操作被各服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到Hadoop、数据仓库中做离线分析和信息挖掘;

④、运营指标:kafka也经常用来记录运营监控数据,包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。

 

 

kafka基本使用

安装前的环境准备

由于Kafka是用Scala语言开发的,运行在JVM上,所以在安装Kafka之前需要先安装JDK,执行命令:yum install java-1.8.0-openjdk* -y

Kafka依赖zookeeper,所以需要先安装zookeeper,执行命令如下:

wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gz

tar -zxvf apache-zookeeper-3.5.8-bin.tar.gz

cd apache-zookeeper-3.5.8-bin

cp conf/zoo_sample.cfg conf/zoo.cfg

 

#启动zookeeper

bin/zkServer.sh start

bin/zkCli.sh

#查看zk的根目录相关节点

ls /

 

下载kafka安装包

下载2.4.1 release版本并解压:

#2.11是scala的版本,2.4.1是kafka的版本

wget https://mirror.bit.edu.cn/apache/kafka/2.4.1/kafka_2.11-2.4.1.tgz

tar -xzf kafka_2.11-2.4.1.tgz

cd kafka_2.11-2.4.1

 

修改配置

修改配置文件config/server.properties:

#broker.id属性在kafka集群中必须唯一,且必须是非负整数

broker.id=0

#kafka部署的机器ip和提供服务的端口号 也就是生产者和消费者访问的地址

listeners=PLAINTEXT://IP:PORT

#kafka存放数据的路径,这个路径可以是多个,用逗号分隔;每当创建新的partition时,都会选择在包含partition最少的路径下创建。

log.dirs=/usr/local/data/kafka.logs

#kafka连接zookeeper的地址 如果zookeeper是集群,用逗号隔开,如:IP:PORT,IP:PORT

zookeeper.connect=IP:PORT

#每个日志文件保存的时间,默认数据保存时间对所有topic都一样

log.retention.hours=168

#是否允许删除主题

delete.topic.enable=false

 

启动服务

bin/kafka-server-start.sh -daemon config/server.properties

-daemon表示在后台进程运行,否则ssh客户端退出后就会停止服务。

需要注意:在启动kafka时会使用Linux主机名关联的ip地址,所以需要把主机名和Linux的IP映射配置到本地hosts里,用vim /etc/hosts

或者使用启动命令bin/kafka-server-start.sh config/server.properties & 这个命令启动只要后台有日志变化就会打印出来

 

进入zookeeper目录通过zookeeper客户端查看zookeeper的目录树

bin/zkCli.sh

#查看zk的根目录kafka相关节点

ls /

#查看kafka节点

ls /brokers/ids

 

停止kafka服务

bin/kafka-server-stop.sh

 

创建主题

创建一个名字为test的topic,这个topic只有一个partition,并且备份银子也设置为1:bin/kafka-topics.sh --create --zookeeper IP:PORT --replication-factor 1 --partitions 1 --topic test

查看目前存在的主题

bin/kafka-topics.sh --list --zookeeper IP:PORT

除了手动创建topic,当producer发布一个消息到某个指定的topic时,如果这个topic不存在,就会自动创建。

 

删除主题

bin/kafka-topics.sh --delete --topic test --zookeeper IP:PORT

 

发送消息

kafka自带了一个producer命令客户端,可以从本地文件中读取内容,或者我们也可以在命令行中输入内容,并且把这些信息发送到kafka集群中。默认情况下,每一行会被当做一条独立的消息。

首先运行发布消息的脚本,然后再命令中输入要发送的消息内容:

bin/kafka-console-producer.sh --broker-list IP:PORT --topic test

 

消费消息

kafka的消息消费完不会删除,因为是存在文件中的(消息默认保存在磁盘一周,时间可配置);kafka的消息消费是通过偏移量消费的,每个消费者都会维护自己的消费偏移量。

对于consumer,kafka同样也自带一个命令行客户端,会把获取到的消息在命令中输出,默认是消费最新的消息:

bin/kafka-console-consumer.sh --bootstrap-server IP:PORT --topic test

如果想消费之前的消息,可以通过--from-beginning参数指定:

bin/kafka-console-consumer.sh --bootstrap-server IP:PORT --from-beginning --topic test

消费多主题

bin/kafka-console-consumer.sh --bootstrap-server IP:PORT --whitelist "test|test-2"

单播消费

一条消息只能被一个消费者消费的模式,类似queue模式,只需让所有消费者在同一个消费组里即可。分别在两个客户端执行下方消费命令,然后发送消息,结果只有一个客户端能收到消息:

bin/kafka-console-consumer.sh --bootstrap-server IP:PORT --consumer-property group.id=testGroup --topic test

多播消费

一条消息能被多个消费者消费的模式,类似publish-subscribe模式,针对kafka同一条消息只能被同一个消费组下的某一个消费者消费的特性,要实现多播消费只要保证这些消费者属于不同的消费组即可。我们再增加一个消费者,该消费者属于testGroup-2消费组,结果两个客户端都能收到消息:

 bin/kafka-console-consumer.sh  --bootstrap-server IP:PORT --consumer-property group.id=testGroup-2 --topic test

查看消费组名

bin/kafka-consumer-groups.sh --bootstrap-server IP:PORT --list

查看消费组的消费偏移量

bin/kafka-consumer-groups.sh --bootstrap-server IP:PORT --describe --group testGroup

回车后展示的信息中名词解释:

current-offset:当前消费组的已消费偏移量(不同的消费者只影响所属消费组的已消费偏移量)

log-end-offset:主题对应分区消息的结束偏移量(有新接收到的消息结束偏移量会增加)

lag:当前消费组未消费的消息数

 

主题Topic和消息日志Log

可以把Topic理解为一个类别的名称,同类消息发送到同一个Topic下;对于每一个Topic,可以有多个分区(Partition)日志文件:

Partition是一个有序的消息序列,这些消息按顺序添加到一个叫做commit log的文件中,每个partition中的消息都有一个唯一的编号,称为offset,用来唯一标识某个分区中的消息。

每个partition都对应一个commit log文件,一个partition中消息的offset都是唯一的,但是不同的partition中消息的offset可能是相同的。

kafka一般不会删除消息,不管这些消息是否被消费,kafka只会根据配置的日志保留时间(log.retention.hours)来确定消息多久被删除,默认是保留最近一周的日志消息。kafka的性能与保留的消息数据量大小没有关系,所以保存大量的数据消息日志信息也不会有什么影响,但是具体保存多久要针对具体的业务场景而定,不然数据太大占用磁盘空间也会变大。

每个consumer都是基于自己在commit log中的消费进度(offset)来工作的,在kafka中,消费offset由消费者自己来维护;一般会按照顺序逐条消费commit log中的消息,也可以人为指定offset来重复消费某些消息或者跳过某些消息。

这意味着kafka中consumer对集群的影响是非常小的,添加一个或者减少一个consumer,对集群或其他consumer都没有影响,因为每个consumer都维护各自的消费offset。

创建topic时默认只有一个partition,但是可以通过设置创建多个partition;一个partition相当于一个queue;设置partition的目的是为了分布式存储,减轻一个queue的压力,提高消息的读写速度;消息发到某个topic的分区,这个消息就会存到这个分区对应的commit log文件中,这也是kafka高性能的原因之一。

 创建多个分区的主题

bin/kafka-topics.sh --create --zookeeper IP:PORT --replication-factor 1 --partition 2 --topic test1

查看topic的情况

bin/kafka-topics.sh --describe --zookeeper IP:PORT --topic test1

回车后展示的内容解释:

第一行是所有分区的概要信息。之后的每一行是每一个partition的信息。

leader节点负责给定partition的所有读写请求;

replicas表示某个partition在哪几个broker上存在备份,不管这个节点是不是leader,甚至这个节点挂了也会列出;

isr是replicas的一个子集,它只列出当前还存活的并且已经同步备份了该partition的节点。

消息日志文件主要存放在分区文件夹里以log为扩展名的日志文件中

对已创建的topic增加分区数量(目前kafka不支持减少分区):

bin/kafka-topics.sh -alter --partitions 3 --zookeeper IP:PORT --topic test

 

Kafka集群

对于kafka来说,单台机器也叫集群,多台机器也叫集群,因为kafka的集群是针对分区而言的。这里再启动2个broker实例(由于前面已经启动了一个broker,这样集群中就有三个broker了)。

首先建立各broker的配置文件:

cp config/server.properties config/server-1.properties

cp config/server.properties config/server-2.properties

配置文件修改内容:

config/server-1.properties

#broker.id属性在kafka集群中必须唯一

broker.id=1

#kafka部署的机器IP和提供服务的端口号

listeners=PLAINTEXT://IP:PORT

#日志存储路径

log.dirs=/usr/local/data/kafka-logs-1

#kafka连接zookeeper的地址,要把多个kafka实例组成集群,对应连接的zookeeper必须相同

zookeeper.connect=IP:PORT

config/server-2.properties

#broker.id属性在kafka集群中必须唯一

broker.id=2

#kafka部署的机器IP和提供服务的端口号

listeners=PLAINTEXT://IP:PORT

#日志存储路径

log.dirs=/usr/local/data/kafka-logs-2

#kafka连接zookeeper的地址,要把多个kafka实例组成集群,对应连接的zookeeper必须相同

zookeeper.connect=IP:PORT

 

启动前面配置好的两个broker实例:

bin/kafka-server-start.sh -daemon config/server-1.properties

bin/kafka-server-start.sh -daemon config/server-2.properties

查看zookeeper确认集群节点是否都注册成功(kafka节点的元信息存储在zookeeper中):

ls /brokers/ids

 

新建一个topic,副本数设置为3,分区数设置为2(副本是针对分区的,这里3个副本2个分区就是说每个分区都有3个副本,放在不同的节点中,目的是为了容灾;副本也分leader副本和follower副本,消息都是写在leader副本中的,leader副本写完消息后会同步给follower副本;如果leader副本挂了,会从follower副本中选举出一个新的leader副本;leader和follower是针对分区的,broker没有leader和follower的概念):

bin/kafka-topics.sh --create --zookeeper IP:PORT --replication-factor 3 --partitions 2 --topic my-replicated-topic

查看topic的情况:

bin/kafka-topics.sh --describe --zookeeper IP:PORT --topic my-replicated-topic

 

 

补充中……

标签:bin,topic,--,IP,zookeeper,笔记,kafka,学习
来源: https://www.cnblogs.com/ywy8/p/16427863.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有