在设置 Periodic Watermark 时,是允许提供一个参数,表示数据最大的延迟时间。其实这个值要结合自己的业务以及数据的情况来设置,如果该值设置的太小会导致数据因为网络或者其他的原因从而导致乱序或者延迟的数据太多,那么最后窗口触发的时候,可能窗口里面的数据量很少,那么这样计算的结果很可能误差会很大,对于有的场景(要求正确性比较高)是不太符合需求的。但是如果该值设置的太大,那么就会导致很多窗口一直在等待延迟的数据,从而一直不触发,这样首先就会导致数据的实时性降低,另外将这么多窗口的数据存在内存中,也会增加作业的内存消耗,从而可能会导致作业发生 OOM 的问题。
综上建议:
- 合理设置允许数据最大延迟时间
- 不太依赖事件时间的场景就不要设置时间策略为 EventTime
延迟数据该如何处理(三种方法)
丢弃(默认)
在 Flink 中,对这么延迟数据的默认处理方式是丢弃。
allowedLateness 再次指定允许数据延迟的时间
allowedLateness 表示允许数据延迟的时间,这个方法是在 WindowedStream 中的,用来设置允许窗口数据延迟的时间,超过这个时间的元素就会被丢弃,这个的默认值是 0,该设置仅针对于以事件时间开的窗口,它的源码如下:
1 | public WindowedStream<T, K, W> allowedLateness(Time lateness) { |
之前有多个小伙伴问过我 Watermark 中允许的数据延迟和这个数据延迟的区别是啥?我的回复是该允许延迟的时间是在 Watermark 允许延迟的基础上增加的时间。那么具体该如何使用 allowedLateness 呢。
1 | dataStream.assignTimestampsAndWatermarks(new TestWatermarkAssigner()) |
sideOutputLateData 收集迟到的数据
sideOutputLateData 这个方法同样是 WindowedStream 中的方法,该方法会将延迟的数据发送到给定 OutputTag 的 side output 中去,然后你可以通过 SingleOutputStreamOperator.getSideOutput(OutputTag)
来获取这些延迟的数据。具体的操作方法如下:
1 | //定义 OutputTag |