ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

spark streaming整合kafka中非聚合类运算如何和kafka保持exactly once一致性语义(幂等性方式)

2022-04-07 12:31:05  阅读:165  来源: 互联网

标签:String val exactly Bytes 偏移量 kafka streaming toBytes put


object KafkaToHbase {

  def main(args: Array[String]): Unit = {


    //true a1 g1 ta,tb
    val Array(isLocal, appName, groupId, allTopics) = args

    val conf = new SparkConf()
      .setAppName(appName)

    if (isLocal.toBoolean) {
      conf.setMaster("local[*]")
    }

    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val ssc: StreamingContext = new StreamingContext(sc, Milliseconds(5000))

    val topics = allTopics.split(",")

    //SparkSteaming跟kafka整合的参数
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "node-1.51doit.cn:9092,node-2.51doit.cn:9092,node-3.51doit.cn:9092",
      "key.deserializer" -> classOf[StringDeserializer].getName,
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "group.id" -> groupId,
      "auto.offset.reset" -> "earliest", //如果没有记录偏移量,第一次从最开始读,有偏移量,接着偏移量读
      "enable.auto.commit" -> (false: java.lang.Boolean) //消费者不自动提交偏移量
    )

    //查询历史偏移量【上一次成功写入到数据库的偏移量】
    val historyOffsets: Map[TopicPartition, Long] = OffsetUtils.queryHistoryOffsetFromHbase("myorder", groupId)

    //跟Kafka进行整合,需要引入跟Kafka整合的依赖
    //createDirectStream更加高效,使用的是Kafka底层的消费API,消费者直接连接到Kafka的Leader分区进行消费
    //直连方式,RDD的分区数量和Kafka的分区数量是一一对应的【数目一样】
    val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent, //调度task到Kafka所在的节点
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, historyOffsets) //指定订阅Topic的规则, 从历史偏移量接着读取数据
    )

    kafkaDStream.foreachRDD(rdd => {

      if (!rdd.isEmpty()) {

        //获取KakfaRDD的偏移量
        val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

        //获取KafkaRDD中的数据
        val lines: RDD[String] = rdd.map(_.value())

        val orderRDD: RDD[Order] = lines.map(line => {
          var order: Order = null
          try {
            order = JSON.parseObject(line, classOf[Order])
          } catch {
            case e: JSONException => {
              //TODO
            }
          }
          order
        })
        //过滤问题数据
        val filtered: RDD[Order] = orderRDD.filter(_ != null)

        filtered.foreachPartition(iter => {
          if (iter.nonEmpty) {
            //先获取当前Task的分区编号,然后根据Task分区编号再获取当前分区的偏移量
            val offsetRange = offsetRanges(TaskContext.get.partitionId)
            //获取一个Hbase的Connection【在Executor端获取的】
            val connection: Connection = HBaseUtil.getConnection("node-1.51doit.cn,node-2.51doit.cn,node-3.51doit.cn", 2181)
            val t_orders: Table = connection.getTable(TableName.valueOf("myorder"))

            //定义一个集合,将数据先缓存到集合中
            val puts = new util.ArrayList[Put]()
            //迭代分区中的每一条数据
            iter.foreach(o => {
              // new 了一个put,就是hbase一行数据
              val put = new Put(Bytes.toBytes(o.oid))

              //put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("order_id"), Bytes.toBytes(o.oid))
              put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("total_money"), Bytes.toBytes(o.totalMoney))

              //如果是一个批次中的最后一条数据,将偏移量和数据同时写入Hbase的同一行中
              if (!iter.hasNext) {
                val topic = offsetRange.topic
                val partition = offsetRange.partition
                val untilOffset = offsetRange.untilOffset
                put.addColumn(Bytes.toBytes("offset"), Bytes.toBytes("groupid"), Bytes.toBytes(groupId))
                put.addColumn(Bytes.toBytes("offset"), Bytes.toBytes("topic_partition"), Bytes.toBytes(topic + "_" + partition))
                put.addColumn(Bytes.toBytes("offset"), Bytes.toBytes("offset"), Bytes.toBytes(untilOffset))
              }

              puts.add(put)
              //            if (puts.size() % 5 == 0) {
              //              t_orders.put(puts)
              //              puts.clear()
              //            }

            })
            //批量写入
            t_orders.put(puts)
            //关闭Hbase的table
            t_orders.close()
            //关闭Hbase连接
            connection.close()

          }
        })

      }

    })

    ssc.start()

    ssc.awaitTermination()

  }
}

 

标签:String,val,exactly,Bytes,偏移量,kafka,streaming,toBytes,put
来源: https://www.cnblogs.com/xstCoding/p/16111676.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有