ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Spark Streaming 数据限流简述

2020-02-06 15:38:05  阅读:294  来源: 互联网

标签:数据源 分区 偏移量 Kafka Streaming 限流 速率 Spark


Spark Streaming对实时数据流进行分析处理,源源不断的从数据源接收数据切割成一个个时间间隔进行处理;

流处理与批处理有明显区别,批处理中的数据有明显的边界、数据规模已知;而流处理数据流并没有边界,也未知数据规模;

由于流处理的数据流特征,使之数据流具有不可预测性,而且数据处理的速率还与硬件、网络等资源有关,在这种情况下如不对源源不断进来的数据流速率进行限制,那当Spark节点故障、网络故障或数据处理吞吐量下来时还有数据不断流进来,那将有可能将出现OOM进而导致Spark Streaming程序崩溃;

在Spark Streaming中不同的数据源采用不同的限速策略,但无论是Socket数据源的限流策略还是Kafka数据源的限流策略其速率(rate)的计算都是使用PIDController算法进行计算而得来;

下面从源码的角度分别介绍 Socket数据源

Kafka数据源

的限流处理。

速率限制的计算与更新

Spark Streaming的流处理其实是基于微批处理(MicroBatch)的,也就是说将数据流按某比较小的时间间隔将数据切割成为一段段微批数据进行处理;

添加监听器

StreamingContext调用Start()启动的时候会将速率控制器(rateController)添加到StreamingListener监听器中;

当每批次处理完成时将触发监听器(RateController),使用该批处理的处理结束时间、处理延迟时间、调度延迟时间、记录行数调用PIDRateEstimator传入PID算法中(PID Controller)计算出该批次的速率(rate)并更新速率限制(rateLimit)与发布该限制速率;

override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
val elements = batchCompleted.batchInfo.streamIdToInputInfo

for {
  processingEnd <- batchCompleted.batchInfo.processingEndTime
  workDelay <- batchCompleted.batchInfo.processingDelay
  waitDelay <- batchCompleted.batchInfo.schedulingDelay
  elems <- elements.get(streamUID).map(_.numRecords)
 } computeAndPublish(processingEnd, elems, workDelay, waitDelay)
}

private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =
Future[Unit] {
  val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay)
  newRate.foreach { s =>
    rateLimit.set(s.toLong)
    publish(getLatestRate())
  }
}

Socket数据源限流

批次的限制速率上面已经算出,这里说的是接收Socket过来的数据时的数据限流;

SocketInputStream类receive方法接收到数据后将数据存入 BlockGenerator的Buffer中,在写入Buffer前调用限流器 (RateLimiter)对写入数据进行限流;

RateLimiter限流器使用了Google开源的 Guava中内置的RateLimiter限流器,该类只是对Guava限流器的简单封装;

 

在Spark Streaming中可通过使用两个参数配置初始速率与最大速率spark.streaming.receiver.maxRate、spark.streaming.backpressure.initialRate;亦可配置PIDController算法相关的四个参数值;

RateLimiter限流器是基于令牌桶的算法基本原理比较简单,如果你对大数据开发感兴趣,想系统学习大数据的话,可以加入大数据技术学习交流扣扣君羊:522189307,欢迎添加,了解课程介绍,获取学习资源。以一个恒定的速率生成令牌放入令牌桶中,桶满则停止,处理请求时需要从令牌桶中取出令牌,当桶中无令牌可取时阻塞等待,此算法用于确保系统不被洪峰击垮。

private lazy val rateLimiter = GuavaRateLimiter.create(getInitialRateLimit().toDouble)
/**
  * Push a single data item into the buffer.
 */
 def addData(data: Any): Unit = {
  if (state == Active) {
//调用限流器等待
  waitToPush()
  synchronized {
    if (state == Active) {
      currentBuffer += data
    } else {
      throw new SparkException(
        "Cannot add data as BlockGenerator has not been started or has been stopped")
    }
  }
} else {
  throw new SparkException(
    "Cannot add data as BlockGenerator has not been started or has been stopped")
 }
}

def waitToPush() {
   //限流器申请令牌
   rateLimiter.acquire()
}

Guava库中RateLimiter限流器基本使用:

//创建限流器,每秒产生令牌数1
    RateLimiter rateLimiter=RateLimiter.create(1);
    for (int i = 0; i < 10; i++) {
        //获得一个令牌,未申请到令牌则阻塞等待
        double waitTime = rateLimiter.acquire();
        System.out.println(String.format("id:%d time:%d waitTime:%f",i,System.currentTimeMillis(),waitTime));
    }

Kafka数据源限流的实现

在Spark Streaming Kafka包拉取Kafka数据会进行如下动作:

1、取Kafka中最新偏移量、分区

2、通过rateController限制每个分区可拉取的最大消息数

3、在DirectKafkaInputDStream中创建KafkaRDD,在其中调用相关对象拉取数据

通过如上步骤也可用看出,只要限制了Kafka某个分区的偏移量(offset)范围也就可限制从Kafka拉取的消息数量,从而达到限流的目的,Spark streaming kafka也是通过此实现的;

计算每个分区速率限制,有如下步骤:

1、通过seekToEnd获取最新可用偏移量与当前偏移量对比获得当前所有分区延迟偏移量

单个分区偏移量延迟=最新偏移量记录-当前偏移量记录

2、获取配置项中每个分区最大速率 (spark.streaming.kafka.maxRatePerPartition),背压率计算,计算每个分区背压率计算公式为:

单个分区背压率=单个分区偏移量延迟/所有分区总延迟*速率限制

速率限制(rateLimit):为通过PIDController动态计算得来

如有配置 每个分区最大速率 则取配置项最大速率与 背压率 两者中的 最小值 , 未配置 则取 背压率 作为每个分区速率限制;

3、将批次间隔(batchDuration)*每个分区速率限制=每个分区最大消息数

4、取当前分区偏移量+分区最大消息数 与 最新偏移量两者当中最小的,由此来控制拉取消息速率;

如当前偏移量+分区最大消息数 大于 最新偏移量则取 最新偏移量否则取 当前偏移量+分区最大消息数作为 拉取Kafka数据的Offset范围 ;

// 限制每个分区最大消息数
protected def clamp(
offsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = {

maxMessagesPerPartition(offsets).map { mmp =>
  mmp.map { case (tp, messages) =>
      val uo = offsets(tp)
      tp -> Math.min(currentOffsets(tp) + messages, uo)
  }
}.getOrElse(offsets)
}

不管是Kafka数据源还是Socket数据源Spark Streaming中都使用了 PIDController 算法用于计算其 速率限制 值,两者的差别也只是因为两种数据源的获取方式 数据特征而决定 的。Socket数据源使用了Guava RateLimiter、而Kafka数据源自己实现了 基于Offsets的限流 ;

以上

所 介绍的框架版本为: Spark Streaming 版本为2.3.2与spark-streaming-kafka-0-10_2.11;

大数据基础入门教程 发布了113 篇原创文章 · 获赞 0 · 访问量 3844 私信 关注

标签:数据源,分区,偏移量,Kafka,Streaming,限流,速率,Spark
来源: https://blog.csdn.net/mnbvxiaoxin/article/details/104196952

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有