Flink CEP API 学习
准备依赖要开发 Flink CEP 应用程序,首先你得在项目的 pom.xml 中添加依赖。
12345<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep_${scala.binary.version}</artifactId> <version>${flink.version}</version></dependency>
这个依赖有两种,一个是 Java 版本的,一个是 Scala 版本,根据项目的开发语言自行选择。
Flink CEP 应用入门准备好依赖后,我们开始第一个 Flink CEP 应用程序,这里我们只做一个简单的数据流匹配,当匹配成功后将匹配的两条数据打印出来。首先定义实体类 Event 如下:
1234public class Event { private Integer id; private ...
Flink CEP深入理解
CEP 的英文全称是 Complex Event Processing,翻译成中文为复杂事件处理。它可以用于处理实时数据并在事件流到达时从事件流中提取信息,并根据定义的规则来判断事件是否匹配,如果匹配则会触发新的事件做出响应。除了支持单个事件的简单无状态的模式匹配(例如基于事件中的某个字段进行筛选过滤),也可以支持基于关联/聚合/时间窗口等多个事件的复杂有状态模式的匹配(例如判断用户下单事件后 30 分钟内是否有支付事件)。
CEP 是什么?CEP 的英文全称是 Complex Event Processing,翻译成中文为复杂事件处理。
CEP 可以用于处理实时数据并在事件流到达时从事件流中提取信息,根据定义的规则来判断事件是否匹配,如果匹配则会触发新的事件做出响应。除了支持单个事件的简单无状态的模式匹配(例如基于事件中的某个字段进行筛选过滤),也可以支持基于关联/聚合/时间窗口等多个事件的复杂有状态模式的匹配(例如判断用户下单事件后 30 分钟内是否有支付事件)。
因为这种事件匹配通常是根据提前制定好的规则去匹配的,而这些规则一般来说不仅多,而且复杂,所以就会引入一些规则引擎来 ...
Flink 状态后端存储
Flink 提供了以下三种开箱即用的状态后端(用于存储状态数据),可以为所有 Flink 作业配置相同的状态后端,也可以为每个 Flink 作业配置指定的状态后端。当需要对具体的某一种 State 做 Checkpoint 时,此时就需要具体的状态后端存储,刚好 Flink 内置提供了不同的状态后端存储,用于指定状态的存储方式和位置。状态可以存储在 Java 堆内存中或者堆外
State Backends当需要对具体的某一种 State 做 Checkpoint 时,此时就需要具体的状态后端存储,Flink 内置提供了不同的状态后端存储,用于指定状态的存储方式和位置。状态可以存储在 Java 堆内存中或者堆外,在 Flink 安装路径下 conf 目录中的 flink-conf.yaml 配置文件中也有状态后端存储相关的配置,Flink 还特有一个 CheckpointingOptions 类来控制 state 存储的相关配置,该类中有如下配置:
state.backend: 用于存储和进行状态 checkpoint 的状态后端存储方式,无默认值
state.checkpoint ...
Flink 常用的 Source 和 Sink Connectors
Flink Job 的大致结构就是 Source ——> Transformation ——> Sink。
Data Source 介绍Data Source 是什么呢?就字面意思其实就可以知道:数据来源。
Flink 做为一款流式计算框架,它可用来做批处理,即处理静态的数据集、历史的数据集;也可以用来做流处理,即处理实时的数据流(做计算操作),然后将处理后的数据实时下发,只要数据源源不断过来,Flink 就能够一直计算下去。
Flink 中你可以使用 StreamExecutionEnvironment.addSource(sourceFunction) 来为你的程序添加数据来源。
Flink 已经提供了若干实现好了的 source function,当然你也可以通过实现 SourceFunction 来自定义非并行的 source 或者实现 ParallelSourceFunction 接口或者扩展 RichParallelSourceFunction 来自定义并行的 source。
那么常用的 Data Source 有哪些呢?
常用的 Data SourceSt ...
Flink 多种时间语义对比
Flink 在流应用程序中支持不同的 Time 概念,有 Processing Time、Event Time 和 Ingestion Time。下面我们一起来看看这三个 Time。
Processing TimeProcessing Time 是指事件被处理时机器的系统时间。
如果 Flink Job 设置的时间策略是 Processing Time 的话,那么后面所有基于时间的操作(如时间窗口)都将会使用当时机器的系统时间。每小时 Processing Time 窗口将包括在系统时钟指示整个小时之间到达特定操作的所有事件。
例如,如果应用程序在上午 9:15 开始运行,则第一个每小时 Processing Time 窗口将包括在上午 9:15 到上午 10:00 之间处理的事件,下一个窗口将包括在上午 10:00 到 11:00 之间处理的事件。
Processing Time 是最简单的 “Time” 概念,不需要流和机器之间的协调,它提供了最好的性能和最低的延迟。但是,在分布式和异步的环境下,Processing Time 不能提供确定性,因为它容易受到事件到达系统的速度( ...
Flink Window 机制深入理解
目前有许多数据分析的场景从批处理到流处理的演变, 虽然可以将批处理作为流处理的特殊情况来处理,但是分析无穷集的流数据通常需要思维方式的转变并且具有其自己的术语,例如,“windowing(窗口化)”、“at-least-once(至少一次)”、“exactly-once(只有一次)” 。
Flink Window 基础概念与实现原理Apache Flink 是一个为生产环境而生的流处理器,具有易于使用的 API,可以用于定义高级流分析程序。Flink 的 API 在数据流上具有非常灵活的窗口定义,使其在其他开源流处理框架中脱颖而出。
什么是 Window?下面我们结合一个现实的例子来说明。
就拿交通传感器的示例:统计经过某红绿灯的汽车数量之和?
假设在一个红绿灯处,我们每隔 15 秒统计一次通过此红绿灯的汽车数量,如下图:
可以把汽车的经过看成一个流,无穷的流,不断有汽车经过此红绿灯,因此无法统计总共的汽车数量。但是,我们可以换一种思路,每隔 15 秒,我们都将与上一次的结果进行 sum 操作(滑动聚合),如下:
这个结果似乎还是无法回答我们的问题,根本原因在于流是无界的,我们 ...
状态一致性
1. 概述2. 状态一致性分类2.1. AT-MOST-ONCE2.2. AT-LEAST-ONCE2.3. EXACTLY-ONCE
Flink 使用 checkpoint保证 EXACTLY-ONCE
3. 端到端状态一致性
Flink 通过快照机制和 Barrier 来实现一致性的保证,当任务中途 crash 或者cancel 之后,可以通过checkpoing 或者 savepoint 来进行恢复,实现数据流的重放。从而让任务达到一致性的效果,这种一致性需要开启 exactly_once模式之后才行。
需要记住的是这边的 Flink exactly_once 只是说在 Flink 内部是 exactly_once 的,并不能保证与外部存储交互时的 exactly_once,如果要实现外部存储连接后的 exactly_once,需要进行做一些特殊的处理。
3.1. 预写日志3.2. 两阶段提交4. Flink + Kafka 端到端状态一致性
Spark Streaming 和 Kafka 整合开发
Apache Kafka 是一个分布式的消息发布-订阅系统。可以说,任何实时大数据处理工具缺少与 Kafka 整合都是不完整的。本文将介绍如何使用 Spark Streaming 从 Kafka 中接收数据,
这里介绍两种方法
使用 Receivers 和 Kafka 高层次的 API
使用 Direct API,这是使用低层次的 KafkaAPI,并没有使用到 Receivers,是 Spark 1.3.0中开始引入的。这两种方法有不同的编程模型,性能特点和语义担保
基于 Receivers 的方法使用了 Receivers 来接收数据。Receivers 的实现使用到 Kafka 高层次的消费者API。对于所有的Receivers,接收到的数据将会保存在 Spark executors 中,然后由 Spark Streaming 启动的 Job 来处理这些数据。
然而,在默认的配置下,这种方法在失败的情况下会丢失数据,为了保证零数据丢失,你可以在 Spark Streaming 中使用 WAL 日志,这是在 Spark 1.2.0 才引入的功能,这使得我们可以将接收到的数据 ...
Watermark 与 Window 结合来处理延迟数据
在设置 Periodic Watermark 时,是允许提供一个参数,表示数据最大的延迟时间。其实这个值要结合自己的业务以及数据的情况来设置,如果该值设置的太小会导致数据因为网络或者其他的原因从而导致乱序或者延迟的数据太多,那么最后窗口触发的时候,可能窗口里面的数据量很少,那么这样计算的结果很可能误差会很大,对于有的场景(要求正确性比较高)是不太符合需求的。但是如果该值设置的太大,那么就会导致很多窗口一直在等待延迟的数据,从而一直不触发,这样首先就会导致数据的实时性降低,另外将这么多窗口的数据存在内存中,也会增加作业的内存消耗,从而可能会导致作业发生 OOM 的问题。
综上建议:
合理设置允许数据最大延迟时间
不太依赖事件时间的场景就不要设置时间策略为 EventTime
延迟数据该如何处理(三种方法)丢弃(默认)在 Flink 中,对这么延迟数据的默认处理方式是丢弃。
allowedLateness 再次指定允许数据延迟的时间allowedLateness 表示允许数据延迟的时间,这个方法是在 WindowedStream 中的,用来设置允许窗口数据延迟的时间,超过这个时间的 ...
Flink 数据倾斜
在大数据计算场景,无论使用 MapReduce、Spark 还是 Flink 计算框架,无论是批处理还是流处理都存在数据倾斜的问题,通过本节学习产生数据倾斜的原因及如何在生产环境解决数据倾斜。
数据倾斜简介分析一个计算各 app PV 的案例,如下图所示,圆球表示 app1 的日志,方块表示 app2 的日志,Source 端从外部系统读取用户上报的各 app 行为日志,要计算各 app 的 PV,所以按照 app 进行 keyBy,相同 app 的数据发送到同一个 Operator 实例中处理,keyBy 后对 app 的 PV 值进行累加来,最后将计算的 PV 结果输出到外部 Sink 端。
可以看到在任务运行过程中,计算 Count 的算子有两个并行度,其中一个并行度处理 app1 的数据,另一个并行度处理 app2 的数据。由于 app1 比较热门,所以 app1 的日志量远大于 app2 的日志量,造成计算 app1 PV 的并行度压力过大成为整个系统的瓶颈,而计算 app2 PV 的并行度数据量较少所以 CPU、内存以及网络资源的使用率整体都比较低,这就是产生数据倾斜 ...