ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Flume实例分析

2022-09-15 14:30:19  阅读:278  来源: 互联网

标签:Flume 分析 sinks memory sources k1 实例 conf flume


需求1:从指定网络端口(44444)采集数据输出到控制台
需求2:监控一个文件实时采集新增的数据输出到控制台
需求3:将A服务器上的日志实时采集到B服务器


一、需求1:从指定网络端口(44444)采集数据输出到控制台

1.建立一个test.conf(简单的节点flume的配置)

(1)使用flume的关键在于写配置文件

a)配置source
b)配置 channel
c)配置 Sink
d)把以上三个组件串起来

a1:agent的名称
r1:数据源的名称
k1:sink的名称
c1:channel 的名称


(2)在/kbb/install/flume/conf目录下建立test.conf文件
vim test.conf

(3)test.conf内容如下:
#name the compents on this agent
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#describe/configure the source 配置source
a1.sources.r1.type=netcat
a1.sources.r1.bind=node01
a1.sources.r1.port=44444

#describe the sink 配置sink
a1.sinks.k1.type=logger

#use a channel which buffers events in memory 存储到memory
a1.channels.c1.type=memory

#bind the source and sink to channel
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

 

 

 

2.启动agent

 /kbb/install/flume/bin 目录下启动下列命令
./flume-ng agent --name a1 --conf /kbb/install/flume/conf --conf-file /kbb/install/flume/conf/test.conf -Dflume.root.logger=INFO,console

克隆窗口
使用telnet进行测试:
telnet node01 44444

传递消息时窗口中出现下列格式的传递消息
Event: { headers:{} body: 68 65 6C 6C 6F 0D hello. }
Event是flume的数据传输基本单元
Event=可选的header+byte arry

 


二、需求2:监控一个文件实时采集新增的数据输出到控制台

1(输出到控制台)

Agent选型:exec source+ memory channel +logger sink

1.)创建一个文件 exec-memory-logger.conf
exec-memory-logger.conf 配置文件如下:

#name the compents on this agent
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#describe/configure the source 配置source
a1.sources.r1.type=exec
a1.sources.r1.command=tail -F /kbb/install/flume/data/data.log #监控文件路径
a1.sources.r1.shell=/bin/sh -c

#describe the sink 配置sink
a1.sinks.k1.type=logger

#use a channel which buffers events in memory 存储到memory
a1.channels.c1.type=memory

#bind the source and sink to channel
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

 

 

2).启动agent
/kbb/install/flume/bin 目录下启动下列命令
./flume-ng agent --name a1 --conf /kbb/install/flume/conf --conf-file /kbb/install/flume/conf/exec-memory-logger.conf -Dflume.root.logger=INFO,console

克隆窗口
echo welcome >>data.log (向/kbb/install/flume/data/data.log文件中写入welcome等内容)
往监控文件data.log中输入内容,控制台上会显示输入的内容,实现了对某个文件的实时监控

 

3(将内容输出到hdfs:离线)

hdfs中新建文件夹 hadoop fs -mkdir /filename
hadoop fs -mkdir /user/flume/test

3.配置文件file-flume-hdfs.conf

#name the compents on this agent
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#describe/configure the source 配置source

a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir=/home/hadoop/flume

#describe the sink 配置sink
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=hdfs://node01:9870/user/flume/test/%y-%m-%d/%H%M/

a1.sinks.k1.hdfs.filePrefix = Data
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.useLocalTimeStamp = true

#use a channel which buffers events in memory 存储到memory
a1.channels.c1.type=memory

#bind the source and sink to channel

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1


bin目录下启动agent
./flume-ng agent --name a1 --conf /kbb/install/flume/conf/test --conf-file /kbb/install/flume/conf/test/file-flume-hdfs.conf -Dflume.root.logger=INFO,console

 

三、需求3:将A服务器上的日志实时采集到B服务器

1.分析


技术选型:exec source +memory channel +avro sink
avro source +memory channel +logger sink

A服务器:
Agent:
source:type=exec
sink:type=avro

B服务器:
Agent:
source:type=avro
sink:type=logger

完成该需求应该写两份配置文件:(配置文件1和2中不能都是a1)

2.配置文件1:exec-memory-avro.conf

#name the compents on this agent
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#describe/configure the source 配置source
a1.sources.r1.type=exec
a1.sources.r1.command=tail -F /kbb/install/flume/data/data.log #监控文件路径
a1.sources.r1.shell=/bin/sh -c

#describe the sink 配置sink
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=node01
a1.sinks.k1.port=44444

#use a channel which buffers events in memory 存储到memory
a1.channels.c1.type=memory

#bind the source and sink to channel
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

 

 

 

3.配置文件2:avro-memory-logger.conf

a2.sources = r2
a2.channels = c2
a2.sinks = k2

#describe/configure the source 配置source
a2.sources.r2.type=avro
a2.sources.r2.bind=node01
a2.sources.r2.port=44444

#describe the sink 配置sink
a2.sinks.k2.type=logger

#use a channel which buffers events in memory 存储到memory
a2.channels.c2.type=memory

#bind the source and sink to channel
a2.sources.r2.channels=c2
a2.sinks.k2.channel=c2

 

 

4.启动agent

1)一定先启动avro-memory-logger.conf(监听)
./flume-ng agent --name a2 --conf /kbb/install/flume/conf/test --conf-file /kbb/install/flume/conf/test/avro-memory-logger.conf -Dflume.root.logger=INFO,console

2)后启动exec-memory-avro.conf
./flume-ng agent --name a1 --conf /kbb/install/flume/conf/test --conf-file /kbb/install/flume/conf/test/exec-memory-avro.conf -Dflume.root.logger=INFO,console

 

 

总结:日志收集过程:

1)机器A(exec source+memory channel+avro sink)上监控一个文件,当我们访问主站时会有用户行为日志记录到access.log中输入内容,控制台上会显示输入的内容,实现了对某个文件的实时监控
2)avro sink把新产生的日志输出到对应的avro source(机器B的source)指定的hostname和port上
3)通过avro source 对应的agent(机器B的logger sink)将日志输出到控制台(以后该位置对接kafka)

 结果:

 

 

标签:Flume,分析,sinks,memory,sources,k1,实例,conf,flume
来源: https://www.cnblogs.com/Lizhichengweidashen/p/16696428.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有