ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Flume-day02_基础案例

2022-06-18 08:31:22  阅读:227  来源: 互联网

标签:Flume channels sinks day02 a1 案例 k1 c1 r1


1 、基础案例

案例一:

在使用之前,提供一个大致思想,使用Flume的过程是确定scource类型,channel类型和sink类型,编写conf文件并开启服务,在数据捕获端进行传入数据流入到目的地。

案例一、从控制台打入数据,在控制台显示

1、确定scource类型,channel类型和sink类型

确定的使用类型分别是,netcat source, memory channel, logger sink.

2、编写conf文件

#a代表agent的名称,r1代表source的名称。c1代表channel名称,k1代表的是sink的名称
#声明各个组件
a.sources=r1
a.channels=c1
a.sinks=k1
#定义source类型,这里是试用netcat的类型
a.sources.r1.type=netcat
a.sources.r1.bind=192.168.44.110
a.sources.r1.port=8888
#定义source发送的下游channel
a.sources.r1.channels=c1
#定义channel
a.channels.c1.type=memory
#缓存的数据条数
a.channels.c1.capacity=1000
#事务数据量
a.channels.c1.transactionCapacity=1000
#定义sink的类型,确定上游channel
a.sinks.k1.channel=c1
a.sinks.k1.type=logger

将上述的配置放到创建的文件夹中

3、开启服务,我们重新开启复制一个客户端进行开启服务

命令: 注意 -n 后面跟着的是你在conf文件中定义好的,-f 后面跟着的是编写conf文件的路径

[root@master scrips]# flume-ng agent -n a -c flume-1.9.0/conf -f /usr/local/soft/bigdata17/scrips/netcat.conf -Dflume.root.logger=DEBUG,console

4、在另一个客户端输入命令:

注意:这里的master和8888是在conf文件中设置好的ip地址和端口

在输入第二个命令的窗口中输入数据,回车,在服务端就会接收到数据。

yum install -y telnet

telnet master 8888

案例二、从本地指定路径中打入数据到HDFS

1、同样,我们需要先确定scource类型,channel类型和sink类型

我们确定使用的类型分别是,spooldir source, memory channle, hdfs sink

2、编写conf文件

a1.sources = r1
a1.sinks = k1
a1.channels = c1
#指定spooldir的属性
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /usr/local/soft/atao_file/flumedata
#时间拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
#指定sink的类型
a1.sinks.k1.type = hdfs
#指定hdfs的集群地址和路径,路径如果没有创建会自动创建
a1.sinks.k1.hdfs.path =hdfs://master:9000/flume/bigdata17/flumeout/log_s/dt=%Y-%m-%d
#指定hdfs路径下生成的文件的前缀
a1.sinks.k1.hdfs.filePrefix = log_%Y-%m-%d
#手动指定hdfs最小备份
a1.sinks.k1.hdfs.minBlockReplicas=1
#设置数据传输类型
a1.sinks.k1.hdfs.fileType = DataStream
#如果参数为0,不按照条数生成文件。如果参数为n,就是按照n条生成一个文件
a1.sinks.k1.hdfs.rollCount = 1000000
#这个参数是hdfs下文件sink的数据size。每sink 32MB的数据,自动生成一个文件
a1.sinks.k1.hdfs.rollSize =0
#每隔n 秒 将临时文件滚动成一个目标文件。如果是0,就不按照时间进行生成目标文件。
a1.sinks.k1.hdfs.rollInterval =0
a1.sinks.k1.hdfs.idleTimeout=0
#指定channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 10000
 #组装
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3、开启服务

[root@master scrips]# flume-ng agent -n a1 -c ../../flume/conf -f ./linux2hdfs.conf -Dflume.root.logger=DEBUG, console

4、将文件复制到指定的目录下

cp DIANXIN.csv /usr/local/soft/flumedata/

案例三、从java代码中进行捕获打入到HDFS

1、先确定scource类型,channel类型和sink类型

确定的三个组件的类型是,avro source, memory channel, hdfs sink

2、打开maven项目,添加依赖

            <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
            <dependency>
                <groupId>org.apache.flume</groupId>
                <artifactId>flume-ng-core</artifactId>
                <version>1.9.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flume.flume-ng-clients</groupId>
                <artifactId>flume-ng-log4jappender</artifactId>
                <version>1.9.0</version>
            </dependency>

3、设置log4J的内容

log4j.rootLogger=INFO,stdout,flume

log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout 
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n


log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = 192.168.230.50
log4j.appender.flume.Port = 41414
log4j.appender.flume.UnsafeMode = true
log4j.appender.flume.layout=org.apache.log4j.PatternLayout 
log4j.appender.flume.layout.ConversionPattern=%m%n

编写java代码(示例,可以修改logger打印的内容)

package com.shujia.log2flume;

import org.apache.log4j.Logger;

import java.text.SimpleDateFormat;
import java.util.Date;

public class LoggerToFlume {
    public static void main(String[] args) throws InterruptedException {
        //创建一个logger对象
        Logger logger = Logger.getLogger(LoggerToFlume.class.getName());

        //创建一个日期格式化对象
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        //写一个死循环
        while (true) {
            Date date = new Date();
            logger.info("dateToBigdata17: " + sdf.format(date));
            //让线程休眠一会儿
            Thread.sleep(1000);
        }

    }
}

4、编写conf文件

#定义agent名, source、channel、sink的名称
a.sources = r1
a.channels = c1
a.sinks = k1

#具体定义source
a.sources.r1.type = avro
a.sources.r1.bind = 192.168.44.110
a.sources.r1.port = 41414

#具体定义channel
a.channels.c1.type = memory
a.channels.c1.capacity = 10000
a.channels.c1.transactionCapacity = 100

#具体定义sink
a.sinks.k1.type = hdfs
a.sinks.k1.hdfs.path =hdfs://master:9000/shujia/bigdata17/flumeout2/flume_hdfs_avro2
a.sinks.k1.hdfs.filePrefix = events
a.sinks.k1.hdfs.minBlockReplicas=1
a.sinks.k1.hdfs.fileType = DataStream
#a.sinks.k1.hdfs.fileType = CompressedStream
#a.sinks.k1.hdfs.codeC = gzip
#不按照条数生成文件
a.sinks.k1.hdfs.rollCount = 1000
a.sinks.k1.hdfs.rollSize =0
#每隔N s将临时文件滚动成一个目标文件
a.sinks.k1.hdfs.rollInterval =0
a.sinks.k1.hdfs.idleTimeout=0 
#组装source、channel、sink
a.sources.r1.channels = c1
a.sinks.k1.channel = c1

5、开启服务,命令:

flume-ng agent -n a -c ../conf -f ./avro2hdfs2.conf -Dflume.root.logger=DEBUG,console

6、运行Java代码

7、查看HDFS

案例四、监控HBase日志到Hbase表中(这里可以换成其他组件日志监控)

1、监控日志

提前建好表

 create 'log','cf1'

编写conf文件 hbaselog2hdfs.conf

# a表示给agent命名为a
# 给source组件命名为r1
a.sources = r1
# 给sink组件命名为k1
a.sinks = k1 
# 给channel组件命名为c1
a.channels = c1


#指定spooldir的属性
a.sources.r1.type = exec 
a.sources.r1.command = cat /usr/local/soft/hbase-1.4.6/logs/hbase-root-master-master.log

#指定sink的类型
a.sinks.k1.type = hbase
a.sinks.k1.table = log
a.sinks.k1.columnFamily = cf1

#指定channel
a.channels.c1.type = memory 
a.channels.c1.capacity = 10000
# 表示sink每次会从channel里取多少数据
a.channels.c1.transactionCapacity = 100

# 组装
a.sources.r1.channels = c1 
a.sinks.k1.channel = c1

运行

flume-ng agent -n a -c ../conf -f ./ hbaselog2hdfs.conf -Dflume.root.logger=DEBUG,console

2、监控自定义的文件

确保test_idoall_org表在hbase中已经存在:

hbase(main):002:0> create 'test_idoall_org','uid','name'
0 row(s) in 0.6730 seconds

=> Hbase::Table - test_idoall_org
hbase(main):003:0> put 'test_idoall_org','10086','name:idoall','idoallvalue'
0 row(s) in 0.0960 seconds

2.创建配置文件:

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /usr/local/soft/atao_file/data.txt
a1.sources.r1.port = 44444
a1.sources.r1.host = 192.168.44.110

# Describe the sink
a1.sinks.k1.type = hbase
a1.sinks.k1.table = test_idoall_org
a1.sinks.k1.columnFamily = name
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3.启动flume agent:

flume-ng agent -n a1 -c ../../flume/conf -f ./file2hbase.conf -Dflume.root.logger=DEBUG, console

4.产生数据:

echo "hello idoall.org from flume" >> data.txt

案例五、flume监控Http source

1、先确定scource类型,channel类型和sink类型

  确定的三个组件的类型是,http source, memory channel, logger sink.

2、编写conf文件

a1.sources=r1
a1.sinks=k1
a1.channels=c1
 
a1.sources.r1.type=http
a1.sources.r1.port=50000
a1.sources.r1.channels=c1
 
a1.sinks.k1.type=logger
a1.sinks.k1.channel=c1
 
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
# 表示sink每次会从channel里取多少数据
a1.channels.c1.transactionCapacity=100

3、启动服务

flume-ng agent -n a1 -f ./httpToLogger.conf -Dflume.root.logger=DEBUG,console

4、复制一个窗口进行打数据

curl -X POST -d'[{"headers":{"h1":"v1","h2":"v2"},"body":"nihao wangtao"}]'  http://192.168.44.110:50000

标签:Flume,channels,sinks,day02,a1,案例,k1,c1,r1
来源: https://www.cnblogs.com/atao-BigData/p/16387610.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有