ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

kafka+zookeeper集群部署

2022-04-26 10:03:08  阅读:193  来源: 互联网

标签:-- kafka01 zookeeper iflytek 集群 kafka root


Kafka相关概念

1)producer(生产者):
消息生产者,发布消息到 kafka 集群的终端或服务。producer是能够发布消息到话题的任何对象。
2)broker(服务代理):
broker是已发布的消息保存在一组服务器中,它们被称为代理(Broker)或Kafka集群。broker是kafka 集群中包含的服务器。
3)topic(话题):
topic是特定类型的消息流。消息是字节的有效负载(Payload),话题是消息的分类名或种子(Feed)名。每条发布到 kafka 集群的消息属于的类别,即 kafka 是面向 topic 的。
4)consumer(消费者):
consumer是从kafka集群中消费消息的终端或服务。可以订阅一个或多个话题,并从Broker拉数据,从而消费这些已发布的消息。
5)partition:
partition 是物理上的概念,每个 topic 包含一个或多个 partition。kafka 分配的单位是 partition。
6)Consumer group:
high-level consumer API 中,每个 consumer 都属于一个 consumer group,每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费。
7)replica:
partition 的副本,保障 partition 的高可用。
8)leader:
replica 中的一个角色, producer 和 consumer 只跟 leader 交互。
9)follower:
replica 中的一个角色,从 leader 中复制数据。
10)controller:
kafka 集群中的其中一个服务器,用来进行 leader election 以及 各种 failover。
11)zookeeper:
kafka 通过 zookeeper 来存储集群的 meta 信息。

1.集群服务器架构信息以及基础环境准备

1.服务器架构规划

ip地址            主机名          安装软件
10.0.0.2        kafka01         zookeeper、kafka
10.0.0.4        kafka02         zookeeper、kafka
10.0.0.12       kafka03         zookeeper、kafka

2.关闭防火墙和绑定hosts(集群所有节点都操作)

复制代码
#设置主机名,每个节点都做。
[root@k8s-master01 ~]# hostnamectl set-hostname kafka01
#4台机器关闭iptables和selinux
[root@kafka01 ~]# /etc/init.d/iptables stop
[root@kafka01 ~]# vim /etc/sysconfig/selinux
......
SELINUX=disabled
[root@kafka01 ~]# setenforce 0
[root@kafka01 ~]# getenforce
Permissive
#4台机器做hosts绑定,并检查,如果集群规模较大建议做个bind9
[root@kafka01 ~]# vim /etc/hosts
10.0.0.2  kafka01
10.0.0.4  kafka02
10.0.0.12  kafka03
[root@kafka01 ~]# ping kafka02
PING node1 (172.31.46.63) 56(84) bytes of data.
64 bytes from node1 (172.31.46.63): icmp_seq=1 ttl=64 time=0.302 ms
复制代码

 3.jdk安装(三台机器都要操作,安装1.7以上版本

复制代码
#将jdk-8u131-linux-x64.rpm下载到/opt目录下
下载地址:https://pan.baidu.com/s/1pLaAjPp
提取密码:x27s
  
[root@kafka01 ~]# cd /usr/local/src/
[root@kafka01 src]# ll jdk-8u131-linux-x64.rpm
-rw-r--r--. 1 root root 169983496 Sep 28  2017 jdk-8u131-linux-x64.rpm
[root@kafka01 src]# rpm -ivh jdk-8u131-linux-x64.rpm
  
[root@kafka01 src]# vim /etc/profile
......
JAVA_HOME=/usr/java/jdk1.8.0_131
JAVA_BIN=/usr/java/jdk1.8.0_131/bin
PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin:/bin:/sbin/
CLASSPATH=.:/lib/dt.jar:/lib/tools.jar
export JAVA_HOME JAVA_BIN PATH CLASSPATH
  
[root@kafka01 src]# source /etc/profile
[root@kafka01 src]# java -version
java version "1.8.0_131"
Java(TM) SE Runtime Environment (build 1.8.0_131-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.131-b11, mixed mode)
复制代码

 

4.安装zookeeper集群(以下操作需在所有zookeeper节点操作)

复制代码
#我们先cd到我们的安装目录下(这个根据每个公司的情况,都不一样)
[root@kafka01 ~]# cd /iflytek
[root@kafka01 iflytek]# wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.6.2/apache-zookeeper-3.6.2-bin.tar.gz
[root@kafka01 iflytek]# tar -zxvf apache-zookeeper-3.6.2-bin.tar.gz
#做软连接,方面后面升级
[root@kafka01 iflytek]# ln -sv apache-zookeeper-3.6.2-bin zookeeper [root@kafka01 iflytek]# mkdir -p zookeeper/data [root@kafka01 iflytek]# cp zookeeper/conf/zoo_sample.cfg zookeeper/conf/zoo_sample.cfg.bak [root@kafka01 iflytek]# cp zookeeper/conf/zoo_sample.cfg zookeeper/conf/zoo.cfg #修改zookeeper的配置文件,清楚之前的内容,加入新的内容如下所示
[root@kafka01 iflytek]# vim zookeeper/conf/zoo.cfg tickTime=2000 initLimit=10 syncLimit=5 dataDir=/iflytek/zookeeper/data/zookeeper dataLogDir=/iflytek/zookeeper/data/logs clientPort=2181 maxClientCnxns=60 autopurge.snapRetainCount=3 autopurge.purgeInterval=1 server.1=10.0.0.2:2888:3888 server.2=10.0.0.4:2888:3888 server.3=10.0.0.12:2888:3888 -------------------------------------
配置参数说明:
tickTime=2000 #服务器与服务器之间和客户端与服务器之间的单次心跳检测时间间隔, 单 位为毫秒, 类似于 haproxy 针对 real server 的 check inter 间隔时间 initLimit=10 #集群中 leader 服务器与 follower 服务器初始连接心跳次数, 即多少个 2000 毫 秒 syncLimit=5 # leader 与 follower 之间连接完成之后,后期检测发送和应答的心跳次数,如 果该 follower 在设置的时间内(5*2000)不能与 leader 进行通信,那么此 follower 将被视为 不可用。 clientPort=2181 #客户端连接 Zookeeper 服务器的端口, Zookeeper 会监听这个端口,接受 客户端的访问请求 autopurge.snapRetainCount=3 #设置 zookeeper 保存保留多少次客户端连接的数据 autopurge.purgeInterval=1 #设置 zookeeper 间隔多少小时清理一次保存的客户端数据 server.1=10.0.0.2:2888:3888#服务器编号=服务器 IP:LF 数据同步端口:LF 选举端口(表示了不同的zookeeper服务器的自身标识,
作为集群的一部分,每一台服务器应该知道其他服务器的信息。 用户可以从"server.id=host:port:port" 中读取到相关信息。 在服务器的data(dataDir参数所指定的目录)下创建一个文件名为myid的文件,这个文件的内容只有一行,指定的是自身的id值。 比如,服务器"1"应该在myid文件中写入"1"。这个id必须在集群环境中服务器标识中是唯一的,且大小在1~255之间。) ----------------------------------
#注意:如果想更换日志输出位置,除了在zoo.cfg加入"dataLogDir=/iflytek/zookeeper/data/logs"外,还需要修改zkServer.sh文件,
大概修改方式地方在141行左右,内容如下
[root@kafka01 iflytek]# cp zookeeper/bin/zkServer.sh zookeeper/bin/zkServer.sh.bak
[root@kafka01 iflytek]# vim zookeeper/bin/zkServer.sh
。。。。。。。。。。

 141 ZOO_LOG_DIR="$($GREP "^[[:space:]]*dataLogDir" "$ZOOCFG" | sed -e 's/.*=//')"   #添加这一行
 142 if [ ! -w "$ZOO_LOG_DIR" ] ; then
 143 mkdir -p "$ZOO_LOG_DIR"
 144 fi

#在启动zookeeper服务之前,还需要在zookeeper节点机器上创建myid,方式如下:
[root@kafka01 iflytek]# mkdir zookeeper/data/zookeeper/
#因为我是在服务器1上演示的所以这里把1写进myid这个文件。其他节点类似
[root@kafka01 iflytek]# echo 1 > zookeeper/data/zookeeper/myid
#启动zookeeper服务,并检查
[root@kafka01 iflytek]# zookeeper/bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /iflytek/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@kafka01 iflytek]# ps -ef|grep zookeeper
[root@kafka01 iflytek]# lsof -i:2181
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 735986 root 56u IPv4 3794176 0t0 TCP *:eforward (LISTEN)
#查看各个节点zookeeper的角色

[root@kafka01 iflytek]# zookeeper/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /iflytek/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower

[root@kafka02 iflytek]# zookeeper/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /iflytek/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower

[root@kafka03 iflytek]# zookeeper/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /iflytek/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: leader

 
 
复制代码

 

回到顶部

2.安装部署kafka和集群

1.安装kafaka(三个节点同样操作)

复制代码
[root@kafka01 ~]# cd /iflytek
[root@kafka01 iflytek]# wget https://mirror.bit.edu.cn/apache/kafka/2.7.0/kafka_2.13-2.7.0.tgz
[root@kafka01 iflytek]# tar -zvxf kafka_2.13-2.7.0.tgz
[root@kafka01 iflytek]# ln -sv kafka_2.13-2.7.0 kafka
#进入kafka下面的config目录,修改配置文件server.properties:
[root@kafka01 iflytek]# cp kafka/config/server.properties kafka/config/server.properties.bak
[root@kafka01 iflytek]# vim kafka/config/server.properties
broker.id=0
delete.topic.enable=true
#下面这行可以不写 #listeners=PLAINTEXT://172.31.46.28:9092
num.network.threads=3
num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/iflytek/kafka/data num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.flush.interval.messages=10000 log.flush.interval.ms=1000 log.retention.hours=168 log.retention.bytes=1073741824 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=10.0.0.2:2181,10.0.0.4:2181,10.0.0.12:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 #其他两个节点的server.properties只需要修改下面两行,其他配置都一样 [root@kafka02 iflytek]# vim kafka/config/server.properties broker.id=1 ......
#下面这行可以不写 #listeners=PLAINTEXT://10.0.0.2:9092 ....... [root@kafka03 iflytek]# vim kafka/config/server.properties broker.id=2 ......
#下面这行可以不写 listeners=PLAINTEXT://172.31.46.67:9092 ...... #启动kafka服务 [root@kafka01 iflytek]# nohup kafka/bin/kafka-server-start.sh kafka/config/server.properties >/dev/null 2>&1 & [root@kafka01 iflytek]# lsof -i:9092 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 876461 root 122u IPv4 4531968 0t0 TCP ceph-deploy:XmlIpcRegSvc (LISTEN) java 876461 root 141u IPv4 4531973 0t0 TCP ceph-deploy:59560->ceph-deploy:XmlIpcRegSvc (ESTABLISHED) java 876461 root 142u IPv4 4532804 0t0 TCP ceph-deploy:XmlIpcRegSvc->ceph-deploy:59560 (ESTABLISHED) #验证服务,随便在其中一台节点主机执行 [root@kafka01 iflytek]# kafka/bin/kafka-topics.sh --create --zookeeper 172.31.46.28:2181,172.31.46.63:2181,172.31.46.67:2181 --replication-factor 1 --partitions 1 --topic test 出现下面信息说明创建成功 Created topic test. #然后再在其他主机查看上面创建的topic [root@kafka02 iflytek]# kafka/bin/kafka-topics.sh --list --zookeeper 172.31.46.28:2181,172.31.46.63:2181,172.31.46.67:2181 test 到此,kafka集群环境已部署完成!
复制代码

 2.Kafka命令行操作

复制代码
#查看当前服务器中的所有topic
[root@kafka01 iflytek]# kafka/bin/kafka-topics.sh --zookeeper 172.31.46.28:2181,172.31.46.63:2181,172.31.46.67:2181 --list
__consumer_offsets
test
test1
#创建topic
[root@kafka01 iflytek]# kafka/bin/kafka-topics.sh --zookeeper 172.31.46.28:2181,172.31.46.63:2181,172.31.46.67:2181 --create --replication-factor 3 --partitions 1 --topic test2
选项说明:
--topic 定义 topic 名
--replication-factor 定义副本数
--partitions 定义分区数
#删除topic(需要 server.properties 中设置 delete.topic.enable=true 否则只是标记删除)
[root@kafka01 iflytek]# kafka/bin/kafka-topics.sh --zookeeper 172.31.46.28:2181,172.31.46.63:2181,172.31.46.67:2181 --delete --topic test
#查看某个Topic的详情
[root@kafka01 iflytek]# kafka/bin/kafka-topics.sh --zookeeper 172.31.46.28:2181,172.31.46.63:2181,172.31.46.67:2181 --describe --topic test1
Topic: test1    PartitionCount: 1    ReplicationFactor: 3    Configs: 
    Topic: test1    Partition: 0    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
#修改分区数
[root@kafka01 iflytek]# kafka/bin/kafka-topics.sh --zookeeper 172.31.46.28:2181,172.31.46.63:2181,172.31.46.67:2181 --alter --topic test1 --partitions 6
#测试生产消息,这个下面的地址加端口填kafka集群中的节点就行了
[root@kafka01 iflytek]# kafka/bin/kafka-console-producer.sh --broker-list 172.31.46.28:9092 --topic test1
>Hello
#测试消费信息
[root@kafka02 iflytek]# kafka/bin/kafka-console-consumer.sh  --bootstrap-server 172.31.46.63:9092 --topic test1
Hello
#查看指定topic中以往所有的数据都读出来(这个前提是消息数据还在默认的保留周期内)--from-beginning: 会把主题中以往所有的数据都读取出来。
[root@kafka03 iflytek]# kafka/bin/kafka-console-consumer.sh  --bootstrap-server 172.31.46.67:9092 --from-beginning --topic test1
复制代码

 

标签:--,kafka01,zookeeper,iflytek,集群,kafka,root
来源: https://www.cnblogs.com/zrxuexi/p/16193500.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有