ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Flink -time(处理时间,事件时间,水位线)

2022-07-24 23:01:31  阅读:198  来源: 互联网

标签:DataStream Flink String val flink 时间 time apache import


Flink -time(处理时间,事件时间,水位线)

1. flink基石

2. Time

3. 统计时间

package com.wt.flink.core
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow

object Demo4ProcessionTime {
  def main(args: Array[String]): Unit = {
    /**
     * 统计最近10秒单词的数量,每隔4秒统计一次
     *
     */

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val linesDS: DataStream[String] = env.socketTextStream("master", 8888)

    val wordsDS: DataStream[String] = linesDS.flatMap(_.split(","))

    val kvDS: DataStream[(String, Int)] = wordsDS.map((_, 1))

    val keyByDS: KeyedStream[(String, Int), String] = kvDS.keyBy(_._1)

    val windowDS: WindowedStream[(String, Int), String, TimeWindow] = keyByDS
      //窗口大小时10秒,滑动时间是5秒2
      .window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))

    val countDS: DataStream[(String, Int)] = windowDS.sum(1)

    countDS.print()

    env.execute()
  }
}

4. 事件时间

package com.wt.flink.core
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow

object Demo5EventTime {
  def main(args: Array[String]): Unit = {
    /**
     * 数据:
     * A9A7N2 340500 161471180000
       A9A7N2 340500 161471181000
       A9A7N2 340500 161471182000
       A9A7N2 340500 161471183000
       A9A7N2 340500 161471184000
       A9A7N2 340500 161471185000
       A9A7N2 340500 161471186000
       A9A7N2 340500 161471187000
       A9A7N2 340500 161471188000
       A9A7N2 340500 161471189000
       A9A7N2 340500 161471190000
       A9A7N2 340500 161471191000
     *
     */

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)

    //读取卡口过车数据
    val dataDS: DataStream[String] = env.socketTextStream("master", 8888)

    //整理数据取出道路编号和时间戳
    val kcDS: DataStream[(String, Long)] = dataDS.map(line => {
      val split: Array[String] = line.split(",")
      //道路编号
      val roadId: String = split(1)
      //时间戳
      val ts: Long = split(2).toLong
      (roadId, ts)
    })

    /**
     * 要使用事件时间需要告诉flink程序哪一个字段是事件时间
     * 时间字段必须是毫秒级别
     *
     */
    val assDS: DataStream[(String, Long)] = kcDS.assignAscendingTimestamps(kv => kv._2)

    /**
     * 统计每个道路的车流量,每隔5秒统计一次,统计近10秒的数据
     *
     */
    val roadKvDS: DataStream[(String, Int)]  = assDS.map(kv => (kv._1, 1))

    //按照道路分组
    val keyByDS: KeyedStream[(String, Int), String] = roadKvDS.keyBy(_._1)

    val windowDS: WindowedStream[(String, Int), String, TimeWindow] = keyByDS
      .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))

    val countDS: DataStream[(String, Int)] = windowDS.sum(1)

    countDS.print()

    env.execute()
  }
}

当出现,时间为2022.01.01 10:10:10先到达,2022.01.01 10:10:04后到达时,04 这条数据就会丢失,报错情况如下。可以采用水位线的方法解决。但是缺点是会造成一定的延时

5. 水位线

package com.wt.flink.core
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{SlidingEventTimeWindows, TumblingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import java.time.Duration
object Demo5ShuiWei {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)

    //读取卡口过车数据
    val dataDS: DataStream[String] = env.socketTextStream("master", 8888)

    //整理数据取出道路编号和时间戳
    val kcDS: DataStream[(String, Long)] = dataDS.map(line => {
      val split: Array[String] = line.split(",")
      //道路编号
      val roadId: String = split(1)
      //时间戳
      val ts: Long = split(2).toLong
      (roadId, ts)
    })

    /**
     * 要使用事件时间需要告诉flink程序哪一个字段是事件时间
     * 时间字段必须是毫秒级别
     *
     */
    //默认水位线等于最新一条数据的时间戳,水位线只能增加不能减少
    //val assDS: DataStream[(String, Long)] = kcDS.assignAscendingTimestamps(kv => kv._2)

    val assDS: DataStream[(String, Long)] = kcDS.assignTimestampsAndWatermarks(
      WatermarkStrategy
        //设置水位线的生成策略,前移5秒
        .forBoundedOutOfOrderness(Duration.ofSeconds(5))
        //设置时间字段
        .withTimestampAssigner(new SerializableTimestampAssigner[(String, Long)] {
          override def extractTimestamp(element: (String, Long), recordTimestamp: Long): Long = {
            //时间字段
            element._2
          }
        })
    )

    /**
     * 统计每个道路的车流量 每隔5秒统计一次 统计最近5秒的车辆
     */
    val roadKvDS: DataStream[(String, Int)] = assDS.map(kv => (kv._1, 1))

    //按照道路分组
    val keyByDS: KeyedStream[(String, Int), String] = roadKvDS.keyBy(_._1)

    val windowDS: WindowedStream[(String, Int), String, TimeWindow] = keyByDS
      //滑动的处理时间窗口
      .window(TumblingEventTimeWindows.of(Time.seconds(5)))

    val countDS: DataStream[(String, Int)] = windowDS.sum(1)

    countDS.print()

    env.execute()
  }
}

标签:DataStream,Flink,String,val,flink,时间,time,apache,import
来源: https://www.cnblogs.com/atao-BigData/p/16515787.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有