当前位置:文档之家› Spark源码学习(4)——Scheduler

Spark源码学习(4)——Scheduler

Spark源码学习(4)——Scheduler Scheduler的基本过程用户提交的Job到DAGScheduler后,会封装成ActiveJob,同时启动JobWaiter监听作业的完成情况。

同时依据job中RDD的dependency和dependency属性(窄依赖NarrowDependency,宽依赖ShufflerDependecy),DAGScheduler会根据依赖关系的先后产生出不同的stage DAG(result stage, shuffle map stage)。

在每一个stage内部,根据stage产生出相应的task,包括ResultTask或是ShuffleMapTask,这些task会根据RDD中partition的数量和分布,产生出一组相应的task,并将其包装为TaskSet提交到TaskScheduler上去。

DAGSchedulerDAGScheduler是高层级别的调度器。

实现了stage-oriented调度。

它计算一个DAG中stage 的工作。

并将这些stage输出落地物化。

最终提交stage以taskSet方式提交给TaskScheduler。

DAGScheduler需要接收上下层的消息,它也是一个actor。

这里主要看看他的一些事件处理。

以下是的所处理的事件。

private[scheduler] case class JobSubmitted(jobId: Int,finalRDD: RDD[_],func: (TaskContext, Iterator[_]) => _,partitions: Array[Int],callSite: CallSite,listener: JobListener,properties: Properties = null)extends DAGSchedulerEventprivate[scheduler] case class StageCancelled(stageId: Int) extends DAGSchedulerEventprivate[scheduler] case class JobCancelled(jobId: Int) extends DAGSchedulerEventprivate[scheduler] case class JobGroupCancelled(groupId: String) extends DAGSchedulerEventprivate[scheduler] case object AllJobsCancelled extends DAGSchedulerEventprivate[scheduler]case class BeginEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEventprivate[scheduler]case class GettingResultEvent(taskInfo: TaskInfo) extends DAGSchedulerEvent还有很多,不一一罗列。

JobSubmitted//处理提交的作业,完成Job到stage的转换,生成finalStage,并SubmitStage。

private[scheduler] def handleJobSubmitted(jobId: Int,finalRDD: RDD[_],func: (TaskContext, Iterator[_]) => _,partitions: Array[Int],callSite: CallSite,listener: JobListener,properties: Properties) {var finalStage: ResultStage = nulltry {// New stage creation may throw an exception if, for example, jobs are run on a// HadoopRDD whose underlying HDFS files have been deleted.finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)} catch {case e: Exception =>logWarning("Creating new stage failed due to exception - job: " + jobId, e)listener.jobFailed(e)return}val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)clearCacheLocs()logInfo("Got job %s (%s) with %d output partitions".format(job.jobId, callSite.shortForm, partitions.length))logInfo("Final stage: " + finalStage + " (" + + ")")logInfo("Parents of final stage: " + finalStage.parents)logInfo("Missing parents: " + getMissingParentStages(finalStage))val jobSubmissionTime = clock.getTimeMillis()jobIdToActiveJob(jobId) = jobactiveJobs += jobfinalStage.setActiveJob(job)val stageIds = jobIdToStageIds(jobId).toArrayval stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))listenerBus.post(SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))submitStage(finalStage)}进入submitStage方法。

submitStage提交stage,第一个提交的是没有父依赖关系的。

/** Submits stage, but first recursively submits any missing parents. */private def submitStage(stage: Stage) {val jobId = activeJobForStage(stage)if (jobId.isDefined) {logDebug("submitStage(" + stage + ")")if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {val missing = getMissingParentStages(stage).sortBy(_.id)logDebug("missing: " + missing)if (missing.isEmpty) {logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")//没有stage依赖submitMissingTasks(stage, jobId.get)} else {//有父依赖递归处理父stagefor (parent <- missing) {submitStage(parent)}waitingStages += stage}}} else {abortStage(stage, "No active job for stage " + stage.id, None)}}如果计算中发现当前的stage没有任何的依赖关系。

则直接提交task。

源码中的getMissingParentStages是获取父stage。

源码如下://查找所有的stage封装成list集合private def getMissingParentStages(stage: Stage): List[Stage] = {val missing = new HashSet[Stage]val visited = new HashSet[RDD[_]]// We are manually maintaining a stack here to prevent StackOverflowError// caused by recursively visitingval waitingForVisit = new Stack[RDD[_]]def visit(rdd: RDD[_]) {if (!visited(rdd)) {visited += rddval rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)if (rddHasUncachedPartitions) {for (dep <- rdd.dependencies) {dep match {//如果是ShufflerDependecy,则新建一个ShuffleMapStage,且该stage是可用的话,则加入missing中//ShufflerDependecy表示Shuffle过程的依赖case shufDep: ShuffleDependency[_, _, _] =>val mapStage = getShuffleMapStage(shufDep, stage.firstJobId)if (!mapStage.isAvailable) {missing += mapStage}case narrowDep: NarrowDependency[_] =>waitingForVisit.push(narrowDep.rdd)}}}}}waitingForVisit.push(stage.rdd)while (waitingForVisit.nonEmpty) {visit(waitingForVisit.pop())}missing.toList}继续submitStage,进入submitMissingTasks方法。

相关主题