ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

大数据技术Flume框架详解

2022-08-29 23:01:14  阅读:171  来源: 互联网

标签:Flume channels sinks 框架 a1 sources 详解 c1 a2


Flume的概述

Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日 志采集、聚合和传输的系统。Flume基于流式架构,灵活简单。

  • 高可用(HA) flume框架(故障转移机制)
  • 高可靠 数据采集的可靠性
  • 分布式 分布式集群搭建

Flume的作用

最主要的作用:实时读取服务器本地磁盘的数据,将数据写到HDFS、Kafka

Flume的优点

可以和任意存储进程集成。

  • 支持不同的采集源
  • 支持多类型的目标源

输入的的数据速率大于写入目的存储的速率,flume会进行缓冲,减小 hdfs的压力

flume中的事务基于channel,使用了两个事务模型(sender + receiver),确保消息被可靠发送

Flume使用两个独立的事务分别负责从soucrce到channel,以及从 channel到sink的事件传递。一旦事务中所有的数据全部成功提交到 channel,那么source才认为该数据读取完成。同理,只有成功被sink 写出去的数据,才会从channel中移除。

Flume的组成结构

1、Flume组成架构

2、Agent

a、简介

Agent是一个JVM进程,它以事件的形式将数据从源头送至目的。Agent 主要有3个部分组成,Source、Channel、Sink。

b、Source

Source是负责接收数据到Flume Agent的组件。Source组件可以处理 各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、 spooling directory、netcat、sequence generator、syslog、 http、legacy。

c、Channel

Channel是位于Source和Sink之间的缓冲区。因此,Channel允许 Source和Sink运作在不同的速率上。Channel是线程安全的,可以同 时处理几个Source的写入操作和几个Sink的读取操作。 Flume自带两种Channel:Memory Channel和File Channel。 Memory Channel是内存中的队列。Memory Channel在不需要关心 数据丢失的情景下适用。如果需要关心数据丢失,那么Memory Channel就不应该使用,因为程序死亡、机器宕机或者重启都会导致数 据丢失。File Channel将所有事件写到磁盘。因此在程序关闭或机器宕 机的情况下不会丢失数据。

d、Sink

Sink不断地轮询Channel中的事件且批量地移除它们,并将这些事件批 量写入到存储或索引系统、或者被发送到另一个Flume Agent。Sink 是完全事务性的。在从Channel批量删除数据之前,每个Sink用 Channel启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume Agent,Sink就利用Channel提交事务。事务一旦被提交,该 Channel从自己的内部缓冲区删除事件。Sink组件目的地包括hdfs、 logger、avro、thrift、ipc、file、null、HBase、solr、自定义。

e、Event

传输单元,Flume数据传输的基本单元,以事件的形式将数据从源头送 至目的地。 Event由可选的header和载有数据的一个byte array 构成。Header是容纳了key-value字符串对的HashMap。

Flume agent的配置文件

单数据源单出口案例

这种模式是将多个flume给顺序连接起来了,从最初的source开始到最 终sink传送的目的存储系统。此模式不建议桥接过多的flume数量, flume数量过多不仅会影响传输速率,而且一旦传输过程中某个节点 flume宕机,会影响整个传输系统。

flume实现监控端口数据案例:

用netcat工具向本机端口号:44444发送消息,flume监听

# Name the components on this agent
# r1:表示a1的输入源	a1:表示agent的名称
a1.sources = r1
# k1:表示a1的输出目的地
a1.sinks = k1
# c1:表示a1的缓冲区
a1.channels = c1

# Describe/configure the source
# 表示a1的输入源类型为netcat端口类型
a1.sources.r1.type = netcat
# 表示a1的监听主机
a1.soucres.r1.bind = localhost
# 表示a1的监听的端口号
a1.sources.r1.port = 44444

# Describe the sink
# 表示a1的输出目的地是控制台logger类型
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
# 表示a1的channel类型是memory内存型
a1.channels.c1.type = memory
# 表示a1的channel总容量1000个event
a1.channels.c1.capacity = 1000
# 表示a1的channel传输时收集到100条event以后再去提交事务
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
# 表示讲r1和c1链接起来
a1.sources.r1.channels = c1
# 表示将k1和c1链接起来
a1.sinks.k1.channel = c1

启动flume

  • 方法一:bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-netcat-logger.conf - Dflume.root.logger=INFO,console

  • 方法二:bin/flume-ng agent -c conf/ -n a1 –f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console

参数说明

  • --conf conf/ :表示(conf)配置文件存储在conf/目录
  • --name a1 :表示给agent起名为a1
  • --conf-file job/flume-netcat.conf :flume本次启动读取的配置 文件是在job文件夹下的flume-telnet.conf文件。
  • -Dflume.root.logger==INFO,console :-D表示flume运行时动 态修改flume.root.logger参数属性值,并将控制台日志打印级别设 置为INFO级别。日志级别包括:log、info、warn、error。

实时采集文件到HDFS上案例

用flume实时监听某文件,当该文件的内容变化时,上传该数据到HDFS上。

# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# Describe/configure the source
# 定义数据源文件的类型
a2.sources.r2.type = exec
# 监听该目录下的access.log文件
a2.sources.r2.command = tail -F /home/hadoop/nginx/logs/access.log
a2.sources.r2.shell = /bin/bash -c

# Describe the sink
a2.sinks.k2.type = hdfs
# 上传文件的路径 %Y%m%d为时间戳,自动生成对应时间 年月日
a2.sinks.k2.hdfs.path = hdfs://192.168.137.128:9000/flume/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 1000
#设置文件类型,可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k2.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

实时读取目录文件到HDFS上案例

使用flume实时监听整个目录文件,当该目录文件新增时,上传该文件到HDFS上。

a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Describe/configure the source
# 定义source类型为目录
a3.sources.r3.type = spooldir
# 定义监控目录
a3.sources.r3.spoolDir = /home/hadoop/bigdatasoftware/flume/upload
# 定义文件上传完的后缀名
a3.sources.r3.fileSuffix = .COMPLETED
# 是否有五年间头
a3.sources.r3.fileHeader = true
#忽略所有以.tmp结尾的文件,不上传
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)

# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://192.168.137.128:9000/flume/upload/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a3.sinks.k3.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

单数据源多出口案例(选择器)

Flume支持将事件流向一个或者多个目的地。这种模式将数据源复制到 多个channel中,每个channel都有相同的数据,sink可以选择传送的 不同的目的地。

flume1监控文件的变动,并将变动的内容传递给flume2和flume3。

flume2负责输出到HDFS上

flume3负责输出到本地上

三个flume在同一台设备上

flume1:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 将数据流复制给所有channel
a1.sources.r1.selector.type = replicating

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/bigdatasoftware/nginx/logs/access.log
a1.sources.r1.shell = /bin/bash -c

# Describe the sink
# sink端的avro是一个数据发送者
a1.sinks.k1.type = avro
# 设置其中一个flume接收的地址
a1.sinks.k1.hostname = 192.168.137.128
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
# 设置另一个flume的接收地址
a1.sinks.k2.hostname = 192.168.137.128
a1.sinks.k2.port = 4142

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

flume2:

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
# source端的avro是一个数据接收服务
a2.sources.r1.type = avro
# 设置本机地址,注意端口号
a2.sources.r1.bind = 192.168.137.128
a2.sources.r1.port = 4141

# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path =
hdfs://192.168.137.128:9000/flume2/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 600
#设置每个文件的滚动大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k1.hdfs.rollCount = 0

# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

flume3:

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = 192.168.137.128
a3.sources.r1.port = 4142

# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/data/flume3

# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

注意:接收方与发送方的地址和端口号要对应

单数据源多出口案例(Sink组)

Flume支持使用将多个sink逻辑上分到一个sink组,flume将数据发送 到不同的sink,主要解决负载均衡和故障转移问题

配置1个接收日志文件的source和1个channel、两个sink,分别输送给flume-flume-console1和flume-flume-console2。

flume1:

a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2

a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 22222

#定义一个sink组
#一个channel对应多个sink时要设置一个sinkgroups
a1.sinkgroups = g1
#指明sink组中的sink实例
a1.sinkgroups.g1.sinks = k1 k2
#设置sinkProcessor的类型(负载均衡)
a1.sinkgroups.g1.processor.type = load_balance
#①random-随机分配  ②round_robin-轮循
a1.sinkgroups.g1.processor.selector = random


a1.channels.c1.type = memory


a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 192.168.137.128
a1.sinks.k1.port = 33333

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = 192.168.137.129
a1.sinks.k2.port = 44444


a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

flume2:

a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = avro
a1.sources.r1.bind = 192.168.137.128
a1.sources.r1.port = 33333

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000

a1.sinks.k1.type = logger

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

flume3:

a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = avro
a1.sources.r1.bind = 192.168.137.129
a1.sources.r1.port = 44444

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000

a1.sinks.k1.type = logger


a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

多数据源汇总

这种模式是我们最常见的,也非常实用,日常web应用通常分布在上百 个服务器,大者甚至上千个、上万个服务器。产生的日志,处理起来也 非常麻烦。用flume的这种组合方式能很好的解决这一问题,每台服务 器部署一个flume采集日志,传送到一个集中收集日志的flume,再由 此flume上传到hdfs、hive、hbase、jms等,进行日志分析

flume1监控一个文件的变动

flume2监控一个端口的数据

flume1和flume2将数据发送给flume3,flume3最终将数据打印到控制台。

flume1:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/nginx/logs/access.log
a1.sources.r1.shell = /bin/bash -c

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 192.168.137.129
a1.sinks.k1.port = 4141

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

flume2:

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = 198.168.137.128
a2.sources.r1.port = 44444

# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = 192.168.137.129
a2.sinks.k1.port = 4141

# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

flume3:

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = 192.168.137.129
a3.sources.r1.port = 4141

# Describe the sink
# Describe the sink
a3.sinks.k1.type = logger

# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

标签:Flume,channels,sinks,框架,a1,sources,详解,c1,a2
来源: https://www.cnblogs.com/Mr-Sponge/p/16637703.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有