shuffle 的性能高低直接影响了整个程序的性能和吞吐量。因为在分布式情况下,reduce task 需要跨节点去拉取其它节点上的 map task 结果。这一过程将会产生网络资源消耗和内存,磁盘 IO 的消耗。

概述

Shuffle 描述着数据从 map task 输出到 reduce task 输入的这段过程。

Shuffle 是连接 Map 和 Reduce 之间的桥梁, Map 的输出要用到 Reduce 中必须经过 shuffle 这个环节.

shuffle 的性能高低直接影响了整个程序的性能和吞吐量。因为在分布式情况下,reduce task 需要跨节点去拉取其它节点上的 map task 结果。这一过程将会产生网络资源消耗和内存,磁盘 IO 的消耗。

通常 shuffle 分为两部分:Map阶段的数据准备和 Reduce 阶段的数据拷贝处理。

一般将在 map 端的 shuffle 称之为 Shuffle Write, 在Reduce 端的 Shuffle 称之为 Shuffle Read

导致 Shuffle 操作算子

重分区类的操作

重分区类算子一般会 shuffle,因为需要在整个集群中,对之前所有的分区的数据进行随机,均匀的打乱,然后把数据放入下游新的指定数量的分区内。

比如 repartition、repartitionAndSortWithinPartitions等

byKey 类的操作

比如 reduceByKey、groupByKey、sortByKey 等,对一个 key 进行聚合操作时要保证集群中,所有节点上相同的 key 分配到同一个节点上进行处理

Join 类的操作

比如 join、cogroup 等。两个 rdd 进行 join,就必须将相同 key 的数据,shuffle 到同一个节点上,然后进行相同 key 的两个 rdd 数据操作。

Shuffle 原理

ShuffleMapStage 的结束伴随着 shuffle 文件的写磁盘

ResultStage基本上对应代码中的 action 算子, 即将一个函数应用在 RDD 的各个 partition 的数据集上,意味着一个 job 的运行结束

在划分 stage 时, 最后一个 stage 称为 finalStage, 它本质上是一个 ResultStage

HashShuffle

通常 shuffle 分为两部分:write 阶段的数据准备和 read 阶段的数据拷贝处理。

shuffle write

shuffle write 阶段,ShuffleMapStage 结束后,每一个 task 中的数据按照 key 进行分类,根据 hash 算法将相同的 key 写入到一个磁盘文件中,而每一个磁盘文件都只属于下游 stage 的一个 task。在将数据写入磁盘之前,会先将数据写入到内存缓冲,当内存缓冲填满之后,溢写到磁盘文件中。

shuffle read

shuffle read,通常就是一个 stage 刚开始时要做的事情。此时该 stage 的每一个 task 需要将上一个 stage 的计算结果中的所有相同 key,从各个节点上通过网络都拉取到自己所在的节点上,然后进行 key 的聚合或连接等操作。由于 shuffle write 的过程中,task 给下游 stage 的每个 task 都创建了一个磁盘文件,因此 shuffle read 的过程中,每个 task 只要从上游 stage 的所有 task 所在节点上,拉取属于自己的那一个磁盘文件即可.

shuffle read 的拉取过程是一边拉取一边进行聚合的。每个 shuffle read task 都会有一个自己的buffer 缓冲,每次都只能拉取与 buffer 缓冲相同大小的数据,然后通过内存中的一个 Map 进行聚合等操作。聚合完一批数据后,再拉取下一批数据,并放到 buffer 缓冲中进行聚合操作。以此类推,直到最后将所有数据到拉取完,并得到最终的结果。

这种策略的不足在于,下游有几个 task,上游的每一个 task 都就都需要创建几个临时文件,每个文件中只存储 key 取 hash 之后相同的数据,导致了当下游的 task 任务过多的时候,上游会堆积大量的小文件.

屏幕快照 2020-02-21 下午8.24.28

  1. Shuffle 前在磁盘上会产生海量的小文件,此时会产生大量耗时低效的 IO 操作
  2. 内存不够用,由于内存中需要保存海量文件操作信息和临时信息,如果数据处理的规模比较庞大的话,内存不可承受,会出现 OOM 等问题。

优化之后的 HashShuffle

这里说的优化,是指我们可以设置一个参数,spark.shuffle.consolidateFiles。该参数默认值为false,将其设置为 true 即可开启优化机制。通常来说,如果我们使用HashShuffleManager,那么都建议开启这个选项。

开启这个机制之后,在 shuffle write 过程中,task 就不是为下游 stage 的每个 task 创建一个磁盘文件。出现 shuffleFileGroup 的概念。一个 Executor 上有多少个 CPU core ,就可以并行执行多少个 task。第一批并行执行的每个 task 都会创建一个 shuffleFileGroup,并将数据写入对应的磁盘文件内。当 Executor 的 CPU core 接着执行下一批 task 时,下一批 task 就会复用之前已有的 shuffleFileGroup ,包括其中的磁盘文件。而不会写入新的磁盘文件中。

consolidate 机制允许不同的 task 复用同一批磁盘文件,这样就可以有效将多个 task 的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升 shuffle write 的性能。

SortShuffle

SortShuffleManager 的运行机制主要分成两种,一种是普通运行机制,另一种是 bypass 运 行 机 制 。 当 shuffle read task 的 数量小于等于 spark.shuffle.sort.bypassMergeThreshold 参数的值时[默认为 200 ], 就会启用 bypass 机制。

普通 SortShuffle

Task 将数据会先写入一个内存数据结构。

[根据不同的 shuffle 算子,可能选用不同的数据结构。如果是 reduceByKey 这种聚合类的 shuffle 算子,那么会选用 Map 数据结构,一边通过 Map 进行聚合,一边写入内存;如果是 join 这种普通的 shuffle 算子,那么会选用 Array 数据结构,直接写入内存。]

每写一条数据进入内存数据结构之后,就会判断是否达到了某个临界值,如果达到了临界值的话,就会尝试的将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。

注:此时的临界值为动态变化的,并非固定值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
var shouldSpill = false

if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
// Claim up to double our current memory from the shuffle memory pool
val amountToRequest = 2 * currentMemory - myMemoryThreshold
val granted = acquireMemory(amountToRequest)
myMemoryThreshold += granted
// If we were granted too little memory to grow further (either tryToAcquire returned 0,
// or we already had more memory than myMemoryThreshold), spill the current collection
shouldSpill = currentMemory >= myMemoryThreshold
}
shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
// Actually spill
if (shouldSpill) {
_spillCount += 1
logSpillage(currentMemory)
spill(collection)
_elementsRead = 0
_memoryBytesSpilled += currentMemory
releaseMemory()
}
shouldSpill
}

在溢写到磁盘文件之前,会先根据 key 对内存数据结构中已有的数据进行排序,排序之后,会分批将数据写入磁盘文件。

默认的 batch 数量是 10000 条,也就是说,排序好的数据,会以每批次 1 万条数据的形式分批写入磁盘文件,写入磁盘文件是通过Java 的 BufferedOutputStream 实现的。BufferedOutputStream 是 Java 的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘IO次数,提升性能。

一个 Task 将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写,产生多个临时文件,最后会将之前所有的临时文件都进行合并,最后会合并成为一个大文件。最终只剩下两个文件,一个是合并之后的数据文件,一个是索引文件,索引文件标识了下游各个 Task 的数据在文件中的 start offset 与 end offset。最终再由下游的 task 根据索引文件读取相应的数据文件。

SortShuffleManager 由于有一个磁盘文件 merge 的过程,因此大大减少了文件数量。 比如第一个 stage 有 50 个 task , 总共有 10 个 Executor , 每个 Executor 执行 5个 task ,而第二个 stage 有 100 个 task 。由于每个 task 最终只有一个磁盘 文件,因此 此时每个 Executor 上只有 5 个磁盘文件, 所有 Executor 只有 50 个磁盘文件。

bypassSortShuffle

此时 Task 会为每个下游 Task 都创建一个临时磁盘文件,并将数据按 key 进行 hash 然后根据 key 的hash 值,将 key 写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。

该过程的磁盘写机制其实跟未经优化的 HashShuffleManager 是一模一样的,因为都要创建数量惊人的磁盘文件, 只是在最后会做一个磁盘文件的合并而已,因此产生少量的最终磁盘文件,也让该机制相对未经优化的 HashShuffleManager 来说,shuffle read 的性能会更好。

而该机制与普通 SortShuffleManager 运行机制的不同在于

  1. 磁盘写机制不同
  2. 不会进行排序

也就是说,启用该机制的最大好处在于,shuffle write 过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

触发条件

  1. shuffle map task 的数量小于 spark.shuffle.sort.bypassMergeThreshold 参数的值[默认200]

  2. 不是聚合类的shuffle 算子[比如groupByKey]

Spark Shuffle vs MR Shuffle

Shuffle 管理器

Hadoop 2.7.x Shuffle 过程是 sort-based 过程,在 shuffle 过程中会发生排序行为

Spark 2.2.x Spark ShuffleManager 分为HashShuffleManager和 SortShuffleManager。Spark 1.2 后 默认为SortShuffleManager,在普通模式下,shuffle 过程中会发生排序行为;Spark 可以根据业务场景需要进行ShuffleManager 选择Hash Shuffle Manager / Sort ShuffleManager[普通模式和bypass模式]。

Shuffle 过程排序次数

  • Hadoop Shuffle 过程总共会发生 3 次排序行为,详细分别如下:
    • 第一次排序行为:在 map 阶段,由环形缓冲区溢出到磁盘上时,落地磁盘的文件会按照 key 进行分区和排序,属于分区内有序,排序算法为快速排序
    • 第二次排序行为:在 map 阶段,对溢出的文件进行 combiner合并过程中,需要对溢出的小文件进行归并排序、合并,排序算法为归并排序;
    • 第三次排序行为:在 reduce 阶段,reduce task 将不同 maptask 端文件拉去到同一个reduce 分区后,对文件进行合并,归并排序,排序算法为归并排序;
  • Spark Shuffle 过程在满足Shuffle Manager 为 SortShuffleManager ,且运行模式为普通模式的情况下才会发生排序行为,排序行为发生在数据结构中保存数据内存达到阈值,在溢出磁盘文件之前会对内存数据结构中数据进行排序;
    • Spark 中 Sorted-Based Shuffle 在 Mapper 端是进行排序的,包括 partition 的排序和每个partition 内部元素进行排序。但是在 Reducer 端没有进行排序,所以 job 的结果默认情况下不是排序的。
    • Sorted-Based Shuffle 采用 Tim-Sort 排序算法,好处是可以极为高效的使用 Mapper 端的排序成果完成全局排序。

Shuffle 逻辑流划分

  • Hadoop Shuffle 过程可以划分为:map(),spill,merge,shuffle,sort,reduce()等,是按照流程顺次执行的,属于Push类型;

  • Spark Shuffle过程是由算子进行驱动,由于Spark的算子懒加载特性,属于Pull类型,整个Shuffle过程可以划分为Shuffle Write 和Shuffle Read两个阶段;

数据结构不同

  • Hadoop 是基于文件的数据结构
  • Spark是基于RDD的数据结构,计算性能要比 Hadoop 要高

Shuffle Fetch 后数据存放位置

  • Hadoop reduce 端将 map task 的文件拉去到同一个 reduce 分区,是将文件进行归并排序、合并,将文件直接保存在磁盘上
  • Spark Shuffle Read 拉取来的数据首先肯定是放在 Reducer 端的内存缓存区中的,实现是内存+磁盘的方式,当然也可以通过 Spark.shuffle.spill=false 来设置只能使用内存。使用 ExternalAppendOnlyMap的方式时候如果内存使用达到一定临界值,会首先尝试在内存中扩大 ExternalAppendOnlyMap,如果不能扩容的话才会spill到磁盘。

Fetch 操作与数据计算粒度

  • Hadoop 的 MapReduce 是粗粒度的,Hadoop Shuffle Reducer Fetch到的数据 record先暂时被存放到Buffer 中,当 Buffer 快满时才进行 combine() 操作
  • Spark 的 Shuffle Fetch 是细粒度的,Reducer 是对 Map 端数据 Record 边拉去边聚合

Spark Shuffle 调优

大多数 Spark 作业的性能主要就是消耗在了 shuffle 环节,因为该环节包含了大量的磁盘IO、序列化、网络数据传输等操作。因此,如果要让作业的性能更上一层楼,就有必要对 shuffle 过程进行调优。

shuffle 相关参数调优

  1. spark.shuffle.file.buffer

    默认值:32k

    参数说明:该参数用于设置 shuffle write task 的 BufferedOutputStream 的 buffer 缓冲大小。将数据写到磁盘文件之前,会先写入 buffer 缓冲中,待缓冲写满之后,才会溢写到磁盘。
    调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。

  2. spark.reducer.maxSizeInFlight

    默认值:48m
    参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。
    调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。

  3. spark.shuffle.io.maxRetries

    默认值:3
    参数说明:shuffle read task 从 shuffle write task 所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。
    调优建议:对于那些包含了特别耗时的 shuffle 操作的作业,建议增加重试最大次数(比如60次),以避免由于 JVM 的 full gc 或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的 shuffle 过程,调节该参数可以大幅度提升稳定性。

  4. spark.shuffle.io.retryWait

    默认值:5s
    参数说明:该参数代表了每次重试拉取数据的等待间隔,默认是 5s。
    调优建议:建议加大间隔时长(比如60s),以增加 shuffle 操作的稳定性。