标签:EventTime val Watermark Flink 00 Window env
第七章 EventTime 与 Window
7.1 EventTime 的引入
在 Flink 的 流 式 处 理中 , 绝 大 部 分 的 业务都 会 使 用 eventTime,一般只在 eventTime 无法使用时,才会被迫使用 ProcessingTime 或者 IngestionTime。 如果要使用 EventTime,那么需要引入 EventTime 的时间属性, 引入方式如下所示:val env = StreamExecutionEnvironment.getExecutionEnvironment
// 从调用时刻开始给 env 创建的每一个 stream 追加时间特征 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
7.2 Watermark
7.2.1 基本概念
我们知道,流处理从事件产生,到流经 source,再到 operator,中间是有一个过 程和时间的,虽然大部分情况下,流到 operator 的数据都是按照事件产生的时间顺 序来的,但是也不排除由于网络等原因,导致乱序的产生,所谓乱序,就是指 Flink 接收到的事件的先后顺序不是严格按照事件的 Event Time 顺序排列的。![](https://www.icode9.com/i/l/?n=18&i=blog/1604514/201907/1604514-20190713172956964-273970204.png)
7.2.2 Watermark 的引入
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 从调用时刻开始给 env 创建的每一个 stream 追加时间特征 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream = env.socketTextStream("localhost", 11111).assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(200)) { override def extractTimestamp(t: String): Long = { // EventTime 是日志生成时间,我们从日志中解析 EventTime t.split(" ")(0).toLong } })
7.3 EvnetTimeWindow API
当使用 EventTimeWindow 时,所有的 Window 在 EventTime 的时间轴上进行划 分,也就是说,在 Window 启动后,会根据初始的 EventTime 时间每隔一段时间划 分一个窗口,如果 Window 大小是 3 秒,那么 1 分钟内会把 Window 划分为如下的 形式:[00:00:00,00:00:03) [00:00:03,00:00:06) ... [00:00:57,00:01:00)
如果 Window 大小是 10 秒,则 Window 会被分为如下的形式:
[00:00:00,00:00:10) [00:00:10,00:00:20) ... [00:00:50,00:01:00)注意,窗口是左闭右开的,形式为:[window_start_time,window_end_time)。 Window 的设定无关数据本身,而是系统定义好了的,也就是说,Window 会一 直按照指定的时间间隔进行划分,不论这个 Window 中有没有数据,EventTime 在 这个 Window 期间的数据会进入这个 Window。 Window 会不断产生,属于这个 Window 范围的数据会被不断加入到 Window 中, 所有未被触发的 Window 都会等待触发,只要 Window 还没触发,属于这个 Window 范围的数据就会一直被加入到 Window 中,直到 Window 被触发才会停止数据的追 加,而当 Window 触发之后才接受到的属于被触发 Window 的数据会被丢弃。 Window 会在以下的条件满足时被触发执行: watermark 时间 >= window_end_time; 在[window_start_time,window_end_time)中有数据存在。 我们通过下图来说明 Watermark、EventTime 和 Window 的关系。
7.3.1 滚动窗口(TumblingEventTimeWindows)
// 获取执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)结果是按照 Event Time 的时间窗口计算得出的,而无关系统的时间(包括输入的快慢)。
// 创建 SocketSource val stream = env.socketTextStream("localhost", 11111)
// 对 stream 进行处理并按 key 聚合 val streamKeyBy = stream.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(3000)) { override def extractTimestamp(element: String): Long = { val sysTime = element.split(" ")(0).toLong println(sysTime) sysTime }
}
).map(item => (item.split(" ")(1), 1)).keyBy(0)
// 引入滚动窗口 val streamWindow = streamKeyBy.window(TumblingEventTimeWindows.of(Time.seconds(10)))
// 执行聚合操作 val streamReduce = streamWindow.reduce( (item1, item2) => (item1._1, item1._2 + item2._2) )
// 将聚合数据写入文件 streamReduce.print
// 执行程序 env.execute("TumblingWindow")
7.3.2 滑动窗口(SlidingEventTimeWindows)
// 获取执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 创建 SocketSource val stream = env.socketTextStream("localhost", 11111)
// 对 stream 进行处理并按 key 聚合 val streamKeyBy = stream.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(0)) { override def extractTimestamp(element: String): Long = { val sysTime = element.split(" ")(0).toLong println(sysTime) sysTime }
}
).map(item => (item.split(" ")(1), 1)).keyBy(0)
// 引入滚动窗口 val streamWindow = streamKeyBy.window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5)))
// 执行聚合操作 val streamReduce = streamWindow.reduce( (item1, item2) => (item1._1, item1._2 + item2._2) )
// 将聚合数据写入文件 streamReduce.print
// 执行程序 env.execute("TumblingWindow")
7.3.3 会话窗口(EventTimeSessionWindows)
相邻两次数据的 EventTime 的时间差超过指定的时间间隔就会触发执行。如果 加入 Watermark,那么当触发执行时,所有满足时间间隔而还没有触发的 Window 会 同时触发执行。// 获取执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 创建 SocketSource val stream = env.socketTextStream("localhost", 11111)
// 对 stream 进行处理并按 key 聚合 val streamKeyBy = stream.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(0)) { override def extractTimestamp(element: String): Long = { val sysTime = element.split(" ")(0).toLong println(sysTime) sysTime }
}
).map(item => (item.split(" ")(1), 1)).keyBy(0)
// 引入滚动窗口 val streamWindow = streamKeyBy.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
// 执行聚合操作 val streamReduce = streamWindow.reduce( (item1, item2) => (item1._1, item1._2 + item2._2) )
// 将聚合数据写入文件 streamReduce.print
// 执行程序 env.execute("TumblingWindow")
总结
Flink 是一个真正意义上的流计算引擎,在满足低延迟和低容错开销的基础之上,完美 的解决了 exactly-once 的目标,真是由于 Flink 具有诸多优点,越来越多的企业开始使用 Flink 作为流处理框架,逐步替换掉了原本的 Storm 和 Spark 技术框架。
标签:EventTime,val,Watermark,Flink,00,Window,env 来源: https://www.cnblogs.com/LXL616/p/11181488.html
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。