ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

[ Project ] Editing Flume.conf

2021-05-05 23:00:09  阅读:261  来源: 互联网

标签:Flume a1 sinks kafka Project sources k1 Editing channels


文件编写

简单了解

一、介绍

  这一步主要是利用 flume 采集 HDFS 上的源数据并流向 kafka

二、简单要点

  1.Flume 是什么?
  简单了解一下。

  2.为什么 flumeKafka 要联合使用 ?
  简单了解一下。

  3.建议用谷歌浏览器打开 Flume 官方网站(地址:flume.apache.org),如果需要可在站内将网页转为中文。

  4.首先是查看用户指南,找到 Spooling Directory Source ,这主要是说把 文件夹 放到 目录 中来索取数据,说明这边监控的是文件夹,不监控文件,所以在编辑脚本时,目录写到文件夹即可,不需要写到文件。

三、操作步骤

(一)按图操作

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

(二)修改 sources

找到 Spooling Directory Source

  • 把这一段代码考过来
a1.channels = ch-1										# 设置名称
a1.sources = src-1

a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir =/var/log/apache/flumeSpool	# 需要flume处理的文件目录
a1.sources.src-1.fileHeader = true						# 是否添加存储绝对路径文件名的标头
  • 进行简单修改:
a1.channels = c1
a1.sources = s1

a1.sources.s1.type = spooldir
a1.sources.s1.channels = ch-1
a1.sources.s1.spoolDir =/opt/data/event_attendees
  • 最大长度进行修改:
    注:因查询文件中最长的一行达10w个字符,故将下面最长行限制设置为12W
    查询代码:wc -L 文件名
a1.sources.s1.deserializer.maxLineLength=120000

(三)使用拦截器去除表头

  1. 先找到正则拦截器

在这里插入图片描述

  1. 编写代码
    注:因为选择了正则拦截器,那 type 势必为 regex_filter

在这里插入图片描述

a1.sources.s1.interceptors=i1
a1.sources.s1.interceptors.i1.type = regex_filter
a1.sources.s1.interceptors.i1.regex = \s*event.*
a1.sources.s1.interceptors.i1.excludeEvents=true
  • regex 后面跟的是正则表达式。

  • regex 后面跟的是正则表达式。

  • true 代表符合正则表达式的都不要,都过滤掉。

(四)修改 channels

找到 File Channels

  • 把这一段代码考过来
a1.channels = c1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint	# 这是检查点文件夹
a1.channels.c1.dataDirs = /mnt/flume/data				# 这是数据通道
  • 这里的 checkpointDirdataDirs 文件夹在代码运行的时候系统都会自动创建
  • 进行简单修改:
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/flumekfk/checkpoint
a1.channels.c1.dataDirs = /opt/flumekfk/data

如果担心在传输的过程中日志会丢掉,就要加上一个备份检查点。

在这里插入图片描述

(五)修改 sinks

找到 Kafka Sink

  • 把这一段代码考过来
a1.sinks.k1.channel  =  c1 
a1.sinks.k1.type  =  org.apache.flume.sink.kafka.KafkaSink 
a1.sinks.k1.kafka.topic  =  mytopic 
a1.sinks.k1.kafka.bootstrap.servers  = 本地主机:9092 
a1.sinks.k1.kafka.flumeBatchSize  =  20 					# 一批的尺寸是20条
a1.sinks.k1.kafka.producer.acks  =  1 
a1.sinks.k1.kafka.producer.linger.ms  =  1 					# 保留缓存时间
a1.sinks.k1.kafka .producer.compression.type  = snappy		# 压缩方式
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = event_attendees_raw
a1.sinks.k1.kafka.bootstrap.servers = 192.168.59.200:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 10
a1.sinks.k1.kafka.producer.batch.size=524288

四、完整配置代码

a1.channels = c1
a1.sources = s1
a1.sinks = k1

a1.sources.s1.type = spooldir
a1.sources.s1.channels = c1
a1.sources.s1.spoolDir = /opt/data/event_attendees
a1.sources.s1.deserializer.maxLineLength=120000

a1.sources.s1.interceptors=i1
a1.sources.s1.interceptors.i1.type = regex_filter
a1.sources.s1.interceptors.i1.regex = \s*event.*
a1.sources.s1.interceptors.i1.excludeEvents=true

a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/flumekfk/checkpoint
a1.channels.c1.dataDirs = /opt/flumekfk/data


a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = event_attendees_raw
a1.sinks.k1.kafka.bootstrap.servers = 192.168.59.200:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 10
a1.sinks.k1.kafka.producer.batch.size=524288
  • kafka 参数调优
  • 刚刚上边查出一行最长有 107513 个字符,所以 batch.size 要设置成 524288 ,也就是 512kb,才能保证实现批数据处理
  • kafka.topic 这里要先在 kafka 里创建同名 topic

五、执行命令

  • 执行
flume-ng agent -n a1 -c /opt/software/hadoop/flume160/conf -f /opt/flumecfg/event_attendees.conf -Dflume.root.logger=info,console
  • 查询行数
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.59.200:9092 --topic event_attendees_raw --time -1
  • 后台监控
kafka-console-consumer.sh --bootstrap-server 192.168.59.200:9092 --topic event_attendees_raw --from-beginning

标签:Flume,a1,sinks,kafka,Project,sources,k1,Editing,channels
来源: https://blog.csdn.net/weixin_51184877/article/details/116310683

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有