ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

spark streaming整合kafka中聚合类运算如何和kafka保持exactly once一致性语义(mysql方式,利用事务)

2022-04-05 21:34:57  阅读:188  来源: 互联网

标签:String val exactly 偏移量 kafka streaming MySQL Kafka


/**
  * 从Kafka读取数据,实现ExactlyOnce,偏移量保存到MySQL中
  * 1.将聚合好的数据,收集到Driver端,
  * 2.然后建计算好的数据和偏移量在一个事物中同时保存到MySQL中
  * 3.成功了提交事物
  * 4.失败了让这个任务重启
  *
  * MySQL数据库中有两张表:保存计算好的结果、保存偏移量
  */
object ExactlyOnceWordCountOffsetStoreInMySQL {

  def main(args: Array[String]): Unit = {

    //true a1 g1 ta,tb
    val Array(isLocal, appName, groupId, allTopics) = args


    val conf = new SparkConf()
      .setAppName(appName)

    if (isLocal.toBoolean) {
      conf.setMaster("local[*]")
    }


    //创建StreamingContext,并指定批次生成的时间
    val ssc = new StreamingContext(conf, Milliseconds(5000))
    //设置日志级别
    ssc.sparkContext.setLogLevel("WARN")

    //SparkStreaming 跟kafka进行整合
    //1.导入跟Kafka整合的依赖
    //2.跟kafka整合,创建直连的DStream【使用底层的消费API,效率更高】

    val topics = allTopics.split(",")

    //SparkSteaming跟kafka整合的参数
    //kafka的消费者默认的参数就是每5秒钟自动提交偏移量到Kafka特殊的topic中: __consumer_offsets
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "node-1.51doit.cn:9092,node-2.51doit.cn:9092,node-3.51doit.cn:9092",
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "group.id" -> groupId,
      "auto.offset.reset" -> "earliest" //如果没有记录偏移量,第一次从最开始读,有偏移量,接着偏移量读
      , "enable.auto.commit" -> (false: java.lang.Boolean) //消费者不自动提交偏移量
    )

    //在创建KafkaDStream之前要先读取MySQL数据库,查询历史偏移量,没有就从头读,有就接着读
    //offsets: collection.Map[TopicPartition, Long]
    val offsets: Map[TopicPartition, Long] = OffsetUtils.queryHistoryOffsetFromMySQL(appName, groupId)

    //跟Kafka进行整合,需要引入跟Kafka整合的依赖
    //createDirectStream更加高效,使用的是Kafka底层的消费API,消费者直接连接到Kafka的Leader分区进行消费
    //直连方式,RDD的分区数量和Kafka的分区数量是一一对应的【数目一样】
    val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent, //调度task到Kafka所在的节点
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets) //指定订阅Topic的规则
    )

    kafkaDStream.foreachRDD(rdd => {

      //判断当前批次的RDD是否有数据
      if (!rdd.isEmpty()) {

        //获取RDD所有分区的偏移量
        val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

        //实现WordCount业务逻辑
        val words: RDD[String] = rdd.flatMap(_.value().split(" "))
        val wordsAndOne: RDD[(String, Int)] = words.map((_, 1))
        val reduced: RDD[(String, Int)] = wordsAndOne.reduceByKey(_ + _)
        //将计算好的结果收集到Driver端再写入到MySQL中【保证数据和偏移量写入在一个事物中】
        //触发Action,将数据收集到Driver段
        val res: Array[(String, Int)] = reduced.collect()

        //创建一个MySQL的连接【在Driver端创建】
        //默认MySQL自动提交事物

        var connection: Connection = null
        var ps1: PreparedStatement = null
        var ps2: PreparedStatement = null
        try {
          connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "123456")
          //不要自动提交事物
          connection.setAutoCommit(false)

          ps1 = connection.prepareStatement("INSERT INTO t_wordcount (word, counts) VALUES (?, ?) ON DUPLICATE KEY UPDATE counts = counts + ?")
          //将计算好的WordCount结果写入数据库表中,但是没有提交事物
          for (tp <- res) {
            ps1.setString(1, tp._1)
            ps1.setLong(2, tp._2)
            ps1.setLong(3, tp._2)
            ps1.executeUpdate() //没有提交事物,不会讲数据真正写入到MySQL
          }

          //(app1_g001, wc_0) ->  1000
          ps2 = connection.prepareStatement("INSERT INTO t_kafka_offset (app_gid, topic_partition, offset) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE offset = ?")
          //将偏移量写入到MySQL的另外一个表中,也没有提交事物
          for (offsetRange <- offsetRanges) {
            //topic名称
            val topic = offsetRange.topic
            //topic分区编号
            val partition = offsetRange.partition
            //获取结束偏移量
            val untilOffset = offsetRange.untilOffset
            //将结果写入MySQL
            ps2.setString(1, appName + "_" + groupId)
            ps2.setString(2, topic + "_" + partition)
            ps2.setLong(3, untilOffset)
            ps2.setLong(4, untilOffset)
            ps2.executeUpdate()
          }

          //提交事物
          connection.commit()

        } catch {
          case e: Exception => {
            //回滚事物
            connection.rollback()
            //让任务停掉
            ssc.stop()
          }
        } finally {
          if(ps2 != null) {
            ps2.close()
          }
          if(ps1 != null) {
            ps1.close()
          }
          if(connection != null) {
            connection.close()
          }
        }
      }
    })


    ssc.start()

    ssc.awaitTermination()


  }
}

标签:String,val,exactly,偏移量,kafka,streaming,MySQL,Kafka
来源: https://www.cnblogs.com/xstCoding/p/16103955.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有