ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

多张报表简单逻辑在同一任务中样例

2021-08-02 10:32:01  阅读:184  来源: 互联网

标签:逻辑 obj 报表 val StructField 中样 IntegerType getString true


package com.fengtu.sparktest.eta

import java.text.SimpleDateFormat
import java.util
import java.util.Date

import com.alibaba.fastjson.JSONObject
import com.fengtu.sparktest.utils.{JSONUtils, MD5Util, SparkUtils}
import com.fengtu.sparktest.utils2.DateUtil
import org.apache.commons.lang
import org.apache.commons.lang.StringUtils
import org.apache.log4j.Logger
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.storage.StorageLevel

/*
* 导航SDK-ETA指标监控需求(陈晨)
*/
object NaviSdkEtaIndexMonitor extends Serializable {
  val appName: String = this.getClass.getSimpleName.replace("$", "")
  val logger: Logger = Logger.getLogger(appName)
  val funcMap = new util.HashMap[String,String]()

  val descMysqlUserName = "gis_oms_pns"
  val descMysqlPassWord = "gis_oms_pns@123@"
  val descMysqlUrl = "jdbc:mysql://10.119.72.209:3306/gis_oms_lip_pns?characterEncoding=utf-8"

  /*p1*/
  //点击率 点选率
  val clickRateStatisticsSourTable = "dm_gis.gis_navi_top3_click_route"
  //测试注释
  val clickRateStatisticsDescTable = "ETA_INDEX_CLICK_RATE_STATISTICS"
  //val clickRateStatisticsDescTable = "ETA_INDEX_CLICK_RATE_STATISTICS_test"
  //偏航
  val yawStatisticsSourTable = "dm_gis.gis_navi_result_union"
  //测试注释
  val yawStatisticsDescTable = "ETA_INDEX_YAW_STATISTICS"
  //val yawStatisticsDescTable = "ETA_INDEX_YAW_STATISTICS_test"
  //准确率
  val accStatisticsSourTable = "gis_navi_eta_result_tmp"
  val accStatisticsDescTable = "ETA_INDEX_ACCARY_RATE_STATISTICS"
  //一致率
  val consistentSourTable = "gis_navi_eta_result_tmp"
  val reqAccStatisticsDescTable = "ETA_INDEX_CONSISTENT_REQACC_STATISTICS"
  //val reqAccStatisticsDescTable = "tmp_ETA_INDEX_CONSISTENT_REQACC_STATISTICS"
  val accConStatisticsDescTable = "ETA_INDEX_CONSISTENT_ACC_STATISTICS"
  //使用率
  val useRateSourTable = "dm_gis.gis_navi_result_union"
  val useRateDestTable = "ETA_INDEX_USE_RATE_STATISTICS"
  //异常退出监控
  val aemStatisticsSourTable = "dm_gis.gis_navi_result_union"
  val aemStatisticsoDescTable = "ETA_INDEX_ABNORMAL_EXIT_MONITOR_STATISTICS"
  //时间偏差率
  val timeDeviationRateSourTable = "gis_navi_eta_result_tmp"
  val timeDeviationRateDescTable = "ETA_INDEX_TIME_DIFF_TIME_STATISTICS"
  //特定时段偏差率
  val timePeriodDeviationRateSourTable = "gis_navi_eta_result_tmp"
  val timePeriodDeviationRateDescTable = "ETA_INDEX_TIME_PERIOD_DIFF_TIME_STATISTICS"

  /*p2*/
  //任务量
  val taskAmountStatSourTable = "gis_navi_eta_result_tmp"
  val taskAmountStatDescTable = "ETA_INDEX_TASK_AMOUNT_STATISTICS"

  //服务指标-响应时间
  var serviceCostTimeRdd = null:RDD[((String, String), JSONObject)]
  val serviceResponseDescTable = "ETA_INDEX_SERVICE_RESPONSE_TIME_STATISTICS"

  //服务指标-性能统计
  val servicePerformanceDescTable = "ETA_INDEX_SERVICE_PERFORMANCE_TIME_STATISTICS"

  //复用率
  val reuseStatSourTable = "dm_gis.gis_navi_result_union"
  val reuseStatDestTable = "ETA_INDEX_REUSE_RATE_STATISTICS"

  //问卷调查正确率
  var questionnaireRdd = null:RDD[((String, String), JSONObject)]
  val questionnaireAccDestTable = "ETA_INDEX_QUESTIONNAIRE_ACC_RATE_STATISTICS"
  //测试注释
  //val questionnaireAccDestTable = "ETA_INDEX_QUESTIONNAIRE_ACC_RATE_STATISTICS_test"

  //问卷调查司机错误占比
  val questionnaireErrDestTable = "ETA_INDEX_QUESTIONNAIRE_ERR_RATE_STATISTICS"

  def init ( incDay:String )={
    val spark = SparkSession
      .builder()
      .appName("SparkDecode")
      .master("yarn")
      .enableHiveSupport()
      .config("hive.exec.dynamic.partition",true)
      .config("hive.exec.dynamic.partition.mode","nonstrict")
      .getOrCreate()
    //val spark = SparkSession.builder().config(new SparkConf().setMaster("local[10]").setAppName(appName)).getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
    //p1
    funcMap.put("点击率","processClickRateStatistics")
    funcMap.put("偏航","processYawStatistics")
    funcMap.put("准确率","processAccuracyStatistics")
    funcMap.put("一致率","processConsistentStatistics")
    funcMap.put("使用率","processUseRateStatistics")
    funcMap.put("异常退出监控","processAbnormalExitMonitor")
    funcMap.put("时间偏差率","processTimeDeviationRate")
    funcMap.put("特定时间偏差率","processTimePeriodDeviationRate")

    //p2
    funcMap.put("任务量", "processTaskAmountStatistics")
    funcMap.put("复用率", "processReuseRateStatistics")
    funcMap.put("问卷调查正确率", "processQuestionnaireAccRateStatistics")
    funcMap.put("问卷调查司机错误占比", "processQuestionnaireDriverErrPerStatistics")
    funcMap.put("服务指标-响应时间", "processServiceResponseStatistics")
    funcMap.put("服务指标-性能统计", "processServicePerformanceStatistics")

    //因dm_gis.gis_navi_eta_result拆分,现需要合并注册成临时表
    logger.error("合并导航结果表")

    val querySql =
      s"""
         |select
         |    src_province,
         |    src_citycode,
         |    src_deptcode,
         |    dest_province,
         |    dest_citycode,
         |    dest_deptcode,
         |    ft_right,
         |    tl_ft_right,
         |    src,
         |    duration,
         |    plan_date,
         |    t1.routeid as routeId,
         |    req_order,
         |    similarity1,
         |    similarity5,
         |    navi_endstatus,
         |    t1.req_type,
         |    diff_time,
         |    navi_time,
         |    t1.request_id,
         |    t1.req_time,
         |    t1.navi_endtime,
         |    t1.inc_day,
         |    req_status,
         |    distance,
         |    route_order,
         |    req_order,
         |    t1.navi_id,
         |    t1.task_id
         |from (
         |    select * from dm_gis.gis_navi_eta_result1 where inc_day='$incDay'
         |) t1
         |inner join (
         |    select * from dm_gis.gis_navi_eta_result2 where inc_day='$incDay'
         |) t2
         |on t1.id = t2.id
         |""".stripMargin

    logger.error(querySql)

    spark.sql( querySql).createOrReplaceTempView("gis_navi_eta_result_tmp")

    spark
  }

  def main(args: Array[String]): Unit = {
    val spark = init( args(0) )

    start(spark,args)

    spark.close()
  }

  /*
  *p1
  */

  //点选率映射
  val clickRateSchema = StructType(List(
    StructField("id", StringType, true),
    StructField("statdate", StringType, true),
    StructField("province", StringType, true),
    StructField("citycode", StringType, true),
    StructField("sitecode", StringType, true),
    StructField("navi_amount", IntegerType, true),
    StructField("firstWay_amount", IntegerType, true),
    StructField("secondWay_amount", IntegerType, true),
    StructField("thirdWay_amount", IntegerType, true),
    StructField("traditionType_amount", IntegerType, true),
    StructField("experienceType_amount", IntegerType, true),
    StructField("gdType_amount", IntegerType, true)
  ))

  //偏航统计映射
  val yawSchema = StructType( List(
    StructField("id", StringType, true),
    StructField("statdate", StringType, true),
    StructField("province", StringType, true),
    StructField("citycode", StringType, true),
    StructField("sitecode", StringType, true),
    StructField("sdkversion", StringType, true),
    StructField("system", StringType, true),
    StructField("navi_amount", IntegerType, true),
    StructField("yawpredict_avg", IntegerType, true),
    StructField("yawfinaljudgment_avg", IntegerType, true),
    StructField("percent99_yaw_amount", IntegerType, true),
    StructField("percent95_yaw_amount", IntegerType, true),
    StructField("percent90_yaw_amount", IntegerType, true),
    StructField("experienceYaw_amount", IntegerType, true),
    StructField("experienceUse_amount", IntegerType, true),
    StructField("traditionalYaw_amount", IntegerType, true),
    StructField("traditionalUse_amount", IntegerType, true),
    StructField("gdyaw_amount", IntegerType, true),
    StructField("gduse_amount", IntegerType, true)
  ))

  //准确率统计映射
  val accSchema = StructType(List(
    StructField("id", StringType, true),
    StructField("statdate", StringType, true),
    StructField("province", StringType, true),
    StructField("citycode", StringType, true),
    StructField("sitecode", StringType, true),
    StructField("compensate", StringType, true),
    StructField("src", StringType, true),
    StructField("reqamount", IntegerType, true),
    StructField("correctamount", IntegerType, true),
    StructField("halfhour_reqamount", IntegerType, true),
    StructField("halfhour_correctamount", IntegerType, true),
    StructField("onehour_reqamount", IntegerType, true),
    StructField("onehour_correctamount", IntegerType, true),
    StructField("req1_amount", IntegerType, true),
    StructField("req2_amount", IntegerType, true),
    StructField("req3_amount", IntegerType, true),
    StructField("req4_amount", IntegerType, true),
    StructField("req5_amount", IntegerType, true),
    StructField("req6_amount", IntegerType, true),
    StructField("req7_amount", IntegerType, true),
    StructField("req8_amount", IntegerType, true),
    StructField("req9_amount", IntegerType, true),
    StructField("req10_amount", IntegerType, true),
    StructField("req11_amount", IntegerType, true),
    StructField("req12_amount", IntegerType, true),
    StructField("req13_amount", IntegerType, true),
    StructField("req14_amount", IntegerType, true),
    StructField("req15_amount", IntegerType, true),
    StructField("req16_amount", IntegerType, true),
    StructField("req17_amount", IntegerType, true),
    StructField("req18_amount", IntegerType, true),
    StructField("req19_amount", IntegerType, true),
    StructField("req20_amount", IntegerType, true),
    StructField("req21_amount", IntegerType, true),
    StructField("req22_amount", IntegerType, true),
    StructField("req23_amount", IntegerType, true),
    StructField("req24_amount", IntegerType, true),
    StructField("correct1_amount", IntegerType, true),
    StructField("correct2_amount", IntegerType, true),
    StructField("correct3_amount", IntegerType, true),
    StructField("correct4_amount", IntegerType, true),
    StructField("correct5_amount", IntegerType, true),
    StructField("correct6_amount", IntegerType, true),
    StructField("correct7_amount", IntegerType, true),
    StructField("correct8_amount", IntegerType, true),
    StructField("correct9_amount", IntegerType, true),
    StructField("correct10_amount", IntegerType, true),
    StructField("correct11_amount", IntegerType, true),
    StructField("correct12_amount", IntegerType, true),
    StructField("correct13_amount", IntegerType, true),
    StructField("correct14_amount", IntegerType, true),
    StructField("correct15_amount", IntegerType, true),
    StructField("correct16_amount", IntegerType, true),
    StructField("correct17_amount", IntegerType, true),
    StructField("correct18_amount", IntegerType, true),
    StructField("correct19_amount", IntegerType, true),
    StructField("correct20_amount", IntegerType, true),
    StructField("correct21_amount", IntegerType, true),
    StructField("correct22_amount", IntegerType, true),
    StructField("correct23_amount", IntegerType, true),
    StructField("correct24_amount", IntegerType, true)
  ))

  //一致率统计映射
  val reqAccSchema = StructType(List(
    StructField("id", StringType, true),
    StructField("statdate", StringType, true),
    StructField("province", StringType, true),
    StructField("citycode", StringType, true),
    StructField("sitecode", StringType, true),
    StructField("compensate", StringType, true),
    StructField("src", StringType, true),
    StructField("simtype", StringType, true),
    StructField("distance", StringType, true),
    StructField("reqamount", IntegerType, true),
    StructField("nullreq_amount", IntegerType, true),
    StructField("percent100_req_amount", IntegerType, true),
    StructField("percent100_acc_amount", IntegerType, true),
    StructField("percent99_req_amount", IntegerType, true),
    StructField("percent99_acc_amount", IntegerType, true),
    StructField("percent98_req_amount", IntegerType, true),
    StructField("percent98_acc_amount", IntegerType, true),
    StructField("percent95_req_amount", IntegerType, true),
    StructField("percent95_acc_amount", IntegerType, true),
    StructField("percent90_req_amount", IntegerType, true),
    StructField("percent90_acc_amount", IntegerType, true),
    StructField("percent85_req_amount", IntegerType, true),
    StructField("percent85_acc_amount", IntegerType, true),
    StructField("percent80_req_amount", IntegerType, true),
    StructField("percent80_acc_amount", IntegerType, true),
    StructField("percent60_req_amount", IntegerType, true),
    StructField("percent60_acc_amount", IntegerType, true)
  ))

  val reqWaySimSchema =  StructType(List(
    StructField("id", StringType, true),
    StructField("statdate", StringType, true),
    StructField("province", StringType, true),
    StructField("citycode", StringType, true),
    StructField("sitecode", StringType, true),
    StructField("compensate", StringType, true),
    StructField("index_type", StringType, true),
    StructField("src", StringType, true),
    StructField("simtype", StringType, true),
    StructField("reqamount", IntegerType, true),
    StructField("correctamount", IntegerType, true),
    StructField("halfhour_reqamount", IntegerType, true),
    StructField("halfhour_correctamount", IntegerType, true),
    StructField("onehour_reqamount", IntegerType, true),
    StructField("onehour_correctamount", IntegerType, true),
    StructField("req1_amount", IntegerType, true),
    StructField("req2_amount", IntegerType, true),
    StructField("req3_amount", IntegerType, true),
    StructField("req4_amount", IntegerType, true),
    StructField("req5_amount", IntegerType, true),
    StructField("req6_amount", IntegerType, true),
    StructField("req7_amount", IntegerType, true),
    StructField("req8_amount", IntegerType, true),
    StructField("req9_amount", IntegerType, true),
    StructField("req10_amount", IntegerType, true),
    StructField("req11_amount", IntegerType, true),
    StructField("req12_amount", IntegerType, true),
    StructField("req13_amount", IntegerType, true),
    StructField("req14_amount", IntegerType, true),
    StructField("req15_amount", IntegerType, true),
    StructField("req16_amount", IntegerType, true),
    StructField("req17_amount", IntegerType, true),
    StructField("req18_amount", IntegerType, true),
    StructField("req19_amount", IntegerType, true),
    StructField("req20_amount", IntegerType, true),
    StructField("req21_amount", IntegerType, true),
    StructField("req22_amount", IntegerType, true),
    StructField("req23_amount", IntegerType, true),
    StructField("req24_amount", IntegerType, true),
    StructField("correct1_amount", IntegerType, true),
    StructField("correct2_amount", IntegerType, true),
    StructField("correct3_amount", IntegerType, true),
    StructField("correct4_amount", IntegerType, true),
    StructField("correct5_amount", IntegerType, true),
    StructField("correct6_amount", IntegerType, true),
    StructField("correct7_amount", IntegerType, true),
    StructField("correct8_amount", IntegerType, true),
    StructField("correct9_amount", IntegerType, true),
    StructField("correct10_amount", IntegerType, true),
    StructField("correct11_amount", IntegerType, true),
    StructField("correct12_amount", IntegerType, true),
    StructField("correct13_amount", IntegerType, true),
    StructField("correct14_amount", IntegerType, true),
    StructField("correct15_amount", IntegerType, true),
    StructField("correct16_amount", IntegerType, true),
    StructField("correct17_amount", IntegerType, true),
    StructField("correct18_amount", IntegerType, true),
    StructField("correct19_amount", IntegerType, true),
    StructField("correct20_amount", IntegerType, true),
    StructField("correct21_amount", IntegerType, true),
    StructField("correct22_amount", IntegerType, true),
    StructField("correct23_amount", IntegerType, true),
    StructField("correct24_amount", IntegerType, true)
  ))

  //使用率统计映射
  val useSchema = StructType(List(
    StructField("id", StringType, true),
    StructField("statdate", StringType, true),
    StructField("province", StringType, true),
    StructField("citycode", StringType, true),
    StructField("sitecode", StringType, true),
    StructField("sdkversion", StringType, true),
    StructField("system", StringType, true),
    StructField("navi_task_amount", IntegerType, true),
    StructField("navi_use_amount", IntegerType, true),
    StructField("gd_use_amount", IntegerType, true),
    StructField("sf_use_amount", IntegerType, true),
    StructField("whole_use_amount", IntegerType, true),
    StructField("driver_amount", IntegerType, true),
    StructField("driver_use_amount", IntegerType, true)
  ))

  //异常结束监控映射
  val aemSchema =  StructType(List(
    StructField("id", StringType, true),
    StructField("statdate", StringType, true),
    StructField("province", StringType, true),
    StructField("citycode", StringType, true),
    StructField("sitecode", StringType, true),
    StructField("sdkVersion", StringType, true),
    StructField("system", StringType, true),
    StructField("intosdk_nonavi_amount", IntegerType, true),
    StructField("navi_amount", IntegerType, true),
    StructField("exception_amount", IntegerType, true),
    StructField("halfway_end_mount", IntegerType, true),
    StructField("exception_notend_amount", IntegerType, true),
    StructField("falshback_amount", IntegerType, true),
    StructField("normal_amount", IntegerType, true),
    StructField("autoend_amount", IntegerType, true),
    StructField("manualend_amount", IntegerType, true)
  ))

  //时间偏差率映射
  val timeDeviationRateSchema = StructType(List(
    StructField("id", StringType, true),
    StructField("statdate", StringType, true),
    StructField("province", StringType, true),
    StructField("citycode", StringType, true),
    StructField("sitecode", StringType, true),
    StructField("src", StringType, true),
    StructField("req_amount", StringType, true),
    StructField("diff_amount", StringType, true),
    StructField("diff_per_amount", StringType, true),
    StructField("req_0_10", StringType, true),
    StructField("diff_0_10", StringType, true),
    StructField("diff_per_0_10", StringType, true),
    StructField("req_10_20", StringType, true),
    StructField("diff_10_20", StringType, true),
    StructField("diff_per_10_20", StringType, true),
    StructField("req_20_40", StringType, true),
    StructField("diff_20_40", StringType, true),
    StructField("diff_per_20_40", StringType, true),
    StructField("req_40_50", StringType, true),
    StructField("diff_40_50", StringType, true),
    StructField("diff_per_40_50", StringType, true),
    StructField("req_50_70", StringType, true),
    StructField("diff_50_70", StringType, true),
    StructField("diff_per_50_70", StringType, true),
    StructField("req_70_90", StringType, true),
    StructField("diff_70_90", StringType, true),
    StructField("diff_per_70_90", StringType, true),
    StructField("req_90_120", StringType, true),
    StructField("diff_90_120", StringType, true),
    StructField("diff_per_90_120", StringType, true),
    StructField("req_120_150", StringType, true),
    StructField("diff_120_150", StringType, true),
    StructField("diff_per_120_150", StringType, true),
    StructField("req_150_180", StringType, true),
    StructField("diff_150_180", StringType, true),
    StructField("diff_per_150_180", StringType, true),
    StructField("req_180_240", StringType, true),
    StructField("diff_180_240", StringType, true),
    StructField("diff_per_180_240", StringType, true),
    StructField("req_240_350", StringType, true),
    StructField("diff_240_350", StringType, true),
    StructField("diff_per_240_350", StringType, true),
    StructField("req_350_370", StringType, true),
    StructField("diff_350_370", StringType, true),
    StructField("diff_per_350_370", StringType, true),
    StructField("req_370", StringType, true),
    StructField("diff_370", StringType, true),
    StructField("diff_per_370", StringType, true)
  ))

  //特定时间偏差率
  val timePeriodDeviationRateSchema =StructType(List(
    StructField("id", StringType, true),
    StructField("statdate", StringType, true),
    StructField("province", StringType, true),
    StructField("citycode", StringType, true),
    StructField("sitecode", StringType, true),
    StructField("src", StringType, true),
    StructField("req_amount", StringType, true),
    StructField("diff_amount", StringType, true),
    StructField("navi_amount", StringType, true),
    StructField("req_half", StringType, true),
    StructField("diff_half", StringType, true),
    StructField("navi_half", StringType, true),
    StructField("req_1", StringType, true),
    StructField("diff_1", StringType, true),
    StructField("navi_1", StringType, true),
    StructField("req_2", StringType, true),
    StructField("diff_2", StringType, true),
    StructField("navi_2", StringType, true),
    StructField("req_3", StringType, true),
    StructField("diff_3", StringType, true),
    StructField("navi_3", StringType, true),
    StructField("req_4", StringType, true),
    StructField("diff_4", StringType, true),
    StructField("navi_4", StringType, true),
    StructField("req_5", StringType, true),
    StructField("diff_5", StringType, true),
    StructField("navi_5", StringType, true),
    StructField("req_6", StringType, true),
    StructField("diff_6", StringType, true),
    StructField("navi_6", StringType, true)
  ))

  /*
  *p2
  */
  //任务量统计映射
  val taskSchema = StructType(List(
    StructField("id", StringType, true),
    StructField("statdate", StringType, true),
    StructField("province", StringType, true),
    StructField("citycode", StringType, true),
    StructField("sitecode", StringType, true),
    StructField("req_status", StringType, true),
    StructField("task_amount", IntegerType, true),
    StructField("navi_amount", IntegerType, true),
    StructField("dist_0", IntegerType, true),
    StructField("dist_50", IntegerType, true),
    StructField("dist_200", IntegerType, true),
    StructField("dist_500", IntegerType, true),
    StructField("city_interior", IntegerType, true),
    StructField("province_interior", IntegerType, true),
    StructField("interprovincial", IntegerType, true)
  ))

  //复用率统计映射
  val reuseSchema = StructType(List(
    StructField("id", StringType, true),
    StructField("statdate", StringType, true),
    StructField("province", StringType, true),
    StructField("citycode", StringType, true),
    StructField("sitecode", StringType, true),
    StructField("sdkversion", StringType, true),
    StructField("system", StringType, true),
    StructField("driver_amount", IntegerType, true),
    StructField("reuse_0", IntegerType, true),
    StructField("reuse_1", IntegerType, true),
    StructField("reuse_2", IntegerType, true),
    StructField("reuse_5", IntegerType, true),
    StructField("reuse_10", IntegerType, true),
    StructField("last_navitask_amount", IntegerType, true),
    StructField("driver_loss_amount", IntegerType, true),
    StructField("driver_keep_amount", IntegerType, true),
    StructField("last_no_navitask_amount", IntegerType, true),
    StructField("driver_add_amount", IntegerType, true)
  ))

  //问卷调查准确率映射
  val qAccSchema = StructType(List(
    StructField("id", StringType, true),
    StructField("statdate", StringType, true),
    StructField("app_version", StringType, true),
    StructField("sysytem", StringType, true),
    StructField("questionnaire_amount", IntegerType, true),
    StructField("driver_amount", IntegerType, true),
    StructField("q1_amount", IntegerType, true),
    StructField("q1_acc_amount", IntegerType, true),
    StructField("q2_amount", IntegerType, true),
    StructField("q2_acc_amount", IntegerType, true),
    StructField("q3_amount", IntegerType, true),
    StructField("q3_acc_amount", IntegerType, true),
    StructField("q4_amount", IntegerType, true),
    StructField("q4_acc_amount", IntegerType, true),
    StructField("q5_amount", IntegerType, true),
    StructField("q5_acc_amount", IntegerType, true),
    StructField("q6_amount", IntegerType, true),
    StructField("q6_acc_amount", IntegerType, true),
    StructField("q7_amount", IntegerType, true),
    StructField("q7_acc_amount", IntegerType, true),
    StructField("q8_amount", IntegerType, true),
    StructField("q8_acc_amount", IntegerType, true),
    StructField("q9_amount", IntegerType, true),
    StructField("q9_acc_amount", IntegerType, true),
    StructField("q10_amount", IntegerType, true),
    StructField("q10_acc_amount", IntegerType, true)
  ))

  //问卷调查错误占比映射
  val qErrSchema = StructType(List(
    StructField("id", StringType, true),
    StructField("statdate", StringType, true),
    StructField("app_version", StringType, true),
    StructField("sysytem", StringType, true),
    StructField("questionnaire_amount", IntegerType, true),
    StructField("driver_amount", IntegerType, true),
    StructField("q1_err_amount", IntegerType, true),
    StructField("q1_max_driver_amount", IntegerType, true),
    StructField("q1_max_driverId", StringType, true),
    StructField("q2_err_amount", IntegerType, true),
    StructField("q2_max_driver_amount", IntegerType, true),
    StructField("q2_max_driverId", StringType, true),
    StructField("q3_err_amount", IntegerType, true),
    StructField("q3_max_driver_amount", IntegerType, true),
    StructField("q3_max_driverid", StringType, true),
    StructField("q3_max_err_type", StringType, true),
    StructField("q3_max_err_type_amount", IntegerType, true),
    StructField("q4_err_amount", IntegerType, true),
    StructField("q4_max_driver_amount", IntegerType, true),
    StructField("q4_max_driverId", StringType, true),
    StructField("q5_err_amount", IntegerType, true),
    StructField("q5_max_driver_amount", IntegerType, true),
    StructField("q5_max_driverId", StringType, true),
    StructField("q6_err_amount", IntegerType, true),
    StructField("q6_max_driver_amount", IntegerType, true),
    StructField("q6_max_driverId", StringType, true),
    StructField("q7_err_amount", IntegerType, true),
    StructField("q7_max_driver_amount", IntegerType, true),
    StructField("q7_max_driverId", StringType, true),
    StructField("q8_err_amount", IntegerType, true),
    StructField("q8_max_driver_amount", IntegerType, true),
    StructField("q8_max_driverid", StringType, true),
    StructField("q8_max_err_type", StringType, true),
    StructField("q8_max_err_type_amount", IntegerType, true),
    StructField("q9_err_amount", IntegerType, true),
    StructField("q9_max_driver_amount", IntegerType, true),
    StructField("q9_max_driverId", StringType, true)
  ))

  //服务指标-响应时间映射
  val respTimeSchema = StructType(List(
    StructField("id", StringType, true),
    StructField("statdate", StringType, true),
    StructField("module", StringType, true),
    StructField("service", StringType, true),
    StructField("resp_0_200", IntegerType, true),
    StructField("resp_200_500", IntegerType, true),
    StructField("resp_500_1000", IntegerType, true),
    StructField("resp_1000_1500", IntegerType, true),
    StructField("resp_1500_2000", IntegerType, true),
    StructField("resp_2000_3000", IntegerType, true),
    StructField("res_3000", IntegerType, true)
  ))

  //服务指标-性能统计映射
  val performanceSchema = StructType(List(
    StructField("id", StringType, true),
    StructField("statdate", StringType, true),
    StructField("module", StringType, true),
    StructField("service", StringType, true),
    StructField("minute", StringType, true),
    StructField("req_peak", IntegerType, true),
    StructField("minu_req_amount", IntegerType, true),
    StructField("avg_cost_time", IntegerType, true),
    StructField("minu_avg_cost_time", IntegerType, true),
    StructField("per99_cost_time", IntegerType, true),
    StructField("minu_per99_cost_time", IntegerType, true)
  ))

  val getSrcMap = (json:JSONObject) => json.getString("src") match {
    case "rp-my" => "传统"
    case "sf" => "传统"
    case "rp-jy-full" => "经验"
    case "rp-jy" => "经验"
    case "rp-jy-art" => "经验"
    case "rp-jy-fixed" => "经验"
    case "jy" => "经验"
    case "gd" => "高德"
    case "rp-gd" => "高德"
    case _ => ""
  }

  //相似度大于0.9过滤
  val getReqOrWayRdd = (rdd: RDD[(Seq[String], JSONObject)],compensate:String,indexType:String,simType:String) => {
    rdd.map(obj => {
      val (dest_province,dest_citycode,dest_deptcode,srcMap) = (obj._1(0),obj._1(1),obj._1(2),obj._1(3))
      obj._2.put("IndexType",indexType)
      if ("未修正指标".equals(indexType))
        obj._2.put("IndexValue",obj._2.getString("ft_right"))
      else
        obj._2.put("IndexValue",obj._2.getString("tl_ft_right"))

      (Seq(dest_province,dest_citycode,dest_deptcode,compensate,indexType,srcMap,simType),obj._2)
    }).filter(obj => {
      obj._1(6) match {
        case "sim1" => 0.9 <= obj._2.getDouble("similarity1")
        case "sim5" => 0.9 <= obj._2.getDouble("similarity5")
        case "sim1And5" => 0.9 <= obj._2.getDouble("similarity1") || 0.9 <= obj._2.getDouble("similarity5")
      }
    })
  }

  /*process*/
  /*p0*/
  def processClickRateStatistics( spark:SparkSession,incDay:String,yesterday:String ):Unit={
    logger.error(">>>开始统计:ETA指标-点击率<<<")

    //从Top3点选线路的ETA结果表读取数据
    val querysql =
      s"""select
         | dest_province,
         |  dest_citycode,
         |  dest_deptcode,
         |  navi_id,
         |  src,
         |  route_index
         |FROM ${clickRateStatisticsSourTable}
         |where inc_day='$incDay'
         |""".stripMargin

    logger.error(querysql)

    val sourRdd = spark.sql(querysql).rdd.repartition(100).map(
      obj => {
        val jsonObj = new JSONObject()
        jsonObj.put("naviId",obj.getString(3))
        jsonObj.put("src",obj.getString(4))
        jsonObj.put("routeindex",obj.getString(5))

        ((obj.getString(0),obj.getString(1),obj.getString(2)),jsonObj)
      }
    ).persist(StorageLevel.DISK_ONLY)

    logger.error(s"共获取从Top3点选线路的ETA数据共:${sourRdd.count()}")

    //统计点选指标
    val clickRateRdd = doClickRateStatistics(sourRdd,incDay)

    //保存到hive
    SparkUtils.df2Hive(spark,clickRateRdd,clickRateSchema,"append","dm_gis."+clickRateStatisticsDescTable,"statdate",incDay,logger)
    //测试注释
    //保存到mysql
    SparkUtils.df2Mysql(spark,clickRateRdd,clickRateSchema,descMysqlUserName,descMysqlPassWord,
      "append",descMysqlUrl,clickRateStatisticsDescTable,incDay,logger)

    clickRateRdd.unpersist()
  }

  def processYawStatistics( spark:SparkSession,incDay:String,yesterday:String ):Unit={
    logger.error(">>>开始统计:ETA指标-偏航统计<<<")

    //读取导航表数据
    val querySql =
      s"""
         |select
         |*
         |from
         |(
         |select
         | dest_province,
         |  dest_citycode,
         |  dest_deptcode,
         |  sdk_version,
         |  system,
         |  navi_id,
         |  route_src,
         |  navi_starttime,
         |  route_order,
         |  hasYaw,
         |  route_type,
         |  route_count,
         |  row_number() over(partition by navi_id order by route_order asc) num
         |FROM ${yawStatisticsSourTable}
         |where inc_day='$incDay'
         | and navi_starttime is not null and navi_starttime <> '' and navi_starttime != 'null'
         | and routeid is not null and routeid <>'' and routeid <>'null'
         | and navi_endtime != ''
         | and navi_endtime is not null
         | ) t
         | -- where num = 1
         |""".stripMargin

    logger.error(querySql)

    val sourRdd = spark.sql(querySql).rdd.repartition(100).map(
      obj => {
        val jsonObj = new JSONObject()
        jsonObj.put("naviId",obj.getString(5))
        jsonObj.put("src",obj.getString(6))
        jsonObj.put("naviStartTime",obj.getString(7))
        jsonObj.put("route_order",obj.getString(8))
        jsonObj.put("hasYaw",obj.getString(9))
        jsonObj.put("route_type",obj.getString(10))
        jsonObj.put("route_count",obj.getString(11))

        ((obj.getString(0),obj.getString(1),obj.getString(2),obj.getString(3),obj.getString(4)),jsonObj)
      }
    ).persist(StorageLevel.DISK_ONLY)

    logger.error(s"共查询eta导航数据:${sourRdd.count()}")

    //开始进行偏航统计
    val yawRdd = doYawStatistics(sourRdd,incDay)

    //保存到hive
    SparkUtils.df2Hive(spark,yawRdd,yawSchema,"append","dm_gis."+yawStatisticsDescTable,"statdate",incDay,logger)
//  测试需注释掉
    //保存到mysql
    SparkUtils.df2Mysql(spark,yawRdd,yawSchema,descMysqlUserName,descMysqlPassWord,
      "append",descMysqlUrl,yawStatisticsDescTable,incDay,logger)

    yawRdd.unpersist()
  }

  def processAccuracyStatistics( spark:SparkSession,incDay:String,yesterday:String ):Unit = {
    logger.error(">>>开始统计:ETA指标-准确率统计<<<")

    //读取ETA结果汇总表
    val querySql =
      s"""
         |select
         | dest_province,
         |  dest_citycode,
         |  dest_deptcode,
         |  ft_right,
         |  tl_ft_right,
         |  src,
         |  duration,
         |  plan_date
         |FROM $accStatisticsSourTable
         |where inc_day='$incDay'
         | and req_status = '0'
         |""".stripMargin
    logger.error(querySql)

    val sourUnFixRdd = spark.sql(querySql).rdd.repartition(100).map (
      obj => {
        val jsonObj = new JSONObject()
        jsonObj.put("IndexType","未修正指标")
        jsonObj.put("IndexValue",obj.getString(3))
        jsonObj.put("src",obj.getString(5))
        jsonObj.put("duration",obj.getString(6))
        jsonObj.put("planDate",obj.getString(7))

        val srcMap = getSrcMap( jsonObj )
        (Seq( obj.getString(0), obj.getString(1), obj.getString(2), srcMap ), jsonObj)
      }
    ).persist( StorageLevel.DISK_ONLY )

    logger.error( s"获取未修正数据共:${ sourUnFixRdd.count() }" )

    val sourFixRdd = spark.sql(querySql).rdd.repartition(100).map(
      obj => {
        val jsonObj = new JSONObject()
        jsonObj.put("IndexType","修正指标")
        jsonObj.put("IndexValue",obj.getString(4))
        jsonObj.put("src",obj.getString(5))
        jsonObj.put("duration",obj.getString(6))
        jsonObj.put("planDate",obj.getString(7))

        val srcMap = getSrcMap( jsonObj )
        ( Seq( obj.getString(0), obj.getString(1), obj.getString(2), srcMap ), jsonObj )
      }
    ).persist( StorageLevel.DISK_ONLY )

    logger.error(s"获取修正数据共:${ sourFixRdd.count() }")

    //开始进行准确率统计
    val accRdd = doAccuracyStatistics(sourUnFixRdd,incDay ).union(doAccuracyStatistics( sourFixRdd, incDay ))

    //保存到hive
    SparkUtils.df2Hive(spark,accRdd,accSchema,"append","dm_gis."+accStatisticsDescTable,"statdate",incDay,logger)
    //测试注释
    //保存到mysql
    SparkUtils.df2Mysql(spark,accRdd,accSchema,descMysqlUserName,descMysqlPassWord,
      "append",descMysqlUrl,accStatisticsDescTable,incDay,logger)

    accRdd.unpersist()
  }

  def processConsistentStatistics( spark:SparkSession,incDay:String,yesterday:String ):Unit={
    logger.error(">>>开始统计:ETA指标-一致率统计<<<")

    //读取ETA结果汇总表
    val querySql =
      s"""
         |select
         | dest_province,
         |  dest_citycode,
         |  dest_deptcode,
         |  ft_right,
         |  tl_ft_right,
         |  src,
         |  duration,
         |  routeId,
         |  req_order,
         |  similarity1,
         |  similarity5,
         |  plan_date,
         |  navi_endstatus,
         |  req_type,
         |  case when distance >=500 and distance < 1000 then '500'
         |   when distance >= 1000 and distance < 5000  then '1000'
         |   when distance >= 5000 and distance < 10000 then '5000'
         |   when distance >= 10000 then '10000'
         |  else '0'
         |  end as distance
         |FROM $consistentSourTable
         |where inc_day='$incDay'
         | -- and req_status = '0'
         | and plan_date is not null and plan_date <>''
         | and cast (similarity1 as float) >= 0 and  cast (similarity1 as float) <= 1
         | and cast (similarity5 as float) >= 0 and  cast (similarity5 as float) <= 1
         | and routeId is not null and routeId <>'' and routeId <>'null'
         |""".stripMargin
    logger.error(querySql)

    logger.error(">>>开始统计相似度不同区间的请求量和准确量<<<")
    val sourRdd = spark.sql(querySql).rdd.repartition(100).map(obj => {
      val jsonObj = new JSONObject()
      jsonObj.put("ft_right",obj.getString(3))
      jsonObj.put("tl_ft_right",obj.getString(4))
      jsonObj.put("src",obj.getString(5))
      jsonObj.put("duration",obj.getString(6))
      jsonObj.put("routeId",obj.getString(7))
      jsonObj.put("req_order",obj.getString(8))
      jsonObj.put("similarity1",obj.getString(9))
      jsonObj.put("similarity5",obj.getString(10))
      jsonObj.put("planDate",obj.getString(11))
      jsonObj.put("navi_endstatus",obj.getString(12))
      jsonObj.put("req_type",obj.getString(13))

      //20210709增加字段
      jsonObj.put("distance",obj.getString(14))

      val srcMap = getSrcMap(jsonObj)

      (Seq(obj.getString(0),obj.getString(1),obj.getString(2),srcMap,obj.getString(14)),jsonObj)
    }).persist(StorageLevel.DISK_ONLY)
    logger.error(s"按请求获取汇总结果:${sourRdd.count()}")

    //按请求统计请求量和准确量
    val reqRdd = doReqAccStatistics(sourRdd,incDay,"ReqAccIndex","开始按请求统计请求量和准确率")

    //top3请求统计
    val wayReqRdd = sourRdd.filter(json => {
      "top3".equals(JSONUtils.getJsonValue(json._2,"req_type",""))
    }).persist(StorageLevel.DISK_ONLY)

    //按线路统计请求量和准确量
//    val wayReqRdd = sourRdd.map(obj => {
//      ((obj._1(0),obj._1(1),obj._1(2),obj._1(3),obj._2.getString("routeId")),obj._2)
//    }).groupByKey()
//      .map(obj => {
//        val (dest_province,dest_citycode,dest_deptcode,srcMap,_) = obj._1
//
//        //取数条件为:navi_endstatus属于1-9下,req_type=top3或yaw,并且满足同routeId下req_order最小的请求记录
//        val resList = obj._2.toList.filter(json => {
//          Range(1,9).toString().contains(json.getString("navi_endstatus")) &&
//            Array("top3","yaw").contains(json.getString("req_type"))
//        })
//
//        val value = if (resList != null && resList.size > 0) resList.minBy( _.getString("req_order")) else new JSONObject()
//        (Seq(dest_province,dest_citycode,dest_deptcode,srcMap),value)
//      }).filter(json => json._2 != null && json._2.size() > 0).persist(StorageLevel.DISK_ONLY)

    logger.error(s"按线路获取汇总结果:${wayReqRdd.count()}")

    val wayRdd = doReqAccStatistics(wayReqRdd,incDay,"WayAccIndex","开始按线路统计请求量和准确率")

    //合并线路和请求方式同记得请求量和准确量
    val reqAccRdd = reqRdd.union(wayRdd).persist(StorageLevel.DISK_ONLY)

    //保存到hive
    SparkUtils.df2Hive(spark,reqAccRdd,reqAccSchema,"append","dm_gis."+reqAccStatisticsDescTable,"statdate",incDay,logger)

    //测试注释
//    保存到mysql
    SparkUtils.df2Mysql(spark,reqAccRdd,reqAccSchema,descMysqlUserName,descMysqlPassWord,
      "append",descMysqlUrl,reqAccStatisticsDescTable,incDay,logger)

    reqRdd.unpersist()

    logger.error(s"共统计请求和访问:${reqAccRdd.count()}")


    //按请求统计相似度≥0.9的区间的准确率
    val reqRdd2 = getReqOrWayRdd(sourRdd,"ReqAccIndex","未修正指标","sim1").union(getReqOrWayRdd(sourRdd,"ReqAccIndex","未修正指标","sim5"))
      .union(getReqOrWayRdd(sourRdd,"ReqAccIndex","未修正指标","sim1And5")).union(getReqOrWayRdd(sourRdd,"ReqAccIndex","修正指标","sim1"))
      .union(getReqOrWayRdd(sourRdd,"ReqAccIndex","修正指标","sim5")).union(getReqOrWayRdd(sourRdd,"ReqAccIndex","修正指标","sim1And5"))
    val reqSimRdd = doAccuracyStatistics(reqRdd2,incDay)

    //按路线统计相似度≥0.9的区间的准确率
    val wayRdd2 = getReqOrWayRdd(wayReqRdd,"WayAccIndex","未修正指标","sim1").union(getReqOrWayRdd(wayReqRdd,"WayAccIndex","未修正指标","sim5"))
      .union(getReqOrWayRdd(wayReqRdd,"WayAccIndex","未修正指标","sim1And5")).union(getReqOrWayRdd(wayReqRdd,"WayAccIndex","修正指标","sim1"))
      .union(getReqOrWayRdd(wayReqRdd,"WayAccIndex","修正指标","sim5")).union(getReqOrWayRdd(wayReqRdd,"WayAccIndex","修正指标","sim1And5"))
    val waySimRdd = doAccuracyStatistics(wayRdd2,incDay)

    //合并
    val reqWaySimRdd = reqSimRdd.union(waySimRdd)

    //保存到hive
    SparkUtils.df2Hive(spark,reqWaySimRdd,reqWaySimSchema,"append","dm_gis."+accConStatisticsDescTable,"statdate",incDay,logger)

    //保存到mysql
    SparkUtils.df2Mysql(spark,reqWaySimRdd,reqWaySimSchema,descMysqlUserName,descMysqlPassWord,
      "append",descMysqlUrl,accConStatisticsDescTable,incDay,logger)

    sourRdd.unpersist()
    reqWaySimRdd.unpersist()
    reqRdd2.unpersist()
    wayRdd2.unpersist()
    wayRdd.unpersist()
  }

  def processUseRateStatistics( spark : SparkSession ,incDay:String,yesterday:String ):Unit={
    logger.error(">>>开始统计:ETA指标-导航使用率统计<<<")

    //获取导航数据
    val querySql =
      s"""
         |select
         |  dest_province,
         |  dest_citycode,
         |  dest_deptcode,
         |  sdk_version,
         |  system,
         |  task_id,
         |  src_deptcode,
         |  dest_deptcode,
         |  routeid,
         |  navi_endstatus,
         |  navi_starttime,
         |  navi_id
         |from ${useRateSourTable}
         |where inc_day= '${incDay}'
         |""".stripMargin

    logger.error(querySql)

    val naviRdd = spark.sql(querySql).rdd.repartition(100).map( obj => {
      val jsonObj = new JSONObject()
      jsonObj.put("dest_province",obj.getString(0))
      jsonObj.put("dest_citycode",obj.getString(1))
      jsonObj.put("dest_deptcode",obj.getString(2))
      jsonObj.put("sdk_version",obj.getString(3))
      jsonObj.put("system",obj.getString(4))
      jsonObj.put("task_id",obj.getString(5))
      jsonObj.put("src_dept_code",obj.getString(6))
      jsonObj.put("dest_dept_code",obj.getString(7))
      jsonObj.put("route_id",obj.getString(8))
      jsonObj.put("navi_end_status",obj.getString(9))
      jsonObj.put("navi_starttime",obj.getString(10))
      jsonObj.put("navi_id",obj.getString(11))
      jsonObj.put("navi_endtime",obj.getString(10))

      ((obj.getString(5),obj.getString(7)),jsonObj)
    }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"获取导航数据共${naviRdd.count()}")

    //获取顺路数据
    val sLRdd = getSLTask(spark,incDay,yesterday)

    //获取关联经停数据
    val joinTaskPassRdd = getTaskPass(spark,sLRdd,incDay,yesterday)

    //关联司机操作日志记录
    val joinDriverTaskRdd = joinDriverTask(spark,joinTaskPassRdd,incDay,yesterday)

    //关联导航信息
    val joinNaviInfoRdd = joinNaviInfo(spark,joinDriverTaskRdd,incDay,yesterday)

    //导航关联顺路数据
    val totalRdd = joinSlNavi(joinNaviInfoRdd,naviRdd)

    //统计指标
    val useDf = doUseRateStatistics(totalRdd,incDay)

    //保存到hive
    SparkUtils.df2Hive(spark,useDf,useSchema,"append","dm_gis."+useRateDestTable,"statdate",incDay,logger)

    //保存到mysql
//    SparkUtils.df2Mysql(spark,useDf,useSchema,descMysqlUserName,descMysqlPassWord,
//      "append",descMysqlUrl,useRateDestTable,incDay,logger)

    useDf.unpersist()
  }

  def processAbnormalExitMonitor( spark : SparkSession,incDay:String,yesterday:String ): Unit = {
    logger.error(">>>开始统计:ETA指标-异常退出监控<<<")

    //从导航表查询数据
    val querySql =
      s"""
         |select
         |  dest_province,
         |  dest_citycode,
         |  dest_deptcode,
         |  sdk_version,
         |  system,
         |  navi_id,
         |  navi_endstatus,
         |  routeid,
         |  navi_starttime
         |FROM ${aemStatisticsSourTable}
         |where inc_day='$incDay'
         |and navi_endtime <> ''
         |and navi_endtime is not null
         |""".stripMargin
    logger.error(querySql)

    val sourRdd = spark.sql(querySql).rdd.repartition(100).map(
      obj => {
        val jsonObj = new JSONObject()
        jsonObj.put("naviId",obj.getString(5))
        jsonObj.put("naviEndStatus",obj.getString(6))
        jsonObj.put("route_id",obj.getString(7))
        jsonObj.put("naviStartTime",obj.getString(8))

        ((obj.getString(0),obj.getString(1),obj.getString(2),obj.getString(3),obj.getString(4)),jsonObj)
      }
    ).persist(StorageLevel.DISK_ONLY)

    logger.error(s"共获取导航数据:${sourRdd.count}")

    //开始进行监控统计
    val aemRdd = doAbnormalExitMonitor(sourRdd,incDay)

    //保存到hive中
    SparkUtils.df2Hive(spark,aemRdd,aemSchema,"append","dm_gis."+aemStatisticsoDescTable,"statdate",incDay,logger)

    //保存到mysql中
    SparkUtils.df2Mysql(spark,aemRdd,aemSchema,descMysqlUserName,descMysqlPassWord,
      "append",descMysqlUrl,aemStatisticsoDescTable,incDay,logger)

    aemRdd.unpersist()
  }

  def processTimeDeviationRate(spark : SparkSession,incDay:String,yesterday:String): Unit ={
    logger.error("开始统计eta指标-时间偏差率")

    val querySql =
      s"""
         |select
         |  dest_province,
         |  dest_citycode,
         |  dest_deptcode,
         |  src,
         |  diff_time,
         |  navi_time,
         |  duration,
         |  request_id
         |from $timeDeviationRateSourTable
         |where inc_day='$incDay'
         |  and req_status = '0'
         |  and duration is not null and duration <> '' and duration <> 'null'
         |  and diff_time is not null and diff_time <> '' and diff_time <> 'null'
         |  and navi_time is not null and navi_time <> '' and navi_time <> 'null'
         |""".stripMargin

    logger.error(querySql)
    val sourRdd = spark.sql(querySql).rdd.repartition(100)
      .map( obj => {
        val jsonObj = new JSONObject()
        jsonObj.put("src",obj.getString(3))
        jsonObj.put("diff_time",obj.getString(4))
        jsonObj.put("navi_time",obj.getString(5))
        jsonObj.put("duration",obj.getString(6))
        jsonObj.put("request_id",obj.getString(7))

        val srcMap = getSrcMap(jsonObj)

        ((obj.getString(0),obj.getString(1),obj.getString(2),srcMap),jsonObj)
      }).persist(StorageLevel.DISK_ONLY)
    logger.error(s"获取ETA总汇总结果:${sourRdd.count()}")

    //统计时间偏差率
    val timeDeviationRateRdd= doTimeDeviationRate(spark,sourRdd,incDay)

    logger.error("时间偏差率总数据量为:" + timeDeviationRateRdd.count())
    timeDeviationRateRdd.take(1).foreach(println(_))

    if(timeDeviationRateRdd.count() > 1) {

      //保存到hive中
      SparkUtils.df2Hive(spark,timeDeviationRateRdd,timeDeviationRateSchema,"append",
        "dm_gis."+timeDeviationRateDescTable,"statdate",incDay,logger)

      //保存到mysql中
      SparkUtils.df2Mysql(spark,timeDeviationRateRdd,timeDeviationRateSchema,descMysqlUserName,descMysqlPassWord,
        "append",descMysqlUrl,timeDeviationRateDescTable,incDay,logger)

    }

    timeDeviationRateRdd.unpersist()
  }

  def processTimePeriodDeviationRate(spark : SparkSession,incDay:String,yesterday:String): Unit ={
    logger.error("开始统计特定时间的时间偏差率")

    val querySql =
      s"""
         |select
         |  dest_province,
         |  dest_citycode,
         |  dest_deptcode,
         |  src,
         |  diff_time,
         |  navi_time,
         |  req_time,
         |  navi_endtime
         |from $timePeriodDeviationRateSourTable
         |where inc_day='$incDay'
         |  and req_status = '0'
         |  and diff_time is not null and diff_time <> '' and diff_time <> 'null'
         |  and navi_time is not null and navi_time <> '' and navi_time <> 'null'
         |  and req_time is not null and req_time <> '' and req_time <> 'null'
         |  and navi_endtime is not null and navi_endtime <> '' and navi_endtime <> 'null'
         |""".stripMargin
    logger.error(querySql)
    val sourRdd = spark.sql(querySql).rdd.repartition(100)
      .map(obj => {
        val jsonObj = new JSONObject()
        jsonObj.put("src",obj.getString(3))
        jsonObj.put("diff_time",obj.getString(4))
        jsonObj.put("navi_time",obj.getString(5))
        jsonObj.put("req_time",obj.getString(6))
        jsonObj.put("navi_endtime",obj.getString(7))

        val srcMap = getSrcMap(jsonObj)

        ((obj.getString(0),obj.getString(1),obj.getString(2),srcMap),jsonObj)
      }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"获取ETA总汇总结果:${sourRdd.count}")

    //统计时段时间偏差率
    val timePeriodDeviationRateRdd = doTimePeriodDeviationRate(spark,sourRdd,incDay)

    logger.error("时间偏差率总数据量为:" + timePeriodDeviationRateRdd.count())
    timePeriodDeviationRateRdd.take(1).foreach(println(_))


    if(timePeriodDeviationRateRdd.count() > 1) {

      //保存到hive中
      SparkUtils.df2Hive(spark, timePeriodDeviationRateRdd, timePeriodDeviationRateSchema, "append",
        "dm_gis." + timePeriodDeviationRateDescTable, "statdate", incDay, logger)

      //入库mysql
      SparkUtils.df2Mysql(spark, timePeriodDeviationRateRdd, timePeriodDeviationRateSchema, descMysqlUserName, descMysqlPassWord,
        "append", descMysqlUrl, timePeriodDeviationRateDescTable, incDay, logger)

    }

    timePeriodDeviationRateRdd.unpersist()
  }

  /*p2*/
  def processTaskAmountStatistics( spark:SparkSession,incDay:String,yesterday:String ): Unit ={
    logger.error("开始统计任务量")

    //20210429变更sql

    val querySql =
      s"""
         |select
         |   src_province,
         |   src_citycode,
         |   src_deptcode,
         |   dest_province,
         |   dest_citycode,
         |   dest_deptcode,
         |   a.navi_endStatus navi_endStatus,
         |   b.distance distance,
         |   a.navi_id navi_id,
         |   task_id
         |from
         |(
         |select *
         |from
         |(
         |   select
         |      src_province,
         |      src_citycode,
         |      src_deptcode,
         |      dest_province,
         |      dest_citycode,
         |      dest_deptcode,
         |      navi_endStatus,
         |      navi_id,
         |      task_id,
         |      row_number() over(partition by navi_id order by route_order asc) num
         |    from dm_gis.gis_navi_result_union
         |    where
         |      inc_day = '$incDay'
         |    -- and
         |    --  navi_endStatus <> ''
         |    and
         |      route_type='top3'
         |) t
         |where num = 1
         |) a
         |left outer join
         |(
         |
         |select
         |  navi_id,distance
         |from
         |(
         |select
         |   navi_id,
         |   distance,
         |   row_number() over(partition by navi_id order by route_order asc,req_order asc) num
         |from
         |   gis_navi_eta_result_tmp
         |where
         |   inc_day = '$incDay'
         |and
         |   navi_endStatus <> ''
         |   )c
         |where
         |  c.num = 1
         |) b
         |on a.navi_id = b.navi_id
       """.stripMargin


//    val querySql =
//      s"""
//         |select
//         |  src_province,
//         |  src_citycode,
//         |  src_deptcode,
//         | dest_province,
//         | dest_citycode,
//         | dest_deptcode,
//         | navi_endStatus,
//         | distance,
//         | navi_id,
//         |  task_id
//         |from (
//         | select
//         |      src_province,
//         |      src_citycode,
//         |      src_deptcode,
//         |      dest_province,
//         |      dest_citycode,
//         |      dest_deptcode,
//         |      navi_endStatus,
//         |      distance,
//         |      navi_id,
//         |      task_id,
//         |      row_number() over(partition by navi_id order by route_order asc,req_order asc) num
//         |    from $taskAmountStatSourTable
//         |    where inc_day = '$incDay' and navi_endStatus <> ''
//         |) t
//         |where num = 1
//         |""".stripMargin

    logger.error( querySql )

    val sourRdd = spark.sql( querySql ).rdd.repartition(100 ).map( obj => {
      val jsonObj = new JSONObject()
      jsonObj.put("src_province",obj.getString(0))
      jsonObj.put("src_citycode",obj.getString(1))
      jsonObj.put("src_deptcode",obj.getString(2))
      jsonObj.put("dest_province",obj.getString(3))
      jsonObj.put("dest_citycode",obj.getString(4))
      jsonObj.put("dest_deptcode",obj.getString(5))
      jsonObj.put("distance",obj.getString(7))
      jsonObj.put("navi_id",obj.getString(8))
      jsonObj.put("task_id",obj.getString(9))

      val status= try {obj.getInt(6)} catch {case e:Exception => 0}
      val naviEndStatus = status match {
        case status if (status.toInt == 0) => 0
        case status if (status.toInt >= 1 && status.toInt <= 9) => 1
        case status if(status.toInt >= 10) => 2
      }

      ((obj.getString(3),obj.getString(4),obj.getString(5),naviEndStatus.toString),jsonObj)
    }).persist( StorageLevel.DISK_ONLY )

    logger.error(s"获取导航任务数据共:${ sourRdd.count() }")

    //进行指标统计
    val taskAmountDf = doTaskAmountStatistics( sourRdd, incDay )

    //存入hive
    SparkUtils.df2Hive( spark,taskAmountDf,taskSchema,"append","dm_gis."+taskAmountStatDescTable,"statdate",incDay,logger )
    // 测试需注释掉
    //保存到mysql
    SparkUtils.df2Mysql( spark,taskAmountDf,taskSchema,descMysqlUserName,descMysqlPassWord,
      "append",descMysqlUrl,taskAmountStatDescTable,incDay,logger )

    taskAmountDf.unpersist()
  }

  def processReuseRateStatistics ( spark:SparkSession,incDay:String,yesterday:String ): Unit = {
    logger.error("开始统计复用率")

    //关联查询历史司机复用和当天复用情况
    val queryHisReuseSql =
      s"""
         |select
         |     nvl(t1.dest_province,t2.province) as province,
         |      nvl(t1.dest_citycode,t2.citycode) as citycode,
         |      nvl(t1.dest_deptcode,t2.sitecode) as sitecode,
         |      nvl(t1.sdk_version,t2.sdkversion) as sdkversion,
         |      nvl(t1.system,t2.system) as system,
         |      nvl(t1.driver_id,t2.driver_id) as driver_id,
         |      nvl(t1.use_amount,0) + nvl(t2.his_use_amount,0) as his_use_amount,
         |      '$incDay' as statdate
         |from (
         |    select
         |       dest_province,
         |        dest_citycode,
         |        dest_deptcode,
         |        sdk_version,
         |        system,
         |        driver_id,
         |        count(1) as use_amount
         |    from ${reuseStatSourTable}
         |    where inc_day='$incDay'
         |    group by dest_province,dest_citycode,dest_deptcode,sdk_version,system,driver_id
         |) t1
         |FULL JOIN
         |(
         | SELECT
         |      province,
         |      citycode,
         |      sitecode,
         |      sdkversion,
         |      system,
         |      driver_id,
         |      his_use_amount
         |    from dm_gis.eta_index_reuse_his_i
         |    where statdate='$yesterday'
         |) t2
         |on t1.dest_province = t2.province
         |   and t1.dest_citycode = t2.citycode
         |   and t1.dest_deptcode = t2.sitecode
         |   and t1.sdk_version = t2.sdkversion
         |   and t1.system = t2.system
         |   and t1.driver_id = t2.driver_id
         |""".stripMargin

    logger.error(queryHisReuseSql)

    val hisReuseDf = spark.sql(queryHisReuseSql).repartition(100).persist(StorageLevel.DISK_ONLY)

    logger.error(s"获取历史复用数据共${hisReuseDf.count()}")

    logger.error("保存复用情况到hive历史复用情况中")
    SparkUtils.df2Hive( spark,hisReuseDf,"append","dm_gis.eta_index_reuse_his_i","statdate",incDay,logger )

    //统计司机复用情况
    val driverReuseRdd = doDriverReuseStatistics(hisReuseDf)

    //统计司机变更情况
    val driverChangeRdd = doDriverChangeStatistics(spark,incDay,yesterday)

    //关联司机复用和司机变更
    val driverReuseChangeDf = joinDriverReuseChange(driverChangeRdd,driverReuseRdd,incDay)

    //存入hive
    SparkUtils.df2Hive(spark,driverReuseChangeDf,reuseSchema,"append","dm_gis."+reuseStatDestTable,"statdate",incDay,logger)

    //存入mysql
    SparkUtils.df2Mysql(spark,driverReuseChangeDf,reuseSchema,descMysqlUserName,descMysqlPassWord,
      "append",descMysqlUrl,reuseStatDestTable,incDay,logger)

    driverReuseChangeDf.unpersist()
  }

  def processQuestionnaireAccRateStatistics( spark:SparkSession,incDay:String,yesterday:String ): Unit = {
    logger.error("开始统计问卷调查率")

    //获取问卷调查数据
    val questionnaireDataRdd = Option(questionnaireRdd).getOrElse( getQuestionnaireData(spark,incDay,yesterday) )

    //问卷调查正确率
    val questionnaireAccRateDf = doQuestionnaireAccRateStatistics(questionnaireDataRdd,incDay,yesterday)

    //存入hive
    SparkUtils.df2Hive(spark,questionnaireAccRateDf,qAccSchema,"append","dm_gis."+questionnaireAccDestTable,
      "statdate",incDay,logger)
    //测试注释
    //存入mysql
    SparkUtils.df2Mysql(spark,questionnaireAccRateDf,qAccSchema,descMysqlUserName,descMysqlPassWord,
      "append",descMysqlUrl,questionnaireAccDestTable,incDay,logger)

    questionnaireAccRateDf.unpersist()
  }

  def processQuestionnaireDriverErrPerStatistics( spark:SparkSession,incDay:String,yesterday:String ): Unit = {
    logger.error("开始统计问卷调查司机错误占比")

    val questionnaireDataRdd = Option(questionnaireRdd).getOrElse( getQuestionnaireData(spark,incDay,yesterday) )

    //统计问卷调查司机错误占比
    val questionnaireErrRateDf = doQuestionnaireErrRateStatistics(questionnaireDataRdd,incDay,yesterday)

    //存入hive
    SparkUtils.df2Hive(spark,questionnaireErrRateDf,qErrSchema,"append","dm_gis."+questionnaireErrDestTable,
      "statdate",incDay,logger)

    //存入mysql
    SparkUtils.df2Mysql(spark,questionnaireErrRateDf,qErrSchema,descMysqlUserName,descMysqlPassWord,
      "append",descMysqlUrl,questionnaireErrDestTable,incDay,logger)

    questionnaireErrRateDf.unpersist()
  }

  def processServiceResponseStatistics( spark:SparkSession,incDay:String,yesterday:String ): Unit ={
    logger.error("开始统计服务指标-响应时间")
    val serviceCostTimeDataRdd = Option(serviceCostTimeRdd).getOrElse( getServiceCostTimeData(spark,incDay,yesterday))

    //统计服务指标-响应时间
    val serviceCostTimeDf = doServiceResponseStatistics(serviceCostTimeDataRdd,incDay)

    //存入hive
    SparkUtils.df2Hive(spark,serviceCostTimeDf,respTimeSchema,"append","dm_gis."+serviceResponseDescTable,
      "statdate",incDay,logger)

    //存入mysql
    SparkUtils.df2Mysql(spark,serviceCostTimeDf,respTimeSchema,descMysqlUserName,descMysqlPassWord,
      "append",descMysqlUrl,serviceResponseDescTable,incDay,logger)

    serviceCostTimeDf.unpersist()
  }

  def processServicePerformanceStatistics( spark:SparkSession,incDay:String,yesterday:String ): Unit ={
    logger.error("开始统计服务指标-性能统计")
    val df = new SimpleDateFormat("yyyyMMdd HH:mm:ss")
    val tm1 = df.parse(incDay + " 00:00:00").getTime

    //获取请求时间戳所属的分钟
    val serviceCostTimeDataRdd = Option(serviceCostTimeRdd).getOrElse( getServiceCostTimeData(spark,incDay,yesterday))
      .filter(json => StringUtils.isNotBlank(json._2.getString("req_time")))
      .map(obj => {
        val (module,service) = obj._1
        val minute = DateUtil.getCurrentMinDiff(tm1,obj._2.getLong("req_time"))
        ((module,service,minute),obj._2)
      }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"获取查询时间和响应时间都部位空的数据共:${serviceCostTimeDataRdd.count}")

    //统计服务指标-性能统计
    val serviceCostTimeDf = doServicePerformanceStatistics(spark,serviceCostTimeDataRdd,incDay)

    //存入hive
    SparkUtils.df2Hive(spark,serviceCostTimeDf,performanceSchema,"append","dm_gis."+servicePerformanceDescTable,
      "statdate",incDay,logger)

    //存入mysql
    SparkUtils.df2Mysql(spark,serviceCostTimeDf,performanceSchema,descMysqlUserName,descMysqlPassWord,
      "append",descMysqlUrl,servicePerformanceDescTable,incDay,logger)

    serviceCostTimeDf.unpersist()
  }

  /*do anything*/
  /*p1*/
  def doClickRateStatistics( sourRdd: RDD[((String, String, String), JSONObject)],incDay: String) = {


    //按照省份(dest_province)、城市(dest_citycode)、场地(dest_deptcode)、日期(inc_day)聚合
    val clickRateDf =
      sourRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp)
        .map(obj => {
          val (dest_province,dest_citycode,dest_deptcode) = obj._1
          val resList = obj._2

          val md5Instance = MD5Util.getMD5Instance
          val id = MD5Util.getMD5(md5Instance, Array(incDay,dest_province,dest_citycode,dest_deptcode).mkString("_"))
          //val id = Base64.getEncoder().encodeToString(Array(incDay,dest_province,dest_citycode,dest_deptcode).mkString("_").getBytes("utf-8"))
          //总导航量
          val naviAmount = resList.length
          //选择了第几条线路统计
          val routeindexMap = resList.groupBy(_.getString("routeindex"))
          val firstWayAmount = routeindexMap.getOrElse("0",List()).length
          val secondWayAmount = routeindexMap.getOrElse("1",List()).length
          val thirdWayAmount = routeindexMap.getOrElse("2",List()).length
          //传统/经验统计
          val srcTypeMap = resList.map(json => {
            val src = getSrcMap(json)

            json.put("src",src)
            json
          }).groupBy(_.getString("src"))
          val traditionTypeAmount = srcTypeMap.getOrElse("传统",List()).length
          val experienceTypeAmount = srcTypeMap.getOrElse("经验",List()).length
          //20210429新增gd统计
          val gdTypeAmount = srcTypeMap.getOrElse("高德",List()).length

          Row(id,incDay,dest_province,dest_citycode,dest_deptcode,naviAmount,firstWayAmount,
            secondWayAmount,thirdWayAmount,traditionTypeAmount,experienceTypeAmount,gdTypeAmount)
        }).persist(StorageLevel.DISK_ONLY)

    sourRdd.unpersist()
    logger.error(s"共统计点选率指标:${clickRateDf.count()}")

    clickRateDf
  }

  def doYawStatistics(sourRdd: RDD[((String, String, String, String, String), JSONObject)],incDay: String)={
    logger.error(s"开始进行偏航统计")

    val yawDf =
      sourRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp)
        .map(obj => {
          val ( dest_province,dest_citycode,dest_deptcode,sdkVersion,system ) = obj._1
          val resList = obj._2
          val md5Instance = MD5Util.getMD5Instance
          val id = MD5Util.getMD5(md5Instance, Array(incDay,dest_province,dest_citycode,dest_deptcode,sdkVersion,system).mkString("_"))

          //导航次数
          val naviDistinctList = resList.map(_.getString("naviId")).distinct
          val naviAmount = naviDistinctList.length

          //偏航预判平均次数
          val yawPredictAvg = resList.length / naviDistinctList.length

          //偏航终判平均次数
          val hasYawList = resList.filter(json => "true".equals(json.getString("hasYaw")))
          val yawFinalJudgmentAvg = hasYawList.length / naviDistinctList.length

          //99%偏航
          val hasYawRouteCountList = hasYawList.groupBy(_.getString("naviId")).map(_._2.head).toList.sortBy(_.getString("route_count"))
          val _99PercentYaw = if (hasYawRouteCountList.length == 0 ) 0
          else hasYawRouteCountList(Math.round((hasYawRouteCountList.length -1 ) * 0.99).toInt).getInteger("route_count")

          //95%偏航
          val _95PercentYaw = if (hasYawRouteCountList.length == 0) 0
          else hasYawRouteCountList(Math.round((hasYawRouteCountList.length -1 ) * 0.95).toInt).getInteger("route_count")

          //90%偏航
          val _90PercentYaw = if (hasYawRouteCountList.length == 0) 0
          else hasYawRouteCountList(Math.round((hasYawRouteCountList.length -1 ) * 0.90).toInt).getInteger("route_count")

          //经验偏航次数
          var yawList = List[String]()
          var useList = List[String]()
          var gdList = List[String]()


          resList.filter(json => {
            ( "true".equals(json.getString("hasYaw")) ||
              "top3".equals(json.getString("route_type"))) &&
              lang.StringUtils.isNotBlank(json.getString("naviStartTime"))
          })
            .groupBy(_.getString("naviId")).foreach(obj => {
            val maxSrc = getSrcMap(obj._2.maxBy(_.getString("route_order")))
            yawList = maxSrc :: yawList

            //obj._2.foreach(elem => useList = getSrcMap(elem) :: useList)
          })

          resList.filter(json => {
            ( "true".equals(json.getString("hasYaw"))) &&
              lang.StringUtils.isNotBlank(json.getString("naviStartTime"))
          }).groupBy(_.getString("naviId")).foreach(obj => {
            obj._2.foreach(elem => useList = getSrcMap(elem) :: useList)
          })

          val experienceYawAmount = yawList.count(src => "经验".equals(src))

          //经验线路使用总量
          val experienceUseAmount = useList.count(src => "经验".equals(src))

          //传统偏航次数
          val traditionalYawAmount = yawList.count(src => "传统".equals(src))

          //传统线路使用总量
          val traditionalUseAmount = useList.count(src => "传统".equals(src))

          //新增gd线路使用总量

          val gdYawAmount = yawList.count(src => "高德".equals(src))

          val gdUseAmount = useList.count(src => "高德".equals(src))

          Row(id,incDay,dest_province,dest_citycode,dest_deptcode,sdkVersion,system,naviAmount,yawPredictAvg,
            yawFinalJudgmentAvg,_99PercentYaw,_95PercentYaw,_90PercentYaw,experienceYawAmount,experienceUseAmount,
            traditionalYawAmount,traditionalUseAmount,gdYawAmount,gdUseAmount)
        }).persist(StorageLevel.DISK_ONLY)

    sourRdd.unpersist()
    logger.error( s"共统计点选率指标:${yawDf.count()}" )

    yawDf
  }

  def doAccuracyStatistics( sourRdd: RDD[(Seq[String],JSONObject)],incDay:String)= {
    logger.error(s"开始进行准确率统计")

    val accDf = sourRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp).map(

      obj => {
        val resList = obj._2
        var rowSeq = null:Row
        val md5Instance = MD5Util.getMD5Instance

        //总请求量
        val reqAmount = resList.length
        //正确量
        val accList = resList.filter(json => "1".equals(json.getString("IndexValue")))
        val accAmount = accList.length

        //半小时请求量
        val durationReqList = resList.filter( json => StringUtils.isNotBlank(json.getString("duration")))
        val durationAccList = accList.filter( json => StringUtils.isNotBlank(json.getString("duration")))

        val halfHourReqAmount = durationReqList.filter( json => 20*60 <= json.getString("duration").toDouble
          && json.getString("duration").toDouble < 40*60).length
        //半小时正确量
        val halfHourAccAmount = durationAccList.filter( json => 20*60 <= json.getString("duration").toDouble
          && json.getString("duration").toDouble < 40*60).length
        //一小时请求量:50≤duration<70时的数据量
        val oneHourReqAmount = durationReqList.filter( json => 50*60 <= json.getString("duration").toDouble
          && json.getString("duration").toDouble < 70*60).length
        //一小时正确量
        val oneHourAccAmount = durationAccList.filter( json => 50*60 <= json.getString("duration").toDouble
          && json.getString("duration").toDouble < 70*60).length
        //分时段请求量
        obj._1.length match {
          case 4 => {
            val (dest_province, dest_citycode, dest_deptcode, srcMap) = (obj._1(0),obj._1(1),obj._1(2),obj._1(3))
            val id = MD5Util.getMD5(md5Instance, Array(incDay, dest_province, dest_citycode
              , dest_deptcode, srcMap, resList(0).getString("IndexType")).mkString("_"))
            rowSeq = Row(id, incDay, dest_province, dest_citycode, dest_deptcode, resList(0).getString("IndexType"), srcMap, reqAmount
              , accAmount, halfHourReqAmount, halfHourAccAmount, oneHourReqAmount, oneHourAccAmount)
          }
          case 7 => {
            val (dest_province,dest_citycode,dest_deptcode,compensate,indexType,srcMap,simType) = (obj._1(0),obj._1(1),obj._1(2),obj._1(3),obj._1(4),obj._1(5),obj._1(6))
            val id = MD5Util.getMD5(md5Instance,Array(incDay,dest_province,dest_citycode
              , dest_deptcode,compensate,indexType,srcMap,simType).mkString("_"))
            rowSeq = Row(id,incDay,dest_province,dest_citycode,dest_deptcode,compensate,indexType,srcMap,simType,reqAmount
              , accAmount, halfHourReqAmount, halfHourAccAmount, oneHourReqAmount, oneHourAccAmount)
          }
        }

        var timeReqList = List[Int]()

        val timeReqMap = resList.filter(json => StringUtils.isNotBlank(json.getString("planDate")))
          .map(_.getString("planDate").substring(8,10).replaceAll("^(0?)","").toInt)
          .groupBy(str => str)


        for (i <- 0 until 24){
          val timeReqValue = timeReqMap.getOrElse(i,List())
          timeReqList = timeReqList :+ timeReqValue.length
        }

        //rowSeq = Row.merge(rowSeq,Row.fromSeq(timeReqList))

        //分时段准确量
        val timeAccMap = accList.filter(json => StringUtils.isNotBlank(json.getString("planDate")))
          .map(_.getString("planDate").substring(8,10).replaceAll("^(0?)","").toInt)
          .groupBy(str => str)

        for (i <- 0 until 24){
          val timeReqValue = timeAccMap.getOrElse(i,List())
          timeReqList =  timeReqList :+ timeReqValue.length
        }

        Row.merge(rowSeq,Row.fromSeq(timeReqList))
      }
    ).persist(StorageLevel.DISK_ONLY)

    //sourRdd.unpersist()
    logger.error(s"共统计指标:${accDf.count}")
    sourRdd.unpersist()
    accDf
  }

  def doReqAccStatistics(sourRdd: RDD[(Seq[String], JSONObject)],incDay:String,indexString:String,reqString:String)={
    logger.error(s"$reqString")

    val perArray = Array(1,0.99,0.98,0.95,0.9,0.85,0.8,0.6,0)

    val reqRdd = sourRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp).flatMap(obj => {
      val (dest_province,dest_citycode,dest_deptcode,srcMap,distance) = (obj._1(0),obj._1(1),obj._1(2),obj._1(3),obj._1(4))
      val resList = obj._2
      val md5Instance = MD5Util.getMD5Instance
      val sim1id = MD5Util.getMD5(md5Instance, Array(incDay,dest_province,dest_citycode,dest_deptcode,srcMap,distance,indexString,"sim1").mkString("_"))
      val sim5id = MD5Util.getMD5(md5Instance, Array(incDay,dest_province,dest_citycode,dest_deptcode,srcMap,distance,indexString,"sim5").mkString("_"))
      val sim1And5Rowid = MD5Util.getMD5(md5Instance, Array(incDay,dest_province,dest_citycode,dest_deptcode,distance,srcMap,indexString,"sim1And5").mkString("_"))

      //总请求量
      val reqAmount = resList.length
      //空值请求量
      val sim1NullReqAmount = resList.filter( json => { StringUtils.isBlank(json.getString("similarity1"))} ).length
      val sim5NullReqAmount = resList.filter( json => { StringUtils.isBlank(json.getString("similarity5"))} ).length
      val sim1And5NullReqAmount = resList.filter( json => { StringUtils.isBlank(json.getString("similarity1")) &&  StringUtils.isBlank(json.getString("similarity5")) } ).length

      //[1-0]请求量
      val sim1Row = Row(sim1id,incDay,dest_province,dest_citycode,dest_deptcode,indexString,srcMap,"sim1",distance,reqAmount,sim1NullReqAmount)
      val sim5Row = Row(sim5id,incDay,dest_province,dest_citycode,dest_deptcode,indexString,srcMap,"sim5",distance,reqAmount,sim5NullReqAmount)
      val sim1And5Row = Row(sim1And5Rowid,incDay,dest_province,dest_citycode,dest_deptcode,indexString,srcMap,"sim1And5",distance,reqAmount,sim1And5NullReqAmount)

      var sim1PerList = List[Int]()
      var sim5PerList = List[Int]()
      var sim1And5PerList = List[Int]()

      for ( i <- 0 until perArray.length-1) {
        //sim1请求量
        val sim1ReqList = resList.filter( json => { json.getDouble("similarity1") <  perArray(i) && json.getDouble("similarity1") >=  perArray(i+1) })
        sim1PerList = sim1PerList :+ sim1ReqList.length
        //sim1正确量
        sim1PerList = sim1PerList :+ sim1ReqList.filter( json => "1".equals(json.getString("tl_ft_right")) ).length
        //sim5请求量
        val sim5ReqList = resList.filter( json => { json.getDouble("similarity5") <  perArray(i) && json.getDouble("similarity5") >=  perArray(i+1) })
        sim5PerList = sim5PerList :+ sim5ReqList.length
        //sim5正确量
        sim5PerList = sim5PerList :+ sim5ReqList.filter( json => "1".equals(json.getString("tl_ft_right")) ).length
        //sim1And5请求量
        val sim1And5ReqList =  resList.filter(
          json => {
            ( json.getDouble("similarity1") <  perArray(i) && json.getDouble("similarity1") >=  perArray(i+1)) ||
              ( json.getDouble("similarity5") <  perArray(i) && json.getDouble("similarity5") >=  perArray(i+1))
          })
        sim1And5PerList = sim1And5PerList :+ sim1And5ReqList.length
        //sim1And5请求量
        sim1And5PerList = sim1And5PerList :+ sim1And5ReqList.filter( json => "1".equals(json.getString("tl_ft_right")) ).length
      }

      //增加distance统计
      var rowList = List[Row]()
      rowList = rowList :+ Row.merge(sim1Row,Row.fromSeq(sim1PerList))
      rowList = rowList :+ Row.merge(sim5Row,Row.fromSeq(sim5PerList))
      rowList = rowList :+ Row.merge(sim1And5Row,Row.fromSeq(sim1And5PerList))

      rowList
    }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"共统计指标${reqRdd.count()}")

    sourRdd.unpersist()
    reqRdd
  }

  def doAbnormalExitMonitor(sourRdd: RDD[((String, String, String,String, String), JSONObject)],incDay:String)={
    logger.error("开始进行异常退出监控指标统计")


    val aemDf = sourRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp).map(
      obj => {
        val (dest_province,dest_citycode,dest_deptcode,sdkVersion,system) = obj._1
        val resList = obj._2
        val md5Instance = MD5Util.getMD5Instance
        val id = MD5Util.getMD5(md5Instance, Array(incDay,dest_province,dest_citycode,dest_deptcode,sdkVersion,system).mkString("_"))

        //进入SDK未导航量
        val intoSdkNoNaviAmount = resList.filter(json => StringUtils.isBlank(json.getString("naviStartTime")))
          .map(_.getString("naviId")).distinct.length
        //进入SDK导航总次数
        val naviAmount = resList.filter(json => StringUtils.isNotBlank(json.getString("naviStartTime")))
          .map(_.getString("naviId")).distinct.length
        //异常总量
        val exceptionAmount = resList.filter(json => Array("10","11","12").contains(json.getString("naviEndStatus")))
          .map(_.getString("naviId")).distinct.length
        //中途结束量
        val halfWayEndAmount = resList.filter(json => "10".equals(json.getString("naviEndStatus")))
          .map(_.getString("naviId")).distinct.length
        //异常未结束
        val exceptionNotEndAmount = resList.filter(json => "12".equals(json.getString("naviEndStatus")))
          .map(_.getString("naviId")).distinct.length - resList.filter(json => StringUtils.isBlank(json.getString("naviStartTime")))
          .map(_.getString("naviId")).distinct.length
        //闪退量
        val falshBackAmount = resList.filter(json => "11".equals(json.getString("naviEndStatus")))
          .map(_.getString("naviId")).distinct.length
        //正常总量
        val normalAmount =  resList.filter(json => Array("1","2","3").contains(json.getString("naviEndStatus")))
          .map(_.getString("naviId")).distinct.length
        //自动结束
        val autoEndAmount = resList.filter(json => "2".equals(json.getString("naviEndStatus")))
          .map(_.getString("naviId")).distinct.length
        //手动结束
        val manualEndAmount = resList.filter(json => "1".equals(json.getString("naviEndStatus")))
          .map(_.getString("naviId")).distinct.length

        Row(id,incDay,dest_province,dest_citycode,dest_deptcode,sdkVersion,system,intoSdkNoNaviAmount,naviAmount,exceptionAmount,
          halfWayEndAmount,exceptionNotEndAmount,falshBackAmount,normalAmount,autoEndAmount,manualEndAmount)
      }
    ).persist(StorageLevel.DISK_ONLY)

    sourRdd.unpersist()
    logger.error(s"共统计指标:${aemDf.count}")

    aemDf
  }

  def getTaskPass( spark:SparkSession,taskAmountRdd:RDD[JSONObject],incDay:String,yesterday:String ) ={
    //获取经停表的数据
    val passSrcQuerySql =
      s"""
         |select
         |*
         |from(
         |    select
         |    task_id,
         |    pass_zone_code,
         |    actual_depart_tm,
         |    row_number() over (partition by task_id,pass_zone_code order by actual_depart_tm desc ) num
         |    from  ods_russtask.tt_vehicle_task_pass_zone_monitor
         |    where
         |      inc_day >= '${yesterday}' and inc_day <= '${incDay}'
         |      and actual_depart_tm != '' and actual_depart_tm is not null and actual_depart_tm != 'null'
         |      and pass_zone_code <>'' and pass_zone_code is not null and pass_zone_code <> 'null'
         |) t
         |where t.num=1
         |""".stripMargin

    logger.error(passSrcQuerySql)

    //关联获取实际出发时间
    val passSrcRdd = spark.sql(passSrcQuerySql).rdd.repartition(100).map(obj => {
      val jsonObj = new JSONObject()
      jsonObj.put("actual_depart_tm",obj.getInt(3).toString)

      ((obj.getString(0),obj.getString(1)),jsonObj)
    }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"获取经停实际出发时间非空:${passSrcRdd.count}")

    val passDestQuerySql =
      s"""
         |select
         |*
         |from(
         |    select
         |    task_id,
         |    pass_zone_code,
         |    actual_arrive_tm,
         |    row_number() over (partition by task_id,pass_zone_code order by actual_depart_tm desc ) num
         |    from  ods_russtask.tt_vehicle_task_pass_zone_monitor
         |    where
         |    inc_day >= '${yesterday}' and inc_day <= '${incDay}'
         |    and actual_arrive_tm != '' and actual_arrive_tm is not null and actual_arrive_tm != 'null'
         |    and pass_zone_code <>'' and pass_zone_code is not null and pass_zone_code <> 'null'
         |) t
         |where t.num=1
         |""".stripMargin

    logger.error(passSrcQuerySql)

    //关联获取实际出发时间
    val passDesrRdd = spark.sql(passDestQuerySql).rdd.repartition(100).map(obj => {
      val jsonObj = new JSONObject()
      jsonObj.put("actual_arrive_tm",obj.getInt(3).toString)

      ((obj.getString(0),obj.getString(1)),jsonObj)
    }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"获取经停实际到达时间非空:${passDesrRdd.count}")

    val merge = (obj: ((String, String), (JSONObject, Option[JSONObject])),flag:Int) => {
      val leftBody = obj._2._1
      val rightBody = obj._2._2

      if ( rightBody.nonEmpty )
        leftBody.fluentPutAll(rightBody.get)

      val res = flag match {
        case 2 =>  ( (leftBody.getString("task_id"),leftBody.getString("dest_zone_code")) ,leftBody )
        case 1 =>  ( (leftBody.getString("task_id"),"") ,leftBody )
      }

      res
    }

    //关联任务起点和终点数据
    val joinTaskPassRdd = taskAmountRdd.map(json => {
      ((json.getString("task_id"),json.getString("src_zone_code")),json)
    }).leftOuterJoin(passSrcRdd).map( obj => {
      merge(obj,2)
    }).leftOuterJoin(passDesrRdd).map( obj => {
      merge(obj,1)
    }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"共关联经停数据:${joinTaskPassRdd.count()}")

    taskAmountRdd.unpersist()
    passSrcRdd.unpersist()
    passDesrRdd.unpersist()

    joinTaskPassRdd
  }

  def joinDriverTask( spark:SparkSession ,joinTaskPassRdd : RDD[((String, String),JSONObject)],incDay:String,yesterday:String )={
    val querySql =
      s"""
         |select
         |    concat(tt.own_dept_code,tt.driver_task_id) as `task_id`,
         |    tt.user_code,
         |    max(tt.operate_time ) as `max_time`,
         |    min(tt.operate_time ) as `min_time`,
         |    max(tt.operate_time_0) as `geton_operate_time`,
         |    max(tt.operate_time_1) as `start_operate_time`
         |from (
         |  select
         |    own_dept_code,
         |    driver_task_id,
         |    user_code,
         |    operate_time,
         |    case when operate_type='0' then operate_time else "" end as `operate_time_0`,
         |    case when operate_type='1' then operate_time else "" end as `operate_time_1`
         |  from ods_shiva_ground.tm_driver_task_log
         |  Where
         |  inc_day >= '${yesterday}' and inc_day <= '${incDay}'
         |) tt
         |group by tt.own_dept_code,tt.driver_task_id,tt.user_code
         |""".stripMargin
    logger.error(querySql)

    val driverTaskRdd = spark.sql(querySql).rdd.repartition(100).map(
      obj => {
        val jsonObj = new JSONObject()
        jsonObj.put("user_code",obj.getString(1))
        jsonObj.put("max_time",obj.getString(2))
        jsonObj.put("min_time",obj.getString(3))
        jsonObj.put("geton_operate_time",obj.getString(3))
        jsonObj.put("start_operate_time",obj.getString(4))

        ((obj.getString(0),""),jsonObj)
      }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"共查询司机操作日志记录:${driverTaskRdd.count()}")

    val joinDriverTaskRdd = joinTaskPassRdd.leftOuterJoin(driverTaskRdd).map( obj => {
      val leftBody = obj._2._1
      val rightBody = obj._2._2

      if ( rightBody.nonEmpty )
        leftBody.fluentPutAll(rightBody.get)

      (leftBody.getString("user_code"),leftBody)
    }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"共关联司机操作日志记录数据:${joinDriverTaskRdd.count()}")

    joinTaskPassRdd.unpersist()
    driverTaskRdd.unpersist()

    joinDriverTaskRdd
  }

  def joinNaviInfo(spark:SparkSession ,joinDriverTaskRdd: RDD[(String, JSONObject)],incDay:String,yesterday:String)={

    val querySql =
      s"""
         |select
         |  properties_username,
         |  from_unixtime(cast (time / 1000 as int),'yyyy-MM-dd HH:mm:ss') as time,
         |  event_id,
         |  model,
         |  dt
         |from ods_inc_ubas.product_inc_ubas_dev_shiva_trtms_driver
         |where
         | dt >= '${yesterday}' and  dt <= '${incDay}'
         | and event_id='ground_tbp_navigate_dialog_confirm'
         | and properties_username is not null and properties_username <> ''
         | and properties_username <> 'null'
         | and time <> '' and time is not null
         |""".stripMargin

    logger.error(querySql)

    val naviInfoRdd = spark.sql(querySql).rdd.repartition(100).map( obj => {
      val jsonObj = new JSONObject()
      jsonObj.put("properties_username",obj.getString(0))
      jsonObj.put("time",obj.getString(1))
      jsonObj.put("event_id",obj.getString(2))
      jsonObj.put("model",obj.getString(3))
      jsonObj.put("dt",obj.getString(4))

      (obj.getString(0),jsonObj)
    }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"共获取导航信息${naviInfoRdd.count()}")

    //关联导航信息
    val jsoinNaviInfoRdd = joinDriverTaskRdd.leftOuterJoin(naviInfoRdd).map(obj => {
      val leftBody = obj._2._1
      val rightBody = obj._2._2

      try {
        if (rightBody.nonEmpty){
          if ( leftBody.getDate("actual_arrive_tm").compareTo(leftBody.getDate("max_time")) > 0 ){
            if ( leftBody.getDate("actual_depart_tm").compareTo(leftBody.getDate("min_time")) < 1 &&
              leftBody.getDate("min_time").compareTo(leftBody.getDate("time")) < 1 &&
              leftBody.getDate("time").compareTo(leftBody.getDate("max_time")) < 1)
              leftBody.fluentPutAll(rightBody.get)
          } else {
            if (leftBody.getDate("actual_depart_tm").compareTo(leftBody.getDate("min_time")) < 1 &&
              leftBody.getDate("min_time").compareTo(leftBody.getDate("time")) < 1 &&
              leftBody.getDate("time").compareTo(leftBody.getDate("actual_arrive_tm")) < 1)
              leftBody.fluentPutAll(rightBody.get)
          }
        }
      } catch {case e:Exception => logger.error(leftBody.toString ) ;throw e}

      ((leftBody.getString("task_id"),leftBody.getString("dest_zone_code")),leftBody)
    }).persist(StorageLevel.DISK_ONLY)

    logger.error( s"关联导航信息共${jsoinNaviInfoRdd.count()}" )

    joinDriverTaskRdd.unpersist()
    naviInfoRdd.unpersist()
    jsoinNaviInfoRdd
  }

  def getSLTask(spark:SparkSession,incDay:String,yesterday:String) ={
    //获取顺陆任务量数据
    val taskAmountQuerySql =
      s"""
         | select
         |        task_id,
         |        src_city_code,
         |        dest_city_code,
         |        stop_over_zone_code,
         |        main_driver_account,
         |        is_stop,
         |        carrier_name,
         |        carrier_id,
         |        carrier_type,
         |        driver_source,
         |        dest_province,
         |        src_zone_code,
         |        dest_zone_code
         |    from dm_grd.grd_new_task_detail
         |    where
         |     main_driver_account != '' and main_driver_account is not null and main_driver_account <> 'null'
         |     and driver_source='0'
         |     and inc_day >= '${yesterday}' and inc_day <= '${incDay}'
         |""".stripMargin
    logger.error(taskAmountQuerySql)

    val taskAmountTmpRdd = spark.sql(taskAmountQuerySql).rdd.repartition(100)
      .flatMap( obj => {
        val jsonObj = new JSONObject()
        jsonObj.put("task_id",obj.getString(0))
        jsonObj.put("src_city_code",obj.getString(1))
        jsonObj.put("dest_city_code",obj.getString(2))
        jsonObj.put("stop_over_zone_code",obj.getString(3))
        jsonObj.put("main_driver_account",obj.getString(4))
        jsonObj.put("is_stop",obj.getInt(5).toString)
        jsonObj.put("carrier_name",obj.getString(6))
        jsonObj.put("carrier_id",obj.getLong(7).toString)
        jsonObj.put("carrier_type",obj.getInt(8).toString)
        jsonObj.put("driver_source",obj.getString(9))
        jsonObj.put("dest_province",obj.getString(10))
        jsonObj.put("src_zone_code",obj.getString(11))
        jsonObj.put("dest_zone_code",obj.getString(12))

        val deptCodeArr = obj.getString(3).split(",")
        var jsonList = List[JSONObject]()

        if ( deptCodeArr.length > 1) {
          for ( i <- 0 until deptCodeArr.length -1 ){
            val jsonTmp = new JSONObject().fluentPutAll(jsonObj)
            jsonTmp.put("src_zone_code",deptCodeArr(i))
            jsonTmp.put("dest_zone_code",deptCodeArr(i+1))
            jsonTmp.put("task_id_jt",obj.getString(0)+"_"+deptCodeArr(i+1)+"_"+deptCodeArr(i+1))
            jsonList = jsonList :+ jsonTmp
          }
        } else
          jsonList = jsonList :+ jsonObj

        jsonList
      })/*.map(json => {
        (json.getString("task_id"),json)
      })*/.persist(StorageLevel.DISK_ONLY)

    logger.error(s"共获取顺陆任务量数据:${taskAmountTmpRdd.count()}")

    taskAmountTmpRdd
  }

  def joinSlNavi( joinNaviInfoRdd: RDD[((String, String), JSONObject)],naviRdd: RDD[((String, String), JSONObject)])={
    val totalRdd = joinNaviInfoRdd.leftOuterJoin(naviRdd).map( obj => {
      val leftBody = obj._2._1
      val rightBoby = obj._2._2.getOrElse(new JSONObject())

      leftBody.fluentPutAll(rightBoby)

      val toString = (str:String) => {
        if (StringUtils.isEmpty(str)) "" else str
      }

      ((toString(leftBody.getString("dest_province")),toString(leftBody.getString("dest_city_code")),
        toString(leftBody.getString("dest_zone_code")),toString(leftBody.getString("sdk_version")),
        toString(leftBody.getString("system"))),leftBody)
    }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"共关联顺路导航数据:${totalRdd.count()}")

    joinNaviInfoRdd.unpersist()
    naviRdd.unpersist()

    totalRdd
  }

  def doUseRateStatistics(sourRdd: RDD[((String, String, String,String, String), JSONObject)],incDay:String) ={
    val useDf = sourRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp).map( obj => {
      val ( dest_province,dest_citycode,dest_deptcode,sdkVersion,system ) = obj._1
      val resList = obj._2
      val md5Instance = MD5Util.getMD5Instance
      val id = MD5Util.getMD5(md5Instance, Array(incDay,dest_province,dest_citycode,dest_deptcode,sdkVersion,system).mkString("_"))


      //reqid&destcode&src_deptcode
      //导航任务总量
      val naviTaskAmount = resList.map(_.getString("task_id_jt")).distinct.length
      //高德使用量
      val gdUseAmount = resList.count( json => "ground_tbp_navigate_dialog_confirm".equals(json.getString("event_id")) )
      //顺丰导航使用量
      val sfUseAmount = resList.filter(json => StringUtils.isNotBlank(json.getString("route_id")))
        .map(json =>
          (json.getString("route_id"),json.getString("src_dept_code"),json.getString("dest_dept_code"))
        ).distinct.length
      //导航使用量
      val naviUseAmount = gdUseAmount + sfUseAmount
      //全程使用量
      var  wholeUseAmount = 0
      resList.filter(json => StringUtils.isNotBlank(json.getString("navi_end_status"))).groupBy(_.getString("task_id")).map( elem =>{
        val naviIdAmount = elem._2.map(_.getString("navi_id")).length

        if (naviIdAmount == 1)
          wholeUseAmount += elem._2.filter(json => {Array("1","2","3").contains(json.getString("navi_end_status"))})
            .map(_.getString("navi_id")).distinct.length
        else {
          val tmpList = elem._2.sortBy(_.getString("navi_starttime"))


          if (Array("1","2","3").contains(tmpList.last.getString("navi_end_status")))
            for( i <- 0 until tmpList.length -1 ){
              if (tmpList(i+1).getLong("navi_starttime") - tmpList(i).getLong("navi_endtime") <= 300)
                wholeUseAmount += 1
            }
          wholeUseAmount
        }
      })


      //司机总量
      val driverAmount = resList.map(_.getString("main_driver_account")).distinct.length

      //司机使用量
      val driverUseAmount = resList.filter(json => StringUtils.isNotBlank(json.getString("route_id"))).map(_.getString("main_driver_account")).distinct.length

      Row(id,incDay,dest_province,dest_citycode,dest_deptcode,sdkVersion,system,naviTaskAmount,naviUseAmount,gdUseAmount,sfUseAmount,wholeUseAmount,driverAmount,driverUseAmount)
    }).persist(StorageLevel.DISK_ONLY)

    sourRdd.unpersist()
    logger.error( s"共统计使用率率指标:${useDf.count()}" )

    useDf
  }

  def doTimeDeviationRate(spark:SparkSession,sourRdd: RDD[((String, String, String, String), JSONObject)],incDay:String) ={
    val timeDeviationRateRdd = sourRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp)
      .map( obj => {
        val (destProvince,destPitycode,destDeptcode,src) = obj._1
        val resList = obj._2
        val md5Instance = MD5Util.getMD5Instance
        val id = MD5Util.getMD5(md5Instance, Array(incDay,destProvince,destPitycode,destDeptcode,src).mkString("_"))

        //总请求量
        val reqAmount = resList.length
        //统计每个请求的偏差量和导航量
        var diffAmount = 0:Double
        var diffPerAmount = 0:Double
        resList.map(elem => {
          diffAmount += Math.abs(elem.getDouble("diff_time"))
          diffPerAmount +=  Math.abs(elem.getDouble("diff_time")) / elem.getDouble("navi_time")
        })

        //区间
        var rowList = Array[String](id,incDay,destProvince,destPitycode,destDeptcode,src,reqAmount.toString
          ,(diffAmount / resList.size).toString,(diffPerAmount/resList.size).toString)

        val timeArr = Array(0,10,20,40,50,70,90,120,150,180,240,350,370,Int.MaxValue)

        for(i <- 0 until timeArr.length -1 ){
          val tmpList = resList.filter(json => {
            json.getDouble("duration") >= timeArr(i)*60 &&
              json.getDouble("duration") <= timeArr(i+1)*60
          })


          val tmpReqAmount = tmpList.length

          rowList =  rowList :+ tmpReqAmount.toString

          var diffTempAmount = 0:Double
          var  diffPerTempAmount = 0:Double

          tmpList.map(elem => {
            diffTempAmount += Math.abs(elem.getDouble("diff_time"))
            diffPerTempAmount += Math.abs(elem.getDouble("diff_time")) / elem.getDouble("navi_time")
          })
          rowList =  rowList :+ (diffTempAmount / tmpList.size).toString
          rowList =  rowList :+ (diffPerTempAmount / tmpList.size).toString

        }

        Row.fromSeq(rowList)
      }).persist(StorageLevel.DISK_ONLY)

    sourRdd.unpersist()
    logger.error( s"共统计时间偏差率指标:${timeDeviationRateRdd.count()}" )

    val (allDf,provinceDf,cityDf,deptDf) = calByOtherDmiension(incDay,timeDeviationRateRdd,spark)


    sourRdd.unpersist()

    timeDeviationRateRdd.union(allDf).union(provinceDf).union(cityDf).union(deptDf)
  }

  def calByOtherDmiension(incDay:String,timeDeviationRateRdd:RDD[Row],spark:SparkSession) ={

    val calLogic = (tup:Tuple4[String,String,String,String],resList:List[Row]) => {
      val md5Instance = MD5Util.getMD5Instance
      val id = MD5Util.getMD5(md5Instance, Array(incDay,tup._1,tup._2,tup._3,tup._4).mkString("_"))
      val rowSeq = Row(id,incDay,tup._1,tup._2,tup._3,tup._4)

      var dataList =Array[String]()

      if(resList != null && resList.size > 0){
        dataList =  Array.fill(resList.head.size)("0")

        for (elem <- resList) {
          for(i <- 0 until elem.length-1){
            dataList(i) = (dataList(i).toDouble + elem(i).toString.toDouble).toString
          }
        }
      }

      Row.merge(rowSeq,Row.fromSeq(dataList))
    }

    //按天维度统计
    val dayList = timeDeviationRateRdd.map( obj => { Row.fromSeq(obj.toSeq.drop(6)) } ).collect().toList

    val dayRow = calLogic(("all","all","all","all"),dayList)
    val allDf = spark.sparkContext.parallelize(Array(dayRow)).persist(StorageLevel.DISK_ONLY)
    allDf.count()

    logger.error("按天聚合完毕")

    //按省聚合
    val provinceDf = timeDeviationRateRdd.map( obj => { (obj.getString(2),Row.fromSeq(obj.toSeq.drop(6))) })
      .aggregateByKey(List[Row]())(SparkUtils.seqOpRow,SparkUtils.combOpRow)
      .map( obj => {
        val tup4 = (obj._1,"all","all","all")
        val resList = obj._2
        calLogic(tup4,resList)
      }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"按照省维度统计共:${provinceDf.count()}")

    //按照城市维度统计
    val cityDf = timeDeviationRateRdd.map( obj => { ((obj.getString(2),obj.getString(3)),Row.fromSeq(obj.toSeq.drop(6))) } )
      .aggregateByKey(List[Row]())(SparkUtils.seqOpRow,SparkUtils.combOpRow)
      .map( obj => {
        val tup4 = (obj._1._1,obj._1._2,"all","all")
        val resList = obj._2
        calLogic(tup4,resList)
      }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"按照城市维度统计共:${cityDf.count()}")

    //按照场地维度统计
    val deptDf = timeDeviationRateRdd.map( obj => { ((obj.getString(2),obj.getString(3),obj.getString(4)),Row.fromSeq(obj.toSeq.drop(6))) } )
      .aggregateByKey(List[Row]())(SparkUtils.seqOpRow,SparkUtils.combOpRow)
      .map( obj => {
        val tup4 = (obj._1._1,obj._1._2,obj._1._3,"all")
        val resList = obj._2
        calLogic(tup4,resList)
      }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"按照场地维度统计共:${deptDf.count()}")

    (allDf,provinceDf,cityDf,deptDf)
  }

  def doTimePeriodDeviationRate(spark:SparkSession,sourRdd: RDD[((String, String, String, String), JSONObject)],incDay:String) = {
    val timePeriodDeviationRateRdd = sourRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp)
      .map(obj => {
        val (destProvince,destPitycode,destDeptcode,src) = obj._1
        val resList = obj._2
        val md5Instance = MD5Util.getMD5Instance
        val id = MD5Util.getMD5(md5Instance, Array(incDay,destProvince,destPitycode,destDeptcode,src).mkString("_"))

        //总请求量
        val reqAmount = resList.length
        //统计每个请求的偏差量和导航量
        var diffAmount = 0:Double
        var diffPerAmount = 0:Double
        resList.map(elem => {
          diffAmount += Math.abs(elem.getDouble("diff_time"))
          diffPerAmount += Math.abs(elem.getDouble("diff_time")) / elem.getDouble("navi_time")
        })

        //每个小时偏差量和导航量
        val timeArr = Array(40,70,130,190,250,310,370)
        var rowList = Array[String](id,incDay,destProvince,destPitycode,destDeptcode,src,reqAmount.toString,
          (diffAmount / resList.size).toString(),(diffPerAmount / resList.size).toString())

        for (time <- timeArr) {
          val tmpList = resList.filter(json => {
            json.getLong("req_time") >= json.getLong("navi_endtime") - time * 60 * 1000  &&
              json.getLong("req_time") >= json.getLong("navi_endtime") - (time-20) * 60 * 1000
          })

          val tmpReqAmount = tmpList.length
          rowList =  rowList :+ tmpReqAmount.toString

          var diffTempAmount = 0:Double
          var diffPerTempAmount = 0:Double

          tmpList.map(elem => {
            diffTempAmount += Math.abs(elem.getDouble("diff_time"))
            diffPerTempAmount += Math.abs(elem.getDouble("diff_time")) / elem.getDouble("navi_time")
          })

          rowList =  rowList :+ (diffTempAmount / tmpList.size).toString
          rowList =  rowList :+ (diffPerTempAmount / tmpList.size).toString
        }

        Row.fromSeq(rowList)
      }).persist(StorageLevel.DISK_ONLY)

    sourRdd.unpersist()
    logger.error( s"共统计时间偏差率指标:${timePeriodDeviationRateRdd.count()}" )

    val (allDf,provinceDf,cityDf,deptDf) = calByOtherDmiension(incDay,timePeriodDeviationRateRdd,spark)

    timePeriodDeviationRateRdd.union(allDf).union(provinceDf).union(cityDf).union(deptDf)
  }

  /*p2*/
  def doTaskAmountStatistics( sourRdd: RDD[((String, String, String, String), JSONObject)],incDay:String )={
    val taskAmountDf = sourRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp).map(
      obj => {
        val resList = obj._2
        val (src_province,src_citycode,src_deptcode,src_status) = obj._1
        val md5Instance = MD5Util.getMD5Instance
        val id = MD5Util.getMD5(md5Instance, Array(incDay,src_province,src_citycode,src_deptcode,src_status).mkString("_"))

        //任务量
        val taskAmount = resList.map(_.getString("task_id")).distinct.length
        //导航次数
        val naviAmount =  resList.length
        //导航距离分组统计
        val naviDistGroup = resList.filter(json => StringUtils.isNotBlank(json.getString("distance")))
          .map(json => {
            val distStr = json.getString("distance").replaceAll("(\\..*$)","")
            val dist = if(StringUtils.isNotBlank(distStr)) distStr.toLong / 1000 else 0

            val distGroup= dist match {
              case dist if dist >=0 && dist < 50 => "dist_0"
              case dist if dist >=50 && dist < 200 => "dist_50"
              case dist if dist >=200 && dist < 500 => "dist_200"
              case _ => "dist_500"
            }

            (distGroup,"")
          }).groupBy(_._1)

        //省际、省内、市内统计
        val areaGroup = resList.map(json => {
          val ag = json match {
            case json if StringUtils.isNotBlank(json.getString("src_citycode"))
              && json.getString("src_citycode").equals(json.getString("dest_citycode")) => "市内"
            case json if StringUtils.isNotBlank( json.getString("src_province"))
              && StringUtils.isNotBlank(json.getString("src_citycode"))
              && json.getString("src_province").equals(json.getString("dest_province")) => "省内"
            case json if StringUtils.isNotBlank( json.getString("src_province"))
              && StringUtils.isNotBlank(json.getString("src_citycode")) => "省际"
            case _ => "null"
          }

          (ag,"")
        }).groupBy(_._1)

        Row(id,incDay,src_province,src_citycode,src_deptcode,src_status,taskAmount,naviAmount,naviDistGroup.getOrElse("dist_0",List()).length,
          naviDistGroup.getOrElse("dist_50",List()).length,naviDistGroup.getOrElse("dist_200",List()).length,naviDistGroup.getOrElse("dist_500",List()).length
          ,areaGroup.getOrElse("市内",List()).length,areaGroup.getOrElse("省内",List()).length,areaGroup.getOrElse("省际",List()).length)
      }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"共统计指标:${taskAmountDf.count()}")

    sourRdd.unpersist()
    taskAmountDf
  }

  def doDriverReuseStatistics( hisReuseDf:DataFrame ) ={
    val driverReuseRdd = hisReuseDf.rdd.map( obj => {
      val jsonObj = new JSONObject()
      jsonObj.put("driver_id",obj.getString(5))
      jsonObj.put("his_use_amount",obj.getLong(6))

      ((obj.getString(0),obj.getString(1),obj.getString(2),obj.getString(3),obj.getString(4)),jsonObj)
    }).aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp)
      .map( obj => {
        val resList = obj._2
        //司机总量
        val driverAmount = resList.map(_.getString("driver_id")).distinct.length
        //无复用司机占比
        val reuse_0 = resList.count(_.getLong("his_use_amount") == 1)
        //司机复用一次占比
        val reuse_1 = resList.count(_.getLong("his_use_amount") == 2)
        //司机复用2次占比
        val reuse_2 = resList.count(_.getLong("his_use_amount") == 3)
        //司机复用5次占比
        val reuse_5 = resList.count(json => json.getLong("his_use_amount") >= 6 && json.getLong("his_use_amount") < 11)
        //司机复用10次占比
        val reuse_10 = resList.count(json => json.getLong("his_use_amount") >= 11 )

        val jsonObj = new JSONObject()
        jsonObj.put("driver_amount",driverAmount)
        jsonObj.put("reuse_0",reuse_0)
        jsonObj.put("reuse_1",reuse_1)
        jsonObj.put("reuse_2",reuse_2)
        jsonObj.put("reuse_5",reuse_5)
        jsonObj.put("reuse_10",reuse_10)

        (obj._1,jsonObj)
      }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"获取司机复用数据共:${driverReuseRdd.count()}")

    hisReuseDf.unpersist()
    driverReuseRdd
  }

  def doDriverChangeStatistics( spark:SparkSession,incDay:String,yesterday:String)={
    logger.error("开始统计司机变更情况")

    val taskNaviSql =
      s"""
         |select
         |       t2.dest_province as dest_province,
         |       t2.dest_citycode as dest_citycode,
         |       t2.dest_deptcode as dest_deptcode,
         |       t2.sdk_version,
         |       t2.system,
         |       t1.main_driver_account,
         |       if(t2.driver_id is null,false,true) as isNavi
         |    from (
         |       select
         |        *
         |       from (
         |          select
         |            dest_province,
         |            dest_city_code,
         |            dest_zone_code,
         |            main_driver_account,
         |            row_number() over( partition by main_driver_account order by actual_depart_tm asc ) num
         |          from dm_grd.grd_new_task_detail
         |          where inc_day ='%s'
         |          and actual_depart_tm is not null and actual_depart_tm <> '' and actual_depart_tm <> 'null'
         |          and actual_arrive_tm is not null and actual_arrive_tm <> '' and actual_arrive_tm <> 'null'
         |          and main_driver_account is not null and main_driver_account <> '' and main_driver_account <> 'null'
         |          and driver_source = 0
         |       ) tmp where num = 1
         |    ) t1
         |    LEFT JOIN
         |    (
         |      select
         |        *
         |      from (
         |        SELECT
         |         dest_province,
         |         dest_citycode,
         |         dest_deptcode,
         |         sdk_version,
         |         system,
         |         driver_id,
         |         row_number() over( partition by driver_id order by navi_starttime asc ) num
         |        from ${reuseStatSourTable}
         |        where inc_day ='%s'
         |          and dest_province is not null and dest_province <> ''
         |          and dest_citycode is not null and dest_citycode <> ''
         |          and dest_deptcode is not null and dest_deptcode <> ''
         |          and sdk_version is not null and sdk_version <> ''
         |          and system is not null and system <> ''
         |      ) tmp where num = 1
         |    ) t2
         |    on t1.main_driver_account = t2.driver_id
         |""".stripMargin

    //查询司机前一天的任务和导航情况
    val lastTaskNaviSql = taskNaviSql.format(yesterday,yesterday)
    logger.error(lastTaskNaviSql)
    val lastTaskNaviDF = spark.sql(lastTaskNaviSql).repartition(100).persist(StorageLevel.DISK_ONLY)
    logger.error(s"共获取查询司机前一天的任务和导航数据:${lastTaskNaviDF.count()}")

    //查询司机当天的任务和导航情况
    val currentTaskNaviSql = taskNaviSql.format(incDay,incDay)
    logger.error(currentTaskNaviSql)
    val currentTaskNaviDF = spark.sql(currentTaskNaviSql).repartition(100).persist(StorageLevel.DISK_ONLY)
    logger.error(s"共获取查询司机当天的任务和导航数据:${currentTaskNaviDF.count()}")

    //统计司机流失率,司机新增率,司机存留率
    lastTaskNaviDF.registerTempTable("lastTaskNavi");
    currentTaskNaviDF.registerTempTable("currentTaskNavi");

    val driverChangeRdd = spark.sql("""
                                      |select
                                      |  nvl(t1.dest_province,"") dest_province_t1,
                                      |  nvl(t2.dest_province,"") dest_province_t2,
                                      |  nvl(t1.dest_citycode,"") dest_citycode_t1,
                                      |  nvl(t2.dest_citycode,"") dest_citycode_t2,
                                      |  nvl(t1.dest_deptcode,"") dest_deptcode_t1,
                                      |  nvl(t2.dest_deptcode,"") dest_deptcode_t2,
                                      |  nvl(t1.sdk_version,"") sdk_version_t1,
                                      |  nvl(t2.sdk_version,"") sdk_version_t2,
                                      |  nvl(t1.system,"") system_t1,
                                      |  nvl(t2.system,"") system_t2,
                                      |  t1.isNavi as isLastNavi,
                                      |  t2.isNavi as isCurrentNavi
                                      |from lastTaskNavi t1
                                      |join currentTaskNavi t2
                                      |on t1.main_driver_account = t2.main_driver_account
                                      |""".stripMargin )
      .rdd.map( obj =>{
      val jsonObj = new JSONObject()
      jsonObj.put("isLastNavi",obj.getBoolean(10))
      jsonObj.put("isCurrentNavi",obj.getBoolean(11))

      var key = ("","","","","")
      if( obj.getBoolean(10) )
        key = (obj.getString(0),obj.getString(2),obj.getString(4),obj.getString(6),obj.getString(8))
      else
        key =  (obj.getString(1),obj.getString(3),obj.getString(5),obj.getString(7),obj.getString(9))

      (key,jsonObj)
    })
      .aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp)
      .map( obj => {
        val resList = obj._2

        //司机流失率
        val lastNaviTaskList = resList.filter( _.getBoolean("isLastNavi") )
        val lastNaviTaskAmount = lastNaviTaskList.length
        val driverLossAmount = lastNaviTaskList.count( !_.getBoolean("isCurrentNavi") )
        //司机存留率
        val driverKeepAmount = lastNaviTaskList.count( _.getBoolean("isCurrentNavi") )
        //司机新增率
        val lastNoNaviTaskList =  resList.filter( ! _.getBoolean("isLastNavi") )
        val lastNoNaviTaskAmount = lastNoNaviTaskList.length
        val driverAddAmount = lastNoNaviTaskList.count( _.getBoolean("isCurrentNavi") )

        val jsonObj = new JSONObject()
        jsonObj.put("lastNaviTaskAmount",lastNaviTaskAmount)
        jsonObj.put("driverLossAmount",driverLossAmount)
        jsonObj.put("driverKeepAmount",driverKeepAmount)
        jsonObj.put("lastNoNaviTaskAmount",lastNoNaviTaskAmount)
        jsonObj.put("driverAddAmount",driverAddAmount)

        (obj._1,jsonObj)
      }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"统计司机变更指标共:${driverChangeRdd.count()}")

    lastTaskNaviDF.unpersist()
    currentTaskNaviDF.unpersist()

    driverChangeRdd
  }

  def joinDriverReuseChange( driverChangeRdd: RDD[((String, String, String, String, String), JSONObject)],
                             driverReuseRdd: RDD[((String, String, String, String, String), JSONObject)],incDay:String) ={
    val driverReuseChangeDf =
      driverReuseRdd./*leftOuterJoin*/fullOuterJoin(driverChangeRdd).map( obj => {
        /*val leftBody = obj._2._1
        val rightBody = obj._2._2*/
        var leftBody = new JSONObject()
        var rightBody = new JSONObject()
        val (dest_province,dest_citycode,dest_deptcode,sdk_version,system) = obj._1
        val md5Instance = MD5Util.getMD5Instance
        val id = MD5Util.getMD5(md5Instance, Array(incDay,dest_province,dest_citycode,dest_deptcode,sdk_version,system).mkString("_"))

        /*if ( rightBody.nonEmpty )
          leftBody.fluentPutAll(rightBody.get)*/

        if ( obj._2._1.nonEmpty && obj._2._2.nonEmpty)
          leftBody = obj._2._1.get.fluentPutAll(obj._2._2.get)
        else if(obj._2._1.isEmpty && obj._2._2.nonEmpty)
          leftBody = obj._2._2.get
        else
          leftBody = obj._2._1.get

        Row(id,incDay,dest_province,dest_citycode,dest_deptcode,sdk_version,system,leftBody.getInteger("driver_amount"),leftBody.getInteger("reuse_0"),
          leftBody.getInteger("reuse_1"),leftBody.getInteger("reuse_2"),leftBody.getInteger("reuse_5"),leftBody.getInteger("reuse_10"),
          leftBody.getInteger("lastNaviTaskAmount"),leftBody.getInteger("driverLossAmount"),leftBody.getInteger("driverKeepAmount"),
          leftBody.getInteger("lastNoNaviTaskAmount"),leftBody.getInteger("driverAddAmount"))
      }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"共关联司机重用和变更情况共${driverReuseChangeDf.count()}")

    driverChangeRdd.unpersist()
    driverReuseRdd.unpersist()

    driverReuseChangeDf
  }

  def getQuestionnaireData( spark:SparkSession,incDay:String,yesterday:String ) = {
    logger.error("开始获取问卷调查数据")

    //20210428修改sql

    val querySql =
      s"""
         |select
         |   rel_id,question_seq,answer_txt,template_id,id,account,file_name,device_type,create_time,app_version
         |from (
         |   select
         |      a.rel_id,
         |      a.question_seq,
         |      a.answer_txt,
         |      a.template_id,
         |      b.id,
         |      b.account,
         |      b.file_name,
         |      nvl(b.device_type,'') device_type,
         |      b.create_time,
         |      nvl(b.app_version,'') app_version,
         |      b.inc_day,
         |      row_number() over( PARTITION by  a.rel_id,a.question_seq,b.file_name,b.account order by a.rel_id desc,a.question_seq desc) num
         |    from
         |      (
         |        select
         |          *
         |        from
         |          dm_gis.record_tt_cm_question_answer
         |        where
         |          inc_day = '$incDay'
         |      ) a
         |      left JOIN (
         |        select
         |          *
         |        from
         |          dm_gis.record_tt_cm_question
         |        where
         |          inc_day = '$incDay'
         |      ) b
         |    on a.rel_id = b.id
         |) tmp
         |where num = 1 and question_seq < 11
         |""".stripMargin

    logger.error(querySql)

    questionnaireRdd = spark.sql(querySql).repartition(100).rdd.map( obj =>{
      val jsonObj = new JSONObject()
      jsonObj.put("rel_id",obj.getString(0))
      jsonObj.put("question_seq",obj.getString(1))
      jsonObj.put("answer_txt",obj.getString(2))
      jsonObj.put("template_id",obj.getString(3))
      jsonObj.put("id",obj.getString(4))
      jsonObj.put("account",obj.getString(5))
      jsonObj.put("file_name",obj.getString(6))
      jsonObj.put("create_time",obj.getString(8))

      ((obj.getString(9),obj.getString(7)),jsonObj)
    }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"获取问卷调查数据共:${questionnaireRdd.count()}")

    questionnaireRdd
  }

  def doQuestionnaireAccRateStatistics( questionnaireDataRdd: RDD[((String, String), JSONObject)],incDay:String,yesterday:String ) ={
    val questionnaireAccRateDf = questionnaireDataRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp)
      .map( obj => {
        val (app_version,device_type) = obj._1
        val resList = obj._2
        val md5Instance = MD5Util.getMD5Instance
        val id = MD5Util.getMD5(md5Instance, Array(incDay,app_version,device_type).mkString("_"))

        val getAcc = (qSeq:String,qtype:String) => {
          val qList = resList.filter(json => qSeq.equals(json.getString("question_seq")))
          val qAmount = qList.length
          val qAccAmount = qList.count( json => qtype.equals(json.getString("answer_txt")) )

          (qAmount,qAccAmount)
        }

        //问卷数量
        val questionnaireAmount = resList.map(_.getString("rel_id")).distinct.length
        //司机数量
        val driverAmount = resList.map(_.getString("account")).distinct.length
        //问题1正确率
        val (q1Amount,q1AccAmount) = getAcc("1","B")
        //问题2正确率
        val (q2Amount,q2AccAmount) = getAcc("2","B")
        //问题3正确率
        val (q3Amount,q3AccAmount) = getAcc("3","A")
        //问题4正确率
        val q4List = resList.filter(json => "4".equals(json.getString("question_seq")))
        val q4Amount = q4List.count(json =>  ! "C".equals(json.getString("answer_txt")))
        val q4AccAmount = q4List.count(json => "A".equals(json.getString("answer_txt")))
        //问题5正确率
        val (q5Amount,q5AccAmount) = getAcc("5","A")
        //问题6正确率
        val (q6Amount,q6AccAmount) = getAcc("6","A")
        //问题7正确率
        val (q7Amount,q7AccAmount) = getAcc("7","A")
        //问题8正确率
        val (q8Amount,q8AccAmount) = getAcc("8","A")
        //问题9正确率
        val (q9Amount,q9AccAmount) = getAcc("9","A")
        //20210429新增问题10正确率
        val q10List = resList.filter(json => "10".equals(json.getString("question_seq")))
        var q10Amount = 0d
        q10List.map(json => {(JSONUtils.getJsonValueDouble(json,"answer_txt",0))}).toStream.foreach(x => {q10Amount = q10Amount + x})
        val q10AccAmount = q10List.length
        //val (q10Amount,q10AccAmount) = getAcc("10","B")

        Row(id,incDay,app_version,device_type,questionnaireAmount,driverAmount,q1Amount,q1AccAmount,q2Amount,q2AccAmount,q3Amount,q3AccAmount,
          q4Amount,q4AccAmount, q5Amount,q5AccAmount,q6Amount,q6AccAmount,q7Amount,q7AccAmount,q8Amount,q8AccAmount,q9Amount,q9AccAmount,q10Amount.toInt,q10AccAmount)
      }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"统计问卷调查准确率指标共:${questionnaireAccRateDf.count()}")

    questionnaireDataRdd.unpersist()
    questionnaireAccRateDf
  }

  def doQuestionnaireErrRateStatistics( questionnaireDataRdd: RDD[((String, String), JSONObject)],incDay:String,yesterday:String ) = {
    val questionnaireErrRateDf = questionnaireDataRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp)
      .map( obj => {
        val (app_version,device_type) = obj._1
        val resList = obj._2
        val md5Instance = MD5Util.getMD5Instance
        val id = MD5Util.getMD5(md5Instance, Array(incDay,app_version,device_type).mkString("_"))

        val gerErrDriver = ( qList: List[JSONObject]) => {
          val qErrAmount = qList.length
          val (maxDriverId,maxList) = if (qList.isEmpty) ("",List()) else qList.groupBy(_.getString("account")).maxBy(_._2.length)
          (qErrAmount,maxList.length,maxDriverId)
        }

        val getErr1 = (qSeq:String) => {
          val qList = resList.filter( json => qSeq.equals(json.getString("question_seq")) && "B".equals( json.getString("answer_txt")) )
          val  (qErrAmount,maxDriverAmount,maxDriverId) = gerErrDriver( qList )
          (qErrAmount,maxDriverAmount,maxDriverId)
        }

        val getErr3 = (qSeq:String) => {
          val qList = resList.filter( json => qSeq.equals(json.getString("question_seq")) && ! "A".equals(json.getString("answer_txt")) )
          val  (qErrAmount,maxDriverAmount,maxDriverId) = gerErrDriver( qList )
          val (maxErrType,maxErrTypeList) = if (qList.isEmpty) ("",List()) else qList.groupBy(_.getString("answer_txt")).maxBy(_._2.length)
          (qErrAmount,maxDriverAmount,maxDriverId,maxErrType,maxErrTypeList.length)
        }

        //问卷数量
        val questionnaireAmount = resList.map(_.getString("rel_id")).distinct.length
        //司机数量
        val driverAmount = resList.map(_.getString("account")).distinct.length
        //问题1错误量
        val (q1ErrAmount,q1MaxDriverAmount,q1MaxDriverId) = getErr1("1")
        //问题2错误量
        val (q2ErrAmount,q2MaxDriverAmount,q2MaxDriverId) = getErr1("2")
        //问题3错误量
        val (q3ErrAmount,maxQ3DriverAmount,maxQ3DriverId,maxQ3ErrType,maxQ3ErrTypeAmount) =getErr3("3")
        //问题4错误量
        val (q4ErrAmount,q4MaxDriverAmount,q4MaxDriverId) = getErr1("4")
        //问题5错误量
        val (q5ErrAmount,q5MaxDriverAmount,q5MaxDriverId) = getErr1("5")
        //问题6错误量
        val (q6ErrAmount,q6MaxDriverAmount,q6MaxDriverId) = getErr1("6")
        //问题7错误量
        val (q7ErrAmount,q7MaxDriverAmount,q7MaxDriverId) = getErr1("7")
        //问题8错误量
        val (q8ErrAmount,maxQ8DriverAmount,maxQ8DriverId,maxQ8ErrType,maxQ8ErrTypeAmount) =getErr3("8")
        //问题9错误量
        val (q9ErrAmount,q9MaxDriverAmount,q9MaxDriverId) = getErr1("9")

        Row(id,incDay,app_version,device_type,questionnaireAmount,driverAmount,q1ErrAmount,q1MaxDriverAmount,q1MaxDriverId,q2ErrAmount,q2MaxDriverAmount,q2MaxDriverId,
          q3ErrAmount,maxQ3DriverAmount,maxQ3DriverId,maxQ3ErrType,maxQ3ErrTypeAmount,q4ErrAmount,q4MaxDriverAmount,q4MaxDriverId,q5ErrAmount,q5MaxDriverAmount,q5MaxDriverId,
          q6ErrAmount,q6MaxDriverAmount,q6MaxDriverId,q7ErrAmount,q7MaxDriverAmount,q7MaxDriverId,q8ErrAmount,maxQ8DriverAmount,maxQ8DriverId,maxQ8ErrType,maxQ8ErrTypeAmount,
          q9ErrAmount,q9MaxDriverAmount,q9MaxDriverId)
      }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"统计问卷调查错误占比指标共:${questionnaireErrRateDf.count()}")

    questionnaireDataRdd.unpersist()
    questionnaireErrRateDf
  }

  def getServiceCostTimeData(spark:SparkSession,incDay:String,yesterday:String )={
    logger.error("开始查询服务响应时间")

    val querySql =
      s"""
         |--top3-eta
         |select
         |    "top3" as `module`,
         |    "top3-eta" as `service`,
         |    req_costtime as `cost_time`,
         |    req_starttime as `req_time`
         |from dm_gis.gis_navi_top3_parse
         |where inc_day='$incDay'
         |    and req_costtime is not null and req_costtime <> '' and req_costtime <> 'null'
         |union all
         |--Top3-pns
         |select
         |    "top3" as `module`,
         |    "top3-pns" as `service`,
         |    pnstop3_costtime as `cost_time`,
         |    pnstop3_starttime as `req_time`
         |from dm_gis.gis_navi_top3_parse
         |where inc_day='$incDay'
         |    and pnstop3_costtime is not null and pnstop3_costtime <> '' and pnstop3_costtime <> 'null'
         |union all
         |--noYaw-eta
         |select
         |    "noYaw" as `module`,
         |    "noYaw-eta" as `service`,
         |    req_costtime as `cost_time`,
         |    req_starttime as `req_time`
         |from dm_gis.gis_navi_no_yaw_parse
         |where inc_day='$incDay'
         |   and req_costtime is not null and req_costtime <> '' and req_costtime <> 'null'
         |union all
         |--noYaw-pns
         |select
         |    "noYaw" as `module`,
         |    "noYaw-pns" as `service`,
         |    qmpoint_costtime as `cost_time`,
         |    qmpoint_starttime as `req_time`
         |from dm_gis.gis_navi_no_yaw_parse
         |where inc_day='$incDay'
         |    and qmpoint_costtime is not null and qmpoint_costtime <> '' and qmpoint_costtime <> 'null'
         |union all
         |--Yaw-eta
         |select
         |    "Yaw" as `module`,
         |    "Yaw-eta" as `service`,
         |    req_costtime as `cost_time`,
         |    req_start_time as `req_time`
         |from dm_gis.gis_navi_yaw_parse
         |where inc_day='$incDay'
         |    and req_costtime is not null and req_costtime <> '' and req_costtime <> 'null'
         |union all
         |--Yaw-eta-sf
         |select
         |    "Yaw" as `module`,
         |    "Yaw-eta-sf" as `service`,
         |    req_costtime as `cost_time`,
         |    req_start_time as `req_time`
         |from dm_gis.gis_navi_yaw_parse
         |where inc_day='$incDay'
         |    and req_costtime is not null and req_costtime <> '' and req_costtime <> 'null'
         |    and sfpnstop3_costtime is not null and sfpnstop3_costtime <> '' and sfpnstop3_costtime <> 'null'
         |    and (jypnstop3_costtime is null or jypnstop3_costtime = '' or jypnstop3_costtime = 'null')
         |union all
         |--Yaw-eta-jy
         |select
         |    "Yaw" as `module`,
         |    "Yaw-eta-jy" as `service`,
         |    req_costtime as `cost_time`,
         |    req_start_time as `req_time`
         |from dm_gis.gis_navi_yaw_parse
         |where inc_day='$incDay'
         |    and req_costtime is not null and req_costtime <> '' and req_costtime <> 'null'
         |    and jypnstop3_costtime is not null and jypnstop3_costtime <> '' and jypnstop3_costtime <> 'null'
         |    and (sfpnstop3_costtime is null or sfpnstop3_costtime = '' or sfpnstop3_costtime = 'null')
         |union all
         |--Yaw-eta-jy-sf
         |select
         |    "Yaw" as `module`,
         |    "Yaw-eta-jy-sf" as `service`,
         |    req_costtime as `cost_time`,
         |    req_start_time as `req_time`
         |from dm_gis.gis_navi_yaw_parse
         |where inc_day='$incDay'
         |    and req_costtime is not null and req_costtime <> '' and req_costtime <> 'null'
         |    and sfpnstop3_costtime is not null and sfpnstop3_costtime <> '' and sfpnstop3_costtime <> 'null'
         |    and jypnstop3_costtime is not null and jypnstop3_costtime <> '' and jypnstop3_costtime <> 'null'
         |union all
         |--Yaw-pns-1
         |select
         |    "Yaw" as `module`,
         |    "Yaw-pns" as `service`,
         |    sfpnstop3_costtime as `cost_time`,
         |    sfpnstop3_starttime as `req_time`
         |from dm_gis.gis_navi_yaw_parse
         |where inc_day='$incDay'
         |    and sfpnstop3_costtime is not null and sfpnstop3_costtime <> '' and sfpnstop3_costtime <> 'null'
         |union all
         |-- Yaw-pns-2
         |select
         |    "Yaw" as `module`,
         |    "Yaw-pns" as `service`,
         |    jypnstop3_costtime as `cost_time`,
         |    jypnstop3_starttime as `req_time`
         |from dm_gis.gis_navi_yaw_parse
         |where inc_day='$incDay'
         |    and jypnstop3_costtime is not null and jypnstop3_costtime <> '' and jypnstop3_costtime <> 'null'
         |union all
         |--Yaw-pns-sf
         |select
         |    "Yaw" as `module`,
         |    "Yaw-pns-sf" as `service`,
         |    sfpnstop3_costtime as `cost_time`,
         |    jypnstop3_starttime as `req_time`
         |from dm_gis.gis_navi_yaw_parse
         |where inc_day='$incDay'
         |    and sfpnstop3_costtime is not null and sfpnstop3_costtime <> '' and sfpnstop3_costtime <> 'null'
         |union all
         |--Yaw-pns-jy
         |select
         |    "Yaw" as `module`,
         |    "Yaw-pns-jy" as `service`,
         |    jypnstop3_costtime as `cost_time`,
         |    jypnstop3_starttime as `req_time`
         |from dm_gis.gis_navi_yaw_parse
         |where inc_day='$incDay'
         |    and jypnstop3_costtime is not null and jypnstop3_costtime <> '' and jypnstop3_costtime <> 'null'
         |""".stripMargin

    logger.error(querySql)

    serviceCostTimeRdd = spark.sql(querySql).repartition(100).rdd.map( obj =>{
      val jsonObj = new JSONObject()
      jsonObj.put("module",obj.getString(0))
      jsonObj.put("service",obj.getString(1))
      jsonObj.put("cost_time",obj.getString(2))
      jsonObj.put("req_time",obj.getString(3))

      ((obj.getString(0),obj.getString(1)),jsonObj)
    }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"获取问卷调查数据共:${serviceCostTimeRdd.count()}")

    serviceCostTimeRdd
  }

  def doServiceResponseStatistics( serviceCostTimeRdd: RDD[((String, String), JSONObject)],incDay:String) ={
    logger.error("开始执行统计服务指标-响应时间")

    val serviceCostTimeDf = serviceCostTimeRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp)
      .map( obj => {
        val (module,service) = obj._1
        val resList = obj._2
        val md5Instance = MD5Util.getMD5Instance
        val id = MD5Util.getMD5(md5Instance, Array(incDay,module,service).mkString("_"))

        val segMap = resList.map(json => {
          json match {
            case json if (json.getLong("cost_time") < 200) => json.put("seg_time","resp_0_200")
            case json if (json.getLong("cost_time") >= 200 && json.getLong("cost_time") < 500) => json.put("seg_time","resp_200_500")
            case json if (json.getLong("cost_time") >= 500 && json.getLong("cost_time") < 1000) => json.put("seg_time","resp_500_1000")
            case json if (json.getLong("cost_time") >= 1000 && json.getLong("cost_time") < 1500) => json.put("seg_time","resp_1000_1500")
            case json if (json.getLong("cost_time") >= 1500 && json.getLong("cost_time") < 2000) => json.put("seg_time","resp_1500_2000")
            case json if (json.getLong("cost_time") >= 2000 && json.getLong("cost_time") < 3000) => json.put("seg_time","resp_2000_3000")
            case json if (json.getLong("cost_time") >= 3000) => json.put("seg_time","res_3000")
          }

          json
        }).groupBy(_.getString("seg_time"))

        Row(id,incDay,module,service,segMap.getOrElse("resp_0_200",List()).length,segMap.getOrElse("resp_200_500",List()).length,
          segMap.getOrElse("resp_500_1000",List()).length,segMap.getOrElse("resp_1000_1500",List()).length,
          segMap.getOrElse("resp_1500_2000",List()).length, segMap.getOrElse("resp_2000_3000",List()).length,
          segMap.getOrElse("res_3000",List()).length
        )
      }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"统计服务指标-响应时间共:${serviceCostTimeDf.count}")

    serviceCostTimeDf
  }

  def doServicePerformanceStatistics( spark:SparkSession,serviceCostTimeDataRdd:RDD[((String, String, Long),JSONObject)],incDay:String) = {
    logger.error("开始执行统计服务性能-指标统计")

    val commonCal = (resList: List[JSONObject]) => {
      val md5Instance = MD5Util.getMD5Instance
      //请求峰值
      val reqPeak = resList.maxBy(_.getInteger("minuReqAmount")).getInteger("minuReqAmount")
      //平均响应时间
      val avgCostTime = resList.map(json => {json.getInteger("minAvgCostTime").toInt}).sum / resList.length
      //99%响应时间
      var costTimeList = List[String]()
      resList.map(json => costTimeList = json.getString("costTimeList").split(",").toList ::: costTimeList)

      costTimeList = costTimeList.sortBy(_.toLong)
      val per99CostTime = costTimeList (Math.round((costTimeList.length - 1 ) * 0.99).toInt).toInt

      (md5Instance,reqPeak,avgCostTime,per99CostTime)
    }

    val serviceCostTimeDfTmp = serviceCostTimeDataRdd
      .aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp)
      .map(
        obj => {
          val sdf = new SimpleDateFormat("yyyyMMddHHmmss");
          val date = sdf.parse(incDay+"000000")

          val (module,service,minute) = obj._1
          val resList = obj._2
          //每分钟访问量
          val minuReqAmount = resList.length
          //每分钟平均响应时间
          val minAvgCostTime = resList.map(_.getInteger("cost_time").toInt).sum / minuReqAmount
          //每分钟99%响应时间
          resList.sortBy(_.getLong("cost_time"))
          val minPer99CostTime = resList (Math.round((resList.length - 1 ) * 0.99).toInt).getInteger("cost_time")

          val jsonObj= new JSONObject()

          jsonObj.put("minute",sdf.format(new Date(date .getTime() + minute *60 * 1000 ) ).dropRight(2))
          jsonObj.put("minuReqAmount",minuReqAmount)
          jsonObj.put("minAvgCostTime",minAvgCostTime)
          jsonObj.put("minPer99CostTime",minPer99CostTime)
          jsonObj.put("costTimeList", resList.map(_.getLong("cost_time")).mkString(","))

          ((module,service),jsonObj)
        }
      ).persist(StorageLevel.DISK_ONLY)

    val serviceCostTimeDf =
      serviceCostTimeDfTmp.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp)
        .flatMap(obj => {
          val (module,service) = obj._1
          val resList = obj._2

          val  (md5Instance,reqPeak,avgCostTime,per99CostTime)= commonCal(resList)

          for ( jsonObj <- resList ) yield {
            val id = MD5Util.getMD5(md5Instance, Array(incDay,module,service,jsonObj.getString("minute")).mkString("_"))

            Row(id,incDay,module,service,jsonObj.getString("minute"),reqPeak
              ,jsonObj.getInteger("minuReqAmount"),avgCostTime
              ,jsonObj.getInteger("minAvgCostTime"),per99CostTime
              ,jsonObj.getInteger("minPer99CostTime")
            )
          }

        }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"统计服务指标-响应时间共:${serviceCostTimeDf.count}")

    /*按时间维度统计*/
    val dateList = serviceCostTimeDfTmp.values.collect()
    val  (md5Instance,reqPeak,avgCostTime,per99CostTime)= commonCal(dateList.toList)

    val id = MD5Util.getMD5(md5Instance, Array(incDay,"all","all","all").mkString("_"))

    val allDf = spark.sparkContext.parallelize(Array(Row(
      id,incDay,"all","all","all",reqPeak,-1,avgCostTime,-1,per99CostTime,-1
    ))).persist(StorageLevel.DISK_ONLY)

    logger.error("按天聚合完毕")

    /*按模块维度统计*/
    val moduleRdd =
      serviceCostTimeDfTmp.map(obj => (obj._1._1,obj._2))
        .aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp)
        .map(obj => {
          val module = obj._1
          val resList = obj._2
          val  (md5Instance,reqPeak,avgCostTime,per99CostTime)= commonCal(resList)

          val id = MD5Util.getMD5(md5Instance, Array(incDay,module,"all","all").mkString("_"))

          Row(id,incDay,module,"all","all",reqPeak,-1,avgCostTime,-1,per99CostTime,-1)
        }).persist(StorageLevel.DISK_ONLY)
    logger.error(s"按照模块维度统计共${moduleRdd.count}")

    //按照服务维度统计
    val serviceRdd =
      serviceCostTimeDfTmp.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp)
        .map( obj => {
          val (module,service) = obj._1
          val resList = obj._2
          val  (md5Instance,reqPeak,avgCostTime,per99CostTime)= commonCal(resList)

          val id = MD5Util.getMD5(md5Instance, Array(incDay,module,service,"all").mkString("_"))

          Row(id,incDay,module,service,"all",reqPeak,-1,avgCostTime,-1,per99CostTime,-1)
        }).persist(StorageLevel.DISK_ONLY)
    logger.error(s"按照服务维度统计共${serviceRdd.count}")


    serviceCostTimeDfTmp.unpersist()
    allDf.unpersist()
    serviceCostTimeDataRdd.unpersist()
    moduleRdd.unpersist()

    serviceCostTimeDf.union(allDf).union(moduleRdd).union(serviceRdd)
  }

  def start(spark: SparkSession,args: Array[String] ): Unit = {

    var incDay = ""
    var yesterday = ""

    if (args.length >2 ){
      incDay = args(0)
      yesterday = args(1)

      val funcArr = args.drop(2)

      println(incDay,yesterday)

      //反射调用任意统计方法
      funcArr.foreach(
        index => {
          val funcName = funcMap.get(index)
          println("\n\n\n"+s">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>${funcName}开始<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<")
          this.getClass.getDeclaredMethod(funcName, classOf[SparkSession], classOf[String],classOf[String]).invoke(this, spark, incDay,yesterday)
          println("\n"+s">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>${funcName}结束<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<"+"\n\n\n")
        }
      )

      logger.error(">>>统计结束!<<<")
    } else {
      logger.error("参数长度异常")
      System.exit(1)
    }


  }
}

标签:逻辑,obj,报表,val,StructField,中样,IntegerType,getString,true
来源: https://blog.csdn.net/qq_38680817/article/details/119319831

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有