本文分为两个部分:
collect后Job的提交流程
点击「链接」查看DataFrame.collect触发的作业提交流程思维导图。
def collect(): Array[T] = withAction("collect", queryExecution)(collectFromPlan)
触发物理计划的执行,其中 plan 的类型是 SparkPlan
private def collectFromPlan(plan: SparkPlan): Array[T] = {
val fromRow = resolvedEnc.createDeserializer()
plan.executeCollect().map(fromRow)
}
Spark 有很多action 函数,比如:
最终都是通过 collectFromPlan 去创建 Job
executeCollect
这个函数分为三部:
Resilient Distributed Dataset (RDD), 是一种不可变、支持分区的数据集合。由于支持分区,该数据集支持并行访问。
class RDD是一个基类,它有很多子类:
collect 方法的主要职能是提交 Spark 作业,该功能代理给了 SparkContext 去支持:
runJob 方法有很多重载,我们只关心最复杂的一个:
从功能上来说,它实现了
从代码上来说,方法声明如下:
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit
它有两个泛型类型参数:
参数列表:
返回值: Unit 表示没有任何返回值
对于 DAGScheduler 而言,Stage是最小的调度单元。它会
DAGScheduler 对Job的调度是围绕 DAGSchedulerEventProcessLoop 展开的。这是一个经典的EventLoop使用场景。runJob 方法的执行流程如下:
在 EventLoop 的另一端,onReceive 接收到 JobSubmitted事件,交给成员函数 handleJobSubmitted 处理该事件。
JobWaiter 内部有一个 Promise 对象,它会不停接收到 taskSucceeded,增加计数,知道成功task的数量等于task的总数量,将promise置为成功。
onReceive 负责接收各类事件,并分发给特定的 handler 函数处理,具体可以看思维导图或spark代码。
这里我们只看 handleJobSubmitted,它做了五件事情:
由于 stage 是一个有向无环图,所以创建和执行都遵循 topological order。
在 SparkPlan 对象调用 execute 时,会递归地生成 RDD,从而构成了 RDD Lineage Graph,它是一个有向无环图。那么在 RDD Lineage 上如何切分 stage 呢?
RDD依赖分为宽依赖和窄依赖,代码体现为两个类ShuffleDependency和NarrowDependency。在构建 RDD Lineage时,相邻的两个RDD必须有其中一种依赖关系。Spark通过这种依赖关系划分 Stage。根节点的RDD必须分配到 ResultStage里,而之前所有的Stage,不管有多少级依赖,都是 ShuffleMapStage。
在
DAGScheduler.getShuffleDependenciesAndResourceProfiles
方法中,通过一个栈来记录分配到当前stage中的 RDD(窄依赖中的rdd都会被push到栈里),碰到宽依赖,则加到 shuffleDeps 中。
getShuffleDependenciesAndResourceProfiles
留言与评论(共有 0 条评论) “” |