ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

大数据学习(29)—— Spark Streaming

2021-09-26 12:33:27  阅读:189  来源: 互联网

标签:RDD 数据源 29 Streaming 数据流 Spark DStream


Spark提供了DataFrame和DataSet API来处理批量数据,它们把数据转换成RDD,在内存中以迭代器的方式不落盘处理,所以效率很高。但它有一个弊端,就是不能准实时计算数据变化。

为了解决上述问题,Spark引入了Spark Stream来处理准流式数据。为啥说准流式呢?因为它本质上还是批处理,只不过这个批相当小,是微批,接近流式计算,以秒级的时间窗口来处理一批数据。 

接下来,老规矩,看官网Spark Streaming

概述

Spark Streaming是对核心Spark API的一个扩展,它能够实现对实时数据流的流式处理,并具有很好的可扩展性、高吞吐量和容错性。Spark Streaming支持从多种数据源提取数据,如:Kafka、Flume、Twitter、ZeroMQ、Kinesis以及TCP套接字,并且可以提供一些高级API来表达复杂的处理算法,如:map、reduce、join和window等。最后,Spark Streaming支持将处理完的数据推送到文件系统、数据库或者实时仪表盘中展示。实际上,你完全可以将Spark的机器学习(machine learning) 和 图计算(graph processing)的算法应用于Spark Streaming的数据流当中。

这个图片是对上述文字的可视化展现。从数据源获取数据,加工处理后存储起来或者展示。

下图展示了Spark Streaming的内部工作原理。Spark Streaming从实时数据流接入数据,再将其划分为一个个小批量供后续Spark engine处理,所以实际上,Spark Streaming是按一个个小批量来处理数据流的。

 

数据流经过Spark Streaming的窗口之后,就变成了微批,对微批的处理也是RDD的DAG,处理完毕变成批处理结果,再加上你的处理逻辑就可以存储或者展示了。

Spark Streaming为这种持续的数据流提供了的一个高级抽象,即:discretized stream(离散数据流)或者叫DStream。DStream既可以从输入数据源创建得来,如:Kafka、Flume或者Kinesis,也可以从其他DStream经一些算子操作得到。其实在内部,一个DStream就是包含了一系列RDD。

Spark Streaming架构

  • Master:主要负责整体集群资源的管理和应用程序调度,上图中没有画出来
  • Worker:负责单个节点的资源管理,driver 和 executor 的启动等
  • Driver:用户入口程序执行的地方,即 SparkContext 执行的地方,主要是 DAG 生成、stage 划分、task 生成及调度
  • Executor:负责执行 task,反馈执行状态和执行结果。

离散数据流 (DStreams)

离散数据流(DStream)是Spark Streaming最基本的抽象。它代表了一种连续的数据流,要么从某种数据源提取数据,要么从其他数据流映射转换而来。DStream内部是由一系列连续的RDD组成的,每个RDD都是不可变、分布式的数据集,每个RDD都包含了特定时间间隔内的一批数据,如下图所示。

 

任何作用于DStream的算子,其实都会被转化为对其内部RDD的操作。例如,在前面的例子中,我们将 lines 这个DStream转成words DStream对象,其实作用于lines上的flatMap算子,会施加于lines中的每个RDD上,并生成新的对应的RDD,而这些新生成的RDD对象就组成了words这个DStream对象。如下所示。

底层的RDD转换仍然是由Spark引擎来计算。DStream的算子将这些细节隐藏了起来,并为开发者提供了更为方便的高级API。

输入DStream和接收器

输入DStream代表从某种流式数据源流入的数据流。在之前的例子里,lines 对象就是输入DStream,它代表从netcat server收到的数据流。每个输入DStream(除文件数据流外)都和一个接收器(Receiver)相关联,而接收器则是专门从数据源拉取数据到内存中的对象。

Spark Streaming主要提供两种内建的流式数据源:

  • 基础数据源(Basic sources): 在StreamingContext API 中可直接使用的源,如:文件系统,套接字连接或者Akka actor。
  • 高级数据源(Advanced sources): 需要依赖额外工具类的源,如:Kafka、Flume、Kinesis、Twitter等数据源。这些数据源都需要增加额外的依赖。

注意,如果你需要同时从多个数据源拉取数据,那么你就需要创建多个DStream对象。多个DStream对象其实也就同时创建了多个数据流接收器。但是请注意,Spark的worker/executor 都是长期运行的,因此它们都会各自占用一个分配给Spark Streaming应用的CPU。所以,在运行Spark Streaming应用的时候,需要注意分配足够的CPU core(本地运行时,需要足够的线程)来处理接收到的数据,同时还要足够的CPU core来运行这些接收器。

注意

  • 如果本地运行Spark Streaming应用,记得不能将master设为”local” 或 “local[1]”。这两个值都只会在本地启动一个线程。而如果此时你使用一个包含接收器(如:套接字、Kafka、Flume等)的输入DStream,那么这一个线程只能用于运行这个接收器,而处理数据的逻辑就没有线程来执行了。因此,本地运行时,一定要将master设为”local[n]”,其中 n > 接收器的个数。
  • 将Spark Streaming应用置于集群中运行时,同样,分配给该应用的CPU core数必须大于接收器的总数。否则,该应用就只会接收数据,而不会处理数据。

实战

下面我将用一段Java代码向本级端口发送数据,Spark Streaming从该端口接收数据并在时间窗口内统计单词个数。

首先引入Spark Streming的maven依赖。

      <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_2.12</artifactId>
          <version>3.1.2</version>
      </dependency>

接着编写一段发送数据的Java程序。

public class SendMessage {
    public static void main(String[] args) {
        new Thread(new Runnable() {
            int i = 0;

            public void run() {
                try {

                    //创建一个serversocket绑定的端口:6666
                    ServerSocket s = new ServerSocket(6666);

                    Socket client = s.accept();

                    //获取输出流
                    OutputStream out = client.getOutputStream();

                    while(true) {
                        i++;
                        //向6666端口发送数据hello n
                        out.write(new String("hello " + i+"\n").getBytes());
                        sleep(1000);
                    }

                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

然后再用scala写一段Spark Streaming程序来接收并处理数据。

object StreamingTest {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("test").setMaster("local[3]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")  //设置输出日志级别
    val ssc = new StreamingContext(sc, Seconds(2)) //两秒一个微批

    val ds = ssc.socketTextStream("localhost",6666)
    val result = ds.flatMap(_.split(" ")).map((_,1)).reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(10),Seconds(6))  //10秒的窗口,每6秒滑动一次
    
    result.print()
    ssc.start()

    ssc.awaitTermination()
  }
}

首先启动Socket服务端,并写入数据。接着启动spark streaming程序,控制台输出如下。稳定之后,每个窗口处理5个批次的数据,有10个hello。

-------------------------------------------
Time: 1632630350000 ms
-------------------------------------------
(3,1)
(4,1)
(hello,5)
(1,1)
(5,1)
(2,1)

-------------------------------------------
Time: 1632630356000 ms
-------------------------------------------
(6,1)
(9,1)
(3,1)
(4,1)
(7,1)
(hello,10)
(10,1)
(8,1)
(5,1)
(2,1)
(11,1)

-------------------------------------------
Time: 1632630362000 ms
-------------------------------------------
(15,1)
(9,1)
(12,1)
(16,1)
(hello,10)
(13,1)
(10,1)
(8,1)
(14,1)
(17,1)
(11,1)

标签:RDD,数据源,29,Streaming,数据流,Spark,DStream
来源: https://www.cnblogs.com/burningblade/p/15134032.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有