Spark 存储模块
在执行 Spark 的应用程序时,Spark 集群会启动 Driver 和 Executor 两种 JVM 进程,前者为主控进程,负责创建 Spark 上下文,提交 Spark 作业[Job],并将作业转化为计算任务[Task],在各个 Executor 进程间协调任务的调度;后者负责在工作节点上执行具体的计算任务,并将结果返回给 Driver, 同时为需要持久化的 RDD 提供存储功能。由于 Driver 的内存管理相对来说较为简单,本节主要对 Executor 的内存管理进行分析,下文中的 Spark 内存均特指 Executor 的内存。
Execuor 内存模型堆内和堆外内存作为一个 JVM 进程,Executor 的内存管理建立在 JVM 的内存管理之上,Spark 对 JVM 的堆内 [On-heap]空间进行了更为详细的分配,以充分利用内存。
同时,Spark 引入了堆外[Off-heap]内存,使之可以直接在工作节点的系统内存中开辟空间,进一步优化了内存的使用。
堆内内存受到 JVM 统一管理,堆外内存是直接向操作系统进行内存的申请和释放。
堆内内存堆内内存的大小, ...
Cloudera平台搭建
1.服务器环境准备1.1 服务器环境概述数据集群包含两台专业服务器,通过XenServer服务器虚拟化软件把两台专业服务器虚拟化为五台虚拟服务器(1个master和4个slave);五台虚拟服务器都安装了CentOS7(Linux)操作系统,在此基础上安装了 Java、C/C++、Scala等基本开发工具,以及Hadoop(HDFS,YARN)、MySQL、ZooKeeper、Kafka 、Spark2
Hbase、Spark等数据集群必须的大数据存储及处理软件。数据集群需要安装的软件及其层次关系如表1.1所示。数据集群除了安装Hadoop、Spark、Hbase等组件外,在Master节点和data1节点安装了MySQL数据库。
主机
所在服务器
密码
192.168.10.96
192.168.10.90
123456
192.168.10.98
192.168.10.90
123456
192.168.10.100
192.168.10.90
123456
192.168.10.102
192.168.10.120
123456
192.168.1 ...
无题
生产者,Broker,消费者都是有可能丢数据的
生产端生产者丢数据,即发送的数据根本没有保存到 Broker 端。出现这个情况的原因可能是,网络抖动,导致消息压根就没有发送到 Broker 端;也可能是消息本身不合格导致 Broker 拒绝接收(比如消息太大了,超过了 Broker 的承受能力)等等。
上面所说比如网络原因导致消息没有成功发送到 broker 端,常见,也并不可怕。可怕的不是没发送成功,而是发送失败了你不做任何处理。
很简单的一个重试配置,基本就可以解决这种网络瞬时抖动问题。
1props.put("retries", 10);
当然还有很多其他原因导致的,不能只依靠 kafka 的配置来做处理,我们看一下 kafka 发送端的源码,其实人家是提供了两个方法的,通常会出问题的方法是那个简单的 send,没有 callback(回调)。简单的 send发送后不会去管它的结果是否成功,而 callback 能准确地告诉你消息是否真的提交成功了。一旦出现消息提交失败的情况,你就可以有针对性地进行处理。
因此,一定要使用带有回调通知的 send 方法。
我 ...
使用 Flink ProcessFunction 处理宕机告警
Flink 的底层 API 就是 ProcessFunction,它是一个低阶的流处理操作,它可以访问流处理程序的基础构建模块:Event、State、Timer。ProcessFunction 可以被认为是一种提供了对 KeyedState 和定时器访问的 FlatMapFunction。每当数据源中接收到一个事件,就会调用来此函数来处理。对于容错的状态,ProcessFunction 可以通过 RuntimeContext 访问 KeyedState。
ProcessFunction 介绍Flink 的底层 API 就是 ProcessFunction,它是一个低阶的流处理操作,它可以访问流处理程序的基础构建模块:Event、State、Timer。ProcessFunction 可以被认为是一种提供了对 KeyedState 和定时器访问的 FlatMapFunction。每当数据源中接收到一个事件,就会调用来此函数来处理。对于容错的状态,ProcessFunction 可以通过 RuntimeContext 访问 KeyedState。
定时器可以对处理时间和事件时间的变化 ...
Flink Job 并行度设置
生产环境如果 Job 突然消费不及时了,或者 Job 就根本不在消费数据了,那么该怎么办?首先得看下相关的监控查看 Job 是否在正常运行,是否出现反压的情况,是否这会生产数据量过大然而并行度却是根据之前数据量设置的,种种原因都需要一个个排查一下,然后找到根因才能够对应的去解决。这节来讲解下遇到这种问题后如何合理配置并行度呢?
Source 端并行度的配置假设数据源端是 Kafka,在出现作业消费不及时的时候,首先看下 Kafka 的监控是不是现在生产者生产的数据上涨速度较快,从而导致作业目前的消费速度就是跟不上 Kafka 生产者的生产速度,如果是这样的话,那么就得查看作业的并行度和 Kafka 的分区数是否一致,如果小于 Kafka 的分区数,那么可以增大并行度至 Kafka 的分区数,然后再观察作业消费速度是否可以跟上数据生产速度;如果已经等于 Kafka 的分区数了,那得考虑下是否 Kafka 要扩大分区,但是这样可能会带来 Kafka 其他的问题,这个操作需要谨慎。
Kafka 中数据出现堆积的话,还可以分析下数据的类型,如果数据不重要,但是又要保证数据的及时性,可以修改 ...
Flink Side Output 分流
通常,在 Kafka 的 topic 中会有很多数据,这些数据虽然结构是一致的,但是类型可能不一致,举个例子:Kafka 中的监控数据有很多种:机器、容器、应用、中间件等,如果要对这些数据分别处理,就需要对这些数据流进行一个拆分。
使用 Filter 分流使用 filter 算子根据数据的字段进行过滤分成机器、容器、应用、中间件等。伪代码如下:
12345DataStreamSource<MetricEvent> data = KafkaConfigUtil.buildSource(env); //从 Kafka 获取到所有的数据流SingleOutputStreamOperator<MetricEvent> machineData = data.filter(m -> "machine".equals(m.getTags().get("type"))); //过滤出机器的数据SingleOutputStreamOperator<MetricEvent> dockerData = data.fil ...
Flink Checkpoint 和 Savepoint 区别
Checkpoint 在 Flink 中是一个非常重要的 Feature,Checkpoint 使 Flink 的状态具有良好的容错性,通过 Checkpoint 机制,Flink 可以对作业的状态和计算位置进行恢复。
Checkpoint 介绍及使用为了保障的容错,Flink 需要对状态进行快照。Flink 可以从 Checkpoint 中恢复流的状态和位置,从而使得应用程序发生故障后能够得到与无故障执行相同的语义。
Flink 的 Checkpoint 有以下先决条件:
需要具有持久性且支持重放一定时间范围内数据的数据源。
例如:Kafka、RabbitMQ 等。这里为什么要求支持重放一定时间范围内的数据呢?因为 Flink 的容错机制决定了,当 Flink 任务失败后会自动从最近一次成功的 Checkpoint 处恢复任务,此时可能需要把任务失败前消费的部分数据再消费一遍,所以必须要求数据源支持重放。假如一个Flink 任务消费 Kafka 并将数据写入到 MySQL 中,任务从 Kafka 读取到数据,还未将数据输出到 MySQL 时任务突然失败了,此时如果 Kafka ...
Flink Parallelism 和 Slot 深度理解
parallelism 是并行的意思,在 Flink 里面代表每个算子的并行度,适当的提高并行度可以大大提高 Job 的执行效率,比如你的 Job 消费 Kafka 数据过慢,适当调大可能就消费正常了。
相信使用过 Flink 的你或多或少遇到过下面这个问题(笔者自己的项目曾经也出现过这样的问题),错误信息如下:
123Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/taskmanager_0#15608456]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalRpcInvocation".
跟着这问题在 Flink 的 Issue 列表里看到了一个类似的问题:https://issues.apache.org/jira/browse/FLINK-90 ...
使用 Flink ParameterTool 读取配置
Flink 配置的管理很不方便,比如像算子的并行度配置、Kafka 数据源的配置(broker 地址、topic 名、group.id)、Checkpoint 是否开启、状态后端存储路径、数据库地址、用户名和密码等,Flink 作为流计算引擎,处理源源不断的数据是其本意,但是在处理数据的过程中,往往可能需要一些参数的传递
Flink Job 配置在 Flink 中其实是有几种方法来管理配置。
使用 ConfigurationFlink 提供了 withParameters 方法,它可以传递 Configuration 中的参数给,要使用它,需要实现那些 Rich 函数,比如实现 RichMapFunction,而不是 MapFunction,因为 Rich 函数中有 open 方法,然后可以重写 open 方法通过 Configuration 获取到传入的参数值。
12345678910111213141516171819202122232425262728ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnv ...