ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Spark-SubmitTask

2019-08-06 22:57:11  阅读:265  来源: 互联网

标签:case val jobId rdd SubmitTask Spark partitions stage


1.Rdd rdd中 reduce、fold、aggregate 这些ShuffleTask  还有collect、count这些finalTask 都会调用 sparkContext.runJob def reduce(f: (T, T) => T): T = withScope {   val cleanF = sc.clean(f)   val reducePartition: Iterator[T] => Option[T] = iter => {     if (iter.hasNext) {       Some(iter.reduceLeft(cleanF))     } else {       None     }   }   var jobResult: Option[T] = None   val mergeResult = (index: Int, taskResult: Option[T]) => {     if (taskResult.isDefined) {       jobResult = jobResult match {         case Some(value) => Some(f(value, taskResult.get))         case None => taskResult       }     }   }   sc.runJob(this, reducePartition, mergeResult)   // Get the final result out of our Option, or throw an exception if the RDD was empty   jobResult.getOrElse(throw new UnsupportedOperationException("empty collection")) }     def runJob[T, U: ClassTag](     rdd: RDD[T],     processPartition: Iterator[T] => U,     resultHandler: (Int, U) => Unit) {   val processFunc = (context: TaskContext, iter: Iterator[T]) => processPartition(iter)   runJob[T, U](rdd, processFunc, 0 until rdd.partitions.length, resultHandler) }   2.SparkContext def runJob[T, U: ClassTag](     rdd: RDD[T],     func: (TaskContext, Iterator[T]) => U,     partitions: Seq[Int],     resultHandler: (Int, U) => Unit): Unit = {   if (stopped.get()) {     throw new IllegalStateException("SparkContext has been shutdown")   }   val callSite = getCallSite   val cleanedFunc = clean(func)   logInfo("Starting job: " + callSite.shortForm)   if (conf.getBoolean("spark.logLineage", false)) {     logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)   }   dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)   progressBar.foreach(_.finishAll())   rdd.doCheckpoint() }   3.DAGSchedule def runJob[T, U](     rdd: RDD[T],     func: (TaskContext, Iterator[T]) => U,     partitions: Seq[Int],     callSite: CallSite,     resultHandler: (Int, U) => Unit,     properties: Properties): Unit = {   val start = System.nanoTime   val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)   ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)   waiter.completionFuture.value.get match {     case scala.util.Success(_) =>       logInfo("Job %d finished: %s, took %f s".format         (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))     case scala.util.Failure(exception) =>       logInfo("Job %d failed: %s, took %f s".format         (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))       // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.       val callerStackTrace = Thread.currentThread().getStackTrace.tail       exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)       throw exception   } }     def submitJob[T, U](     rdd: RDD[T],     func: (TaskContext, Iterator[T]) => U,     partitions: Seq[Int],     callSite: CallSite,     resultHandler: (Int, U) => Unit,     properties: Properties): JobWaiter[U] = {   // Check to make sure we are not launching a task on a partition that does not exist.   val maxPartitions = rdd.partitions.length   partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>     throw new IllegalArgumentException(       "Attempting to access a non-existent partition: " + p + ". " +         "Total number of partitions: " + maxPartitions)   }       val jobId = nextJobId.getAndIncrement()   if (partitions.size == 0) {     // Return immediately if the job is running 0 tasks     return new JobWaiter[U](this, jobId, 0, resultHandler)   }       assert(partitions.size > 0)   val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]   val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)   eventProcessLoop.post((     jobId, rdd, func2, partitions.toArray, callSite, waiter,     SerializationUtils.clone(properties)))   waiter }   4.DAGSchedulerEventProcessLoop override def onReceive(event: DAGSchedulerEvent): Unit = {   val timerContext = timer.time()   try {     doOnReceive(event)   } finally {     timerContext.stop()   } }     private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {   case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>     dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)       case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>     dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)       case StageCancelled(stageId, reason) =>     dagScheduler.handleStageCancellation(stageId, reason)       case JobCancelled(jobId, reason) =>     dagScheduler.handleJobCancellation(jobId, reason)       case JobGroupCancelled(groupId) =>     dagScheduler.handleJobGroupCancelled(groupId)       case AllJobsCancelled =>     dagScheduler.doCancelAllJobs()       case ExecutorAdded(execId, host) =>     dagScheduler.handleExecutorAdded(execId, host)       case ExecutorLost(execId, reason) =>     val workerLost = reason match {       case SlaveLost(_, true) => true       case _ => false     }     dagScheduler.handleExecutorLost(execId, workerLost)       case WorkerRemoved(workerId, host, message) =>     dagScheduler.handleWorkerRemoved(workerId, host, message)       case BeginEvent(task, taskInfo) =>     dagScheduler.handleBeginEvent(task, taskInfo)       case SpeculativeTaskSubmitted(task) =>     dagScheduler.handleSpeculativeTaskSubmitted(task)       case GettingResultEvent(taskInfo) =>     dagScheduler.handleGetTaskResult(taskInfo)       case completion: CompletionEvent =>     dagScheduler.handleTaskCompletion(completion)       case TaskSetFailed(taskSet, reason, exception) =>     dagScheduler.handleTaskSetFailed(taskSet, reason, exception)       case ResubmitFailedStages =>     dagScheduler.resubmitFailedStages() }   5.DAGScheduler   M-submitStage 和 M-getMissingParentStages 构成spark stage划分  划分过程中创建stage 是 M-getOrCreateShuffleMapStage 第一次会创建,第二次就是从map中取(也就是从内存中取)   把一个app 划分成多个stage 使用M-submitMissingTasks 提交过去   M-submitStage 划分过程 ResultStage 是最后一个stage , 假如ResultStage 依赖ShuffleMapStage B ShuffleMapStage B 依赖ShuffleMapStage A 会优先提交A,提交后把 B 和Result 放入 waitingStages   M-submitMissingTasks  根据不同的Stage  将rdd 和 func 或者 stage.shuffleDep 封装到 taskBinaryBytes 最后更具不同的partition id放入Task 中  存入taskset 中   等A 运行完之后,最后一行 submitWaitingChildStages(stage)   M-submitWaitingChildStages 根据当前的stage 从waitingStages 找出当前的stage 的子stage  然后再次提交到  submitStage   M-getMissingParentStages if (!mapStage.isAvailable)  则不为true 则不会再次提交 这个是获取mapOutputTrackerMaster 中  _numAvailableOutputs 数量是否和分区数相等。如果相等,则表示 该Stage 已经处理过   taskBinaryBytes = stage match {   case stage: ShuffleMapStage =>     JavaUtils.bufferToArray(       closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))   case stage: ResultStage =>     JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef)) }     taskBinary = sc.broadcast(taskBinaryBytes)     new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,   taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),   Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())   new ResultTask(stage.id, stage.latestInfo.attemptNumber,   taskBinary, part, locs, id, properties, serializedTaskMetrics,   Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,   stage.rdd.isBarrier())     private[scheduler] def handleJobSubmitted(jobId: Int,     finalRDD: RDD[_],     func: (TaskContext, Iterator[_]) => _,     partitions: Array[Int],     callSite: CallSite,     listener: JobListener,     properties: Properties) {   var finalStage: ResultStage = null   try {     // New stage creation may throw an exception if, for example, jobs are run on a     // HadoopRDD whose underlying HDFS files have been deleted.     finalStage =  createResultStage(finalRDD, func, partitions, jobId, callSite)   } catch {     case e: BarrierJobSlotsNumberCheckFailed =>       logWarning(s"The job $jobId requires to run a barrier stage that requires more slots " +         "than the total number of slots in the cluster currently.")       // If jobId doesn't exist in the map, Scala coverts its value null to 0: Int automatically.       val numCheckFailures = barrierJobIdToNumTasksCheckFailures.compute(jobId,         new BiFunction[Int, Int, Int] {           override def apply(key: Int, value: Int): Int = value + 1         })       if (numCheckFailures <= maxFailureNumTasksCheck) {         messageScheduler.schedule(           new Runnable {             override def run(): Unit = eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func,               partitions, callSite, listener, properties))           },           timeIntervalNumTasksCheck,           TimeUnit.SECONDS         )         return       } else {         // Job failed, clear internal data.         barrierJobIdToNumTasksCheckFailures.remove(jobId)         listener.jobFailed(e)         return       }         case e: Exception =>       logWarning("Creating new stage failed due to exception - job: " + jobId, e)       listener.jobFailed(e)       return   }   // Job submitted, clear internal data.   barrierJobIdToNumTasksCheckFailures.remove(jobId)       val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)   clearCacheLocs()   logInfo("Got job %s (%s) with %d output partitions".format(     job.jobId, callSite.shortForm, partitions.length))   logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")   logInfo("Parents of final stage: " + finalStage.parents)   logInfo("Missing parents: " + getMissingParentStages(finalStage))       val jobSubmissionTime = clock.getTimeMillis()   jobIdToActiveJob(jobId) = job   activeJobs += job   finalStage.setActiveJob(job)   val stageIds = jobIdToStageIds(jobId).toArray   val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))   listenerBus.post(     SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))   submitStage(finalStage) }         private def submitStage(stage: Stage) {   val jobId = activeJobForStage(stage)   if (jobId.isDefined) {     logDebug("submitStage(" + stage + ")")     if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {       val missing = getMissingParentStages(stage).sortBy(_.id)       logDebug("missing: " + missing)       if (missing.isEmpty) {         logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")         submitMissingTasks(stage, jobId.get)       } else {         for (parent <- missing) {           submitStage(parent)         }         waitingStages += stage       }     }   } else {     abortStage(stage, "No active job for stage " + stage.id, None)   } }     private def getMissingParentStages(stage: Stage): List[Stage] = {   val missing = new HashSet[Stage]   val visited = new HashSet[RDD[_]]   // We are manually maintaining a stack here to prevent StackOverflowError   // caused by recursively visiting   val waitingForVisit = new ArrayStack[RDD[_]]   def visit(rdd: RDD[_]) {     if (!visited(rdd)) {       visited += rdd       val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)       if (rddHasUncachedPartitions) {         for (dep <- rdd.dependencies) {           dep match {             case shufDep: ShuffleDependency[_, _, _] =>               val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)               if (!mapStage.isAvailable) {                 missing += mapStage               }             case narrowDep: NarrowDependency[_] =>               waitingForVisit.push(narrowDep.rdd)           }         }       }     }   }   waitingForVisit.push(stage.rdd)   while (waitingForVisit.nonEmpty) {     visit(waitingForVisit.pop())   }   missing.toList }       private def submitMissingTasks(stage: Stage, jobId: Int) {   logDebug("submitMissingTasks(" + stage + ")")       // First figure out the indexes of partition ids to compute.   val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()       // Use the scheduling pool, job group, description, etc. from an ActiveJob associated   // with this Stage   val properties = jobIdToActiveJob(jobId).properties       runningStages += stage   // SparkListenerStageSubmitted should be posted before testing whether tasks are   // serializable. If tasks are not serializable, a SparkListenerStageCompleted event   // will be posted, which should always come after a corresponding SparkListenerStageSubmitted   // event.   stage match {     case s: ShuffleMapStage =>       outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)     case s: ResultStage =>       outputCommitCoordinator.stageStart(         stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)   }   val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {     stage match {       case s: ShuffleMapStage =>         partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap       case s: ResultStage =>         partitionsToCompute.map { id =>           val p = s.partitions(id)           (id, getPreferredLocs(stage.rdd, p))         }.toMap     }   } catch {     case NonFatal(e) =>       stage.makeNewStageAttempt(partitionsToCompute.size)       listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))       abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))       runningStages -= stage       return   }       stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)       // If there are tasks to execute, record the submission time of the stage. Otherwise,   // post the even without the submission time, which indicates that this stage was   // skipped.   if (partitionsToCompute.nonEmpty) {     stage.latestInfo.submissionTime = Some(clock.getTimeMillis())   }   listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))       // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.   // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast   // the serialized copy of the RDD and for each task we will deserialize it, which means each   // task gets a different copy of the RDD. This provides stronger isolation between tasks that   // might modify state of objects referenced in their closures. This is necessary in Hadoop   // where the JobConf/Configuration object is not thread-safe.   var taskBinary: Broadcast[Array[Byte]] = null   var partitions: Array[Partition] = null   try {     // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).     // For ResultTask, serialize and broadcast (rdd, func).     var taskBinaryBytes: Array[Byte] = null     // taskBinaryBytes and partitions are both effected by the checkpoint status. We need     // this synchronization in case another concurrent job is checkpointing this RDD, so we get a     // consistent view of both variables.     RDDCheckpointData.synchronized {       taskBinaryBytes = stage match {         case stage: ShuffleMapStage =>           JavaUtils.bufferToArray(             closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))         case stage: ResultStage =>           JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))       }           partitions = stage.rdd.partitions     }         taskBinary = sc.broadcast(taskBinaryBytes)   } catch {     // In the case of a failure during serialization, abort the stage.     case e: NotSerializableException =>       abortStage(stage, "Task not serializable: " + e.toString, Some(e))       runningStages -= stage           // Abort execution       return     case NonFatal(e) =>       abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e))       runningStages -= stage       return   }       val tasks: Seq[Task[_]] = try {     val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()     stage match {       case stage: ShuffleMapStage =>         stage.pendingPartitions.clear()         partitionsToCompute.map { id =>           val locs = taskIdToLocations(id)           val part = partitions(id)           stage.pendingPartitions += id           new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,             taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),             Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())         }           case stage: ResultStage =>         partitionsToCompute.map { id =>           val p: Int = stage.partitions(id)           val part = partitions(p)           val locs = taskIdToLocations(id)           new ResultTask(stage.id, stage.latestInfo.attemptNumber,             taskBinary, part, locs, id, properties, serializedTaskMetrics,             Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,             stage.rdd.isBarrier())         }     }   } catch {     case NonFatal(e) =>       abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))       runningStages -= stage       return   }       if (tasks.size > 0) {     logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +       s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")     taskScheduler.submitTasks(new TaskSet(       tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))   } else {     // Because we posted SparkListenerStageSubmitted earlier, we should mark     // the stage as completed here in case there are no tasks to run     markStageAsFinished(stage, None)         stage match {       case stage: ShuffleMapStage =>         logDebug(s"Stage ${stage} is actually done; " +             s"(available: ${stage.isAvailable}," +             s"available outputs: ${stage.numAvailableOutputs}," +             s"partitions: ${stage.numPartitions})")         markMapStageJobsAsFinished(stage)       case stage : ResultStage =>         logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})")     }     submitWaitingChildStages(stage)   } }       private def submitWaitingChildStages(parent: Stage) {   logTrace(s"Checking if any dependencies of $parent are now runnable")   logTrace("running: " + runningStages)   logTrace("waiting: " + waitingStages)   logTrace("failed: " + failedStages)   val childStages = waitingStages.filter(_.parents.contains(parent)).toArray   waitingStages --= childStages   for (stage <- childStages.sortBy(_.firstJobId)) {     submitStage(stage)   } }   6.TaskScheduleImpl 这部实际是对taskset 进行封装成TaskSetManager 放入队列 override def submitTasks(taskSet: TaskSet) {   val tasks = taskSet.tasks   logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")   this.synchronized {     val manager = createTaskSetManager(taskSet, maxTaskFailures)     val stage = taskSet.stageId     val stageTaskSets =       taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])     stageTaskSets(taskSet.stageAttemptId) = manager     val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>       ts.taskSet != taskSet && !ts.isZombie     }     if (conflictingTaskSet) {       throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +         s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")     }     //这一步实际上把taskset放入调度队列中     schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)         if (!isLocal && !hasReceivedTask) {       starvationTimer.scheduleAtFixedRate(new TimerTask() {         override def run() {           if (!hasLaunchedTask) {             logWarning("Initial job has not accepted any resources; " +               "check your cluster UI to ensure that workers are registered " +               "and have sufficient resources")           } else {             this.cancel()           }         }       }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)     }     hasReceivedTask = true   }     //通知 StandaloneSchedulerBackend 进行通知,对任务队列中的task 进行分配executor    backend.reviveOffers() }     7.FIFOSchedulableBuilder //将TaskSetManager 放入调度队列中 override def addTaskSetManager(manager: Schedulable, properties: Properties) {   rootPool.addSchedulable(manager) }     8.CoarseGrainedSchedulerBackend 主要是对executor进行过滤,然后executor 和 task 分配 最后启动task,也就是向executor 发送launchtask 的消息  launchTask 其实发送的是TaskDescription,TaskDescription 包含了 task 和 executor 信息 TaskSetManager 生成的 TaskDescription   private def makeOffers() {   // Make sure no executor is killed while some task is launching on it   val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {     // Filter out executors under killing     val activeExecutors = executorDataMap.filterKeys(executorIsAlive)     val workOffers = activeExecutors.map {       case (id, executorData) =>         new WorkerOffer(id, executorData.executorHost, executorData.freeCores,           Some(executorData.executorAddress.hostPort))     }.toIndexedSeq     scheduler.resourceOffers(workOffers)   }   if (!taskDescs.isEmpty) {     launchTasks(taskDescs)   } }       def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {   // Mark each slave as alive and remember its hostname   // Also track if new executor is added   var newExecAvail = false   for (o <- offers) {     if (!hostToExecutors.contains(o.host)) {       hostToExecutors(o.host) = new HashSet[String]()     }     if (!executorIdToRunningTaskIds.contains(o.executorId)) {       hostToExecutors(o.host) += o.executorId       executorAdded(o.executorId, o.host)       executorIdToHost(o.executorId) = o.host       executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()       newExecAvail = true     }     for (rack <- getRackForHost(o.host)) {       hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host     }   }       // Before making any offers, remove any nodes from the blacklist whose blacklist has expired. Do   // this here to avoid a separate thread and added synchronization overhead, and also because   // updating the blacklist is only relevant when task offers are being made.   blacklistTrackerOpt.foreach(_.applyBlacklistTimeout())       val filteredOffers = blacklistTrackerOpt.map { blacklistTracker =>     offers.filter { offer =>       !blacklistTracker.isNodeBlacklisted(offer.host) &&         !blacklistTracker.isExecutorBlacklisted(offer.executorId)     }   }.getOrElse(offers)       val shuffledOffers = shuffleOffers(filteredOffers)   // Build a list of tasks to assign to each worker.   val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))   val availableCpus = shuffledOffers.map(o => o.cores).toArray   val availableSlots = shuffledOffers.map(o => o.cores / CPUS_PER_TASK).sum   val sortedTaskSets = rootPool.getSortedTaskSetQueue   for (taskSet <- sortedTaskSets) {     logDebug("parentName: %s, name: %s, runningTasks: %s".format(       taskSet.parent.name, taskSet.name, taskSet.runningTasks))     if (newExecAvail) {       taskSet.executorAdded()     }   }       // Take each TaskSet in our scheduling order, and then offer it each node in increasing order   // of locality levels so that it gets a chance to launch local tasks on all of them.   // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY   for (taskSet <- sortedTaskSets) {     // Skip the barrier taskSet if the available slots are less than the number of pending tasks.     if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {       // Skip the launch process.       // TODO SPARK-24819 If the job requires more slots than available (both busy and free       // slots), fail the job on submit.       logInfo(s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " +         s"because the barrier taskSet requires ${taskSet.numTasks} slots, while the total " +         s"number of available slots is $availableSlots.")     } else {       var launchedAnyTask = false       // Record all the executor IDs assigned barrier tasks on.       val addressesWithDescs = ArrayBuffer[(String, TaskDescription)]()       for (currentMaxLocality <- taskSet.myLocalityLevels) {         var launchedTaskAtCurrentMaxLocality = false         do {           launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet,             currentMaxLocality, shuffledOffers, availableCpus, tasks, addressesWithDescs)           launchedAnyTask |= launchedTaskAtCurrentMaxLocality         } while (launchedTaskAtCurrentMaxLocality)       }           if (!launchedAnyTask) {         taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors).foreach { taskIndex =>             // If the taskSet is unschedulable we try to find an existing idle blacklisted             // executor. If we cannot find one, we abort immediately. Else we kill the idle             // executor and kick off an abortTimer which if it doesn't schedule a task within the             // the timeout will abort the taskSet if we were unable to schedule any task from the             // taskSet.             // Note 1: We keep track of schedulability on a per taskSet basis rather than on a per             // task basis.             // Note 2: The taskSet can still be aborted when there are more than one idle             // blacklisted executors and dynamic allocation is on. This can happen when a killed             // idle executor isn't replaced in time by ExecutorAllocationManager as it relies on             // pending tasks and doesn't kill executors on idle timeouts, resulting in the abort             // timer to expire and abort the taskSet.             executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) match {               case Some ((executorId, _)) =>                 if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) {                   blacklistTrackerOpt.foreach(blt => blt.killBlacklistedIdleExecutor(executorId))                       val timeout = conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000                   unschedulableTaskSetToExpiryTime(taskSet) = clock.getTimeMillis() + timeout                   logInfo(s"Waiting for $timeout ms for completely "                     + s"blacklisted task to be schedulable again before aborting $taskSet.")                   abortTimer.schedule(                     createUnschedulableTaskSetAbortTimer(taskSet, taskIndex), timeout)                 }               case None => // Abort Immediately                 logInfo("Cannot schedule any task because of complete blacklisting. No idle" +                   s" executors can be found to kill. Aborting $taskSet." )                 taskSet.abortSinceCompletelyBlacklisted(taskIndex)             }         }       } else {         // We want to defer killing any taskSets as long as we have a non blacklisted executor         // which can be used to schedule a task from any active taskSets. This ensures that the         // job can make progress.         // Note: It is theoretically possible that a taskSet never gets scheduled on a         // non-blacklisted executor and the abort timer doesn't kick in because of a constant         // submission of new TaskSets. See the PR for more details.         if (unschedulableTaskSetToExpiryTime.nonEmpty) {           logInfo("Clearing the expiry times for all unschedulable taskSets as a task was " +             "recently scheduled.")           unschedulableTaskSetToExpiryTime.clear()         }       }           if (launchedAnyTask && taskSet.isBarrier) {         // Check whether the barrier tasks are partially launched.         // TODO SPARK-24818 handle the assert failure case (that can happen when some locality         // requirements are not fulfilled, and we should revert the launched tasks).         require(addressesWithDescs.size == taskSet.numTasks,           s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " +             s"because only ${addressesWithDescs.size} out of a total number of " +             s"${taskSet.numTasks} tasks got resource offers. The resource offers may have " +             "been blacklisted or cannot fulfill task locality requirements.")             // materialize the barrier coordinator.         maybeInitBarrierCoordinator()             // Update the taskInfos into all the barrier task properties.         val addressesStr = addressesWithDescs           // Addresses ordered by partitionId           .sortBy(_._2.partitionId)           .map(_._1)           .mkString(",")         addressesWithDescs.foreach(_._2.properties.setProperty("addresses", addressesStr))             logInfo(s"Successfully scheduled all the ${addressesWithDescs.size} tasks for barrier " +           s"stage ${taskSet.stageId}.")       }     }   }       // TODO SPARK-24823 Cancel a job that contains barrier stage(s) if the barrier tasks don't get   // launched within a configured time.   if (tasks.size > 0) {     hasLaunchedTask = true   }   return tasks }         private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {   for (task <- tasks.flatten) {     val serializedTask = TaskDescription.encode(task)     if (serializedTask.limit() >= maxRpcMessageSize) {       Option(scheduler.taskIdToTaskSetManager.get(task.taskId)).foreach { taskSetMgr =>         try {           var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +             "spark.rpc.message.maxSize (%d bytes). Consider increasing " +             "spark.rpc.message.maxSize or using broadcast variables for large values."           msg = msg.format(task.taskId, task.index, serializedTask.limit(), maxRpcMessageSize)           taskSetMgr.abort(msg)         } catch {           case e: Exception => logError("Exception in error callback", e)         }       }     }     else {       val executorData = executorDataMap(task.executorId)       executorData.freeCores -= scheduler.CPUS_PER_TASK           logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +         s"${executorData.executorHost}.")           executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))     }   } }   1.Rdd rdd中 reduce、fold、aggregate 这些ShuffleTask  还有collect、count这些finalTask 都会调用 sparkContext.runJob def reduce(f: (T, T) => T): T = withScope {   val cleanF = sc.clean(f)   val reducePartition: Iterator[T] => Option[T] = iter => {     if (iter.hasNext) {       Some(iter.reduceLeft(cleanF))     } else {       None     }   }   var jobResult: Option[T] = None   val mergeResult = (index: Int, taskResult: Option[T]) => {     if (taskResult.isDefined) {       jobResult = jobResult match {         case Some(value) => Some(f(value, taskResult.get))         case None => taskResult       }     }   }   sc.runJob(this, reducePartition, mergeResult)   // Get the final result out of our Option, or throw an exception if the RDD was empty   jobResult.getOrElse(throw new UnsupportedOperationException("empty collection")) }     def runJob[T, U: ClassTag](     rdd: RDD[T],     processPartition: Iterator[T] => U,     resultHandler: (Int, U) => Unit) {   val processFunc = (context: TaskContext, iter: Iterator[T]) => processPartition(iter)   runJob[T, U](rdd, processFunc, 0 until rdd.partitions.length, resultHandler) }   2.SparkContext def runJob[T, U: ClassTag](     rdd: RDD[T],     func: (TaskContext, Iterator[T]) => U,     partitions: Seq[Int],     resultHandler: (Int, U) => Unit): Unit = {   if (stopped.get()) {     throw new IllegalStateException("SparkContext has been shutdown")   }   val callSite = getCallSite   val cleanedFunc = clean(func)   logInfo("Starting job: " + callSite.shortForm)   if (conf.getBoolean("spark.logLineage", false)) {     logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)   }   dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)   progressBar.foreach(_.finishAll())   rdd.doCheckpoint() }   3.DAGSchedule def runJob[T, U](     rdd: RDD[T],     func: (TaskContext, Iterator[T]) => U,     partitions: Seq[Int],     callSite: CallSite,     resultHandler: (Int, U) => Unit,     properties: Properties): Unit = {   val start = System.nanoTime   val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)   ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)   waiter.completionFuture.value.get match {     case scala.util.Success(_) =>       logInfo("Job %d finished: %s, took %f s".format         (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))     case scala.util.Failure(exception) =>       logInfo("Job %d failed: %s, took %f s".format         (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))       // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.       val callerStackTrace = Thread.currentThread().getStackTrace.tail       exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)       throw exception   } }     def submitJob[T, U](     rdd: RDD[T],     func: (TaskContext, Iterator[T]) => U,     partitions: Seq[Int],     callSite: CallSite,     resultHandler: (Int, U) => Unit,     properties: Properties): JobWaiter[U] = {   // Check to make sure we are not launching a task on a partition that does not exist.   val maxPartitions = rdd.partitions.length   partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>     throw new IllegalArgumentException(       "Attempting to access a non-existent partition: " + p + ". " +         "Total number of partitions: " + maxPartitions)   }       val jobId = nextJobId.getAndIncrement()   if (partitions.size == 0) {     // Return immediately if the job is running 0 tasks     return new JobWaiter[U](this, jobId, 0, resultHandler)   }       assert(partitions.size > 0)   val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]   val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)   eventProcessLoop.post((     jobId, rdd, func2, partitions.toArray, callSite, waiter,     SerializationUtils.clone(properties)))   waiter }   4.DAGSchedulerEventProcessLoop override def onReceive(event: DAGSchedulerEvent): Unit = {   val timerContext = timer.time()   try {     doOnReceive(event)   } finally {     timerContext.stop()   } }     private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {   case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>     dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)       case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>     dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)       case StageCancelled(stageId, reason) =>     dagScheduler.handleStageCancellation(stageId, reason)       case JobCancelled(jobId, reason) =>     dagScheduler.handleJobCancellation(jobId, reason)       case JobGroupCancelled(groupId) =>     dagScheduler.handleJobGroupCancelled(groupId)       case AllJobsCancelled =>     dagScheduler.doCancelAllJobs()       case ExecutorAdded(execId, host) =>     dagScheduler.handleExecutorAdded(execId, host)       case ExecutorLost(execId, reason) =>     val workerLost = reason match {       case SlaveLost(_, true) => true       case _ => false     }     dagScheduler.handleExecutorLost(execId, workerLost)       case WorkerRemoved(workerId, host, message) =>     dagScheduler.handleWorkerRemoved(workerId, host, message)       case BeginEvent(task, taskInfo) =>     dagScheduler.handleBeginEvent(task, taskInfo)       case SpeculativeTaskSubmitted(task) =>     dagScheduler.handleSpeculativeTaskSubmitted(task)       case GettingResultEvent(taskInfo) =>     dagScheduler.handleGetTaskResult(taskInfo)       case completion: CompletionEvent =>     dagScheduler.handleTaskCompletion(completion)       case TaskSetFailed(taskSet, reason, exception) =>     dagScheduler.handleTaskSetFailed(taskSet, reason, exception)       case ResubmitFailedStages =>     dagScheduler.resubmitFailedStages() }   5.DAGScheduler   M-submitStage 和 M-getMissingParentStages 构成spark stage划分  划分过程中创建stage 是 M-getOrCreateShuffleMapStage 第一次会创建,第二次就是从map中取(也就是从内存中取)   把一个app 划分成多个stage 使用M-submitMissingTasks 提交过去   M-submitStage 划分过程 ResultStage 是最后一个stage , 假如ResultStage 依赖ShuffleMapStage B ShuffleMapStage B 依赖ShuffleMapStage A 会优先提交A,提交后把 B 和Result 放入 waitingStages   M-submitMissingTasks  根据不同的Stage  将rdd 和 func 或者 stage.shuffleDep 封装到 taskBinaryBytes 最后更具不同的partition id放入Task 中  存入taskset 中   等A 运行完之后,最后一行 submitWaitingChildStages(stage)   M-submitWaitingChildStages 根据当前的stage 从waitingStages 找出当前的stage 的子stage  然后再次提交到  submitStage   M-getMissingParentStages if (!mapStage.isAvailable)  则不为true 则不会再次提交 这个是获取mapOutputTrackerMaster 中  _numAvailableOutputs 数量是否和分区数相等。如果相等,则表示 该Stage 已经处理过   taskBinaryBytes = stage match {   case stage: ShuffleMapStage =>     JavaUtils.bufferToArray(       closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))   case stage: ResultStage =>     JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef)) }     taskBinary = sc.broadcast(taskBinaryBytes)     new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,   taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),   Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())   new ResultTask(stage.id, stage.latestInfo.attemptNumber,   taskBinary, part, locs, id, properties, serializedTaskMetrics,   Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,   stage.rdd.isBarrier())     private[scheduler] def handleJobSubmitted(jobId: Int,     finalRDD: RDD[_],     func: (TaskContext, Iterator[_]) => _,     partitions: Array[Int],     callSite: CallSite,     listener: JobListener,     properties: Properties) {   var finalStage: ResultStage = null   try {     // New stage creation may throw an exception if, for example, jobs are run on a     // HadoopRDD whose underlying HDFS files have been deleted.     finalStage =  createResultStage(finalRDD, func, partitions, jobId, callSite)   } catch {     case e: BarrierJobSlotsNumberCheckFailed =>       logWarning(s"The job $jobId requires to run a barrier stage that requires more slots " +         "than the total number of slots in the cluster currently.")       // If jobId doesn't exist in the map, Scala coverts its value null to 0: Int automatically.       val numCheckFailures = barrierJobIdToNumTasksCheckFailures.compute(jobId,         new BiFunction[Int, Int, Int] {           override def apply(key: Int, value: Int): Int = value + 1         })       if (numCheckFailures <= maxFailureNumTasksCheck) {         messageScheduler.schedule(           new Runnable {             override def run(): Unit = eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func,               partitions, callSite, listener, properties))           },           timeIntervalNumTasksCheck,           TimeUnit.SECONDS         )         return       } else {         // Job failed, clear internal data.         barrierJobIdToNumTasksCheckFailures.remove(jobId)         listener.jobFailed(e)         return       }         case e: Exception =>       logWarning("Creating new stage failed due to exception - job: " + jobId, e)       listener.jobFailed(e)       return   }   // Job submitted, clear internal data.   barrierJobIdToNumTasksCheckFailures.remove(jobId)       val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)   clearCacheLocs()   logInfo("Got job %s (%s) with %d output partitions".format(     job.jobId, callSite.shortForm, partitions.length))   logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")   logInfo("Parents of final stage: " + finalStage.parents)   logInfo("Missing parents: " + getMissingParentStages(finalStage))       val jobSubmissionTime = clock.getTimeMillis()   jobIdToActiveJob(jobId) = job   activeJobs += job   finalStage.setActiveJob(job)   val stageIds = jobIdToStageIds(jobId).toArray   val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))   listenerBus.post(     SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))   submitStage(finalStage) }         private def submitStage(stage: Stage) {   val jobId = activeJobForStage(stage)   if (jobId.isDefined) {     logDebug("submitStage(" + stage + ")")     if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {       val missing = getMissingParentStages(stage).sortBy(_.id)       logDebug("missing: " + missing)       if (missing.isEmpty) {         logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")         submitMissingTasks(stage, jobId.get)       } else {         for (parent <- missing) {           submitStage(parent)         }         waitingStages += stage       }     }   } else {     abortStage(stage, "No active job for stage " + stage.id, None)   } }     private def getMissingParentStages(stage: Stage): List[Stage] = {   val missing = new HashSet[Stage]   val visited = new HashSet[RDD[_]]   // We are manually maintaining a stack here to prevent StackOverflowError   // caused by recursively visiting   val waitingForVisit = new ArrayStack[RDD[_]]   def visit(rdd: RDD[_]) {     if (!visited(rdd)) {       visited += rdd       val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)       if (rddHasUncachedPartitions) {         for (dep <- rdd.dependencies) {           dep match {             case shufDep: ShuffleDependency[_, _, _] =>               val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)               if (!mapStage.isAvailable) {                 missing += mapStage               }             case narrowDep: NarrowDependency[_] =>               waitingForVisit.push(narrowDep.rdd)           }         }       }     }   }   waitingForVisit.push(stage.rdd)   while (waitingForVisit.nonEmpty) {     visit(waitingForVisit.pop())   }   missing.toList }       private def submitMissingTasks(stage: Stage, jobId: Int) {   logDebug("submitMissingTasks(" + stage + ")")       // First figure out the indexes of partition ids to compute.   val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()       // Use the scheduling pool, job group, description, etc. from an ActiveJob associated   // with this Stage   val properties = jobIdToActiveJob(jobId).properties       runningStages += stage   // SparkListenerStageSubmitted should be posted before testing whether tasks are   // serializable. If tasks are not serializable, a SparkListenerStageCompleted event   // will be posted, which should always come after a corresponding SparkListenerStageSubmitted   // event.   stage match {     case s: ShuffleMapStage =>       outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)     case s: ResultStage =>       outputCommitCoordinator.stageStart(         stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)   }   val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {     stage match {       case s: ShuffleMapStage =>         partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap       case s: ResultStage =>         partitionsToCompute.map { id =>           val p = s.partitions(id)           (id, getPreferredLocs(stage.rdd, p))         }.toMap     }   } catch {     case NonFatal(e) =>       stage.makeNewStageAttempt(partitionsToCompute.size)       listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))       abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))       runningStages -= stage       return   }       stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)       // If there are tasks to execute, record the submission time of the stage. Otherwise,   // post the even without the submission time, which indicates that this stage was   // skipped.   if (partitionsToCompute.nonEmpty) {     stage.latestInfo.submissionTime = Some(clock.getTimeMillis())   }   listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))       // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.   // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast   // the serialized copy of the RDD and for each task we will deserialize it, which means each   // task gets a different copy of the RDD. This provides stronger isolation between tasks that   // might modify state of objects referenced in their closures. This is necessary in Hadoop   // where the JobConf/Configuration object is not thread-safe.   var taskBinary: Broadcast[Array[Byte]] = null   var partitions: Array[Partition] = null   try {     // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).     // For ResultTask, serialize and broadcast (rdd, func).     var taskBinaryBytes: Array[Byte] = null     // taskBinaryBytes and partitions are both effected by the checkpoint status. We need     // this synchronization in case another concurrent job is checkpointing this RDD, so we get a     // consistent view of both variables.     RDDCheckpointData.synchronized {       taskBinaryBytes = stage match {         case stage: ShuffleMapStage =>           JavaUtils.bufferToArray(             closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))         case stage: ResultStage =>           JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))       }           partitions = stage.rdd.partitions     }         taskBinary = sc.broadcast(taskBinaryBytes)   } catch {     // In the case of a failure during serialization, abort the stage.     case e: NotSerializableException =>       abortStage(stage, "Task not serializable: " + e.toString, Some(e))       runningStages -= stage           // Abort execution       return     case NonFatal(e) =>       abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e))       runningStages -= stage       return   }       val tasks: Seq[Task[_]] = try {     val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()     stage match {       case stage: ShuffleMapStage =>         stage.pendingPartitions.clear()         partitionsToCompute.map { id =>           val locs = taskIdToLocations(id)           val part = partitions(id)           stage.pendingPartitions += id           new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,             taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),             Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())         }           case stage: ResultStage =>         partitionsToCompute.map { id =>           val p: Int = stage.partitions(id)           val part = partitions(p)           val locs = taskIdToLocations(id)           new ResultTask(stage.id, stage.latestInfo.attemptNumber,             taskBinary, part, locs, id, properties, serializedTaskMetrics,             Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,             stage.rdd.isBarrier())         }     }   } catch {     case NonFatal(e) =>       abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))       runningStages -= stage       return   }       if (tasks.size > 0) {     logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +       s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")     taskScheduler.submitTasks(new TaskSet(       tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))   } else {     // Because we posted SparkListenerStageSubmitted earlier, we should mark     // the stage as completed here in case there are no tasks to run     markStageAsFinished(stage, None)         stage match {       case stage: ShuffleMapStage =>         logDebug(s"Stage ${stage} is actually done; " +             s"(available: ${stage.isAvailable}," +             s"available outputs: ${stage.numAvailableOutputs}," +             s"partitions: ${stage.numPartitions})")         markMapStageJobsAsFinished(stage)       case stage : ResultStage =>         logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})")     }     submitWaitingChildStages(stage)   } }       private def submitWaitingChildStages(parent: Stage) {   logTrace(s"Checking if any dependencies of $parent are now runnable")   logTrace("running: " + runningStages)   logTrace("waiting: " + waitingStages)   logTrace("failed: " + failedStages)   val childStages = waitingStages.filter(_.parents.contains(parent)).toArray   waitingStages --= childStages   for (stage <- childStages.sortBy(_.firstJobId)) {     submitStage(stage)   } }   6.TaskScheduleImpl 这部实际是对taskset 进行封装成TaskSetManager 放入队列 override def submitTasks(taskSet: TaskSet) {   val tasks = taskSet.tasks   logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")   this.synchronized {     val manager = createTaskSetManager(taskSet, maxTaskFailures)     val stage = taskSet.stageId     val stageTaskSets =       taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])     stageTaskSets(taskSet.stageAttemptId) = manager     val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>       ts.taskSet != taskSet && !ts.isZombie     }     if (conflictingTaskSet) {       throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +         s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")     }     //这一步实际上把taskset放入调度队列中     schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)         if (!isLocal && !hasReceivedTask) {       starvationTimer.scheduleAtFixedRate(new TimerTask() {         override def run() {           if (!hasLaunchedTask) {             logWarning("Initial job has not accepted any resources; " +               "check your cluster UI to ensure that workers are registered " +               "and have sufficient resources")           } else {             this.cancel()           }         }       }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)     }     hasReceivedTask = true   }     //通知 StandaloneSchedulerBackend 进行通知,对任务队列中的task 进行分配executor    backend.reviveOffers() }     7.FIFOSchedulableBuilder //将TaskSetManager 放入调度队列中 override def addTaskSetManager(manager: Schedulable, properties: Properties) {   rootPool.addSchedulable(manager) }     8.CoarseGrainedSchedulerBackend 主要是对executor进行过滤,然后executor 和 task 分配 最后启动task,也就是向executor 发送launchtask 的消息  launchTask 其实发送的是TaskDescription,TaskDescription 包含了 task 和 executor 信息 TaskSetManager 生成的 TaskDescription   private def makeOffers() {   // Make sure no executor is killed while some task is launching on it   val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {     // Filter out executors under killing     val activeExecutors = executorDataMap.filterKeys(executorIsAlive)     val workOffers = activeExecutors.map {       case (id, executorData) =>         new WorkerOffer(id, executorData.executorHost, executorData.freeCores,           Some(executorData.executorAddress.hostPort))     }.toIndexedSeq     scheduler.resourceOffers(workOffers)   }   if (!taskDescs.isEmpty) {     launchTasks(taskDescs)   } }       def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {   // Mark each slave as alive and remember its hostname   // Also track if new executor is added   var newExecAvail = false   for (o <- offers) {     if (!hostToExecutors.contains(o.host)) {       hostToExecutors(o.host) = new HashSet[String]()     }     if (!executorIdToRunningTaskIds.contains(o.executorId)) {       hostToExecutors(o.host) += o.executorId       executorAdded(o.executorId, o.host)       executorIdToHost(o.executorId) = o.host       executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()       newExecAvail = true     }     for (rack <- getRackForHost(o.host)) {       hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host     }   }       // Before making any offers, remove any nodes from the blacklist whose blacklist has expired. Do   // this here to avoid a separate thread and added synchronization overhead, and also because   // updating the blacklist is only relevant when task offers are being made.   blacklistTrackerOpt.foreach(_.applyBlacklistTimeout())       val filteredOffers = blacklistTrackerOpt.map { blacklistTracker =>     offers.filter { offer =>       !blacklistTracker.isNodeBlacklisted(offer.host) &&         !blacklistTracker.isExecutorBlacklisted(offer.executorId)     }   }.getOrElse(offers)       val shuffledOffers = shuffleOffers(filteredOffers)   // Build a list of tasks to assign to each worker.   val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))   val availableCpus = shuffledOffers.map(o => o.cores).toArray   val availableSlots = shuffledOffers.map(o => o.cores / CPUS_PER_TASK).sum   val sortedTaskSets = rootPool.getSortedTaskSetQueue   for (taskSet <- sortedTaskSets) {     logDebug("parentName: %s, name: %s, runningTasks: %s".format(       taskSet.parent.name, taskSet.name, taskSet.runningTasks))     if (newExecAvail) {       taskSet.executorAdded()     }   }       // Take each TaskSet in our scheduling order, and then offer it each node in increasing order   // of locality levels so that it gets a chance to launch local tasks on all of them.   // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY   for (taskSet <- sortedTaskSets) {     // Skip the barrier taskSet if the available slots are less than the number of pending tasks.     if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {       // Skip the launch process.       // TODO SPARK-24819 If the job requires more slots than available (both busy and free       // slots), fail the job on submit.       logInfo(s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " +         s"because the barrier taskSet requires ${taskSet.numTasks} slots, while the total " +         s"number of available slots is $availableSlots.")     } else {       var launchedAnyTask = false       // Record all the executor IDs assigned barrier tasks on.       val addressesWithDescs = ArrayBuffer[(String, TaskDescription)]()       for (currentMaxLocality <- taskSet.myLocalityLevels) {         var launchedTaskAtCurrentMaxLocality = false         do {           launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet,             currentMaxLocality, shuffledOffers, availableCpus, tasks, addressesWithDescs)           launchedAnyTask |= launchedTaskAtCurrentMaxLocality         } while (launchedTaskAtCurrentMaxLocality)       }           if (!launchedAnyTask) {         taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors).foreach { taskIndex =>             // If the taskSet is unschedulable we try to find an existing idle blacklisted             // executor. If we cannot find one, we abort immediately. Else we kill the idle             // executor and kick off an abortTimer which if it doesn't schedule a task within the             // the timeout will abort the taskSet if we were unable to schedule any task from the             // taskSet.             // Note 1: We keep track of schedulability on a per taskSet basis rather than on a per             // task basis.             // Note 2: The taskSet can still be aborted when there are more than one idle             // blacklisted executors and dynamic allocation is on. This can happen when a killed             // idle executor isn't replaced in time by ExecutorAllocationManager as it relies on             // pending tasks and doesn't kill executors on idle timeouts, resulting in the abort             // timer to expire and abort the taskSet.             executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) match {               case Some ((executorId, _)) =>                 if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) {                   blacklistTrackerOpt.foreach(blt => blt.killBlacklistedIdleExecutor(executorId))                       val timeout = conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000                   unschedulableTaskSetToExpiryTime(taskSet) = clock.getTimeMillis() + timeout                   logInfo(s"Waiting for $timeout ms for completely "                     + s"blacklisted task to be schedulable again before aborting $taskSet.")                   abortTimer.schedule(                     createUnschedulableTaskSetAbortTimer(taskSet, taskIndex), timeout)                 }               case None => // Abort Immediately                 logInfo("Cannot schedule any task because of complete blacklisting. No idle" +                   s" executors can be found to kill. Aborting $taskSet." )                 taskSet.abortSinceCompletelyBlacklisted(taskIndex)             }         }       } else {         // We want to defer killing any taskSets as long as we have a non blacklisted executor         // which can be used to schedule a task from any active taskSets. This ensures that the         // job can make progress.         // Note: It is theoretically possible that a taskSet never gets scheduled on a         // non-blacklisted executor and the abort timer doesn't kick in because of a constant         // submission of new TaskSets. See the PR for more details.         if (unschedulableTaskSetToExpiryTime.nonEmpty) {           logInfo("Clearing the expiry times for all unschedulable taskSets as a task was " +             "recently scheduled.")           unschedulableTaskSetToExpiryTime.clear()         }       }           if (launchedAnyTask && taskSet.isBarrier) {         // Check whether the barrier tasks are partially launched.         // TODO SPARK-24818 handle the assert failure case (that can happen when some locality         // requirements are not fulfilled, and we should revert the launched tasks).         require(addressesWithDescs.size == taskSet.numTasks,           s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " +             s"because only ${addressesWithDescs.size} out of a total number of " +             s"${taskSet.numTasks} tasks got resource offers. The resource offers may have " +             "been blacklisted or cannot fulfill task locality requirements.")             // materialize the barrier coordinator.         maybeInitBarrierCoordinator()             // Update the taskInfos into all the barrier task properties.         val addressesStr = addressesWithDescs           // Addresses ordered by partitionId           .sortBy(_._2.partitionId)           .map(_._1)           .mkString(",")         addressesWithDescs.foreach(_._2.properties.setProperty("addresses", addressesStr))             logInfo(s"Successfully scheduled all the ${addressesWithDescs.size} tasks for barrier " +           s"stage ${taskSet.stageId}.")       }     }   }       // TODO SPARK-24823 Cancel a job that contains barrier stage(s) if the barrier tasks don't get   // launched within a configured time.   if (tasks.size > 0) {     hasLaunchedTask = true   }   return tasks }         private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {   for (task <- tasks.flatten) {     val serializedTask = TaskDescription.encode(task)     if (serializedTask.limit() >= maxRpcMessageSize) {       Option(scheduler.taskIdToTaskSetManager.get(task.taskId)).foreach { taskSetMgr =>         try {           var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +             "spark.rpc.message.maxSize (%d bytes). Consider increasing " +             "spark.rpc.message.maxSize or using broadcast variables for large values."           msg = msg.format(task.taskId, task.index, serializedTask.limit(), maxRpcMessageSize)           taskSetMgr.abort(msg)         } catch {           case e: Exception => logError("Exception in error callback", e)         }       }     }     else {       val executorData = executorDataMap(task.executorId)       executorData.freeCores -= scheduler.CPUS_PER_TASK           logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +         s"${executorData.executorHost}.")           executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))     }   } }              

标签:case,val,jobId,rdd,SubmitTask,Spark,partitions,stage
来源: https://www.cnblogs.com/chouc/p/11312380.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有