ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

kafka 实战 (ELK+kafka)

2022-04-18 22:34:49  阅读:238  来源: 互联网

标签:实战 ELK elk zookeeper 192.168 kafka 9092 root


准备

系统IP应用myiddrocker.id
CentOS 7 192.168.100.20 Elasticserach+kibana+kafka+zookeeper 1 1
CentOS 7 192.168.100.30 Elasticserach+kafka+zookeeper+filebeat+logstash 2 2
CentOS 7 192.168.100.40 Elasticserach+kafka+zookeeper+filebeat 3 3

先配置好 Elasticserach 和 Kibana 教程:ELK 日志分析系统 - 花花de代码生活 - 博客园 (cnblogs.com) 做完 kibana就可以了。

一、zookeeper

1、下载 

wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.5.9/apache-zookeeper-3.5.9-bin.tar.gz --no-check-certificate

2、安装及配置 zookeeper

[root@elk-1 ~]# tar -zxf apache-zookeeper-3.6.1-bin.tar.gz -C /usr/local/
[root@elk-1 ~]# mv /usr/local/apache-zookeeper-3.6.1  /usr/local/zookeeper
[root@elk-1 ~]# cd /usr/local/zookeeper
[root@elk-1 zookeeper]# mkdir  {data,logs}
[root@elk-1 zookeeper]# echo 1 > data/myid
[root@elk-1 zookeeper]# cp conf/zoo_sample.cfg conf/zoo.cfg

# 配置文件
[root@elk-1 zookeeper]# vim conf/zoo.cfg
# The number of milliseconds of each tick
tickTime=2000  #zk之间⼼跳间隔2秒
# The number of ticks that the initial
# synchronization phase can take
initLimit=10   #LF初始通信时限
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5   #LF同步通信时限
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/tmp/zookeeper  #Zookeeper保存数据的⽬录
dataLogDir=/usr/local/zookeeper/logs   #Zookeeper保存⽇志⽂件的⽬录
# the port at which the clients will connect
clientPort=2181  #客户端连接 Zookeeper 服务器的端⼝
admin.serverPort=8888   #默认占⽤8080端⼝
# the maximum number of client connections.
# increase this if you need to handle more clients
maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
##http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
autopurge.purgeInterval=1
server.1=elk-1:2888:3888
server.2=elk-2:2888:3888
server.3=elk-3:2888:3888

# 修改配置文件
echo export ZOOKEEPER_HOME=/usr/local/zookeeper >> /etc/profile
echo export PATH=$ZOOKEEPER_HOME/bin:$PATH  >> /etc/profile
# 刷新变量
source /etc/profile
# 测试变量是否成功
echo $ZOOKEEPER_HOME

# 启动 zk
[root@elk-1 zookeeper]# bin/zkServer.sh start
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED    //启动成功

[root@elk-1 ~]# scp /usr/local/zookeeper/conf/zoo.cfg elk-2:/usr/local/zookeeper/conf/zoo.cfg

如果开启不了,查看配置文件没有问题你就查看 data/myid 是否有问题。elk-2 与 elk-3 的这个配置文件不需要更改。

[root@elk-1 ~]# scp /usr/local/zookeeper/conf/zoo.cfg elk-2:/usr/local/zookeeper/conf/zoo.cfg

elk-2:
[root@elk-2 zookeeper]# mkdir {data,logs}
[root@elk-2 zookeeper]# echo 2 > data/myid

elk-3:
[root@elk-3 zookeeper]# mkdir {data,logs}
[root@elk-3 zookeeper]# echo 3 > data /myid

3、测试

 1 # 查看节点状态
 2 [root@elk-1 zookeeper]# /usr/local/zookeeper/bin/zkServer.sh status
 3 /usr/bin/java
 4 ZooKeeper JMX enabled by default
 5 Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
 6 Client port found: 2181. Client address: localhost.
 7 Mode: follower
 8 
 9 [root@elk-1 zookeeper]# bin/zkCli.sh -server elk-2:2181
10 [zk: elk-1:2181(CONNECTED) 0]

二、kafka

1、下载 kafka

wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.8.1/kafka_2.12-2.8.1.tgz --no-check-certificate

2、安装及配置

[root@elk-1 ~]# tar -zxf kafka_2.12-2.8.1.tgz -C /usr/local/
[root@elk-1 ~]# cp /usr/local/kafka_2.12-2.8.1 /usr/local/kafka
[root@elk-1 ~]# cd /usr/local/kafka/
[root@elk-1 kafka]# mkdir logs
[root@elk-1 kafka]# cp config/server.properties{,.bak}

#配置文件
[root@elk-1 kafka]# vim config/server.properties
# broker的id 需要于本机 zk 的 myid 里的数一样
broker.id=1
# 监听端口
listeners=PLAINTEXT://192.168.100.20:9092
# 处理消息的最大线程数,一般情况下不要去修改
num.network.threads=3
# socket的发送缓冲区
num.io.threads=8
# socket的发送缓冲区
socket.send.buffer.bytes=102400    
# socket接收缓冲区
socket.receive.buffer.bytes=102400
# 请求最大值
socket.request.max.bytes=104857600
# log日志文件
log.dirs=/opt/data/kafka/logs
# 分区
num.partitions=3
# kafka会使用可配置的线程池来处理日志片段
num.recovery.threads.per.data.dir=1    
#断分偏移量
offsets.topic.replication.factor=1
# 传输日志的复制状态
transaction.state.log.replication.factor=1
# 传输日志最小的信息
transaction.state.log.min.isr=1
# 数据存储的最大时间
log.retention.hours=168                
# topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖
log.segment.bytes=1073741824
# 检查文件大小检查的周期时间
log.retention.check.interval.ms=300000
# zookeeper连接,维护集群状态
zookeeper.connect=192.168.100.20:2181,192.168.100.30:2181,192.168.100.40:2181
# 连接超时
zookeeper.connection.timeout.ms=18000
# 指定时间内没有消息到达就抛出异常,一般不需要改
group.initial.rebalance.delay.ms=0

# 修改环境变量⽂件
echo export KAFKA_HOME=/usr/local/kafka >> /etc/profile
echo export PATH=$KAFKA_HOME/bin:$PATH >> /etc/profile
source /etc/profile
echo $KAFKA_HOME

# 启动 kafka 两种开启方式
[root@elk-1 kafka]# bin/kafka-server-start.sh -daemon config/server.properties
[root@elk-1 kafka]# nohup bin/kafka-server-start.sh config/server.properties > logs/kafka.log 2>1 &
[1] 14653
# 查看kafka是否开启成功
[root@elk-1 kafka]# jps
14481 QuorumPeerMain
13538 Elasticsearch
15061 Jps
13239 -- process information unavailable
14653 Kafka

主要修改 elk-2 与 elk-3 修改 kafka 配置文件

[root@elk-1 ~]# scp /usr/local/kafka/config/server.properties elk-2:/usr/local/kafka/config/server.properties
#第二个节点的
broker.id=2
listeners=PLAINTEXT://192.168.100.30:9092
#第三个节点的
broker.id=3
listeners=PLAINTEXT://192.168.100.40:909

3、测试 kafka

[root@elk-1 zookeeper]# yum install -y lsof
[root@elk-1 kafka]# lsof -i:2181
COMMAND   PID USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
java    13527 root   48u  IPv6  50879      0t0  TCP *:eforward (LISTEN)
java    13527 root   64u  IPv6  53903      0t0  TCP elk-1:eforward->elk-1:36448 (ESTABLISHED)
java    13527 root   66u  IPv6  54636      0t0  TCP elk-1:eforward->elk-3:43152 (ESTABLISHED)
java    13601 root  122u  IPv6  54546      0t0  TCP elk-1:36448->elk-1:eforward (ESTABLISHED)

# 创建一个 test-ken 文件
[root@elk-1 kafka]# bin/kafka-topics.sh --create --bootstrap-server elk-1:9092 --replication-factor 3 --partitions 2 --topic test-ken
# 在其他节点查看是否升成
[root@elk-3 kafka]# bin/kafka-topics.sh --list --bootstrap-server 192.168.100.30:9092
test-ken

# 查看Topic详情
[root@elk-1 kafka]# bin/kafka-topics.sh --describe --bootstrap-server elk-1:9092 --topic test-ken
Topic: test-ken TopicId: n_OMjCKNQ7SsabGsfen_cA PartitionCount: 2       ReplicationFactor: 3    Configs: segment.bytes=1073741824
        Topic: test-ken Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
        Topic: test-ken Partition: 1    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1

# 从消费者 elk-1 输出消息 elk-2 即可就可以收到消息
[root@elk-1 kafka]# bin/kafka-console-producer.sh --broker-list elk-1:9092 --topic test-ken
>>test    
>ceshicgo
>
# elk-2 这里就可以直接可以收到 [root@elk-2 kafka]# bin/kafka-console-consumer.sh --bootstrap-server elk-2:9092 --topic test-ken test ceshicgo
-------- 扩列知识 -------- # 删除Topic: [root@elk-1 kafka]# bin/kafka-topics.sh --delete --bootstrap-server elk-1:9092 --topic test-ken # 查看删除信息: [root@elk-1 kafka]# bin/kafka-topics.sh --list --bootstrap-server elk-2:9092

三、filebeat

 1、下载

[root@elk-2 ~]# wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.0.0-x86_64.rpm

2、安装及配置、

(1.)elk-1

[root@elk1 ~]# rpm -ivh filebeat-6.0.0-x86_64.rpm

[root@elk-1 ~]# vim /etc/filebeat/filebeat.yml
filebeat.prospectors:
- type: log
 enabled: true
 paths:
      - /var/log/es_access.log   //此处可⾃⾏改为想要监听的⽇志⽂件
output.kafka:
   enabled: true
   hosts: ["elk-1:9092","elk-2:9092","elk-3:9092"]
   topic: es_access           //对应zookeeper⽣成的topic
   keep_alive: 10s
必须注释

 

写到最下即可

 

 

 [root@elk-1 ~]# systemctl start filebeat

(2.)elk-2

跟上一步一样就是修改了一下要看到日志文件

[root@elk-2 ~]# rpm -ivh filebeat-6.0.0-x86_64.rpm

编辑配置⽂件:
[root@elk2 ~]# vim /etc/filebeat/filebeat.yml
filebeat.prospectors:
- type: log
  enabled: true
  paths:
       - /var/log/vmware-network.log //此处可⾃⾏改为想要监听的⽇志⽂件
output.kafka:
    enabled: true
    hosts: ["elk-1:9092","elk-2:9092","elk-3:9092"]
    topic: vmware-network //对应zookeeper⽣成的topic
    keep_alive: 10s
[root@elk2 ~]# systemctl start filebeat

(3.)elk-3

[root@elk-3 ~]# rpm -ivh filebeat-6.0.0-x86_64.rpm

编辑配置⽂件:
[root@elk-3 ~]# vim /etc/filebeat/filebeat.yml
filebeat.prospectors:
- type: log
   enabled: true
   paths:
        - /var/log/access.log //此处可⾃⾏改为想要监听的⽇志⽂件
output.kafka:
   enabled: true
   hosts: ["elk-1:9092","elk-2:9092","elk-3:9092"]
   topic: access //对应zookeeper⽣成的topic
   keep_alive: 10s
[root@elk-3 ~]# systemctl start filebeat

四、Logstash

1、下载 

[root@elk_2 ~]# wget https://artifacts.elastic.co/downloads/logstash/logstash-6.0.0.rpm

2、安装及部署

[root@elk-2 ~]# rpm -ivh logstash-6.0.0.0.rpm

[root@elk1 ~]# vi /etc/logstash/logstash.yml
http.host: "192.168.100.30"

# 配置logstash收集es_access的⽇志:
[root@elk-2 ~]# cat /etc/logstash/conf.d/es_access.conf 
# Settings file in YAML
input {
    kafka {
        bootstrap_servers => "192.168.100.20:9092,192.168.100.30:9092,192.168.100.40:9092"
        group_id => "logstash"
        auto_offset_reset => "earliest"
        decorate_events => true
        topics => ["es_access"]
        type => "messages"
    }
}

output {
   if [type] == "messages" {
        elasticsearch {
            hosts => ["192.168.100.20:9200","192.168.100.30:9200","192.168.100.40:9200"]
            index => "es_access-%{+YYYY-MM-dd}"
        }
   }
}

# 配置收集vmare的日志 [root@elk-2 ~]# cat /etc/logstash/conf.d/vmware.conf # Settings file in YAML input { kafka { bootstrap_servers => "192.168.100.20:9092,192.168.100.30:9092,192.168.100.40:9092" group_id => "logstash" auto_offset_reset => "earliest" decorate_events => true topics => ["vmare"] type => "messages" } } output { if [type] == "messages" { elasticsearch { hosts => ["192.168.100.20:9200","192.168.100.30:9200","192.168.100.40:9200"] index => "vmare-%{+YYYY-MM-dd}" } } } # 配置收集nginx日志 [root@elk-2 ~]# cat /etc/logstash/conf.d/nginx.conf # Settings file in YAML input { kafka { bootstrap_servers => "192.168.100.20:9092,192.168.100.30:9092,192.168.100.40:9092" group_id => "logstash" auto_offset_reset => "earliest" decorate_events => true topics => ["nginx"] type => "messages" } } output { if [type] == "messages" { elasticsearch { hosts => ["192.168.100.20:9200","192.168.100.30:9200","192.168.100.40:9200"] index => "nginx-%{+YYYY-MM-dd}" } } } [root@elk_2 ~]# chmod 755 /var/log/messages [root@elk_2 ~]# chown -R logstash /var/lib/logstash/ [root@elk-2 ~]# ln -s /usr/share/logstash/bin/logstash /usr/bin # 测试脚本是否可以使用 返回ok就是可以使用 [root@elk-2 ~]# logstash --path.settings /etc/logstash/ -f /etc/logstash/conf.d/es_access.conf --config.test_and_exit

3、测试是否成功

第一步:(因为 nginx 跟 vmware 没有这个日志文件所以没有索引)

# 查看是否升成日志索引
[root@elk-1 ~]# curl '192.168.100.20:9200/_cat/indices?v' health status index uuid pri rep docs.count docs.deleted store.size pri.store.size yellow open es_access-2022-04-18 PfGB-dZUTWC8goIxhHgUlA 5 1 11 0 36kb 36kb yellow open .kibana lp_TV8zKTWay24UMkEPtqQ 1 1 2 0 6.8kb 6.8kb

第二步:

第三步:

[root@elk-1 ~]# echo 111 > /var/log/es_access.log 
[root@elk-1 ~]# cat /var/log/es_access.log
111

第四步:

标签:实战,ELK,elk,zookeeper,192.168,kafka,9092,root
来源: https://www.cnblogs.com/huahuadebk/p/16162599.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有