ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Spark Streaming 消费kafka数据,并手动维护offset

2021-01-11 10:02:11  阅读:454  来源: 互联网

标签:String val kafka Streaming offset import groupId pstmt


1.简介

本文基于redis和mysql分别实现手动维护消费kafka的offset。

2.代码实现

2.1基于redis

import java.util

import com.bigdata.analysis.travel.RedisUtil
import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange
import redis.clients.jedis.Jedis

import scala.collection.mutable

/**
  * @ description: 手动维护offset到redis
  * @ author: spencer
  * @ date: 2021/1/8 14:10
  */
object RedisOffsetManager extends OffsetManager {
  /**
    * 获取redis中的偏移量
    * @param topics
    * @param groupId
    * @return
    */
  override def getOffset(topics: Array[String], groupId: String): mutable.Map[TopicPartition, Long] = {
    val jedis: Jedis = RedisUtil.getJedis()
    val offsetMap = mutable.Map[TopicPartition, Long]()
    for (topic <- topics) {
      val map: util.Map[String, String] = jedis.hgetAll(topic)

      import scala.collection.JavaConverters._
      for ((groupidAnd, offset) <- map.asScala){
        val group: String = groupidAnd.split("\\|")(0)
        val partition: Int = groupidAnd.split("\\|")(1).toInt
        if (group == groupId){
          offsetMap += new TopicPartition(topic, partition) -> offset.toLong
        }
      }

    }
    offsetMap
  }

  /**
    * 更新偏移量到redis
    * @param groupId
    * @param offsetRanges
    */
  override def saveOffset(groupId: String, offsetRanges: Array[OffsetRange]) = {
    val jedis: Jedis = RedisUtil.getJedis()
    for (offset <- offsetRanges) {
      val topic: String = offset.topic
      val partition: Int = offset.partition
//      val fromOffset: Long = offset.fromOffset
      val untilOffset: Long = offset.untilOffset

      val partitionId: String = groupId + "|" + partition
      jedis.hset(topic, partitionId, untilOffset.toString)
    }
  }
}

函数主类

import java.lang

import com.bigdata.analysis.travel.offset.RedisOffsetManager
import com.bigdata.conf.ConfigurationManager
import com.bigdata.constant.MyConstant
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

/**
  * @ description: 获取kafka中的的数据,并手动维护偏移量到redis
  * @ author: spencer
  * @ date: 2021/1/8 15:52
  */
object TravelRealtimeAnalysis {

  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "root")
    val sparkConf: SparkConf = new SparkConf()
      .setAppName("TravelRealtimeAnalysis")
      .setMaster("local[*]")
    val ssc = new StreamingContext(sparkConf, Seconds(10))

    // 设置checkpoint到HDFS
    ssc.checkpoint("hdfs://flink101:9000/travel/checkpoint")

    // 配置kafka相关参数
    val brokers: String = ConfigurationManager.config.getString(MyConstant.KAFKA_BROKERS)
    val topics: Array[String] = Array("travel_ods_orders")
    val kafkaParams = Map(
      "bootstrap.servers" -> brokers,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "travel_consumer_id",
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: lang.Boolean) // 不自動提交偏移量,手动维护偏移量
    )

    // TODO:实现步骤
    /**
      * 1.获取redis/mysql中保存的偏移量
      * 2.根据偏移量,获取kafka中的数据源
      * 3.在获取的第一手数据InputDStream中更新偏移量
      * 4.执行业务逻辑
      * 5.启动
      *
      */
    val offsetRanges: mutable.Map[TopicPartition, Long] = RedisOffsetManager.getOffset(topics, "travel_consumer_id")
    val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = if (offsetRanges.isEmpty) {
      KafkaUtils.createDirectStream(
        ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe(topics, kafkaParams)
      )
    } else {
      KafkaUtils.createDirectStream(
        ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsetRanges)
      )
    }

    // 必须在在第一手的InputDStream,否则当中的RDD就不是KafkaRDD,
    // 如果在其它操作之后才写入偏移量则会抛出异常:spark.rdd.MapPartitionsRDD cannot be cast to streaming.kafka010.HasOffsetRange
    kafkaDStream.foreachRDD(rdd => {
      val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd.foreachPartition(_ => {
        RedisOffsetManager.saveOffset("travel_consumer_id", offsetRanges)
      })
    })

    ssc.start()
    ssc.awaitTermination()
  }

}

2.2 基于mysql


object MysqlOffsetManager extends OffsetManager{
  /**
    * 保存偏移量到本地
    *
    * @param groupId
    * @param offsetRanges
    */
  def saveOffset(groupId: String, offsetRanges: Array[OffsetRange]) = {
    val helper: JDBCHelper = JDBCHelper.getInstance()
    val conn: Connection = helper.getConnection
    val sql =
      """
        |replace into kafka_offset(group_id, topic, partition_id, fromOffset, untilOffset) values(?, ?, ?, ?, ?)
      """.stripMargin
    val pstmt: PreparedStatement = conn.prepareStatement(sql)

    for (offset <- offsetRanges) {
      pstmt.setString(1, groupId)
      pstmt.setString(2, offset.topic)
      pstmt.setInt(3, offset.partition)
      pstmt.setLong(4, offset.fromOffset)
      pstmt.setLong(5, offset.untilOffset)
      pstmt.executeUpdate()
    }

    pstmt.close()
    conn.close()

  }

  /**
    * 获取本地偏移量
    *
    * @param groupId
    * @param topics
    * @return
    */
  def getOffset(topics: Array[String], groupId: String) = {
    val sql = "select * from kafka_offset where group_id = ? and topic = ?"
    val helper: JDBCHelper = JDBCHelper.getInstance()
    val conn: Connection = helper.getConnection
    val pstmt: PreparedStatement = conn.prepareStatement(sql)
    var resultSet: ResultSet = null

    val offsetMap = mutable.Map[TopicPartition, Long]()
    for (topic <- topics) {
      pstmt.setString(1, groupId)
      pstmt.setString(2, topic)
      resultSet = pstmt.executeQuery()

      while (resultSet.next()) {
        offsetMap += new TopicPartition(resultSet.getString("topic"), resultSet.getInt("partition_id")) -> resultSet.getString("untilOffset").toLong
      }
    }

    resultSet.close()
    pstmt.close()
    conn.close()

    offsetMap
  }

}

标签:String,val,kafka,Streaming,offset,import,groupId,pstmt
来源: https://blog.csdn.net/weixin_43861104/article/details/112375625

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有