ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Spark Streaming(二)—— 创建DStream

2022-01-07 13:35:15  阅读:187  来源: 互联网

标签:String val Streaming apache org spark DStream Spark ssc


1、文件数据源案例

需求:读取hdfs上的Herry.txt文件,进行词频统计

package com.zch.spark.streaming
​
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
​
/**
 * Author: zhaoHui
 * Date: 2022/01/06
 * Time: 14:29
 * Description: 
 */
object sparkStreaming01_FileWordCount {
  def main(args: Array[String]): Unit = {
    //1.初始化 Spark 配置信息
    val sparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("StreamWordCount")
    
    //2.初始化 SparkStreamingContext
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    
    //3.监控文件夹创建 DStream
    val dirStream = ssc.textFileStream("hdfs://zhaohui01:8020/Herry.txt")
    
    //4.将每一行数据做切分,形成一个个单词
    val wordStreams = dirStream.flatMap(_.split("\t"))
    
    //5.将单词映射成元组(word,1)
    val wordAndOneStreams = wordStreams.map((_, 1))
    
    //6.将相同的单词次数做统计
    val wordAndCountStreams = wordAndOneStreams.reduceByKey(_ + _)
    
    //7.打印
    wordAndCountStreams.print()
    
    //8.启动 SparkStreamingContext
    ssc.start()
    ssc.awaitTermination()
  }
​
}

2、创建DStream Queue

package com.zch.spark.streaming
​
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
​
import scala.collection.mutable
​
/**
 * Author: zhaoHui
 * Date: 2022/01/03
 * Time: 18:18
 * Description: 
 */
object sparkStreaming02_Queue {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("queue")
​
    val ssc = new StreamingContext(sparkConf, Seconds(3))
​
    // 创建RDD队列
    val rddQueue = new mutable.Queue[RDD[Int]]()
    // 创建QueueInputDStream
    val inputStream = ssc.queueStream(rddQueue, oneAtATime = false)
​
    val mappedStream = inputStream.map((_, 1))
    val reducedStream = mappedStream.reduceByKey(_ + _)
    reducedStream.print()
​
    ssc.start()
​
    for(i <- 1 to 10){
      rddQueue += ssc.sparkContext.makeRDD(1 to 300 , 10)
      Thread.sleep(2000)
    }
  }
}

3、自定义数据源

需要继承Receiver,并实现onStart和onStop方法来自定义数据源

class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY){
​
  private var flag = true
​
  override def onStart(): Unit = {
​
    new Thread (() => while (flag){
​
      val string ="采集的数据为:"+ new Random().nextInt(10).toString
        store(string)
        Thread.sleep(500)
    }).start()
  }
​
  override def onStop(): Unit = {
​
    flag = false
  }
}

4、Kafka数据源(重点)

需求:通过SparkStreaming从Kafka读取数据

1.导入依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
    <version>3.1.2</version>
</dependency>

2.编写代码

package com.zch.spark.streaming
​
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
​
/**
 * Author: zhaoHui
 * Date: 2022/01/03
 * Time: 19:03
 * Description: 
 */
object sparkStreaming04_Kafka {
  def main(args: Array[String]): Unit = {
​
    val sparkConf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("kafka")
​
    val ssc = new StreamingContext(sparkConf, Seconds(3))
​
    val kafkaParam: Map[String, String] = Map[String, String](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG ->
        "zhaohui01:9092,zhaohui02:9092,zhaohui03:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "__consumer",
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
​
    )
​
    val kafkaDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](Set("first01"), kafkaParam)
    )
​
    kafkaDS.map(_.value()).print()
​
    ssc.start()
    ssc.awaitTermination()
  }
​
}

标签:String,val,Streaming,apache,org,spark,DStream,Spark,ssc
来源: https://blog.csdn.net/chaohui2638457321/article/details/122362106

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有