ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

flume 读目录下文件 ,同步到kafka

2022-06-12 23:01:49  阅读:181  来源: 互联网

标签:flume channels 同步 sinks kafka a1 sources aSource


启动和配置flume

Agent 是一个 JVM 进程,它以事件(Event)的形式将数据从源头(Source)通过渠道(Channel)送至目标端(Sink)。

Agent 主要有 3 个部分组成,Source、Channel、Sink。

# 目录->kafka
a1.sources = s1
a1.sinks = k1
a1.channels = c1

a1.sources.s1.channels = c1
a1.sinks.k1.channel = k1

a1.sources.s1.type = exec
a1.sources.s1.command = tail -F /home/yu/access.log

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = flumnTopic
a1.sinks.k1.kafka.bootstrap.servers = master:2181
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.produce.ack = 1
a1.sinks.k1.kafka.produce.linger.ms = 1
a1.sinks.k1.kafka.produce.compression.type = snapp

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transcationCapacity = 100

0.依赖 jdk

1.安装flumn

2.配置环境变量

vim /etc/profile.d/flume.sh

FLUMN_HOME=/opt/flumn
PATH=$PATH:FLUMN_HOME/bin

source profile

3.配置flume

  • cp flume-env.sh.template flume-env.sh
export JAVA_HOME=/opt/jdk1.8.0_191
  • cp flume-conf.properties.template flume-conf.properties

source 使用 spoolingDirectory(spooldir)(假脱机目录)

channel 使用memory

sink使用kafka

#1.agent a 
a.sources=aSource
a.channels=aChannel
a.sinks=aSink

# 2.连接
a.sources.aSource.channels=aChannel
a.sinks.aSink.channel=aChannel

# 3.source 
#类型 目录文件
a.sources.aSource.type=spooldir
a.sources.aSource.spoolDir=/opt/apache-zookeeper-3.7.0-bin/logs
# 反序列化器
a.sources.aSource.deserializer=LINE
a.sources.aSource.deserializer.maxLineLength=320000
# 文件正则匹配格式
a.sources.aSource.includePattern= zookeeper-root-server-app.out
# 拦截器
a.sources.aSource.interceptors=head_filter
a.sources.aSource.interceptors.head_filter.type=regex_filter
a.sources.aSource.interceptors.head_filter.regex=^user*
a.sources.aSource.interceptors.head_filter.excludeEvents=true

#4.channel
#a.channels.aChannel.type=file
#a.channels.aChannel.checkpointDir=/opt/kb15tmp/checkpoint/a
#a.channels.aChannel.dataDirs=/opt/kb15tmp/checkpoint/data/a
a.channels.aChannel.type=memory
a.channels.aChannel.capacity=100000
a.channels.aChannel.transactionCapacity=10000

#5.sink
a.sinks.aSink.type=org.apache.flume.sink.kafka.KafkaSink
a.sinks.aSink.batchSize=640
a.sinks.aSink.brokerList=app:9092
a.sinks.aSink.topic=topic01


4.启动flume

# --name a  agent的name
bin/flume-ng agent --conf conf/ --name a --conf-file conf/flume-conf.properties

标签:flume,channels,同步,sinks,kafka,a1,sources,aSource
来源: https://www.cnblogs.com/mznsndy/p/16369177.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有