ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

【Spark Streaming Kafka】Spark流处理消费Kafka数据示例代码

2022-06-22 12:05:20  阅读:117  来源: 互联网

标签:ConsumerConfig String 示例 Kafka apache org Spark kafka spark


代码

package test

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.log4j.{Level, Logger}

import java.util.Date

/**
 * @Author yu
 * @Date 2022/6/8 22:45
 * @Description
 * @Ref
 */
object spark_kafka {
  def main(args: Array[String]): Unit = {

    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.WARN)
    Logger.getLogger("org.apache.kafka.clients.consumer").setLevel(Level.WARN)

    //初始化StreamingContext
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("kafka stream") //.setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(10))

    // 配置相关的kafka消费者配置
    var kafkaParams = Map[String, Object](

      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "node01:9092",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
      ConsumerConfig.GROUP_ID_CONFIG -> "console-group1",
      //这个一定要写成这样,而不能直接写true
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)
      //ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG-> 60000
      //ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG -> 60000
    )

    //如果SparkStreaming设置的批处理时长超过kafka的心跳会话时间(默认30s),
    //那么适当的调大下面的KafkaConsumer以下的参数值,防止超时time-out,可以使用ConsumerConfig.XXX来配置也是极好的
    kafkaParams = kafkaParams.+("heartbeat.interval.ms" -> "30000")
    kafkaParams = kafkaParams.+("session.timeout.ms" -> "60000")
    //TODO 对于5分钟的批处理,我们需要更改Broker上的group.max.session.timeout.ms配置


    //获取到kafka相关的数据流,InputDStream[] extends DStream[]
    val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc, //指定当前上下文环境
      LocationStrategies.PreferConsistent, //大多数情况下使用这个策略
      ConsumerStrategies.Subscribe[String, String](List("test1"), kafkaParams) //添加相关的消费策略
    )

    //在源流的基础上作相应的算子操作
    val source: DStream[String] = kafkaStream.map(record => record.value())

    source.cache()

    source.print(100)



    source.foreachRDD(rdd => {
      println("rdd " + rdd.id + " ------ " + new Date().getTime())
      //      val sdf = new java.text.SimpleDateFormat("yyyyMMdd")
      //      val date = sdf.format(new Date())
      //      rdd.coalesce(1).saveAsTextFile("/tmp/kafka/test1/" + date + "/" + java.util.UUID.randomUUID())
    })

    println("source count: " + source.count().print())
    val aggrStream: DStream[(String, Int)] = source.flatMap(_.split("\\s"))
      .map((_, 1))
      .reduceByKey(_ + _)

    aggrStream.print()

    kafkaStream.foreachRDD(rdd => {
      if (rdd.count() > 0) {
        // offset
        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

        //打印offset
        rdd.foreachPartition { iter =>
          val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
          println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
        }
        println("=============================")
        // 等输出操作完成后提交offset
        kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
      }
    })

    //开始计算
    ssc.start()
    ssc.awaitTermination()
  }
}

POM.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>groupId</groupId>
    <artifactId>spark-streaming-kafka</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <spark.version>2.4.5</spark.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
            <!-- 本地运行,注释掉此scope配置 -->
            <!-- <scope>provided</scope> -->
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
            <!-- 本地运行,注释掉此scope配置 -->
            <!-- <scope>provided</scope> -->
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>


    </dependencies>

    <build>
        <plugins>
            <!--java打包插件-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
                <executions>
                    <execution>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <!--scala打包插件-->
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                        <configuration>
                            <includes>
                                <include>**/*.scala</include>
                            </includes>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <!--将依赖打入jar包-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.6</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

 

结果截图:

 

标签:ConsumerConfig,String,示例,Kafka,apache,org,Spark,kafka,spark
来源: https://www.cnblogs.com/144823836yj/p/16400015.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有