ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

数据源:flume采集到的端口

2022-08-31 20:03:51  阅读:244  来源: 互联网

标签:flume sinks 数据源 端口 a1 sources node1 spark


推送式

  • 将flume采集的数据主动推送给Spark程序,容易导致Spark程序接受数据出问题,推送式整合是基于avro端口下沉地方式完成
  • 引入SparkStreaming和Flume整合的依赖
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.3.1</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-flume_2.11</artifactId>
    <version>2.3.1</version>
</dependency>
</dependencies>
  • 定义Flume采集数据进程脚本,把sink下沉地指定为avro类型的端口下沉底
[root@node1 data]# vi portToSpark.conf 
#指定agent的sources,sinks,channels
a1.sources = s1
a1.sinks = k1
a1.channels = c1
#配置sources属性
a1.sources.s1.type = netcat
a1.sources.s1.bind = node1
a1.sources.s1.port = 44444
#配置sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node1
a1.sinks.k1.port = 8888
a1.sinks.k1.batch-size = 1
#配置channel类型
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#整合flume进程中source channel sink
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
  • 通过FileUtils.createStream方法从avro的端口中获取flume采集到avro端口的实时数据
package SparkStreaming.flume

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object ByFlumePush {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[3]").setAppName("hdfs")
    val ssc: StreamingContext = new StreamingContext(conf, Seconds(10))
    val ds = FlumeUtils.createStream(ssc, "node1", 8888, StorageLevel.MEMORY_ONLY)
    ds.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
  • 启动
1. 启动flume
flume-ng agent -n a1 -f portToSpark.conf -Dflume.root.logger=INFO
2. 运行主类,将java代码打包上传到node1上
spark-submit --class flume.Demo01 ssc.jar
3. 开启监听的端口号
[root@node1 ~]# telnet node1 44444
  • 注意:
    必须保证Spark Streaming运行程序和Flume采集进程在同一个节点上,保证Spark Streaming打包的jar包必须把spark-streaming-flume_2.11:2.3.1版本的依赖包全部打包到jar包中
    (这里用的别人打的包,保存在G://shixun//ssc.jar路径下了)

拉取式

  • 将Flume采集的数据发送给sink了,sink并不是直接把数据立马给了Spark,而是先把数据缓冲,Spark接收器可以按照我的需求主动去sink中拉取数据.
    拉取式整合方式是基于Spark下沉地完成----建议使用
  • 引入依赖:
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.3.1</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-flume_2.11</artifactId>
    <version>2.3.1</version>
</dependency>
</dependencies>
  • 定义flume脚本文件,和上面的方式同,但把sink的下沉地改为SparkSink
[root@node1 data]# vi portToSpark2.conf 
#指定agent的sources,sinks,channels
a1.sources = s1
a1.sinks = k1
a1.channels = c1
#配置sources属性
a1.sources.s1.type = netcat
a1.sources.s1.bind = node1
a1.sources.s1.port = 44444
#配置sink
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname = node1
a1.sinks.k1.port = 8888
a1.sinks.k1.batch-size = 1
#配置channel类型
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#整合flume进程中source channel sink
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
  • 定义读取方法
package SparkStreaming.flume

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object ByFlumePush {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[3]").setAppName("hdfs")
    val ssc: StreamingContext = new StreamingContext(conf, Seconds(10))
    val ds = FlumeUtils.createPollingStream(ssc, "node1", 8888, StorageLevel.MEMORY_ONLY)
    ds.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
  • [注意]:
    SparkStreaming的依赖jar包复制到flume软件的lib目录下,把spark-streaming-flume的依赖jar包放到flume软件的lib目录下
[root@node1 jars]# pwd
/opt/app/spark-2.3.1/jars
[root@node1 jars]# cp spark-streaming_2.11-2.3.1.jar /opt/app/flume-1.8.0/lib/
[root@node1 data]# pwd
/opt/data
[root@node1 data]# cp ssc.jar /opt/app/flume-1.8.0/lib/

(ssc.jar为别人打的包,保存在G://shixun//ssc.jar路径下了)

  • 启动
[root@node1 data]# flume-ng agent -n a1 -f portToSpark2.conf -Dflume.root.logger=INFO,console
[root@node1 data]# spark-submit --class flume.ByFlumePush ssc2.jar
[root@node1 data]# telnet node1 44444

标签:flume,sinks,数据源,端口,a1,sources,node1,spark
来源: https://www.cnblogs.com/jsqup/p/16643991.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有