在生产环境下, Spark 集群的部署方式一般为 YARN-Cluster 模式,因此本文基于YARN-Cluster 模式

Spark 任务提交流程

YARN-Cluster 模式中提交 Spark 应用程序

首先通过 Client 向 ResourceManager 请求启动一个 Application,同时检查是否有足够的资源满足 Application 的需求,如果资源条件满足,则准备 ApplicationMaster 的启动上下文,交给ResourceManager,并循环监控 Application 状态。

当提交的资源队列中有资源时,ResourceManager 会在某个 NodeManager 上启动 ApplicationMaster 进程,ApplicationMaster 会单独启动 Driver 后台线程,当 Driver 启动后,ApplicationMaster 会通过本地的 RPC 连接 Driver ,并开始向 ResourceManager 申请 Container 资源运行 Executor 进程, 当 ResourceManager 返回 Container 资源,ApplicationMaster 则在对应的 Container 上启动 Executor 。

Driver 线程主要是初始化 SparkContext 对象,准备运行所需的上下文,然后一方面保持与 ApplicationMaster 的 RPC 连接,通过 ApplicationMaster 申请资源,另一方面根据用户业务逻辑开始调度任务,将任务下发到已有的空闲 Executor 上。

当 ResourceManager 向 ApplicationMaster 返 回 Container 资源时 , ApplicationMaster 就尝试在对应的 Container 上启动 Executor 进程,Executor 进程起来后, 会向 Driver 反向注册, 注册成功后保持与 Driver 的心跳,同时等待 Driver 分发任务,当分发的任务执行完毕后,将任务状态上报给 Driver。

Spark 任务调度概述

Driver 线程初始化 SparkContext 对象,准备运行所需的上下文,一方面保持与 ApplicationMaster 的 RPC 连接,通过 ApplicationMaster 申请资源,另一方面根据用户业务逻辑开始调度任务,将任务下发到已有的空闲 Executor 上。

Driver 会根据用户程序逻辑准备任务,并根据 Executor 资源情况逐步分发任务。 在详细阐述任务调度前,首先说明下 Spark 里的几个概念。一个 Spark 应用程序包括Job、Stage 以及 Task 三个概念:

  • Job 是以 Action 方法为界,遇到一个 Action 方法则触发一个Job
  • Stage 是 Job 的子集,以宽依赖为界。遇到 Shuffle 做一次划分
  • Task 是 Stage 的子集,以并行度(分区数)来衡量,分区数是多少,则有多少个 task

Spark 的任务调度总体来说分两路进行,一路是 Stage 级的调度, 一路是 Task 级的调度,总体调度流程如下图所示:

Spark RDD 通过其 Transactions 操作,形成了 RDD 血缘关系图,即 DAG ,最后通过 Action 的调用,触发 Job 并调度执行。

DAGScheduler 负责 Stage 级的调度,主要是将 Job 切分成若干 Stages,并将每个 Stage 打包成 TaskSet 交给 TaskScheduler 调度。

TaskScheduler 负责 Task 级的调度,将 DAGScheduler 给过来的 TaskSet 按照指定的调度策略分发到 Executor 上执行,调度过程中 SchedulerBackend 负责提供可用资源, 其中 SchedulerBackend 有多种实现, 分别对接不同的资源管理系统。

Spark Stage 级调度

DAGScheduler 是实现了面向 stage 的调度,它可以为每个 Job 计算出一个 DAG,追踪 RDD 和 stage 的输出是否被持久化,并且寻找到一个最优调度机制来运行 Job.

  1. 接收用户提交的 Job;

  2. 将 Job 划分为不同 stage 的 DAG图,记录哪些 RDD、Stage 被物化存储,并在每一个 stage 内产生一系列的 task,并封装成 TaskSet;

  3. 要保证相互依赖的 Job/stage 能够得到顺利的调度执行,DAGScheduler 必然需要监控当前Job / Stage乃至Task的完成情况。

  4. 结合当前的缓存情况,决定每个 Task 的最佳位置(移动计算而不是移动数据,任务在数据所在的节点上运行),将 TaskSet 提交给 TaskScheduler;

    DAGScheduler 找到哪些 RDDs 已经被 cache 了来避免重计算它们,而且同样地记住哪些ShuffleMapStages 已经生成了输出文件来避免重建一个 shuffle 的 map 侧计算任务。

  5. 重新提交 Shuffle 输出丢失的 Stage 给 TaskScheduler

    处理由于 shuffle 输出文件丢失导致的失败,在这种情况下,旧的 stage 可能会被重新提交。一个 stage 内部的失败,如果不是由于 shuffle 文件丢失导致的,会被 TaskScheduler 处理,它会被多次重试每一个 task,直到最后一个。实在不行,才会被取消整个 stage。

Stage 划分

SparkContext 将 Job 提交给 DAGScheduler,DAGScheduler 将一个 Job 划分为若干 Stages ,具体划分策略是,以 Shuffle 为界,划分 Stage ,由最终的 RDD 不断通过依赖回溯判断父依赖是否是宽依赖,窄依赖的 RDD 被划分到同一个 Stage 中,进行 pipeline 式的计算,划分的 Stages 分两类,一类叫做 ResultStage 为 DAG 下游的 Stage,由 Action 方法决定; 另一类叫做 ShuffleMapStage,其为下游 Stage 准备数据。

生成 Job,提交 Stage

一个 Stage 是否被提交,需要判断它的父 Stage 是否执行,只有在父 Stage 执行完毕才能提交当前 Stage,如果一个 Stage 没有父 Stage,那么从该 Stage 开始提交。Stage 提交时会将 Task 信息[分区信息以及方法等]序列化并被打包成 TaskSet 交给 TaskScheduler,一个 Partition 对应一个 Task。

Spark Task 级调度

Spark Task 的调度是由 TaskScheduler 来完成。DAGScheduler 将 Stage 打包到 TaskSet 交给 TaskScheduler,TaskScheduler 会将 TaskSet 封装为 TaskSetManager 加入到调度队列中。

TaskSetManager 负责监控管理同一个 Stage 中的 Tasks,TaskScheduler 就是以 TaskSetManager 为单元来调度任务 。

TaskScheduler 初始化后会启动 SchedulerBackend,它负责跟外界打交道,接收 Executor 的注册信息,并维护 Executor 的状态,同时它在启动后会定期地去询问 TaskScheduler 是否有任务要运行,也就是说, 它会定期地问 TaskScheduler “我有这么余量,你要不要啊”,TaskScheduler 在 SchedulerBackend 问它的时候,会从调度队列中按照指定的调度策略选择 TaskSetManager 去运行。

SchedulerBackend负责与Cluster Manager交互,取得分配给 Application 的资源,并将资源传给TaskScheduler,由 TaskScheduler 为 Task 最终分配计算资源

调度策略

TaskScheduler 会先把 DAGScheduler 提交过来的 TaskSet 封装成 TaskSetManager 放到任务队列里,然后再从任务队列里按照一定的规则把它们取出来放在 SchedulerBackend 给过来的 Executor 上运行。这个调度过程实际上还是比较粗粒度的,是面向 TaskSetManager 的。

调度队列的层次结构如下图所示

TaskScheduler 支持两种调度策略,一种是 FIFO,也是默认的调度策略,另一种是 FAIR。

在 TaskScheduler 初始化过程中会实例化 rootPool,表示树的根节点, 是 Pool 类型。

  • FIFO 调度策略

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
    override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    val priority1 = s1.priority // jobId
    val priority2 = s2.priority
    var res = math.signum(priority1 - priority2)
    if (res == 0) {
    val stageId1 = s1.stageId
    val stageId2 = s2.stageId
    res = math.signum(stageId1 - stageId2)
    }
    res < 0
    }
    }
    • 比较 s1和 s2 所属的 JobId,值越小,优先级越高
    • 如果两个 JobId 的优先级相同, 则对 s1,s2所属的 StageId 进行比,值越小,优先级越高
  • Fair 调度策略

    FAIR 模式中有一个 Root Pool 和多个子 Pool,各个子 Pool 中 存储着所有待分配的 TaskSetManager 。

    可以通过在 Properties 中指定 spark.scheduler.pool 属性,指定某个调度池作为 TaskSetManager 的父调度池,如果根调度池不存在此属性值对应的调度池,会创建以此属性值为名称的调度池作为 TaskSetManager 的父调度池,并将此调度池作为根调度池的子调度池。

    在 FAIR 模式中,需要先对 子Pool 进行排序,再对 子Pool 里面的 TaskSetManager 进行排序,因为 Pool 和 TaskSetManager 都继承了 Schedulable 特质,因此使用相同的排序算法 。

    每个要排序的对象包含三个属性 : runningTasks 值[正在运行的 Task 数]、 minShare 值、 weight 值,比较时会综合考量三个属性值。

    注意,minShare 、weight 的值均在公平调度配置文件 fairscheduler.xml 中被指定, 调度池在构建阶段会读取此文件的相关配置。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
      <allocations>
    <pool name="production">
    <schedulingMode>FAIR</schedulingMode>
    <weight>1</weight>
    <minShare>2</minShare>
    </pool>
    <pool name="test">
    <schedulingMode>FIFO</schedulingMode>
    <weight>2</weight>
    <minShare>3</minShare>
    </pool>
    </allocations>
    • runningTasks 比 minShare 小的先执行

      如果 A 对象的 runningTasks 大于它的 minShare,B 对象的 runningTasks 小于它的 minShare,那么 B 排在 A 前面

    • minShare 使用率低的先执行

      如果A,B 对象的 runningTasks 都小于它的 minShare ,那么就比较 runningTasks 和 minShare 的比值 [minShare使用率]谁小谁排前面

    • 权重使用率低的先执行

      如果A、B 对象 的 runningTasks 都大于它们的 minShare ,那么就比较 runningTasks 与 weight 的比值(权重使用率),谁小谁排前面。

    • 如果上述比较均相等,则比较名字

    FAIR 模式排序完成后,所有的 TaskSetManager 被放入一个 ArrayBuffer 里,之后依次被取出并发送给 Executor 执行 。

从调度队列中拿到 TaskSetManager 后,由于 TaskSetManager 封装了一个 Stage 的所有 Task, 并负责管理调度这些 Task,接下来 TaskSetManager 按照一定的规则逐个取出 Task 给 TaskScheduler,TaskScheduler 提交给 SchedulerBackend 去发到 Executor 执行。

本地化调度

DAGScheduler 划分 Stage, 通过调用 submitStage 来提交一个 Stage 对应的 tasks, submitStage 会调用 submitMissingTasks,submitMissingTasks 确定每个需要计算的 task 的 preferredLocations,通过调用 getPreferrdeLocations() 得到 partition 的优先位置,由于一个 partition 对应一个task, 此 partition 的优先位置就是 task 的优先位置,对于要提交到 TaskScheduler 的 TaskSet 中的每一个 task ,该 task 优先位置与其对应的 partition 对应的优先位置一致

根据每个 task 的优先位置,确定 task 的 Locality 级别,Locality一共有五种,优先级由高到低顺序

PROCESS_LOCAL 进程本地化,task 和数据在同一个 Executor 中,性能最好。
NODE_LOCAL 节点本地化,task 和数据在同一个节点中,但是 task 和数据不在同一个 Executor 中,数据需要在进程间进行传输。
RACK_LOCAL 机架本地化,task 和数据在同一个机架的两个节点上,数据需要通过网络在节点之间进行传输。
NO_PREF 数据从哪里访问都一样快,不需要位置优先
ANY task 和数据不在一个机架中,性能最差。

在调度执行时,Spark 总是会尽量让每个 task 以最高的本地性级别来启动,当一个 task 以X本地性级别启动,但是该本地性级别对应的所有节点都没有空闲资源而启动失败,此时并不会马上降低本地性级别启动而是在某个时间长度内再次以 X 本地性级别来启动该 task,若超过限时时间则降级启动,去尝试下一个本地性级别,依次类推。

可以通过调大每个类别的最大容忍延迟时间,在等待阶段对应的 Executor 可能 就会有相应的资源去执行此 task,这就在在一定程度上提到了运行性能。

失败重试与黑名单机制

除了选择合适的 Task 调度机制外,还需要监控 Task 的执行状态,与外部通信的是 SchedulerBackend。

Task 被提交到 Executor 启动执行后,Executor 会将执行状态上报给 SchedulerBackend, SchedulerBackend 则通知该 Task 对应的 TaskSetManager,TaskSetManager 获取得知 Task 的执行状态,对于失败的 Task,TaskSetManager 会记录失败次数,如果失败次数还没有超过最大重试次数,则把该 Task 放回待调度的 Task 池子中,否则整个 Application 失败。

在记录 Task 失败次数过程中,会记录其上一次失败所在的 ExecutorId 和 Host,下次调度该 Task 时,会使用黑名单机制,避免再次被调度到上一次失败的节点上,起到一定的容错作用。

黑名单记录 Task 上一次失败所在的 ExecutorId 和 Host,以及其对应的 “拉黑时间”.

“拉黑时间”是指这段时间内不要再往这个节点上调度这个 Task 了。