ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

大数据——Scala和Java实现Spark Streaming实时流监控、Spark Streaming搭配Kafka Stream联用和Spark Streaming自定义采集器

2020-12-27 10:31:07  阅读:157  来源: 互联网

标签:String hello ------------------------------------------- Streaming 采集器 apache or


Scala和Java实现SparkStreaming

Spark Streaming实时流监控端口数据进行WordCount

Scala版本实现Spark Streaming

  • 添加maven依赖
<dependencies>
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
      <version>2.6.6</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>2.0.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-streams</artifactId>
      <version>2.0.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.4.5</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.4.5</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
      <!--<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>-->
      <version>2.4.5</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.4.5</version>
    </dependency>
  </dependencies>
  • 编写Scala程序
package nj.zb.kb09.spark

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreamDemo1 {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamDemo1")

    //采集周期,指定的3秒为每次采集的时间间隔
    val streamingContext = new StreamingContext(sparkConf,Seconds(3))

    //指定采集的方法
    val socketLineStream: ReceiverInputDStream[String] = streamingContext.socketTextStream("192.168.136.100",7777)

    //将采集来的信息进行处理,统计数据(wordcount)

    val wordStream: DStream[String] = socketLineStream.flatMap(line=>line.split("\\s+"))

    val mapStream: DStream[(String, Int)] = wordStream.map(x=>(x,1))

    val wordcountStream: DStream[(String, Int)] = mapStream.reduceByKey(_+_)

    //打印
    wordcountStream.print()


    //启动采集器
    streamingContext.start()

    streamingContext.awaitTermination()
  }
}
  • 在Linux端启动端口服务
[root@hadoop100 ~]# nc -lk 7777

注意:一定要先启动端口服务,再启动Scala程序,不然会直接报错

  • 启动Scala程序
  • 在端口输入单词
hello world
hello spark
hello scala
hello scala
  • 相应的在Scala控制台打印出了单词词频
    在这里插入图片描述
-------------------------------------------
Time: 1608454200000 ms
-------------------------------------------
(hello,1)
(world,1)

-------------------------------------------
Time: 1608454203000 ms
-------------------------------------------
(hello,1)
(spark,1)

-------------------------------------------
Time: 1608454206000 ms
-------------------------------------------

-------------------------------------------
Time: 1608454209000 ms
-------------------------------------------
(hello,1)
(scala,1)

-------------------------------------------
Time: 1608454212000 ms
-------------------------------------------
(hello,1)
(scala,1)

根据指定采集周期,每次间隔3秒采集一次,本质上SparkStreaming是微批处理

Java版本实现SparkStreaming

  • 编写Java程序
package nj.zb.kb09.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;

public class SparkStreamJavaDemo1 {
	public static void main(String[] args) throws InterruptedException {
		SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamJavaDemo1");

		JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));

		JavaReceiverInputDStream<String> lines = jsc.socketTextStream("192.168.136.100", 7777);

		JavaDStream<String> flatMap = lines.flatMap(new FlatMapFunction<String, String>() {
			@Override
			public Iterator<String> call(String s) throws Exception {
				String[] split = s.split("\\s+");
				return Arrays.asList(split).iterator();
			}
		});
		JavaPairDStream<String, Integer> mapToPair = flatMap.mapToPair(new PairFunction<String, String, Integer>() {
			@Override
			public Tuple2<String, Integer> call(String s) throws Exception {
				return new Tuple2<String, Integer>(s, 1);

			}
		});

		JavaPairDStream<String, Integer> reduceByKey = mapToPair.reduceByKey(new Function2<Integer, Integer, Integer>() {
			@Override
			public Integer call(Integer integer, Integer integer2) throws Exception {
				return integer + integer2;
			}
		});
		reduceByKey.print();
		jsc.start();
		jsc.awaitTermination();
	}
}

  • 启动Java程序
  • 在Linux端开启端口
[root@hadoop100 ~]# nc -lk 7777
  • 在端口输入
hello world
hello scala
hello scala
hello scala
hello world
  • 相应的在Java控制台打印了单词词频
-------------------------------------------
Time: 1608454655000 ms
-------------------------------------------
(hello,2)
(world,1)
(scala,1)

-------------------------------------------
Time: 1608454660000 ms
-------------------------------------------
(hello,1)
(scala,1)

-------------------------------------------
Time: 1608454665000 ms
-------------------------------------------
(hello,1)
(scala,1)

Spark Streaming实时流监控文件夹数据进行WordCount

  • 编写Java程序
package nj.zb.kb09.spark

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreamFileDataSource {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamFileDataSource")

    val streamingContext = new StreamingContext(sparkConf,Seconds(5))

    val fileDStream: DStream[String] = streamingContext.textFileStream("in/test/")

    val wordStream: DStream[String] = fileDStream.flatMap(x=>x.split("\\s+"))

    val mapStream: DStream[(String, Int)] = wordStream.map((_,1))

    val sumStream: DStream[(String, Int)] = mapStream.reduceByKey(_+_)

    sumStream.print()

    streamingContext.start()

    streamingContext.awaitTermination()
  }
}

  • 启动Java程序
  • 往文件夹中放入文件a.txt
hello world
hello java
hello scala
hello spark
hello java
hello java
  • 相应的在控制台打印出了结果
-------------------------------------------
Time: 1608549950000 ms
-------------------------------------------

-------------------------------------------
Time: 1608549955000 ms
-------------------------------------------

-------------------------------------------
Time: 1608549960000 ms
-------------------------------------------

-------------------------------------------
Time: 1608549965000 ms
-------------------------------------------
(hello,6)
(java,3)
(world,1)
(scala,1)
(spark,1)

-------------------------------------------
Time: 1608549970000 ms
-------------------------------------------

在这里插入图片描述
注意:原理就是监听指定的文件夹,在监听期间,如果有新的文本被移动到该文件夹中,它就能检测到(毕竟SparkStreaming就是流式计算嘛,这样就可以理解为,监控该文件夹的数据流)。
要特别注意的是(我在这里错了好久,而且问了好多人都不知道为啥出错):它不会读取原本就已经存在于该文件夹里的文本,只会读取在监听期间,传进文件夹的文本,而且该文本还有要求,必须是它最后一次更改并且保存的操作,是在监听开始的那一刻之后的。
其实意思就是,如果要向被监听的文件夹里传一个文本,你就要在监听开始之后,先打开这个文本,随便输入几个空格,或者回车,或者其他不影响文本内容的操作,然后保存,最后再传进文件夹里,这样它才能检测到这个被传进来的文本。(估计它这个用意是只监听被更改过的文本吧)

Spark Streaming和Kafka Stream联用

不显示输出过的结果

  • 编写Java程序
package nj.zb.kb09.spark

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreamKafkaSource {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamKafkaSource")

    val streamingContext = new StreamingContext(sparkConf,Seconds(5))

    val kafkaParams: Map[String, String] = Map(
      (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "192.168.136.100:9092"),
      (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.GROUP_ID_CONFIG, "kafkaGroup1")
    )
    val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(streamingContext,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe(Set("sparkKafkaDemo"),kafkaParams))

    val wordStream: DStream[String] = kafkaStream.flatMap(v=>v.value().toString.split("\\s+"))

    val mapStream: DStream[(String, Int)] = wordStream.map((_,1))

    val sumStream: DStream[(String, Int)] = mapStream.reduceByKey(_+_)

    sumStream.print()

    streamingContext.start()
    streamingContext.awaitTermination()


  }
}
  • 创建所需topic
[root@hadoop100 ~]# kafka-topics.sh --zookeeper 192.168.136.100:2181 --create --topic sparkKafkaDemo --partitions 1 --repliction-factor 1
  • 启动Java程序
  • 创建sparkKafkaDemo生产者信息
[root@hadoop100 ~]# kafka-console-producer.sh --topic sparkKafkaDemo --broker-list 192.168.136.100:9092
  • 在生产者输入数据
hello world
hello java
hello
hello scala
hello spark
hello
hello
world

在这里插入图片描述

  • 对应在Java程序控制台打印了结果
-------------------------------------------
Time: 1608551710000 ms
-------------------------------------------

-------------------------------------------
Time: 1608551715000 ms
-------------------------------------------
(hello,1)
(world,1)

-------------------------------------------
Time: 1608551720000 ms
-------------------------------------------
(hello,2)
(java,1)

-------------------------------------------
Time: 1608551725000 ms
-------------------------------------------
(hello,1)
(scala,1)

-------------------------------------------
Time: 1608551730000 ms
-------------------------------------------
(hello,3)
(spark,1)

-------------------------------------------
Time: 1608551735000 ms
-------------------------------------------
(world,1)

在这里插入图片描述

显示输出过的结果

  • 编写Java程序
package nj.zb.kb09.spark



import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreamKafkaSource {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamKafkaSource")

    val streamingContext = new StreamingContext(sparkConf,Seconds(5))





    val kafkaParams: Map[String, String] = Map(
      (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "192.168.136.100:9092"),
      (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.GROUP_ID_CONFIG, "kafkaGroup1")
    )
    val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(streamingContext,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe(Set("sparkKafkaDemo"),kafkaParams))

    val wordStream: DStream[String] = kafkaStream.flatMap(v=>v.value().toString.split("\\s+"))

    streamingContext.checkpoint("ckeckpoint")
    //有状态
    val stateSumStream: DStream[(String, Int)] = mapStream.updateStateByKey {
      case (seq, buffer) => {
        println(seq,seq.sum,buffer.getOrElse(0))
        val sum = buffer.getOrElse(0) + seq.sum
Option(sum)
      }
    }



    //sumStream.print()


    stateSumStream.print()


    streamingContext.start()
    streamingContext.awaitTermination()


  }
}

  • 创建所需的checkpoint文件夹

在这里插入图片描述

  • 创建sparkKafkaDemo生产者信息
[root@hadoop100 ~]# kafka-console-producer.sh --topic sparkKafkaDemo --broker-list 192.168.136.100:9092
  • 启动Java程序
  • 在sparkKafkaDemo生产信息
hello world
hello spark
hello scala
hello java
hello java
  • 相应的在控制台打印出了结果
-------------------------------------------
Time: 1608636755000 ms
-------------------------------------------

-------------------------------------------
Time: 1608636760000 ms
-------------------------------------------

(CompactBuffer(1),1,0)
(CompactBuffer(1),1,0)
-------------------------------------------
Time: 1608636765000 ms
-------------------------------------------
(hello,1)
(spark,1)

(CompactBuffer(),0,1)
(CompactBuffer(),0,1)
-------------------------------------------
Time: 1608636770000 ms
-------------------------------------------
(hello,1)
(spark,1)

(CompactBuffer(1, 1),2,1)
(CompactBuffer(1),1,0)
(CompactBuffer(),0,1)
(CompactBuffer(1),1,0)
-------------------------------------------
Time: 1608636775000 ms
-------------------------------------------
(hello,3)
(java,1)
(scala,1)
(spark,1)

(CompactBuffer(),0,1)
(CompactBuffer(),0,1)
(CompactBuffer(1),1,3)
(CompactBuffer(1),1,1)
-------------------------------------------
Time: 1608636780000 ms
-------------------------------------------
(hello,4)
(java,2)
(scala,1)
(spark,1)

(CompactBuffer(),0,4)
(CompactBuffer(),0,2)
(CompactBuffer(),0,1)
(CompactBuffer(),0,1)
-------------------------------------------
Time: 1608636785000 ms
-------------------------------------------
(hello,4)
(java,2)
(scala,1)
(spark,1)

在这里插入图片描述

自定义采集器

  • 编写Scala程序
package nj.zb.kb09.spark

import java.io.{BufferedReader, InputStreamReader}

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.receiver.Receiver

class MyReceiver(host:String,port:Int) extends Receiver[String](StorageLevel.MEMORY_ONLY){
  var socket:java.net.Socket=null
  def receive(): Unit = {
     socket= new java.net.Socket(host,port)
    val reader: BufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream,"UTF-8"))
    var line: String =null
    while ((line=reader.readLine() )!=null){
      if(line.equals("end")){
        return
      }else{
        this.store(line)
      }
    }
  }


  override def onStart(): Unit = {
    new Thread(new Runnable {




      override def run(): Unit = {
        receive()
      }
    }).start()
  }

  override def onStop(): Unit = {
    if(socket!=null){
      socket.close()
      socket=null
    }
  }
}
object MyReceiverDemo{
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("MyReceiverDemo")

    val streamingContext = new StreamingContext(sparkConf,Seconds(5))

    val receiverStream: ReceiverInputDStream[String] = streamingContext.receiverStream(new MyReceiver("192.168.136.100",7777))


    val lineStream: DStream[String] = receiverStream.flatMap(_.split("\\s+"))

    val mapStream: DStream[(String, Int)] = lineStream.map((_,1))

    val sumStream: DStream[(String, Int)] = mapStream.reduceByKey(_+_)

    sumStream.print()

      streamingContext.start()
    streamingContext.awaitTermination()

  }
}

  • 启动Scala程序
  • 启动端口服务
[root@hadoop100 ~]# nc -lk 7777
  • 输入数据
hello
hello world
hello java
hello java
end
hello java
hello world
  • 相应的在控制台打印出了结果
-------------------------------------------
Time: 1608564875000 ms
-------------------------------------------

-------------------------------------------
Time: 1608564880000 ms
-------------------------------------------
(hello,1)

-------------------------------------------
Time: 1608564885000 ms
-------------------------------------------
(hello,1)
(world,1)

-------------------------------------------
Time: 1608564890000 ms
-------------------------------------------
(hello,1)
(java,1)

-------------------------------------------
Time: 1608564895000 ms
-------------------------------------------
(hello,1)
(java,1)

-------------------------------------------
Time: 1608564900000 ms
-------------------------------------------

-------------------------------------------
Time: 1608564905000 ms
-------------------------------------------

-------------------------------------------
Time: 1608564910000 ms
-------------------------------------------

注意:会发现在我们输入“end”后,后面的输入的数据再也进行不了收集了,是因为我们指定了输入“end”结束收集,虽然收集一直在继续,但是后续的输入数据都接受不到。

标签:String,hello,-------------------------------------------,Streaming,采集器,apache,or
来源: https://blog.csdn.net/dsjia2970727/article/details/111447117

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有