ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

kafka整合sparkStreaming及优化

2020-01-28 20:08:09  阅读:337  来源: 互联网

标签:sparkStreaming 优化 kafka streaming offset spark Kafka 数据


package streaming

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object KafkaWordCount {

  def main(args: Array[String]): Unit = {

    //1、创建StreamingContext
    val ssc = new StreamingContext(new SparkContext(new SparkConf().setMaster("local[4]").setAppName("kafka")),Seconds(5))

    ssc.sparkContext.setLogLevel("warn")
    //2、消费kafka数据
    //消费topic
    val topics = Array("spark10")

    //kafka参数配置
    val kafkaParams = Map[String, Object](
    //指明kafka borkerlist
    "bootstrap.servers" -> "hadoop01:9092,hadoop02:9092,hadoop03:9092",
    //kafka message key反序列化器
    "key.deserializer" -> classOf[StringDeserializer],
    //kafka message value反序列化器
    "value.deserializer" -> classOf[StringDeserializer],
    //消费者组
    "group.id" -> "kafka_spark_10",
    //指明从什么位置开始消费kafka , latest 从最新的offset开始消费 , earliest 从小的offset开始消费
    "auto.offset.reset" -> "latest",
    //是否自动提交offset
    "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val source: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String,String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](topics,kafkaParams))
    //3、数据处理
    //数据处理、offset更新
    source.foreachRDD(rdd=>{
      //打印消息
      rdd.map(_.value()).foreach(println(_))
      //获取当前消费的offset
      val offset = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      //提交offset
      source.asInstanceOf[CanCommitOffsets].commitAsync(offset)
    })
    //4、启动streaming
    ssc.start()
    //5、阻塞主线程
    ssc.awaitTermination()
  }
}

kafka优化
网络和io操作线程配置优化:
# broker处理消息的最大线程数
    num.network.threads=xxx
#  broker处理磁盘IO的线程数
    num.io.threads=xxx
# 加入队列的最大请求数,超过该值,network thread阻塞
    queued.max.requests=5000
# server使用的send buffer大小。
    socket.send.buffer.bytes=1024000
# server使用的recive buffer大小。
    socket.receive.buffer.bytes=1024000
为了大幅度提高producer写入吞吐量,需要定期批量写文件
# 每当producer写入10000条消息时,刷数据到磁盘
  log.flush.interval.messages=10000
# 每间隔1秒钟时间,刷数据到磁盘
  log.flush.interval.ms=1000
  当kafka server的被写入海量消息后,会生成很多数据文件,且占用大量磁盘空间,如果不及时清理,可能磁盘空间不够用,kafka默认是保留7天。
  # 保留三天,也可以更短 
  log.retention.hours=72
# 段文件配置1GB,有利于快速回收磁盘空间,重启kafka加载也会加快(如果文件过小,则文件数量比较多,kafka启动时是单线程扫描目录(log.dir)下所有数据文件)
   log.segment.bytes=1073741824 
   
数据重复消费和顺序消费问题:
Kafka直接写入页缓存page cache, 然后在持久化到磁盘
数据不丢失问题:
Kafka宕机,Leader 切换时可能导致数据丢失:
1.必须要求至少一个 Follower 在 ISR 列表里
2.每次写入数据的时候,要求 Leader 写入成功以外,至少一个 ISR 里的 Follower 也写成功

在broker中,保证数据不丢失主要是通过副本因子(冗余),防止数据丢失
在消费者消费数据的时候,只要每个消费者记录好offset值即可,就能保证数据不丢失
spark Streaming 优化
对于数据量较小的情况,一般是不会暴露问题的,但是数据量增大后,就会暴露各种问题,这就需要进行一些调优和参数配置。
1. 合理的批处理时间(batchDuration)
sparkStreaming在不断接受数据的同时,需要处理数据的时间,所以如果设置过短的批处理时间,会造成数据堆积,即未完成batch数据越来越多,从而发生阻塞
batchDuration本身不能小于500ms,这会导致sparkStreaming频繁提交作业,造成额外开销,减少整个系统的吞吐量,如果时间过长也会性能
合理的批处理时间,需要根据应用本身,集群资源情况,以及关注和监控spark streaming系统运行情况来调整,重点关注界面中的TotalDelay
2.合理的Kafka拉取量(maxRatePerPartition即Kafka每个partition拉取的数据的上限)
这个值默认是无上限的,即Kafka有多少数据,spark streaming就会一次性全拉出,但是批处理的时间是一定的,不会动态变化,如果Kafka这个数据频率过高就会照成数据堆积,阻塞
数据总量等于Kafka拉取数据量*partition数量调整两个参数 maxRatePerPartition 和 batchDuration 使得数据的拉取和处理能够平衡,尽可能地增加整个系统的吞吐量,可以观察监控界面中的 InputRate 和 ProcessingTime
3.用cache()函数将反复使用数据流Dstream缓存.防止过度调度资源造成网络开销
关注SchedulingDelay参数
4.设置合理的GC方式
对于 Spark 而言,垃圾回收采用 G1GC,而 SparkStreaming 采用 CMS
5.设置合理的parallelism
在spark streaming+Kafka中,我们采用Direct连接方式,spark中的partition和Kafka中的partition是一一对应的,一般默认设置Kafka中partition的数量
6.设置合理的CPU核数,内存,和executor的数量
7.使用高性能算子,Kryo优化序列化
8.在流式处理系统中,我们需要兼顾数据的接收和数据处理,即消费数据的速率要赶上生产数据的速率。当发现生产数据速率过慢时,可以考虑增加并行度,使用更多的接收器(Receiver);如果处理速度过慢,可以考虑加机器、优化程序逻辑及 GC 优化等方式
9.合理的内存管理
Spark 对于内存的使用主要有两类用途:执行(execution)和存储(storage)。执行类内存主要被用于 Shuffle 类操作、join 操作及排序(sort)和聚合(aggregation)类操作,而存储类内存主要用于缓存数据(caching)和集群间内部数据的传送, 执行类内存可以剥夺存储类内存空间,但是存储类内存空间有一个最低阈值会保证保留。
	

Jeady· 发布了25 篇原创文章 · 获赞 4 · 访问量 702 私信 关注

标签:sparkStreaming,优化,kafka,streaming,offset,spark,Kafka,数据
来源: https://blog.csdn.net/qq_43149023/article/details/104102002

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有