ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Spark Streaming整合Kafka及示例

2021-07-06 10:02:20  阅读:219  来源: 互联网

标签:String val 示例 Streaming offset apache org Spark spark


Spark Streaming整合Kafka及示例

Spark和kafka整合有2中方式 : Receiver 和 Dirct
主要学习Dirct方式
一、Receiver

在这里插入图片描述

二、Direct

在这里插入图片描述

三、代码演示

完整pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.jiang</groupId>
    <artifactId>spark</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.12.10</scala.version>
        <hadoop.version>3.2.0</hadoop.version>
        <spark.version>3.0.1</spark.version>
    </properties>

    <dependencies>

        <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.12.10</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.0.1</version>
        </dependency>

         <!--https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>3.0.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
            <version>3.0.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.2</version>
        </dependency>

        <!--mysql数据库访问-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.37</version>
        </dependency>

        <dependency>
            <groupId>com.hankcs</groupId>
            <artifactId>hanlp</artifactId>
            <version>portable-1.7.8</version>
        </dependency>

    </dependencies>

    <build>
        <!--资源文件夹-->
        <sourceDirectory>src/main/scala</sourceDirectory>
        <plugins>
        <!--声明并引入构建的插件-->
            <!--用于编译Scala代码到class-->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <!--    -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

        </plugins>
    </build>

</project>
1、自动提交偏移量

scala代码:

package com.jiang.streaming_kafka

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object Kafka_Demo01 {

  def main(args: Array[String]): Unit = {

    //TODO 0.准备环境
    val conf:SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
    val sc:SparkContext = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val ssc = new StreamingContext(sc,Seconds(5)) // 每隔5秒分一个批次
    // The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint().
    // state 存在checkpoint
    ssc.checkpoint("./ckp")

    //TODO 1.加载数据从Kafka

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "192.168.89.15:9092",
      "key.deserializer" -> classOf[StringDeserializer],  // key的反序列化规则
      "value.deserializer" -> classOf[StringDeserializer], // value的反序列化规则
      "group.id" -> "sparkdemo",  // 消费者组名称
      // earliest:表示有offset记录从offset记录开始消费,没有就从最早消息消费
      // latest:表示有offset记录从offset记录开始消费,没有就从最新的消息开始消费
      // none :表示有offset记录从offset记录开始消费,没有就报错
      "auto.offset.reset" -> "latest",
      "enable.auto.interval.ms" -> "1000",  // 自动提交时间
      "enable.auto.commit" -> (true: java.lang.Boolean) // 是否自动提交
    )

    val topics = Array("spark-demo")  // 要订阅的主题
    val kafkaDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent, // 位置策略,使用源码中推荐的
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams) // 消费策略,使用源码中推荐的
    )

    val infoDS: DStream[String] = kafkaDS.map(record => {
      val topic: String = record.topic()
      val partition: Int = record.partition()
      val offset: Long = record.offset()
      val key: String = record.key()
      val value: String = record.value()
      val info: String = s"""topic:${topic},partition:${partition},offset:${offset},key:${key},value:${value}"""
      info
    })

    //TODO 3.输出结果
    infoDS.print()

    //TODO 4.启动并等待结果
    ssc.start()
    ssc.awaitTermination() // 流式应用启动之后需要一直运行等待手动停止/等待数据到来

    //TODO 5.关闭资源
    ssc.stop()
  }

}
2、手动提交偏移量

scala代码:

package com.jiang.streaming_kafka

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object Kafka_Demo02 {

  def main(args: Array[String]): Unit = {

    //TODO 0.准备环境
    val conf:SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
    val sc:SparkContext = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val ssc = new StreamingContext(sc,Seconds(5)) // 每隔5秒分一个批次
    // The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint().
    // state 存在checkpoint
    ssc.checkpoint("./ckp")

    //TODO 1.加载数据从Kafka

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "192.168.89.15:9092",
      "key.deserializer" -> classOf[StringDeserializer],  // key的反序列化规则
      "value.deserializer" -> classOf[StringDeserializer], // value的反序列化规则
      "group.id" -> "sparkdemo",  // 消费者组名称
      // earliest:表示有offset记录从offset记录开始消费,没有就从最早消息消费
      // latest:表示有offset记录从offset记录开始消费,没有就从最新的消息开始消费
      // none :表示有offset记录从offset记录开始消费,没有就报错
      "auto.offset.reset" -> "latest",
      //"enable.auto.interval.ms" -> "1000",  // 自动提交时间
      "enable.auto.commit" -> (false: java.lang.Boolean) // 是否自动提交
    )

    val topics = Array("spark-demo")  // 要订阅的主题
    val kafkaDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent, // 位置策略,使用源码中推荐的
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams) // 消费策略,使用源码中推荐的
    )

    //TODO 2.处理消息
    // 注意提交时间:应该是消费完一小批就该提交一次offset,而DStream一小批的体现是RDD
    kafkaDS.foreachRDD(rdd => {
      if(!rdd.isEmpty()){
        // 消费
        rdd.foreach(record =>{
          val topic: String = record.topic()
          val partition: Int = record.partition()
          val offset: Long = record.offset()
          val key: String = record.key()
          val value: String = record.value()
          val info: String = s"""topic:${topic},partition:${partition},offset:${offset},key:${key},value:${value}"""

          println("消费到的消息详细信息:" + info)
        })
        // 提交
        // 获取rdd中的offset相关信息
        val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        // 提交
        kafkaDS.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
        println("当前批次的数据已消费并手动提交")
      }
    })


    //TODO 3.输出结果
//    infoDS.print()

    //TODO 4.启动并等待结果
    ssc.start()
    ssc.awaitTermination() // 流式应用启动之后需要一直运行等待手动停止/等待数据到来

    //TODO 5.关闭资源
    ssc.stop()
  }

}
3.手动提交偏移量到Mysql

scala代码:

package com.jiang.streaming_kafka

import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable


object Kafka_Demo03 {

  def main(args: Array[String]): Unit = {

    //TODO 0.准备环境
    val conf:SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
    val sc:SparkContext = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val ssc = new StreamingContext(sc,Seconds(5)) // 每隔5秒分一个批次
    // The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint().
    // state 存在checkpoint
    ssc.checkpoint("./ckp")

    //TODO 1.加载数据从Kafka

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "192.168.89.15:9092",
      "key.deserializer" -> classOf[StringDeserializer],  // key的反序列化规则
      "value.deserializer" -> classOf[StringDeserializer], // value的反序列化规则
      "group.id" -> "sparkdemo",  // 消费者组名称
      // earliest:表示有offset记录从offset记录开始消费,没有就从最早消息消费
      // latest:表示有offset记录从offset记录开始消费,没有就从最新的消息开始消费
      // none :表示有offset记录从offset记录开始消费,没有就报错
      "auto.offset.reset" -> "latest",
      //"enable.auto.interval.ms" -> "1000",  // 自动提交时间
      "enable.auto.commit" -> (false: java.lang.Boolean) // 是否自动提交
    )

    val topics = Array("spark-demo")  // 要订阅的主题

    val offsetsMap: mutable.Map[TopicPartition,Long] = OffsetUtil.getOffsetMap("sparkdemo","spark-demo")
    val kafkaDS: InputDStream[ConsumerRecord[String, String]] = if(offsetsMap.size > 0){
      println("Mysql中存储了该消费者组 消费该主题的偏移量记录,接下来从记录处开始消费")

      KafkaUtils.createDirectStream[String, String](
        ssc,
        LocationStrategies.PreferConsistent, // 位置策略,使用源码中推荐的
        ConsumerStrategies.Subscribe[String, String](topics, kafkaParams,offsetsMap) // 消费策略,使用源码中推荐的
      )
    }else{
      println("Mysql中没有存储该消费者组 消费该主题的偏移量记录,接下来从latest开始消费")
      KafkaUtils.createDirectStream[String, String](
        ssc,
        LocationStrategies.PreferConsistent, // 位置策略,使用源码中推荐的
        ConsumerStrategies.Subscribe[String, String](topics, kafkaParams) // 消费策略,使用源码中推荐的
      )
    }

    //TODO 2.处理消息
    // 注意提交时间:应该是消费完一小批就该提交一次offset,而DStream一小批的体现是RDD
    kafkaDS.foreachRDD(rdd => {
      if(!rdd.isEmpty()){
        // 消费
        rdd.foreach(record =>{
          val topic: String = record.topic()
          val partition: Int = record.partition()
          val offset: Long = record.offset()
          val key: String = record.key()
          val value: String = record.value()
          val info: String = s"""topic:${topic},partition:${partition},offset:${offset},key:${key},value:${value}"""
          println("消费到的消息详细信息:" + info)
        })
        // 提交
        // 获取rdd中的offset相关信息
        val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        // 提交
//        kafkaDS.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
        OffsetUtil.saveOffsetRanges("sparkdemo",offsetRanges)
        println("当前批次的数据已消费并手动提交到Mysql")
      }
    })


    //TODO 3.输出结果
//    infoDS.print()

    //TODO 4.启动并等待结果
    ssc.start()
    ssc.awaitTermination() // 流式应用启动之后需要一直运行等待手动停止/等待数据到来

    //TODO 5.关闭资源
    ssc.stop(stopSparkContext = true,stopGracefully = true)

  }
  object OffsetUtil{

    val url = "jdbc:mysql://localhost:3306/bigdata_test"
    var username = "root"
    val passwd = "123456"


    // 将偏移量保存到数据库
    def saveOffsetRanges(groupid:String,offsetRange:Array[OffsetRange]) = {
      val conn: Connection = DriverManager.getConnection(url, username, passwd)
      // replace into 表示之前有就替换,没有就插入
      val ps: PreparedStatement = conn.prepareStatement("replace into offset (`topic`,`partition`,`groupid`,`offset`) values (?,?,?,?)")
      for(o <- offsetRange){
        ps.setString(1,o.topic)
        ps.setInt(2,o.partition)
        ps.setString(3,groupid)
        ps.setLong(4,o.untilOffset)
        ps.executeUpdate()
      }
      ps.close()
      conn.close()
    }

    // 从数据库读取偏移量Map(主题分区,offset)
    def getOffsetMap(groupid:String,topic:String):mutable.Map[TopicPartition,Long] ={
      val conn: Connection = DriverManager.getConnection(url, username, passwd)
      val ps: PreparedStatement = conn.prepareStatement("select * from offset where groupid=? and topic=?")
      ps.setString(1,groupid)
      ps.setString(2,topic)
      val rs: ResultSet = ps.executeQuery()
      // Map(主题分区,offset)
      val offsetMap = mutable.Map[TopicPartition,Long]()
      // new TopicPartition(rs.getString("topic"),rs.getInt("partition"),rs.getLong("offset"))
      while(rs.next()){
        offsetMap += new TopicPartition(rs.getString("topic"),rs.getInt("partition")) -> rs.getLong("offset")
      }

      rs.close()
      ps.close()
      conn.close()
      offsetMap
    }
  }
}

标签:String,val,示例,Streaming,offset,apache,org,Spark,spark
来源: https://blog.csdn.net/Joker_Jiang3/article/details/118513580

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有