标签:03 30 窗口 watermark flink window 2018 import
首先window的时间范围是一个自然时间范围,比如你定义了一个
TumblingEventTimeWindows.of(Time.seconds(3))
窗口,那么会生成类似如下的窗口(左闭右开):
[2018-03-03 03:30:00,2018-03-03 03:30:03)
[2018-03-03 03:30:03,2018-03-03 03:30:06)
...
[2018-03-03 03:30:57,2018-03-03 03:31:00)
当一个event time=2018-03-03 03:30:01的消息到来时,就会生成[2018-03-03 03:30:00,2018-03-03 03:30:03)这个窗口(而不是[2018-03-03 03:30:01,2018-03-03 03:30:04)这样的窗口),然后将消息buffer在这个窗口中(此时还不会触发窗口的计算),
当watermark(可以翻译成水位线,只会不断上升,不会下降,用来保证在水位线之前的数据一定都到达,org.apache.flink.streaming.api.datastream.DataStream#assignTimestampsAndWatermarks(org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks<T>)来定制水位线生成策略)超过窗口的endTime时,才会真正触发窗口计算逻辑,然后清除掉该窗口。这样后续再有在该窗口区间内的数据到来时(延迟到来的数据),将被丢弃不被处理。有时候我们希望可以容忍一定时间的数据延迟,我们可以通过org.apache.flink.streaming.api.datastream.WindowedStream#allowedLateness方法来指定一个允许的延迟时间,比如allowedLateness(Time.seconds(5))允许5秒延迟,还是用上面的样例距离,当watermark到达2018-03-03 03:30:03时,将不会移除窗口,但是会触发窗口计算函数,由于窗口还在,所以当还有延迟的消息时间落在该窗口范围内时,比如又有一条消息2018-03-03 03:30:02到来(已经小于水位线时间2018-03-03 03:30:03),将会再次触发窗口计算函数。
那么什么时候这个窗口会被移除后续的延迟数据将不被处理呢?比如[2018-03-03 03:30:00,2018-03-03 03:30:03)这个窗口,当允许延迟5秒时,将在watermark到达2018-03-03 03:30:08时(即[watermark-5秒=2018-03-03 03:30:03]到达窗口的endTime时),[2018-03-03 03:30:00,2018-03-03 03:30:03)这个窗口将被移除,后续再有事件时间落在该窗口的数据到来,将丢弃不处理。
附一个demo测试程序:
package windowing; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.text.SimpleDateFormat; import java.util.Comparator; import java.util.Date; import java.util.LinkedList; /** * * @author : xiaojun * @since 9:47 2018/4/2 */ public class WatermarkTest { public static void main(String[] args) throws Exception { //2018/3/3 3:30:0 Long baseTimestamp = 1520019000000L; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.getConfig().setAutoWatermarkInterval(2000); env.setParallelism(1); DataStream<Tuple2<String, Long>> raw = env.socketTextStream("localhost", 9999, "\n").map(new MapFunction<String, Tuple2<String, Long>>() { @Override public Tuple2<String, Long> map(String value) throws Exception { //每行输入数据形如: key1@0,key1@13等等,即在baseTimestamp的基础上加多少秒,作为当前event time String[] tmp = value.split("@"); Long ts = baseTimestamp + Long.parseLong(tmp[1]) * 1000; return Tuple2.of(tmp[0], ts); } }).assignTimestampsAndWatermarks(new MyTimestampExtractor(Time.seconds(10))); //允许10秒乱序,watermark为当前接收到的最大事件时间戳减10秒 DataStream<String> window = raw.keyBy(0) //窗口都为自然时间窗口,而不是说从收到的消息时间为窗口开始时间来进行开窗,比如3秒的窗口,那么窗口一次是[0,3),[3,6)....[57,0),如果10秒窗口,那么[0,10),[10,20),... .window(TumblingEventTimeWindows.of(Time.seconds(3))) // 允许5秒延迟 //比如窗口[2018-03-03 03:30:00,2018-03-03 03:30:03),如果没有允许延迟的话,那么当watermark到达2018-03-03 03:30:03的时候,将会触发窗口函数并移除窗口,这样2018-03-03 03:30:03之前的数据再来,将被丢弃 //在允许5秒延迟的情况下,那么窗口的移除时间将到watermark为2018-03-03 03:30:08,在watermark没有到达这个时间之前,你输入2018-03-03 03:30:00这个时间,将仍然会触发[2018-03-03 03:30:00,2018-03-03 03:30:03)这个窗口的计算 .allowedLateness(Time.seconds(5)) .apply(new WindowFunction<Tuple2<String, Long>, String, Tuple, TimeWindow>() { @Override public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception { LinkedList<Tuple2<String, Long>> data = new LinkedList<>(); for (Tuple2<String, Long> tuple2 : input) { data.add(tuple2); } data.sort(new Comparator<Tuple2<String, Long>>() { @Override public int compare(Tuple2<String, Long> o1, Tuple2<String, Long> o2) { return o1.f1.compareTo(o2.f1); } }); SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String msg = String.format("key:%s, window:[ %s , %s ), elements count:%d, elements time range:[ %s , %s ]", tuple.getField(0) , format.format(new Date(window.getStart())) , format.format(new Date(window.getEnd())) , data.size() , format.format(new Date(data.getFirst().f1)) , format.format(new Date(data.getLast().f1)) ); out.collect(msg); } }); window.print(); env.execute(); } public static class MyTimestampExtractor implements AssignerWithPeriodicWatermarks<Tuple2<String, Long>> { private static final long serialVersionUID = 1L; /** * The current maximum timestamp seen so far. */ private long currentMaxTimestamp; /** * The timestamp of the last emitted watermark. */ private long lastEmittedWatermark = Long.MIN_VALUE; /** * The (fixed) interval between the maximum seen timestamp seen in the records * and that of the watermark to be emitted. */ private final long maxOutOfOrderness; private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public MyTimestampExtractor(Time maxOutOfOrderness) { if (maxOutOfOrderness.toMilliseconds() < 0) { throw new RuntimeException("Tried to set the maximum allowed " + "lateness to " + maxOutOfOrderness + ". This parameter cannot be negative."); } this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds(); this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness; } public long getMaxOutOfOrdernessInMillis() { return maxOutOfOrderness; } @Override public final Watermark getCurrentWatermark() { // this guarantees that the watermark never goes backwards. long potentialWM = currentMaxTimestamp - maxOutOfOrderness; if (potentialWM >= lastEmittedWatermark) { lastEmittedWatermark = potentialWM; } System.out.println(String.format("call getCurrentWatermark======currentMaxTimestamp:%s , lastEmittedWatermark:%s", format.format(new Date(currentMaxTimestamp)), format.format(new Date(lastEmittedWatermark)))); return new Watermark(lastEmittedWatermark); } @Override public final long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) { long timestamp = element.f1; if (timestamp > currentMaxTimestamp) { currentMaxTimestamp = timestamp; } return timestamp; } } }
赋windows上的nc工具:
https://download.csdn.net/download/xiao_jun_0820/10322207
监听本地9999端口:
nc.exe -l -p 9999
---------------------
原文:https://blog.csdn.net/xiao_jun_0820/article/details/79786517
标签:03,30,窗口,watermark,flink,window,2018,import 来源: https://www.cnblogs.com/cxhfuujust/p/10969068.html
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。