在设置 Periodic Watermark 时,是允许提供一个参数,表示数据最大的延迟时间。其实这个值要结合自己的业务以及数据的情况来设置,如果该值设置的太小会导致数据因为网络或者其他的原因从而导致乱序或者延迟的数据太多,那么最后窗口触发的时候,可能窗口里面的数据量很少,那么这样计算的结果很可能误差会很大,对于有的场景(要求正确性比较高)是不太符合需求的。但是如果该值设置的太大,那么就会导致很多窗口一直在等待延迟的数据,从而一直不触发,这样首先就会导致数据的实时性降低,另外将这么多窗口的数据存在内存中,也会增加作业的内存消耗,从而可能会导致作业发生 OOM 的问题。

综上建议:

  • 合理设置允许数据最大延迟时间
  • 不太依赖事件时间的场景就不要设置时间策略为 EventTime

延迟数据该如何处理(三种方法)

丢弃(默认)

在 Flink 中,对这么延迟数据的默认处理方式是丢弃。

allowedLateness 再次指定允许数据延迟的时间

allowedLateness 表示允许数据延迟的时间,这个方法是在 WindowedStream 中的,用来设置允许窗口数据延迟的时间,超过这个时间的元素就会被丢弃,这个的默认值是 0,该设置仅针对于以事件时间开的窗口,它的源码如下:

1
2
3
4
5
6
7
public WindowedStream<T, K, W> allowedLateness(Time lateness) {
final long millis = lateness.toMilliseconds();
checkArgument(millis >= 0, "The allowed lateness cannot be negative.");

this.allowedLateness = millis;
return this;
}

之前有多个小伙伴问过我 Watermark 中允许的数据延迟和这个数据延迟的区别是啥?我的回复是该允许延迟的时间是在 Watermark 允许延迟的基础上增加的时间。那么具体该如何使用 allowedLateness 呢。

1
2
3
4
5
6
7
dataStream.assignTimestampsAndWatermarks(new TestWatermarkAssigner())
.keyBy(new TestKeySelector())
.timeWindow(Time.milliseconds(1), Time.milliseconds(1))
.allowedLateness(Time.milliseconds(2)) //表示允许再次延迟 2 毫秒
.apply(new WindowFunction<Integer, String, Integer, TimeWindow>() {
//计算逻辑
});

sideOutputLateData 收集迟到的数据

sideOutputLateData 这个方法同样是 WindowedStream 中的方法,该方法会将延迟的数据发送到给定 OutputTag 的 side output 中去,然后你可以通过 SingleOutputStreamOperator.getSideOutput(OutputTag) 来获取这些延迟的数据。具体的操作方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//定义 OutputTag
OutputTag<Integer> lateDataTag = new OutputTag<Integer>("late"){};

SingleOutputStreamOperator<String> windowOperator = dataStream
.assignTimestampsAndWatermarks(new TestWatermarkAssigner())
.keyBy(new TestKeySelector())
.timeWindow(Time.milliseconds(1), Time.milliseconds(1))
.allowedLateness(Time.milliseconds(2))
.sideOutputLateData(lateDataTag) //指定 OutputTag
.apply(new WindowFunction<Integer, String, Integer, TimeWindow>() {
//计算逻辑
});

windowOperator.addSink(resultSink);

//通过指定的 OutputTag 从 Side Output 中获取到延迟的数据之后,你可以通过 addSink() 方法存储下来,这样可以方便你后面去排查哪些数据是延迟的。
windowOperator.getSideOutput(lateDataTag)
.addSink(lateResultSink);