生产环境如果 Job 突然消费不及时了,或者 Job 就根本不在消费数据了,那么该怎么办?首先得看下相关的监控查看 Job 是否在正常运行,是否出现反压的情况,是否这会生产数据量过大然而并行度却是根据之前数据量设置的,种种原因都需要一个个排查一下,然后找到根因才能够对应的去解决。这节来讲解下遇到这种问题后如何合理配置并行度呢?

Source 端并行度的配置

假设数据源端是 Kafka,在出现作业消费不及时的时候,首先看下 Kafka 的监控是不是现在生产者生产的数据上涨速度较快,从而导致作业目前的消费速度就是跟不上 Kafka 生产者的生产速度,如果是这样的话,那么就得查看作业的并行度和 Kafka 的分区数是否一致,如果小于 Kafka 的分区数,那么可以增大并行度至 Kafka 的分区数,然后再观察作业消费速度是否可以跟上数据生产速度;如果已经等于 Kafka 的分区数了,那得考虑下是否 Kafka 要扩大分区,但是这样可能会带来 Kafka 其他的问题,这个操作需要谨慎。

Kafka 中数据出现堆积的话,还可以分析下数据的类型,如果数据不重要,但是又要保证数据的及时性,可以修改作业让作业始终从最新的数据开始消费,丢弃之前堆积的数据,这样就可以保证数据的及时性。举个例子,假如一个实时告警作业它突然消费不及时,Kafka 中堆积了几亿条数据(数据延迟几个小时),那么如果作业调高并行度重启后,它还是从上一次提交的 offset 处开始消费的话,这样告警作业即使现在消费速度跟的上了,但是它要处理掉之前堆积的几亿条数据也是要一段时间的,那么就意味着这个作业仍将有段时间处于 ‘不可用’。因为即使判断出来要告警,可能这个告警信息的原数据已经是几个小时前的了,没准这个告警此时已经恢复了,但是还发出来告警这就意味着延迟性比较大,还会对告警消息接收者造成一定的干扰,所以这种场景下建议重启作业就直接开始从最新的数据开始消费。当然不同的场景可能不一样,如果金融行业的交易数据,那么是肯定不能允许这样丢数据的,即使堆积了,也要慢慢的去消费堆积的数据,直到后面追平至最新的数据。

在 Source 端设置并行度的话,如果数据源是 Kafka 的话,建议并行度不要超过 Kafka 的分区数,因为一个并行度会去处理一至多个分区的数据,如果设置过多的话,会出现部分并行度空闲。如果是其他的数据源,可以根据实际情况合理增大并行度来提高作业的处理数据能力。

中间 Operator 并行度的配置

数据从 Source 端流入后,通常会进行一定的数据转换、聚合才能够满足需求,在数据转换中可能会和第三方系统进行交互,在交互的过程中可能会因为网络原因或者第三方服务原因导致有一定的延迟,从而导致这个数据交互的算子处理数据的吞吐量会降低,可能会造成反压,从而会影响上游的算子的消费。那么在这种情况下这些与第三方系统有交互的算子得稍微提高并行度,防止出现这种反压问题(当然反压问题不一定就这样可以解决,具体如何处理参见 9.1 节)。

除了这种与第三方服务做交互的外,另外可能的性能瓶颈也会出现在这类算子中,比如你 Kafka 过来的数据是 JSON 串的 String,然后需要转换成对象,在大数据量的情况下这个转换也是比较耗费性能的。

所以数据转换中间过程的算子也是非常重要的,如果哪一步算子的并行度设置的不合理,可能就会造成各种各样的问题出现。

Sink 端并行度的配置

Sink 端是数据流向下游的地方,可以根据 Sink 端的数据量进行评估,可能有的作业是 Source 端的数据量最大,然后数据量不断的变少,最后到 Sink 端的数据就一点点了,比较常见的就是监控告警的场景。Source 端的数据是海量的,但是通过逐层的过滤和转换,到最后判断要告警的数据其实已经减少很多很多了,那么在最后的这个地方就可以将并行度设置的小一些。

当然也可能会有这样的情况,在 Source 端的数据量是最小的,拿到 Source 端流过来的数据后做了细粒度的拆分,那么数据量就不断的增加了,到 Sink 端的数据量就非常非常的大了。那么在 Sink 到下游的存储中间件的时候就需要提高并行度。

另外 Sink 端也是要与下游的服务进行交互,并行度还得根据下游的服务抗压能力来设置,如果在 Flink Sink 这端的数据量过大的话,然后在 Sink 处并行度也设置的很大,但是下游的服务完全撑不住这么大的并发写入,也是可能会造成下游服务直接被写挂的,下游服务可能还要对外提供一些其他的服务,如果稳定性不能保证的话,会造成很大的影响,所以最终还是要在 Sink 处的并行度做一定的权衡。

Operator Chain

对于一般的作业(无特殊耗性能处),可以尽量让算子的并行度从 Source 端到 Sink 端都保持一致,这样可以尽可能的让 Job 中的算子进行 chain 在一起,形成链,数据在链中可以直接传输,而不需要再次进行序列化与反序列化,这样带来的性能消耗就会得到降低。在 9.2 节中具体讲解了算子 chain 在一起的条件,忘记的话可以去回顾一下。