在执行 Spark 的应用程序时,Spark 集群会启动 Driver 和 Executor 两种 JVM 进程,前者为主控进程,负责创建 Spark 上下文,提交 Spark 作业[Job],并将作业转化为计算任务[Task],在各个 Executor 进程间协调任务的调度;后者负责在工作节点上执行具体的计算任务,并将结果返回给 Driver, 同时为需要持久化的 RDD 提供存储功能。由于 Driver 的内存管理相对来说较为简单,本节主要对 Executor 的内存管理进行分析,下文中的 Spark 内存均特指 Executor 的内存。

Execuor 内存模型

堆内和堆外内存

作为一个 JVM 进程,Executor 的内存管理建立在 JVM 的内存管理之上,Spark 对 JVM 的堆内 [On-heap]空间进行了更为详细的分配,以充分利用内存。

同时,Spark 引入了堆外[Off-heap]内存,使之可以直接在工作节点的系统内存中开辟空间,进一步优化了内存的使用。

堆内内存受到 JVM 统一管理,堆外内存是直接向操作系统进行内存的申请和释放。

堆内内存

堆内内存的大小,由 Spark 应用程序启动时的 –executor-memory 或 spark.executor.memory 参数配置。

Executor 内运行的并发任务共享 JVM 堆内内存, 这些任务在缓存 RDD 数据和广播 Broadcast 数据时占用的内存被规划为存储 [Storage] 内存, 而这些任务在执行 Shuffle 时占用的内存被规划为执行 [Execution] 内存,剩余的部分不做特殊规划,Spark 内部的对象实例,或者用户定义的 Spark 应用程序中的对象实例,均占用剩余的空间 。不同的管理模式下, 这三部分占用的空间大小各不相同

堆外内存

JVM 对于内存的清理无法准确指定时间点,因此无法实现精确的释放。为了进一步优化内存的使用以及提高 Shuffle 时排序的效率,Spark 引入了堆外[Off-heap]内存,使之可以直接在工作节点的系统内存中开辟空间,存储经过序列化的二进制数据。由于内存的申请和释放不再通过 JVM 机制,而是直接向操作系统申请,减少了不必要的内存开销,以及频繁的 GC 扫描和回收,提升了处理性能。而且序列化的数据占用的空间可以被精确计算,所以相比堆内内存来说,堆外内存可以被精确地申请和释放,降低了管理的难度,也降低了误差

在默认情况下堆外内存并不启用,可通过配置 spark.memory.offHeap.enabled 参数启用,并由 spark.memory.offHeap.size 参数设定堆外空间的大小。除了没有 other 空间,堆外内存与堆内内存的划分方式相同,所有运行中的并发任务共享存储内存和执行内存 。

内存空间分配

静态内存管理

在 Spark 最初采用的静态内存管理机制下,**[存储内存][执行内存][其他内存]**的大小在 Spark 应用程序运行期间均为固定的,但用户可以应用程序启动前进行配置。

  • 可用的存储内存

    systemMaxMemory * spark.storage.memoryFraction * spark.storage.safetyFraction

  • 可用的执行内存

    systemMaxMemory * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction

其中 systemMaxMemory 取决于当前 JVM 堆内内存的大小,最后可用的执行内存或者存储内存要在此基础上与各自的 memoryFraction 参数和 safetyFraction 参数相乘得出。

上述计算公式中的两个 safetyFraction 参数,其意义在于在逻辑上预留出 [1-safetyFraction] 这么一块保险区域,降低因实际内存超出当前预设范围而导致 OOM 的风险。

值得注意的是,这个预留的保险区域仅仅是一种逻辑上的规划,在具体使用时 Spark 并没有区别对待,和”其它内存”一样交给了 JVM 去管理。

Storage 内存和 Execution 内存都有预留空间,目的是防止 OOM ,因为 Spark 堆内内存大小的记录是不准确的,需要留出保险区域。

堆外的空间分配较为简单,只有存储内存和执行内存。可用的执行内存和存储内存占用的空间大小直接由参数 spark.memory.storageFraction 决定,由于堆外内存占用的空间可以被精确计算,所以无需再设定保险区域

静态内存管理机制实现起来较为简单,但如果用户不熟悉 Spark 的存储机制,或没有根据具体的数据规模和计算任务或做相应的配置,很容易造成存储内存和执行内存中的一方剩余大量的空间,而另一方却早早被占满,不得不淘汰或移出旧的内容以存储新的内容。由于新的内存管理机制的出现,这种方式目前已经很少有开发者使用,出于兼容旧版本的应用程序的目的,Spark 仍然保留了它的实现。

统一内存管理

Spark1.6 之后引入的统一内存管理机制,与静态内存管理的区别在于存储内存和执行内存共享同一块空间,可以动态占用对方的空闲区域

其中最重要的优化在于动态占用机制, 其规则如下:

  • 设定基本的存储内存和执行内存区域(spark.storage.storageFraction参数),该设定确定了双方各自拥有的空间的范围;

  • 双方的空间都不足时,则存储到硬盘

    若己方空间不足而对方空余时,可借用对方的空间; [注:存储空间不足是指不足以放下一个完整的Block]

  • 执行内存的空间被对方占用后,可让对方将占用的部分转存到磁盘,然后”归还”借用的空间;

  • 存储内存的空间被对方占用后,无法让对方 “归还”,因为需要考虑 Shuffle 过程中的很多因素,实现起来较为复杂。

存储内存管理

概述

Storage 管理着 Spark 应用在运行过程中产生的各种数据。比如 RDD 缓存,shuffle 过程中缓存及写入磁盘的数据,广播变量等。

Storage 模块在逻辑上以 Block 为基本存储单位,RDD 的每个 Partition 经过处理后唯一对应一个 Block。Driver 端 BlockManager 负责整个 Spark 应用程序的 Block 的元数据信息的管理和维护,而 Executor 端的 BlockManager 需要将 Block 的更新等状态上报到 Master,同时接收 Master 的命令, 例如新增或删除一个 RDD。

  • BlockManager

    BlockManager 是 整个 Spark 底层负责数据存储与管理的一个组件 , Driver 和 Executor 的所有数据都由对应的 BlockManager 进行管理。

    Driver 上有 BlockManager Master ,负责对各个节点上的 BlockManager 内部管理的数据的元数据进行维护, 比如 block 的增删改等操作, 都会在这里维护好元数据 的变更。

    每个节点都有一个 BlockManager,每个 BlockManager 创建之后, 第一件事即使去向 BlockManag erMaster 进行注册。

  • CacheManager

    CacheManager 管理 spark 的缓存,而缓存可以基于内存的缓存,也可以是基于磁盘的缓存;
    CacheManager 需要通过 BlockManager 来操作数据

RDD 的持久化机制

弹性分布式数据集 RDD 作为 Spark 最根本的数据抽象,是只读的分区记录的集合,基于在稳定物理存储中的数据集上创建,或者在其他已有的 RDD 上执行转换[Transformation]操作产生一个新的 RDD。转换后的 RDD 与 原始的 RDD 之间产生的依赖关系构成了血统[Lineag]。凭借血统,Spark 保证了每一个 RDD 都可以被重新恢复。

Task 在启动之初读取一个分区时,会先判断这个分区是否已经被持久化,如果没有则需要检查 Checkpoint 或按照血统重新计算。所以如果一个 RDD 要执行多次 action 操作, 可以在第一次 action 操作中使用 persist 或 cache 方法,在内存或磁盘中持久化或缓存这个 RDD,从而在后面的行动时提升计算速度。

其中 cache 这个方法是个 Tranformation ,当第一次遇到 action 算子的时才会进行持久化

cache 内部调用了 persist(StorageLevel.MEMORY_ONLY)方法,所以执行 cache 算子其实就是执行了 persist 算子且持久化级别为 MEMORY_ONLY。 故缓存是一种特殊的持久化。堆内和堆外存储内存的设计,便可以对缓存 RDD 时使用的内存做统一的规划和管理。

RDD 的持久化由 Spark 的 Storage 模块负责,实现了 RDD 与物理存储的解耦合。

在对 RDD 持久化时,Spark 规定了 MEMORY_ONLY 、MEMORY_AND_DISK 等 7 种不同的存储级别 , 而存储级别是以下 5 个变量的组合:

1
2
3
4
5
6
7
class StorageLevel private(
private var _useDisk: Boolean, //磁盘
private var _useMemory: Boolean, //这里其实是指堆内内存
private var _useOffHeap: Boolean, //堆外内存
private var _deserialized: Boolean, //是否为非序列化
private var _replication: Int = 1 //副本个数
)
存储级别 含义
MEMORY_ONLY 以非序列化的 Java 对象的方式持久化在 JVM 内存中。如果内存无法完全存储 RDD 所有的 partition,那么那些没有持久化的 partition 就会在下一次需要使用它们的时候,重新被计算
MEMORY_AND_DISK 同上,但是当 RDD 某些 partition 无法存储在内存中时,会持久化到磁盘中。下次需要使用这些 partition 时,需要从磁盘上读取
MEMORY_ONLY_SER 同 MEMORY_ONLY,但是会使用 Java 序列化方式,将 Java 对象序列化后进行持久化。可以减少内存开销,但是需要进行反序列化,因此会加大 CPU 开销
MEMORY_AND_DISK_SER 同 MEMORY_AND_DISK,但是使用序列化方式持久化 Java 对象
DISK_ONLY 使用非序列化 Java 对象的方式持久化,完全存储到磁盘上
MEMORY_ONLY_2 MEMORY_AND_DISK_2 如果是尾部加了 2 的持久化级别,表示将持久化数据复用一份,保存到其他节点,从而在数据丢失时,不需要再次计算,只需要使用备份数据即可

RDD的缓存过程

RDD 在缓存到存储内存之前,数据项 [Record]的对象实例在逻辑上占用了 JVM 堆内内存的 other 部分的空间,同一 Partition 的不同数据项的存储空间并不连续。

缓存到存储内存之后, Partition 被转换成 Block,Record 在堆内或堆外存储内存中占用一块连续的空间。将 Partition 由不连续的存储空间转换为连续存储空间的过程,Spark 称之为”展开” [Unroll]

Block 有序列化和非序列化两种存储格式,具体以哪种方式取决于该 RDD 的存储级别。非序列化的 Block 用一个数组存储所有的对象实例,序列化的 Block 则用字节缓冲区 ByteBuffer 来存储二进制数据。

每个 Executor 的 Storage 模块用一个 LinkedHashMap 来管理堆内和堆外存储内存中所有的 Block ,对这个 LinkedHashMap 新增和删除间接记录了内存的申请和释放。

因为不能保证存储空间可以一次容纳 Iterator 中的所有数据,当前的计算任务在 Unroll 时要向 MemoryManager 申请足够的 Unroll 空间来临时占位,空间不足则 Unroll 失败,空间足够时可以继续进行。在静态内存管理时,Spark 在存储内存中专门划分了一块 Unroll 空间,其大小是固定的,统一内存管理时则没有对 Unroll 空间进行特别区分,对于序列化的 Partition ,其所需的 Unroll 空间可以直接累加计算,一次申请。

对于序列化的 Partition,其所需的 Unroll 空间可以直接累加计算,一次申请。

对于非序列化的 Partition 则要在遍历 Record 的过程中依次申请,即每读取一条 Record,采样估算其所需的 Unroll 空间并进行申请,空间不足时可以中断,释放已占用的 Unroll 空间。

如果最终 Unroll 成功, 当前 Partition 所占用的 Unroll 空间被转换为正常的缓存 RDD 的存储空间

淘汰与落盘

由于同一个 Executor 的所有的计算任务共享有限的存储内存空间,当有新的 Block 需要缓存但是剩余空间不足且无法动态占用时,就要对 LinkedHashMap 中 的旧 Block 进行淘汰,而被淘汰的 Block 如果其存储级别中同时包含存储到磁盘的要求,则要对其进行落盘,否则直接删除该 Block。

  • 被淘汰的旧 Block 要与新 Block 的 MemoryMode 相同,即同属于堆外或堆内内存
  • 新旧 Block 不能属于同一个RDD,避免循环淘汰
  • 旧 Block 所属 RDD 不能处于被读状态,避免引发一致性问题
  • 遍历 LinkedHashMap中 Block,按照最近最少使用 LRU 的顺序淘汰,直到满足新 Block 所需的空间。 其中 LRU 是 LinkedHashMap 的特性。

执行内存管理

执行内存主要用来存储任务在执行 Shuffle 时占用的内存,Shuffle 是按照一定规则对 RDD 数据重新分区的过程, Shuffle 的 Write 和 Read 两阶段对执行内存的使用.

Shuffle Write

在 map 端会采用 ExternalSorter 进行外排,在内存中存储数据时主要占用堆内执行空间。

Shuffle Read

  • 在对 reduce端的数据进行聚合时, 要将数据交给 Aggregator处理, 在内存中存储数据时占用堆内执行空间。
  • 如果需要进行最终结果排序,则要将再次将数据交给 ExternalSorter处理,占用堆内执行空间

在 ExternalSorter 和 Aggregator 中, Spark 会使用一种叫 AppendOnlyMap 的哈希表在堆内执行内存中存储数据 , 但在 Shuffle 过程中所有数据并不能都保存到该哈希表中, 当这个哈希表占用的内存会进行周期性地采样估算, 当其大到一定程度, 无法再从 MemoryManager 申请到新的执行内存时, Spark 就会将其全部内容存储到磁盘文件中, 这 个过程被称为溢存 [Spill] , 溢存到磁盘的文件最后会被归 并 [Merge]

总结

Spark 的存储内存和执行内存有着截然不同的管理方式

  • 对于存储内存来说,Spark 用一个 **LinkedHashMap **来集中管理所有的 Block,Block 由需要缓存的 RDD 的 Partition 转化而成;

  • 对于执行内存,Spark 用 AppendOnlyMap 来存储 Shuffle 过程中的数据, 在 Tungsten 排序中甚至抽象成为页式内存管理,开辟了全新的 JVM 内存管理机制 。

Spark Shuffle 内存使用

在使用 Spark 进行计算时,我们经常会碰到作业 (Job) Out Of Memory(OOM) 的情况,而且很大一部分情况是发生在 Shuffle 阶段。那么在 Spark Shuffle 中具体是哪些地方会使用比较多的内存而有可能导致 OOM 呢? 为此,本文将围绕以上问题梳理 Spark 内存管理和 Shuffle 过程中与内存使用相关的知识;然后,简要分析下在 Spark Shuffle 中有可能导致 OOM 的原因。

OOM

内存不够,数据太多就会抛出 OOM的 Exeception,主要有driver OOM和 executor OOM两种

1
java.lang.OutOfMemoryError: Java heap space

driver OOM

  • 用户在 Driver 端口生成大对象, 比如创建了一个大的集合数据结构
  • 使用了collect 等操作,将所有 executor 的数据聚合到 driver 导致

一般是使用了collect 等操作,将所有 executor 的数据聚合到 driver 导致。尽量不要使用 collect操作即可。

executor OOM

数据倾斜导致内存溢出

数据不平衡除了有可能导致内存溢出外,也有可能导致性能的问题,调用 repartition 重新分区

Reduce OOM

reduce task 去 map 端获取数据,reduce一边拉取数据一边聚合,reduce端有一块聚合内存[executor memory * 0.2],也就是这块内存不够
解决方法

  • 增加 reduce 聚合操作的内存的比例
  • 增加 Executor memory 的大小 –executor-memory 5G
  • 减少 reduce task 每次拉取的数据量 设置 spak.reducer.maxSizeInFlight 24m, 拉取的次数就多了,因此建立连接的次数增多,有可能会连接不上[正好赶上 map task 端进行GC]

shuffle 后内存溢出

shuffle 后单个文件过大导致内存溢出。在 Spark 中,join,reduceByKey 这一类型的过程,都会有shuffle的过程,在shuffle的使用,需要传入一个partitioner,大部分 Spark 中的 shuffle 操作,默认的 partitioner 都是 HashPatitioner,默认值是父 RDD 中最大的分区数,这个参数通过spark.default.parallelism 控制 [在spark-sql中用spark.sql.shuffle.partitions]

spark.default.parallelism 参数只对 HashPartitioner 有效,所以如果是别的 Partitioner 或者自己实现的 Partitioner 就不能使用 spark.default.parallelism 这个参数来控制 shuffle 的并发量了。如果是别的partitioner 导致的 shuffle 内存溢出,就需要从 partitioner 的代码增加 partitions 的数量

coalesce 调用导致内存溢出

因为 hdfs 中不适合存小问题,所以 Spark 计算后如果产生的文件太小,调用 coalesce 合并文件再存入 hdfs中。但会导致一个问题,例如在 coalesce 之前有100个文件,这也意味着能够有100个 Task,现在调用coalesce(10),最后只产生10个文件,因为 coalesce 并不是 shuffle 操作,这意味着 coalesce并不是先执行100个 Task,再将 Task 的执行结果合并成10个,而是从头到位只有10个 Task 在执行,原本100个文件是分开执行的,现在每个 Task 同时一次读取10个文件,使用的内存是原来的10倍,这导致了OOM。

解决这个问题的方法是令程序按照我们想的先执行100个 Task 再将结果合并成10个文件,这个问题同样可以通过repartition 解决,调用 repartition(10)

standalone 模式下资源分配不均匀导致内存溢出

在 standalone 的模式下如果配置了 –total-executor-cores 和 –executor-memory 这两个参数,但是没有配置 –executor-cores 参数,有可能导致,每个 Executor 的 memory 是一样的,但是 cores 的数量不同,那么在 cores 数量多的 Executor 中,由于能够同时执行多个Task,就容易导致内存溢出的情况。

这种情况的解决方法就是同时配置–executor-cores或者spark.executor.cores参数,确保Executor资源分配均匀。

map 过程产生大量对象导致内存溢出

这种溢出的原因是在单个 map 中产生了大量的对象导致的

例如:rdd.map(x=>for(i <- 1 to 10000) yield i.toString),这个操作在rdd中,每个对象都产生了10000 个对象,这肯定很容易产生内存溢出的问题。

针对这种问题,在不增加内存的情况下,可以通过减少每个 Task 的大小,以便达到每个 Task 即使产生大量的对象 Executor 的内存也能够装得下。具体做法可以在会产生大量对象的 map 操作之前调用 repartition方法,分区成更小的块传入map。

例如:rdd.repartition(10000).map(x=>for(i <- 1 to 10000) yield i.toString)

参数

spark.driver.memory

用来设置 Driver 的内存。在 Spark 程序中,SparkContext,DAGScheduler 都是运行在Driver端的。对应Stage 切分也是在 Driver 端运行,如果用户自己写的程序有过多的步骤,切分出过多的 Stage,这部分信息消耗的是 Driver 的内存,这个时候就需要调大 Driver 的内存