ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

spark-streaming

2021-06-27 22:58:31  阅读:145  来源: 互联网

标签:String val streaming org apache import spark


import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
//二次运行会从头读,因为只有获取偏移量没有提交偏移量
object DemoOffset01 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(5))
    val kafkaParams: Map[String, Object] = Map[String, Object](
      "bootstrap.servers" -> "linux01:9092,linux02:9092,linux03:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "g00003",
      "auto.offset.reset" -> "earliest" ,
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    val topics: Array[String] =Array("test02")
    val kafkaDstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
    )
    kafkaDstream.foreachRDD(rdd=>{
      //kafkaRDD实现了HasOffsetRanges的特质,只有kafkardd中有偏移量
      val offsetRange: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      for (range <- offsetRange) {
         println(s"topic:{range.topic},partition:{range.partition},fromoffset:{range.fromoffset},utiloffset:{range.utiloffset}")
      }
      val res: RDD[String] = rdd.map(_.value())
      res.foreach(println)

    })
    ssc.start()
    ssc.awaitTermination()

    //对sparkstreaming编程就是对RDD进行编程
  }

}

升级版2.0:

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
//二次运行会从头读,因为只有获取偏移量没有提交偏移量
object DemoOffset01 {
  def main(args: Array[String]): Unit = {
    //val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
    val conf: SparkConf = new SparkConf().setAppName("AggregateOperator").setMaster("local").set("spark.testing.memory", "512000000")
    val ssc = new StreamingContext(conf, Seconds(5))
    ssc.sparkContext.setLogLevel("WARN")
    val kafkaParams: Map[String, Object] = Map[String, Object](
      "bootstrap.servers" -> "linux01:9092,linux02:9092,linux03:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "g00003",
      "auto.offset.reset" -> "earliest" ,
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    val topics: Array[String] =Array("helloword")
    val kafkaDstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
    )
    //forearchRDD传入的函数在driver端被不停地周期地运行
    kafkaDstream.foreachRDD(rdd=>{
      if(!rdd.isEmpty()){
        //kafkaRDD实现了HasOffsetRanges的特质,只有kafkardd中有偏移量
        val offsetRange: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        for (range <- offsetRange) {
          println(s"topic:${range.topic},partition:${range.partition},fromoffset:${range.fromOffset},utiloffset:${range.untilOffset}")
        }
        //对数据进行处理
        val res: RDD[String] = rdd.map(_.value())
        res.foreach(println)
        //将偏移量提交到kafka特殊的topic__consumer_offsets中
        val offsetres: Unit = kafkaDstream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRange)


      }

    })
    ssc.start()
    ssc.awaitTermination()

    //对sparkstreaming编程就是对RDD进行编程
  }

}

补充:

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}

object CommitOffsetDemo02 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("AggregateOperator").setMaster("local").set("spark.testing.memory", "512000000")
    val ssc = new StreamingContext(conf, Seconds(5))
    ssc.sparkContext.setLogLevel("WARN")
    val kafkaParams: Map[String, Object] = Map[String, Object](
      "bootstrap.servers" -> "linux01:9092,linux02:9092,linux03:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "g00004",
      "auto.offset.reset" -> "earliest" ,
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    val topics: Array[String] =Array("helloword")
    val kafkaDstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
    )
    //对 kafkaDstream进行transformation得到的是MapPARTITOONRDD,所以要想获取偏移量只能从第一手DirectKafkaInputDStream里获取
    val lines: DStream[String] = kafkaDstream.map(_.value())
    lines.foreachRDD(rdd=>{
      val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      for (range <- offsetRanges) {
        println(s"topic:${range.topic},partition:${range.partition},fromoffset:${range.fromOffset},utiloffset:${range.untilOffset}")
      }
      rdd.foreach(x=>println(x))
    })
  }

}

补充:

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}

object CommitOffsetDemo02 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("AggregateOperator").setMaster("local").set("spark.testing.memory", "512000000")
    val ssc = new StreamingContext(conf, Seconds(5))
    ssc.sparkContext.setLogLevel("WARN")
    val kafkaParams: Map[String, Object] = Map[String, Object](
      "bootstrap.servers" -> "linux01:9092,linux02:9092,linux03:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "g00004",
      "auto.offset.reset" -> "earliest" ,
      "enable.auto.commit" -> (false: java.lang.Boolean)//在executor端提交
    )
    val topics: Array[String] =Array("helloword")
    val kafkaDstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
    )
    //DStream调用transformation会生成一个新的DStream,但是调用ation不会生成一个新的DStream
    //对 kafkaDstream进行transformation得到的是MapPARTITOONRDD,所以要想获取偏移量只能从第一手DirectKafkaInputDStream里获取
    //kafkaclient(driver端)负责从topic中获取偏移量(决定了一个批次的客户端读多少数据),生成的tasks将会被序列化到executor里面的线程池,所以在executor中才读取kafka cluster中的数据,提交偏移量到__consumer_offsets这个topic中
    val lines: DStream[String] = kafkaDstream.map(_.value())
    lines.foreachRDD(rdd=>{
      val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      for (range <- offsetRanges) {
        println(s"topic:${range.topic},partition:${range.partition},fromoffset:${range.fromOffset},utiloffset:${range.untilOffset}")
      }
      rdd.foreach(x=>println(x))
    })
  }

}
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}

//从kafka中读取数据,完成聚合类操作,将偏移量和计算好的聚合类结果同时写入到mysql中,mysql支持事务,保证计算好的聚合结果和偏移量同时写入成功
object CommitOffsetDemo03 {
  def main(args: Array[String]): Unit = {
    val AppName: String =args(0)
    val groupid: String =args(1)
    //val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
    val conf: SparkConf = new SparkConf().setAppName(AppName).setMaster("local[*]").set("spark.testing.memory", "512000000")
    val ssc = new StreamingContext(conf, Seconds(5))
    ssc.sparkContext.setLogLevel("WARN")
    val kafkaParams: Map[String, Object] = Map[String, Object](
      "bootstrap.servers" -> "linux01:9092,linux02:9092,linux03:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupid,
      "auto.offset.reset" -> "earliest" ,
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    val topics: Array[String] =Array("helloword")
    val kafkaDstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
    )
    //forearchRDD传入的函数在driver端被不停地周期地运行
    kafkaDstream.foreachRDD(rdd=>{
      if(!rdd.isEmpty()){
        //kafkaRDD实现了HasOffsetRanges的特质,只有kafkardd中有偏移量
        val offsetRange: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        //对数据进行处理,调用RDD的transaction和action是在driver调用的,里面的恶函数是在executor调用的
        val res: RDD[(String, Int)] = rdd.map(_.value()).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
        //将计算好的结果收集到driver端
        val result: Array[(String, Int)] = res.collect()
        var connection:Connection = null
        var pstm1:PreparedStatement  = null
        var pstm2:PreparedStatement= null
          try {
            //创建一个JDBC链接,导入jdbc的依赖
            val  connection: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/mq01?characterEncoding=utf_8", "root", "123456")
            //开启事务
            connection.setAutoCommit(false)
            //分别将计算好的偏移量(driver)和计算好的结果(收集到driver端 )写入到mysql端,两者都在一个进程里,用一个链接开启一个事务把他俩全写进去
            //创建preparestatement
            val  pstm1: PreparedStatement = connection .prepareStatement("INSERT INTO t_wordcount(word,count) VALUES (?,?) ON DUPLICATE KEY UPDATE COUNT=COUNT +?;")
            for (tp <- result) {
              pstm1.setString(1,tp._1)
              pstm1.setInt(2,tp._2)
              pstm1.setInt(3,tp._2)
              //没确认的数据就是脏数据
              pstm1.executeUpdate()
            }
            val pstm2: PreparedStatement =connection.prepareStatement("INSERT INTO t_kafuka_offset VALUES(?,?,?)  ON DUPLICATE KEY UPDATE OFFSET=?;")
              for (range <- offsetRange) {
              val topic: String = range.topic
              val partition: Int = range.partition
              //无需获取fromoffset
              val offset: Long = range.untilOffset
                pstm2.setString(1,AppName+"_"+groupid)
                pstm2.setString(2,topic+"_"+partition)
                pstm2.setLong(3,offset)
                pstm2.setLong(4,offset)
                pstm2.executeUpdate()
            }
            //提交事务
            connection.commit()
          } catch {
            case e:Exception => {
              connection.rollback()
              throw  e
              //提交事务失败就要回滚事务
              //停止程序
              ssc.stop(true)
            }
          }
          finally {
            if(pstm1!=null){
              pstm1.close()
            }
            if(pstm2!=null){
              pstm2.close()
            }
            if(connection!=null){
              connection.close()
            }

          }
      }
    })
    ssc.start()
    ssc.awaitTermination()
    //对sparkstreaming编程就是对RDD进行编程

  }

}

标签:String,val,streaming,org,apache,import,spark
来源: https://blog.csdn.net/DearNingning/article/details/118274253

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有