ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

spark集成kafka数据源

2019-12-25 13:51:22  阅读:239  来源: 互联网

标签:String val 数据源 kafka streaming apache org spark


1、spark集成的KafkaUtils.createStream已经过期,这个是Spark Integration For Kafka 0.8里集成的。

      替代的是Spark Integration For Kafka 0.10,已经没有createStream函数,采用createDirectStream,

    区别是直连kafka服务器,而不是连接zookeeper。

2、依赖

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.4.4</version>
        </dependency>

3、code

package com.home.spark.streaming

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object Ex_kafkaSource {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf(true).setMaster("local[*]").setAppName("spark streaming wordcount")
    
    conf.set("spark.streaming.stopGracefullyOnShutdown", "true")

    //环境对象,设置采集周期
    val scc: StreamingContext = new StreamingContext(conf, Seconds(30))

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "192.168.44.10:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (true: java.lang.Boolean)
    )

    val topics = Array("test")

    val kafkaStream = KafkaUtils.createDirectStream[String, String](
      scc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String,String](
        topics,
        kafkaParams
      )
    )

    kafkaStream.foreachRDD(rdd => {
      val offsetRange = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      val maped: RDD[(String, String)] = rdd.map(record => (record.key,record.value))
      //计算逻辑
      maped.foreach(println)
      //循环输出
      for(o <- offsetRange){
        println(s"${o.topic}  ${o.partition} ${o.fromOffset} ${o.untilOffset}")
      }
    })

    val words: DStream[String] = kafkaStream.flatMap(t=>t.value().split(" "))

//    val words: DStream[String] = socketStream.flatMap(_.split(" "))

    val pairs = words.map(word => (word, 1))

    val wordCounts: DStream[(String, Int)] = pairs.reduceByKey(_ + _)

    // Print the first ten elements of each RDD generated in this DStream to the console
    wordCounts.print

    // Start the computation
    // 通过 streamingContext.start()来启动消息采集和处理
    scc.start()

    // Wait for the computation to terminate
    // 通过streamingContext.stop()来手动终止处理程序
    scc.awaitTermination()
  }
}

4、kafka测试环境

     https://www.cnblogs.com/asker009/p/9958240.html

标签:String,val,数据源,kafka,streaming,apache,org,spark
来源: https://www.cnblogs.com/asker009/p/12096249.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有