ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

(14)监控管理流式查询

2019-07-20 09:43:35  阅读:502  来源: 互联网

标签:14 流式 查询 sources query println progress spark event


管理查询

流查询的管理操作主要是类是StreamingQueryManager类。该对象可以通过SparkSession获得,预留的主要操作如下:

 

最最重要的就是增加和移除Listener,然后供我们获取每个批次处理的数据具体信息。可以通过listener获取的信息如下:

 

 

StreamingQuery对象在查询启动的时候被创建,可以用来监控管理查询,该对象也可以按照上面所说的方式通过StreamingQueryManager的get方法来获得,前提是先保存了查询的UUID或者ID。

该类主要有四个实现类(StreamingQueryWrapper,),常用的实现类是StreamingQueryWrapper。

 

可以使用StreamingQuery对象对流查询做的操作如下:

val query = df.writeStream.format("console").start()   // get the query object

query.id          // get the unique identifier of the running query that persists across restarts from checkpoint data

query.runId       // get the unique id of this run of the query, which will be generated at every start/restart

query.name        // get the name of the auto-generated or user-specified name

query.explain()   // print detailed explanations of the query

query.stop()      // stop the query

query.awaitTermination()   // block until query is terminated, with stop() or with error

query.exception       // the exception if the query has been terminated with error

query.recentProgress  // an array of the most recent progress updates for this query

query.lastProgress    // the most recent progress update of this streaming query

 在同一个sparksession中,可以启动任意数目的查询。他们会以共享集群资源的形式并行执行。可以通过sparkSession.streams()来获取StreamingQueryManager,其可以用来管理当前活跃的查询。

val spark: SparkSession = ...

spark.streams.active    // get the list of currently active streaming queries

spark.streams.get(id)   // get a query object by its unique id

spark.streams.awaitAnyTermination()   // block until any one of them terminates

 

监控流查询

有两个API用于监控和调试查询 - 以交互方式和异步方式。

1,交互API

可以使用streamingQuery.lastProgress()和streamingQuery.status()直接获取active查询的当前状态和指标。lastProgress()在Scala和Java中返回一个StreamingQueryProgress对象,而在Python中返回与该字段相同的字典。它具有关于流的上一个触发操作进度的所有信息 - 处理哪些数据,处理速率,延迟等等。还有streamingQuery.recentProgress返回最后几个处理信息的数组。

此外,streamingQuery.status()在Scala和Java中返回一个StreamingQueryStatus对象,在Python中返回具有相同字段的字典。它提供有关查询立即执行的信息 - 触发器是活跃的,正在处理的数据等。

这里有几个例子。

val query: StreamingQuery = ...

println(query.lastProgress)

/* Will print something like the following.

{
  "id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",
  "runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",
  "name" : "MyQuery",
  "timestamp" : "2016-12-14T18:45:24.873Z",
  "numInputRows" : 10,
  "inputRowsPerSecond" : 120.0,
  "processedRowsPerSecond" : 200.0,
  "durationMs" : {
    "triggerExecution" : 3,
    "getOffset" : 2
  },
  "eventTime" : {
    "watermark" : "2016-12-14T18:45:24.873Z"
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[topic-0]]",
    "startOffset" : {
      "topic-0" : {
        "2" : 0,
        "4" : 1,
        "1" : 1,
        "3" : 1,
        "0" : 1
      }
    },
    "endOffset" : {
      "topic-0" : {
        "2" : 0,
        "4" : 115,
        "1" : 134,
        "3" : 21,
        "0" : 534
      }
    },
    "numInputRows" : 10,
    "inputRowsPerSecond" : 120.0,
    "processedRowsPerSecond" : 200.0
  } ],
  "sink" : {
    "description" : "MemorySink"
  }
}
*/

println(query.status)
/*  Will print something like the following.
{
  "message" : "Waiting for data to arrive",
  "isDataAvailable" : false,
  "isTriggerActive" : false
}
*/

 

2,异步API

还可以通过附加StreamingQueryListener(Scala / Java文档)异步监控与SparkSession关联的所有查询。使用sparkSession.streams.attachListener()添加自定义StreamingQueryListener对象,将在查询启动和停止时以及在查询执行中获得回调。

 

spark.streams.addListener(new StreamingQueryListener {
  override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
    println("Query started ! ")
  }

  override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
    println("event.progress.batchId ===========> "+event.progress.batchId)
    println("event.progress.durationMs ===========> "+event.progress.durationMs)
    println("event.progress.eventTime ===========> "+event.progress.eventTime)
    println("event.progress.id ===========> "+event.progress.id)
    println("event.progress.name ===========> "+event.progress.name)
    println("event.progress.sink.json ===========> "+event.progress.sink.json)
    println("event.progress.sources.length ===========> "+event.progress.sources.length)
    println("event.progress.sources(0).description ===========> "+event.progress.sources(0).description)
    println("event.progress.sources(0).inputRowsPerSecond ===========> "+event.progress.sources(0).inputRowsPerSecond)
    println("event.progress.sources(0).numInputRows ===========> "+event.progress.sources(0).numInputRows)
    println("event.progress.sources(0).startOffset ===========> "+event.progress.sources(0).startOffset)
    println("event.progress.sources(0).processedRowsPerSecond ===========> "+event.progress.sources(0).processedRowsPerSecond)
    println("event.progress.sources(0).endOffset ===========> "+event.progress.sources(0).endOffset)

    println("event.progress.processedRowsPerSecond ===========> "+event.progress.processedRowsPerSecond)
    println("event.progress.timestamp ===========> "+event.progress.timestamp)
    println("event.progress.stateOperators.size ===========> "+event.progress.stateOperators.size)
    println("event.progress.inputRowsPerSecond ===========> "+event.progress.inputRowsPerSecond)

  }

  override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
    println("Query stopped ! ")
  }

})

操作案例:

 

package bigdata.spark.StructuredStreaming.MMOperator

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryListener}
import org.apache.spark.streaming.scheduler.StreamingListener

object SQListener {
  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("yarn-client")
      .set("yarn.resourcemanager.hostname", "mt-mdh.local")
      .set("spark.executor.instances","2")
      .set("spark.default.parallelism","4")
      .set("spark.sql.shuffle.partitions","4")
      .setJars(List("/Users/meitu/Desktop/sparkjar/bigdata.jar"
        ,"/opt/jars/spark-streaming-kafka-0-10_2.11-2.3.1.jar"
        ,"/opt/jars/kafka-clients-0.10.2.2.jar"
        ,"/opt/jars/kafka_2.11-0.10.2.2.jar"))

    val spark = SparkSession
      .builder()
      .config(sparkConf)
      .getOrCreate()

    import spark.implicits._
    spark.streams.addListener(new StreamingQueryListener {
      override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
        println("Query started ! ")
      }

      override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
        println("event.progress.batchId ===========> "+event.progress.batchId)
        println("event.progress.durationMs ===========> "+event.progress.durationMs)
        println("event.progress.eventTime ===========> "+event.progress.eventTime)
        println("event.progress.id ===========> "+event.progress.id)
        println("event.progress.name ===========> "+event.progress.name)
        println("event.progress.sink.json ===========> "+event.progress.sink.json)
        println("event.progress.sources.length ===========> "+event.progress.sources.length)
        println("event.progress.sources(0).description ===========> "+event.progress.sources(0).description)
        println("event.progress.sources(0).inputRowsPerSecond ===========> "+event.progress.sources(0).inputRowsPerSecond)
        println("event.progress.sources(0).numInputRows ===========> "+event.progress.sources(0).numInputRows)
        println("event.progress.sources(0).startOffset ===========> "+event.progress.sources(0).startOffset)
        println("event.progress.sources(0).processedRowsPerSecond ===========> "+event.progress.sources(0).processedRowsPerSecond)
        println("event.progress.sources(0).endOffset ===========> "+event.progress.sources(0).endOffset)

        println("event.progress.processedRowsPerSecond ===========> "+event.progress.processedRowsPerSecond)
        println("event.progress.timestamp ===========> "+event.progress.timestamp)
        println("event.progress.stateOperators.size ===========> "+event.progress.stateOperators.size)
        println("event.progress.inputRowsPerSecond ===========> "+event.progress.inputRowsPerSecond)

      }

      override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
        println("Query stopped ! ")
      }

    })
    // Create DataSet representing the stream of input lines from kafka
    val lines = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "mt-mdh.local:9093")
      .option("subscribe", "split_test")
      .load()
      .selectExpr("CAST(value AS STRING)")
      .as[String]

    // Generate running word count
    val wordCounts = lines
      .flatMap(_.split(" "))
      .groupBy("value")
      .count()

    // Start running the query that prints the running counts to the console
    val query = wordCounts
      .writeStream
      .outputMode(OutputMode.Update())
      .format("console")
      .start()
    println(query.id)
    println(spark.streams.get(query.id).id)
    query.awaitTermination()
  }

}

 

 建议是使用listener方式去监控流查询的状态。

标签:14,流式,查询,sources,query,println,progress,spark,event
来源: https://blog.csdn.net/qq_18522601/article/details/96493834

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有