From d2f5b1a3d1355fab8c038ebd538047d43a53225b Mon Sep 17 00:00:00 2001 From: Xingbo Jiang Date: Fri, 13 Jul 2018 01:38:58 +0800 Subject: [PATCH 01/14] implement barrier execution mode. --- .../org/apache/spark/BarrierTaskContext.scala | 28 +++ .../apache/spark/BarrierTaskContextImpl.scala | 49 ++++++ .../org/apache/spark/BarrierTaskInfo.scala | 23 +++ .../org/apache/spark/MapOutputTracker.scala | 11 ++ .../apache/spark/rdd/MapPartitionsRDD.scala | 5 +- .../main/scala/org/apache/spark/rdd/RDD.scala | 15 ++ .../org/apache/spark/rdd/RDDBarrier.scala | 48 +++++ .../org/apache/spark/rdd/ShuffledRDD.scala | 2 + .../apache/spark/scheduler/ActiveJob.scala | 6 + .../apache/spark/scheduler/DAGScheduler.scala | 165 ++++++++++++++---- .../apache/spark/scheduler/ResultTask.scala | 9 +- .../spark/scheduler/ShuffleMapTask.scala | 7 +- .../org/apache/spark/scheduler/Stage.scala | 8 +- .../org/apache/spark/scheduler/Task.scala | 39 +++-- .../spark/scheduler/TaskDescription.scala | 7 +- .../spark/scheduler/TaskSchedulerImpl.scala | 65 +++++-- .../spark/scheduler/TaskSetManager.scala | 9 +- .../apache/spark/scheduler/WorkerOffer.scala | 6 +- .../CoarseGrainedSchedulerBackend.scala | 6 +- .../local/LocalSchedulerBackend.scala | 3 +- .../org/apache/spark/SparkContextSuite.scala | 42 +++++ .../apache/spark/executor/ExecutorSuite.scala | 1 + .../apache/spark/rdd/RDDBarrierSuite.scala | 43 +++++ .../spark/scheduler/DAGSchedulerSuite.scala | 58 ++++++ .../scheduler/TaskDescriptionSuite.scala | 2 + 25 files changed, 580 insertions(+), 77 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/BarrierTaskContext.scala create mode 100644 core/src/main/scala/org/apache/spark/BarrierTaskContextImpl.scala create mode 100644 core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala create mode 100644 core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala create mode 100644 core/src/test/scala/org/apache/spark/rdd/RDDBarrierSuite.scala diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala new file mode 100644 index 0000000000000..4d42ab322baf7 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +/** A [[TaskContext]] with extra info and tooling for a barrier stage. */ +trait BarrierTaskContext extends TaskContext { + + /** Sets a global barrier and waits until all tasks in this stage hit this barrier. */ + def barrier(): Unit + + /** Returns the all task infos in this barrier stage. */ + def getTaskInfos(): Array[BarrierTaskInfo] +} diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskContextImpl.scala b/core/src/main/scala/org/apache/spark/BarrierTaskContextImpl.scala new file mode 100644 index 0000000000000..6fe104aa40256 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/BarrierTaskContextImpl.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.Properties + +import org.apache.spark.executor.TaskMetrics +import org.apache.spark.memory.TaskMemoryManager +import org.apache.spark.metrics.MetricsSystem + +/** A [[BarrierTaskContext]] implementation. */ +class BarrierTaskContextImpl( + override val stageId: Int, + override val stageAttemptNumber: Int, + override val partitionId: Int, + override val taskAttemptId: Long, + override val attemptNumber: Int, + override val taskMemoryManager: TaskMemoryManager, + localProperties: Properties, + @transient private val metricsSystem: MetricsSystem, + // The default value is only used in tests. + override val taskMetrics: TaskMetrics = TaskMetrics.empty) + extends TaskContextImpl(stageId, stageAttemptNumber, partitionId, taskAttemptId, attemptNumber, + taskMemoryManager, localProperties, metricsSystem, taskMetrics) + with BarrierTaskContext { + + // TODO implement global barrier. + override def barrier(): Unit = {} + + override def getTaskInfos(): Array[BarrierTaskInfo] = { + val hostsStr = localProperties.getProperty("hosts", "") + hostsStr.trim().split(",").map(_.trim()).map(new BarrierTaskInfo(_)) + } +} diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala b/core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala new file mode 100644 index 0000000000000..88c99b101bd2c --- /dev/null +++ b/core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +/** + * Carries all task infos of a barrier task. + */ +private[spark] class BarrierTaskInfo(val host: String) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 73646051f264c..aeec2876a0f03 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -434,6 +434,17 @@ private[spark] class MapOutputTrackerMaster( } } + /** Unregister all map output information of the given shuffle. */ + def unregisterAllMapOutput(shuffleId: Int) { + shuffleStatuses.get(shuffleId) match { + case Some(shuffleStatus) => + shuffleStatus.removeOutputsByFilter(x => true) + incrementEpoch() + case None => + throw new SparkException("unregisterAllMapOutput called for nonexistent shuffle ID") + } + } + /** Unregister shuffle data */ def unregisterShuffle(shuffleId: Int) { shuffleStatuses.remove(shuffleId).foreach { shuffleStatus => diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala index e4587c96eae1c..e33c6cd9a3f54 100644 --- a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala @@ -27,7 +27,8 @@ import org.apache.spark.{Partition, TaskContext} private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( var prev: RDD[T], f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator) - preservesPartitioning: Boolean = false) + preservesPartitioning: Boolean = false, + isFromBarrier: Boolean = false) extends RDD[U](prev) { override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None @@ -41,4 +42,6 @@ private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( super.clearDependencies() prev = null } + + override def isBarrier(): Boolean = isFromBarrier || prev.isBarrier() } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 0574abdca32ac..b645e54eed94c 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1647,6 +1647,11 @@ abstract class RDD[T: ClassTag]( } } + /** + * Indicates that Spark must launch the tasks together for the current stage. + */ + def barrier(): RDDBarrier[T] = withScope(new RDDBarrier[T](this)) + // ======================================================================= // Other internal methods and fields // ======================================================================= @@ -1839,6 +1844,16 @@ abstract class RDD[T: ClassTag]( def toJavaRDD() : JavaRDD[T] = { new JavaRDD(this)(elementClassTag) } + + /** + * Whether the RDD is in a barrier stage. Spark must launch all the tasks at the same time for a + * barrier stage. + * + * An RDD is in a barrier stage, if at least one of its parent RDD(s), or itself, are mapped from + * a RDDBarrier. This function always returns false for a [[ShuffledRDD]], since a + * [[ShuffledRDD]] indicates start of a new stage. + */ + def isBarrier(): Boolean = dependencies.exists(_.rdd.isBarrier()) } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala b/core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala new file mode 100644 index 0000000000000..064ab222b5b67 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +import scala.reflect.ClassTag + +import org.apache.spark.BarrierTaskContext +import org.apache.spark.TaskContext + +/** Represents an RDD barrier, which forces Spark to launch tasks of this stage together. */ +class RDDBarrier[T: ClassTag](rdd: RDD[T]) { + + /** + * Maps partitions together with a provided [[BarrierTaskContext]]. + * + * `preservesPartitioning` indicates whether the input function preserves the partitioner, which + * should be `false` unless `rdd` is a pair RDD and the input function doesn't modify the keys. + * */ + def mapPartitions[S: ClassTag]( + f: (Iterator[T], BarrierTaskContext) => Iterator[S], + preservesPartitioning: Boolean = false): RDD[S] = rdd.withScope { + val cleanedF = rdd.sparkContext.clean(f) + new MapPartitionsRDD( + rdd, + (context: TaskContext, index: Int, iter: Iterator[T]) => + cleanedF(iter, context.asInstanceOf[BarrierTaskContext]), + preservesPartitioning, + isFromBarrier = true + ) + } + + /** TODO extra conf(e.g. timeout) */ +} diff --git a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala index 26eaa9aa3d03f..489fb8cb80312 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala @@ -110,4 +110,6 @@ class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag]( super.clearDependencies() prev = null } + + override def isBarrier(): Boolean = false } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala b/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala index 949e88f606275..7d1cf4dd3e709 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala @@ -60,4 +60,10 @@ private[spark] class ActiveJob( val finished = Array.fill[Boolean](numPartitions)(false) var numFinished = 0 + + // Mark all the partitions of the stage to be not finished. + def clearResult(): Unit = { + (0 until numPartitions).map(finished.update(_, false)) + numFinished = 0 + } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index f74425d73b392..e52d699603742 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1062,7 +1062,7 @@ class DAGScheduler( stage.pendingPartitions += id new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber, taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId), - Option(sc.applicationId), sc.applicationAttemptId) + Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier()) } case stage: ResultStage => @@ -1072,7 +1072,8 @@ class DAGScheduler( val locs = taskIdToLocations(id) new ResultTask(stage.id, stage.latestInfo.attemptNumber, taskBinary, part, locs, id, properties, serializedTaskMetrics, - Option(jobId), Option(sc.applicationId), sc.applicationAttemptId) + Option(jobId), Option(sc.applicationId), sc.applicationAttemptId, + stage.rdd.isBarrier()) } } } catch { @@ -1311,17 +1312,6 @@ class DAGScheduler( } } - case Resubmitted => - logInfo("Resubmitted " + task + ", so marking it as still running") - stage match { - case sms: ShuffleMapStage => - sms.pendingPartitions += task.partitionId - - case _ => - assert(false, "TaskSetManagers should only send Resubmitted task statuses for " + - "tasks in ShuffleMapStages.") - } - case FetchFailed(bmAddress, shuffleId, mapId, _, failureMessage) => val failedStage = stageIdToStage(task.stageId) val mapStage = shuffleIdToMapStage(shuffleId) @@ -1331,9 +1321,9 @@ class DAGScheduler( s" ${task.stageAttemptId} and there is a more recent attempt for that stage " + s"(attempt ${failedStage.latestInfo.attemptNumber}) running") } else { - failedStage.fetchFailedAttemptIds.add(task.stageAttemptId) + failedStage.failedAttemptIds.add(task.stageAttemptId) val shouldAbortStage = - failedStage.fetchFailedAttemptIds.size >= maxConsecutiveStageAttempts || + failedStage.failedAttemptIds.size >= maxConsecutiveStageAttempts || disallowStageRetryForTest // It is likely that we receive multiple FetchFailed for a single stage (because we have @@ -1349,6 +1339,48 @@ class DAGScheduler( s"longer running") } + if (mapStage.rdd.isBarrier()) { + // Mark all the map as broken in the map stage, to ensure retry all the tasks on + // resubmitted stage attempt. + mapOutputTracker.unregisterAllMapOutput(shuffleId) + } else if (mapId != -1) { + // Mark the map whose fetch failed as broken in the map stage + mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress) + } + + if (failedStage.rdd.isBarrier()) { + failedStage match { + case mapStage: ShuffleMapStage => + // Mark all the map as broken in the map stage, to ensure retry all the tasks on + // resubmitted stage attempt. + mapOutputTracker.unregisterAllMapOutput(mapStage.shuffleDep.shuffleId) + + case resultStage: ResultStage => + // Mark all the partitions of the result stage to be not finished, to ensure retry + // all the tasks on resubmitted stage attempt. + resultStage.activeJob.map(_.clearResult()) + } + } + + // TODO: mark the executor as failed only if there were lots of fetch failures on it + if (bmAddress != null) { + val hostToUnregisterOutputs = if (env.blockManager.externalShuffleServiceEnabled && + unRegisterOutputOnHostOnFetchFailure) { + // We had a fetch failure with the external shuffle service, so we + // assume all shuffle data on the node is bad. + Some(bmAddress.host) + } else { + // Unregister shuffle data just for one executor (we don't have any + // reason to believe shuffle data has been lost for the entire host). + None + } + removeExecutorAndUnregisterOutputs( + execId = bmAddress.executorId, + fileLost = true, + hostToUnregisterOutputs = hostToUnregisterOutputs, + maybeEpoch = Some(task.epoch)) + } + if (shouldAbortStage) { val abortMessage = if (disallowStageRetryForTest) { "Fetch failure will not retry stage due to testing config" @@ -1375,7 +1407,7 @@ class DAGScheduler( // simpler while not producing an overwhelming number of scheduler events. logInfo( s"Resubmitting $mapStage (${mapStage.name}) and " + - s"$failedStage (${failedStage.name}) due to fetch failure" + s"$failedStage (${failedStage.name}) due to fetch failure" ) messageScheduler.schedule( new Runnable { @@ -1386,29 +1418,90 @@ class DAGScheduler( ) } } - // Mark the map whose fetch failed as broken in the map stage - if (mapId != -1) { - mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress) - } + } - // TODO: mark the executor as failed only if there were lots of fetch failures on it - if (bmAddress != null) { - val hostToUnregisterOutputs = if (env.blockManager.externalShuffleServiceEnabled && - unRegisterOutputOnHostOnFetchFailure) { - // We had a fetch failure with the external shuffle service, so we - // assume all shuffle data on the node is bad. - Some(bmAddress.host) - } else { - // Unregister shuffle data just for one executor (we don't have any - // reason to believe shuffle data has been lost for the entire host). - None + case failure: TaskFailedReason if task.isBarrier => + // Also handle the task failed reasons here. + failure match { + case Resubmitted => + logInfo("Resubmitted " + task + ", so marking it as still running") + stage match { + case sms: ShuffleMapStage => + sms.pendingPartitions += task.partitionId + + case _ => + assert(false, "TaskSetManagers should only send Resubmitted task statuses for " + + "tasks in ShuffleMapStages.") } - removeExecutorAndUnregisterOutputs( - execId = bmAddress.executorId, - fileLost = true, - hostToUnregisterOutputs = hostToUnregisterOutputs, - maybeEpoch = Some(task.epoch)) + + case _ => // Do nothing. + } + + // Always fail the current stage and retry all the tasks when a barrier task fail. + val failedStage = stageIdToStage(task.stageId) + logInfo(s"Marking $failedStage (${failedStage.name}) as failed due to a barrier task " + + "failed.") + val message = "Stage failed because a barrier task finished unsuccessfully. " + + s"${failure.toErrorString}" + try { + // cancelTasks will fail if a SchedulerBackend does not implement killTask + taskScheduler.cancelTasks(stageId, interruptThread = false) + } catch { + case e: UnsupportedOperationException => + // Cannot continue with barrier stage if failed to cancel zombie barrier tasks. + logInfo(s"Could not cancel tasks for stage $stageId", e) + abortStage(failedStage, "Could not cancel zombie barrier tasks for stage " + + s"$failedStage (${failedStage.name})", Some(e)) + } + markStageAsFinished(failedStage, Some(message)) + + failedStage.failedAttemptIds.add(task.stageAttemptId) + val shouldAbortStage = + failedStage.failedAttemptIds.size >= maxConsecutiveStageAttempts || + disallowStageRetryForTest + + if (shouldAbortStage) { + val abortMessage = if (disallowStageRetryForTest) { + "Barrier stage will not retry stage due to testing config" + } else { + s"""$failedStage (${failedStage.name}) + |has failed the maximum allowable number of + |times: $maxConsecutiveStageAttempts. + |Most recent failure reason: $message + """.stripMargin.replaceAll("\n", " ") + } + abortStage(failedStage, abortMessage, None) + } else { + failedStage match { + case mapStage: ShuffleMapStage => + // Mark all the map as broken in the map stage, to ensure retry all the tasks on + // resubmitted stage attempt. + mapOutputTracker.unregisterAllMapOutput(mapStage.shuffleDep.shuffleId) + + case resultStage: ResultStage => + // Mark all the partitions of the result stage to be not finished, to ensure retry + // all the tasks on resubmitted stage attempt. + resultStage.activeJob.map(_.clearResult()) } + + // update failedStages and make sure a ResubmitFailedStages event is enqueued + failedStages += failedStage + logInfo(s"Resubmitting $failedStage (${failedStage.name}) due to barrier stage " + + "failure.") + messageScheduler.schedule(new Runnable { + override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) + }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS) + } + + case Resubmitted => + logInfo("Resubmitted " + task + ", so marking it as still running") + stage match { + case sms: ShuffleMapStage => + sms.pendingPartitions += task.partitionId + + case _ => + assert(false, "TaskSetManagers should only send Resubmitted task statuses for " + + "tasks in ShuffleMapStages.") } case _: TaskCommitDenied => diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala index e36c759a42556..aafeae05b566c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala @@ -48,7 +48,9 @@ import org.apache.spark.rdd.RDD * @param jobId id of the job this task belongs to * @param appId id of the app this task belongs to * @param appAttemptId attempt id of the app this task belongs to - */ + * @param isBarrier whether this task belongs to a barrier stage. Spark must launch all the tasks + * at the same time for a barrier stage. + */ private[spark] class ResultTask[T, U]( stageId: Int, stageAttemptId: Int, @@ -60,9 +62,10 @@ private[spark] class ResultTask[T, U]( serializedTaskMetrics: Array[Byte], jobId: Option[Int] = None, appId: Option[String] = None, - appAttemptId: Option[String] = None) + appAttemptId: Option[String] = None, + isBarrier: Boolean = false) extends Task[U](stageId, stageAttemptId, partition.index, localProperties, serializedTaskMetrics, - jobId, appId, appAttemptId) + jobId, appId, appAttemptId, isBarrier) with Serializable { @transient private[this] val preferredLocs: Seq[TaskLocation] = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index 7a25c47e2cab3..f2cd65fd523ab 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -49,6 +49,8 @@ import org.apache.spark.shuffle.ShuffleWriter * @param jobId id of the job this task belongs to * @param appId id of the app this task belongs to * @param appAttemptId attempt id of the app this task belongs to + * @param isBarrier whether this task belongs to a barrier stage. Spark must launch all the tasks + * at the same time for a barrier stage. */ private[spark] class ShuffleMapTask( stageId: Int, @@ -60,9 +62,10 @@ private[spark] class ShuffleMapTask( serializedTaskMetrics: Array[Byte], jobId: Option[Int] = None, appId: Option[String] = None, - appAttemptId: Option[String] = None) + appAttemptId: Option[String] = None, + isBarrier: Boolean = false) extends Task[MapStatus](stageId, stageAttemptId, partition.index, localProperties, - serializedTaskMetrics, jobId, appId, appAttemptId) + serializedTaskMetrics, jobId, appId, appAttemptId, isBarrier) with Logging { /** A constructor used only in test suites. This does not require passing in an RDD. */ diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index 290fd073caf27..26cca334d3bd5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -82,15 +82,15 @@ private[scheduler] abstract class Stage( private var _latestInfo: StageInfo = StageInfo.fromStage(this, nextAttemptId) /** - * Set of stage attempt IDs that have failed with a FetchFailure. We keep track of these - * failures in order to avoid endless retries if a stage keeps failing with a FetchFailure. + * Set of stage attempt IDs that have failed. We keep track of these failures in order to avoid + * endless retries if a stage keeps failing. * We keep track of each attempt ID that has failed to avoid recording duplicate failures if * multiple tasks from the same stage attempt fail (SPARK-5945). */ - val fetchFailedAttemptIds = new HashSet[Int] + val failedAttemptIds = new HashSet[Int] private[scheduler] def clearFailures() : Unit = { - fetchFailedAttemptIds.clear() + failedAttemptIds.clear() } /** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */ diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index f536fc2a5f0a1..155ffba4789a7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -49,6 +49,8 @@ import org.apache.spark.util._ * @param jobId id of the job this task belongs to * @param appId id of the app this task belongs to * @param appAttemptId attempt id of the app this task belongs to + * @param isBarrier whether this task belongs to a barrier stage. Spark must launch all the tasks + * at the same time for a barrier stage. */ private[spark] abstract class Task[T]( val stageId: Int, @@ -60,7 +62,8 @@ private[spark] abstract class Task[T]( SparkEnv.get.closureSerializer.newInstance().serialize(TaskMetrics.registered).array(), val jobId: Option[Int] = None, val appId: Option[String] = None, - val appAttemptId: Option[String] = None) extends Serializable { + val appAttemptId: Option[String] = None, + val isBarrier: Boolean = false) extends Serializable { @transient lazy val metrics: TaskMetrics = SparkEnv.get.closureSerializer.newInstance().deserialize(ByteBuffer.wrap(serializedTaskMetrics)) @@ -77,16 +80,30 @@ private[spark] abstract class Task[T]( attemptNumber: Int, metricsSystem: MetricsSystem): T = { SparkEnv.get.blockManager.registerTask(taskAttemptId) - context = new TaskContextImpl( - stageId, - stageAttemptId, // stageAttemptId and stageAttemptNumber are semantically equal - partitionId, - taskAttemptId, - attemptNumber, - taskMemoryManager, - localProperties, - metricsSystem, - metrics) + context = if (isBarrier) { + new BarrierTaskContextImpl( + stageId, + stageAttemptId, // stageAttemptId and stageAttemptNumber are semantically equal + partitionId, + taskAttemptId, + attemptNumber, + taskMemoryManager, + localProperties, + metricsSystem, + metrics) + } else { + new TaskContextImpl( + stageId, + stageAttemptId, // stageAttemptId and stageAttemptNumber are semantically equal + partitionId, + taskAttemptId, + attemptNumber, + taskMemoryManager, + localProperties, + metricsSystem, + metrics) + } + TaskContext.setTaskContext(context) taskThread = Thread.currentThread() diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala index c98b87148e404..bb4a4442b9433 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala @@ -50,6 +50,7 @@ private[spark] class TaskDescription( val executorId: String, val name: String, val index: Int, // Index within this task's TaskSet + val partitionId: Int, val addedFiles: Map[String, Long], val addedJars: Map[String, Long], val properties: Properties, @@ -76,6 +77,7 @@ private[spark] object TaskDescription { dataOut.writeUTF(taskDescription.executorId) dataOut.writeUTF(taskDescription.name) dataOut.writeInt(taskDescription.index) + dataOut.writeInt(taskDescription.partitionId) // Write files. serializeStringLongMap(taskDescription.addedFiles, dataOut) @@ -117,6 +119,7 @@ private[spark] object TaskDescription { val executorId = dataIn.readUTF() val name = dataIn.readUTF() val index = dataIn.readInt() + val partitionId = dataIn.readInt() // Read files. val taskFiles = deserializeStringLongMap(dataIn) @@ -138,7 +141,7 @@ private[spark] object TaskDescription { // Create a sub-buffer for the serialized task into its own buffer (to be deserialized later). val serializedTask = byteBuffer.slice() - new TaskDescription(taskId, attemptNumber, executorId, name, index, taskFiles, taskJars, - properties, serializedTask) + new TaskDescription(taskId, attemptNumber, executorId, name, index, partitionId, taskFiles, + taskJars, properties, serializedTask) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 56c0bf6c09351..8407bcac3faa5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -274,7 +274,9 @@ private[spark] class TaskSchedulerImpl( maxLocality: TaskLocality, shuffledOffers: Seq[WorkerOffer], availableCpus: Array[Int], - tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = { + tasks: IndexedSeq[ArrayBuffer[TaskDescription]], + hosts: ArrayBuffer[String], + taskDescs: ArrayBuffer[TaskDescription]) : Boolean = { var launchedTask = false // nodes and executors that are blacklisted for the entire application have already been // filtered out by this point @@ -291,6 +293,12 @@ private[spark] class TaskSchedulerImpl( executorIdToRunningTaskIds(execId).add(tid) availableCpus(i) -= CPUS_PER_TASK assert(availableCpus(i) >= 0) + // Only update hosts for a barrier task. + if (taskSet.isBarrier) { + // The executor address is expected to be non empty. + hosts += shuffledOffers(i).host + taskDescs += task + } launchedTask = true } } catch { @@ -346,6 +354,7 @@ private[spark] class TaskSchedulerImpl( // Build a list of tasks to assign to each worker. val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK)) val availableCpus = shuffledOffers.map(o => o.cores).toArray + val availableSlots = shuffledOffers.map(o => o.cores / CPUS_PER_TASK).sum val sortedTaskSets = rootPool.getSortedTaskSetQueue for (taskSet <- sortedTaskSets) { logDebug("parentName: %s, name: %s, runningTasks: %s".format( @@ -359,17 +368,49 @@ private[spark] class TaskSchedulerImpl( // of locality levels so that it gets a chance to launch local tasks on all of them. // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY for (taskSet <- sortedTaskSets) { - var launchedAnyTask = false - var launchedTaskAtCurrentMaxLocality = false - for (currentMaxLocality <- taskSet.myLocalityLevels) { - do { - launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet( - taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks) - launchedAnyTask |= launchedTaskAtCurrentMaxLocality - } while (launchedTaskAtCurrentMaxLocality) - } - if (!launchedAnyTask) { - taskSet.abortIfCompletelyBlacklisted(hostToExecutors) + // Skip the barrier taskSet if the available slots are less than the number of pending tasks. + if (taskSet.isBarrier && availableSlots < taskSet.numTasks) { + // Skip the launch process. + logInfo(s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " + + s"because the barrier taskSet requires ${taskSet.numTasks} slots, while the total " + + s"number of available slots is ${availableSlots}.") + } else { + var launchedAnyTask = false + var launchedTaskAtCurrentMaxLocality = false + // Record all the executor IDs assigned barrier tasks on. + val hosts = ArrayBuffer[String]() + val taskDescs = ArrayBuffer[TaskDescription]() + for (currentMaxLocality <- taskSet.myLocalityLevels) { + do { + launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet, + currentMaxLocality, shuffledOffers, availableCpus, tasks, hosts, taskDescs) + launchedAnyTask |= launchedTaskAtCurrentMaxLocality + } while (launchedTaskAtCurrentMaxLocality) + } + if (!launchedAnyTask) { + taskSet.abortIfCompletelyBlacklisted(hostToExecutors) + } + if (launchedAnyTask && taskSet.isBarrier) { + // Check whether the barrier tasks are partially launched. + // TODO handle the assert failure case (that can happen when some locality requirements + // are not fulfilled, and we should revert the launched tasks) + require(taskDescs.size == taskSet.numTasks, + s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " + + s"because only ${taskDescs.size} out of a total number of ${taskSet.numTasks} " + + "tasks got resource offers. The resource offers may have been blacklisted or " + + "cannot fulfill task locality requirements.") + + // Update the taskInfos into all the barrier task properties. + val hostsStr = hosts.zip(taskDescs) + // Addresses ordered by partitionId + .sortBy(_._2.partitionId) + .map(_._1) + .mkString(",") + taskDescs.foreach(_.properties.setProperty("hosts", hostsStr)) + + logInfo(s"Successfully scheduled all the ${taskDescs.size} tasks for barrier stage " + + s"${taskSet.stageId}.") + } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index defed1e0f9c6c..0b21256ab6cce 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -123,6 +123,10 @@ private[spark] class TaskSetManager( // TODO: We should kill any running task attempts when the task set manager becomes a zombie. private[scheduler] var isZombie = false + // Whether the taskSet run tasks from a barrier stage. Spark must launch all the tasks at the + // same time for a barrier stage. + private[scheduler] def isBarrier = taskSet.tasks.nonEmpty && taskSet.tasks(0).isBarrier + // Set of pending tasks for each executor. These collections are actually // treated as stacks, in which new tasks are added to the end of the // ArrayBuffer and removed from the end. This makes it faster to detect @@ -512,6 +516,7 @@ private[spark] class TaskSetManager( execId, taskName, index, + task.partitionId, addedFiles, addedJars, task.localProperties, @@ -979,8 +984,8 @@ private[spark] class TaskSetManager( */ override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = { // Can't speculate if we only have one task, and no need to speculate if the task set is a - // zombie. - if (isZombie || numTasks == 1) { + // zombie or is from a barrier stage. + if (isZombie || isBarrier || numTasks == 1) { return false } var foundTasks = false diff --git a/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala b/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala index 810b36cddf835..c65feb72a1d10 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala @@ -21,4 +21,8 @@ package org.apache.spark.scheduler * Represents free resources available on an executor. */ private[spark] -case class WorkerOffer(executorId: String, host: String, cores: Int) +case class WorkerOffer( + executorId: String, + host: String, + cores: Int, + address: Option[String] = None) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 9b90e309d2e04..375aeb0c34661 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -242,7 +242,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp val activeExecutors = executorDataMap.filterKeys(executorIsAlive) val workOffers = activeExecutors.map { case (id, executorData) => - new WorkerOffer(id, executorData.executorHost, executorData.freeCores) + new WorkerOffer(id, executorData.executorHost, executorData.freeCores, + Some(executorData.executorAddress.hostPort)) }.toIndexedSeq scheduler.resourceOffers(workOffers) } @@ -267,7 +268,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp if (executorIsAlive(executorId)) { val executorData = executorDataMap(executorId) val workOffers = IndexedSeq( - new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores)) + new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores, + Some(executorData.executorAddress.hostPort))) scheduler.resourceOffers(workOffers) } else { Seq.empty diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala index 4c614c5c0f602..cf8b0ff4f7019 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala @@ -81,7 +81,8 @@ private[spark] class LocalEndpoint( } def reviveOffers() { - val offers = IndexedSeq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores)) + val offers = IndexedSeq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores, + Some(rpcEnv.address.hostPort))) for (task <- scheduler.resourceOffers(offers).flatten) { freeCores -= scheduler.CPUS_PER_TASK executor.launchTask(executorBackend, task) diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index ce9f2be1c02dd..500b87c038b59 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -627,6 +627,48 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu assert(exc.getCause() != null) stream.close() } + + test("support barrier sync under local mode") { + val conf = new SparkConf().setAppName("test").setMaster("local[2]") + sc = new SparkContext(conf) + val rdd = sc.makeRDD(Seq(1, 2, 3, 4), 2) + val rdd2 = rdd.barrier().mapPartitions { (it, context) => + // If we don't get the expected taskInfos, the job shall abort due to stage failure. + if (context.getTaskInfos().length != 2) { + throw new SparkException("Expected taksInfos length is 2, actual length is " + + s"${context.getTaskInfos().length}.") + } + context.barrier() + it + } + rdd2.collect + + eventually(timeout(10.seconds)) { + assert(sc.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum == 0) + } + } + + test("support barrier sync under local-cluster mode") { + val conf = new SparkConf() + .setMaster("local-cluster[3, 1, 1024]") + .setAppName("test-cluster") + sc = new SparkContext(conf) + val rdd = sc.makeRDD(Seq(1, 2, 3, 4), 2) + val rdd2 = rdd.barrier().mapPartitions { (it, context) => + // If we don't get the expected taskInfos, the job shall abort due to stage failure. + if (context.getTaskInfos().length != 2) { + throw new SparkException("Expected taksInfos length is 2, actual length is " + + s"${context.getTaskInfos().length}.") + } + context.barrier() + it + } + rdd2.collect + + eventually(timeout(10.seconds)) { + assert(sc.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum == 0) + } + } } object SparkContextSuite { diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala index 1a7bebe2c53cd..77a7668d3a1d1 100644 --- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala @@ -275,6 +275,7 @@ class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSug executorId = "", name = "", index = 0, + partitionId = 0, addedFiles = Map[String, Long](), addedJars = Map[String, Long](), properties = new Properties, diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDBarrierSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDBarrierSuite.scala new file mode 100644 index 0000000000000..1489666a988cb --- /dev/null +++ b/core/src/test/scala/org/apache/spark/rdd/RDDBarrierSuite.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +import org.apache.spark.{SharedSparkContext, SparkFunSuite} + +class RDDBarrierSuite extends SparkFunSuite with SharedSparkContext { + + test("create an RDDBarrier") { + val rdd = sc.parallelize(1 to 10, 4) + assert(rdd.isBarrier() === false) + + val rdd2 = rdd.barrier().mapPartitions((iter, context) => iter) + assert(rdd2.isBarrier() === true) + } + + test("create an RDDBarrier in the middle of a chain of RDDs") { + val rdd = sc.parallelize(1 to 10, 4) + val rdd2 = rdd.barrier().mapPartitions((iter, context) => iter).map(x => (x, x + 1)) + assert(rdd2.isBarrier() === true) + } + + test("RDDBarrier with shuffle") { + val rdd = sc.parallelize(1 to 10, 4) + val rdd2 = rdd.barrier().mapPartitions((iter, context) => iter).repartition(2) + assert(rdd2.isBarrier() === false) + } +} diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 2987170bf5026..b3db5e29fb82e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -1055,6 +1055,64 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assert(sparkListener.failedStages.size == 1) } + test("Retry all the tasks on a resubmitted attempt of a barrier stage caused by FetchFailure") { + val shuffleMapRdd = new MyRDD(sc, 2, Nil).barrier().mapPartitions((it, context) => it) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) + val shuffleId = shuffleDep.shuffleId + val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker) + submit(reduceRdd, Array(0, 1)) + complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", reduceRdd.partitions.length)), + (Success, makeMapStatus("hostB", reduceRdd.partitions.length)))) + assert(mapOutputTracker.findMissingPartitions(shuffleId) === Some(Seq.empty)) + + // The first result task fails, with a fetch failure for the output from the first mapper. + runEvent(makeCompletionEvent( + taskSets(1).tasks(0), + FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), + null)) + assert(mapOutputTracker.findMissingPartitions(shuffleId) === Some(Seq(0, 1))) + + scheduler.resubmitFailedStages() + // Complete the map stage. + completeShuffleMapStageSuccessfully(0, 1, numShufflePartitions = 2) + + // Complete the result stage. + completeNextResultStageWithSuccess(1, 1) + + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + assertDataStructuresEmpty() + } + + test("Retry all the tasks on a resubmitted attempt of a barrier stage caused by TaskKilled") { + val shuffleMapRdd = new MyRDD(sc, 2, Nil).barrier().mapPartitions((it, context) => it) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) + val shuffleId = shuffleDep.shuffleId + val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker) + submit(reduceRdd, Array(0, 1)) + complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", reduceRdd.partitions.length)))) + assert(mapOutputTracker.findMissingPartitions(shuffleId) === Some(Seq(1))) + + // The second map task fails with TaskKilled. + runEvent(makeCompletionEvent( + taskSets(0).tasks(1), + TaskKilled("test"), + null)) + assert(sparkListener.failedStages === Seq(0)) + assert(mapOutputTracker.findMissingPartitions(shuffleId) === Some(Seq(0, 1))) + + scheduler.resubmitFailedStages() + // Complete the map stage. + completeShuffleMapStageSuccessfully(0, 1, numShufflePartitions = 2) + + // Complete the result stage. + completeNextResultStageWithSuccess(1, 0) + + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + assertDataStructuresEmpty() + } + /** * This tests the case where another FetchFailed comes in while the map stage is getting * re-run. diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala index 97487ce1d2ca8..ba62eec0522db 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala @@ -62,6 +62,7 @@ class TaskDescriptionSuite extends SparkFunSuite { executorId = "testExecutor", name = "task for test", index = 19, + partitionId = 1, originalFiles, originalJars, originalProperties, @@ -77,6 +78,7 @@ class TaskDescriptionSuite extends SparkFunSuite { assert(decodedTaskDescription.executorId === originalTaskDescription.executorId) assert(decodedTaskDescription.name === originalTaskDescription.name) assert(decodedTaskDescription.index === originalTaskDescription.index) + assert(decodedTaskDescription.partitionId === originalTaskDescription.partitionId) assert(decodedTaskDescription.addedFiles.equals(originalFiles)) assert(decodedTaskDescription.addedJars.equals(originalJars)) assert(decodedTaskDescription.properties.equals(originalTaskDescription.properties)) From 93ccf4f5a6d898cf97aba1c738dd559d65c3ad60 Mon Sep 17 00:00:00 2001 From: Xingbo Jiang Date: Fri, 13 Jul 2018 15:11:32 +0800 Subject: [PATCH 02/14] update --- .../cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala index 2d2f90c63a309..31f84310485a0 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala @@ -253,6 +253,7 @@ class MesosFineGrainedSchedulerBackendSuite executorId = "s1", name = "n1", index = 0, + partitionId = 0, addedFiles = mutable.Map.empty[String, Long], addedJars = mutable.Map.empty[String, Long], properties = new Properties(), @@ -361,6 +362,7 @@ class MesosFineGrainedSchedulerBackendSuite executorId = "s1", name = "n1", index = 0, + partitionId = 0, addedFiles = mutable.Map.empty[String, Long], addedJars = mutable.Map.empty[String, Long], properties = new Properties(), From 424bad2edfd3ce8087ddb09ac55f806d5ae1dd3e Mon Sep 17 00:00:00 2001 From: Xingbo Jiang Date: Mon, 16 Jul 2018 16:06:08 +0800 Subject: [PATCH 03/14] better log message. --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index e52d699603742..3800c546196bd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1441,15 +1441,16 @@ class DAGScheduler( val failedStage = stageIdToStage(task.stageId) logInfo(s"Marking $failedStage (${failedStage.name}) as failed due to a barrier task " + "failed.") - val message = "Stage failed because a barrier task finished unsuccessfully. " + - s"${failure.toErrorString}" + val taskAttemptId = task.asInstanceOf[BarrierTaskContextImpl].taskAttemptId + val message = s"Stage failed because barrier task ${taskAttemptId} finished " + + s"unsuccessfully. ${failure.toErrorString}" try { // cancelTasks will fail if a SchedulerBackend does not implement killTask taskScheduler.cancelTasks(stageId, interruptThread = false) } catch { case e: UnsupportedOperationException => // Cannot continue with barrier stage if failed to cancel zombie barrier tasks. - logInfo(s"Could not cancel tasks for stage $stageId", e) + logWarning(s"Could not cancel tasks for stage $stageId", e) abortStage(failedStage, "Could not cancel zombie barrier tasks for stage " + s"$failedStage (${failedStage.name})", Some(e)) } From 377d1c0099460145fe651ec05f54ac812592051b Mon Sep 17 00:00:00 2001 From: Xingbo Jiang Date: Wed, 18 Jul 2018 00:27:44 +0800 Subject: [PATCH 04/14] update --- .../org/apache/spark/BarrierTaskContext.scala | 10 ++++++++-- .../apache/spark/BarrierTaskContextImpl.scala | 8 ++++---- .../org/apache/spark/BarrierTaskInfo.scala | 2 +- .../org/apache/spark/MapOutputTracker.scala | 3 ++- .../main/scala/org/apache/spark/rdd/RDD.scala | 4 +++- .../apache/spark/scheduler/ActiveJob.scala | 2 +- .../apache/spark/scheduler/DAGScheduler.scala | 9 ++++----- .../spark/scheduler/TaskSchedulerImpl.scala | 20 +++++++++++-------- .../apache/spark/scheduler/WorkerOffer.scala | 2 ++ .../org/apache/spark/SparkContextSuite.scala | 4 ++-- 10 files changed, 39 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala index 4d42ab322baf7..778640472884f 100644 --- a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala +++ b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala @@ -20,9 +20,15 @@ package org.apache.spark /** A [[TaskContext]] with extra info and tooling for a barrier stage. */ trait BarrierTaskContext extends TaskContext { - /** Sets a global barrier and waits until all tasks in this stage hit this barrier. */ + /** + * Sets a global barrier and waits until all tasks in this stage hit this barrier. Similar to + * MPI_Barrier function in MPI, the barrier() function call blocks until all tasks in the same + * stage have reached this routine. + */ def barrier(): Unit - /** Returns the all task infos in this barrier stage. */ + /** + * Returns the all task infos in this barrier stage, the task infos are ordered by partitionId. + */ def getTaskInfos(): Array[BarrierTaskInfo] } diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskContextImpl.scala b/core/src/main/scala/org/apache/spark/BarrierTaskContextImpl.scala index 6fe104aa40256..8ac705757a382 100644 --- a/core/src/main/scala/org/apache/spark/BarrierTaskContextImpl.scala +++ b/core/src/main/scala/org/apache/spark/BarrierTaskContextImpl.scala @@ -24,7 +24,7 @@ import org.apache.spark.memory.TaskMemoryManager import org.apache.spark.metrics.MetricsSystem /** A [[BarrierTaskContext]] implementation. */ -class BarrierTaskContextImpl( +private[spark] class BarrierTaskContextImpl( override val stageId: Int, override val stageAttemptNumber: Int, override val partitionId: Int, @@ -39,11 +39,11 @@ class BarrierTaskContextImpl( taskMemoryManager, localProperties, metricsSystem, taskMetrics) with BarrierTaskContext { - // TODO implement global barrier. + // TODO SPARK-24817 implement global barrier. override def barrier(): Unit = {} override def getTaskInfos(): Array[BarrierTaskInfo] = { - val hostsStr = localProperties.getProperty("hosts", "") - hostsStr.trim().split(",").map(_.trim()).map(new BarrierTaskInfo(_)) + val addressesStr = localProperties.getProperty("addresses", "") + addressesStr.split(",").map(_.trim()).map(new BarrierTaskInfo(_)) } } diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala b/core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala index 88c99b101bd2c..d91f7b1441349 100644 --- a/core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala +++ b/core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala @@ -20,4 +20,4 @@ package org.apache.spark /** * Carries all task infos of a barrier task. */ -private[spark] class BarrierTaskInfo(val host: String) +class BarrierTaskInfo(val address: String) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index aeec2876a0f03..7649cefb95039 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -441,7 +441,8 @@ private[spark] class MapOutputTrackerMaster( shuffleStatus.removeOutputsByFilter(x => true) incrementEpoch() case None => - throw new SparkException("unregisterAllMapOutput called for nonexistent shuffle ID") + throw new SparkException( + s"unregisterAllMapOutput called for nonexistent shuffle ID ${shuffleId}.") } } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index b645e54eed94c..edbef65aab3b4 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1853,7 +1853,9 @@ abstract class RDD[T: ClassTag]( * a RDDBarrier. This function always returns false for a [[ShuffledRDD]], since a * [[ShuffledRDD]] indicates start of a new stage. */ - def isBarrier(): Boolean = dependencies.exists(_.rdd.isBarrier()) + def isBarrier(): Boolean = isBarrier_ + + @transient private lazy val isBarrier_ : Boolean = dependencies.exists(_.rdd.isBarrier()) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala b/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala index 7d1cf4dd3e709..0ea5f0eb457e5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala @@ -62,7 +62,7 @@ private[spark] class ActiveJob( var numFinished = 0 // Mark all the partitions of the stage to be not finished. - def clearResult(): Unit = { + def markAllPartitionsAsUnfinished(): Unit = { (0 until numPartitions).map(finished.update(_, false)) numFinished = 0 } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 3800c546196bd..82c1bb1b6d451 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1358,7 +1358,7 @@ class DAGScheduler( case resultStage: ResultStage => // Mark all the partitions of the result stage to be not finished, to ensure retry // all the tasks on resubmitted stage attempt. - resultStage.activeJob.map(_.clearResult()) + resultStage.activeJob.map(_.markAllPartitionsAsUnfinished()) } } @@ -1441,9 +1441,8 @@ class DAGScheduler( val failedStage = stageIdToStage(task.stageId) logInfo(s"Marking $failedStage (${failedStage.name}) as failed due to a barrier task " + "failed.") - val taskAttemptId = task.asInstanceOf[BarrierTaskContextImpl].taskAttemptId - val message = s"Stage failed because barrier task ${taskAttemptId} finished " + - s"unsuccessfully. ${failure.toErrorString}" + val message = s"Stage failed because barrier task $task finished unsuccessfully. " + + s"${failure.toErrorString}" try { // cancelTasks will fail if a SchedulerBackend does not implement killTask taskScheduler.cancelTasks(stageId, interruptThread = false) @@ -1482,7 +1481,7 @@ class DAGScheduler( case resultStage: ResultStage => // Mark all the partitions of the result stage to be not finished, to ensure retry // all the tasks on resubmitted stage attempt. - resultStage.activeJob.map(_.clearResult()) + resultStage.activeJob.map(_.markAllPartitionsAsUnfinished()) } // update failedStages and make sure a ResubmitFailedStages event is enqueued diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 8407bcac3faa5..15ec1456a283c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -275,7 +275,7 @@ private[spark] class TaskSchedulerImpl( shuffledOffers: Seq[WorkerOffer], availableCpus: Array[Int], tasks: IndexedSeq[ArrayBuffer[TaskDescription]], - hosts: ArrayBuffer[String], + addresses: ArrayBuffer[String], taskDescs: ArrayBuffer[TaskDescription]) : Boolean = { var launchedTask = false // nodes and executors that are blacklisted for the entire application have already been @@ -296,7 +296,7 @@ private[spark] class TaskSchedulerImpl( // Only update hosts for a barrier task. if (taskSet.isBarrier) { // The executor address is expected to be non empty. - hosts += shuffledOffers(i).host + addresses += shuffledOffers(i).address.get taskDescs += task } launchedTask = true @@ -371,6 +371,8 @@ private[spark] class TaskSchedulerImpl( // Skip the barrier taskSet if the available slots are less than the number of pending tasks. if (taskSet.isBarrier && availableSlots < taskSet.numTasks) { // Skip the launch process. + // TODO SPARK-24819 If the job requires more slots than available (both busy and free + // slots), fail the job on submit. logInfo(s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " + s"because the barrier taskSet requires ${taskSet.numTasks} slots, while the total " + s"number of available slots is ${availableSlots}.") @@ -378,12 +380,12 @@ private[spark] class TaskSchedulerImpl( var launchedAnyTask = false var launchedTaskAtCurrentMaxLocality = false // Record all the executor IDs assigned barrier tasks on. - val hosts = ArrayBuffer[String]() + val addresses = ArrayBuffer[String]() val taskDescs = ArrayBuffer[TaskDescription]() for (currentMaxLocality <- taskSet.myLocalityLevels) { do { launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet, - currentMaxLocality, shuffledOffers, availableCpus, tasks, hosts, taskDescs) + currentMaxLocality, shuffledOffers, availableCpus, tasks, addresses, taskDescs) launchedAnyTask |= launchedTaskAtCurrentMaxLocality } while (launchedTaskAtCurrentMaxLocality) } @@ -392,8 +394,8 @@ private[spark] class TaskSchedulerImpl( } if (launchedAnyTask && taskSet.isBarrier) { // Check whether the barrier tasks are partially launched. - // TODO handle the assert failure case (that can happen when some locality requirements - // are not fulfilled, and we should revert the launched tasks) + // TODO SPARK-24818 handle the assert failure case (that can happen when some locality + // requirements are not fulfilled, and we should revert the launched tasks). require(taskDescs.size == taskSet.numTasks, s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " + s"because only ${taskDescs.size} out of a total number of ${taskSet.numTasks} " + @@ -401,12 +403,12 @@ private[spark] class TaskSchedulerImpl( "cannot fulfill task locality requirements.") // Update the taskInfos into all the barrier task properties. - val hostsStr = hosts.zip(taskDescs) + val addressesStr = addresses.zip(taskDescs) // Addresses ordered by partitionId .sortBy(_._2.partitionId) .map(_._1) .mkString(",") - taskDescs.foreach(_.properties.setProperty("hosts", hostsStr)) + taskDescs.foreach(_.properties.setProperty("addresses", addressesStr)) logInfo(s"Successfully scheduled all the ${taskDescs.size} tasks for barrier stage " + s"${taskSet.stageId}.") @@ -414,6 +416,8 @@ private[spark] class TaskSchedulerImpl( } } + // TODO SPARK-24823 Cancel a job that contains barrier stage(s) if the barrier tasks don't get + // launched within a configured time. if (tasks.size > 0) { hasLaunchedTask = true } diff --git a/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala b/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala index c65feb72a1d10..6ec74913e42f2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala @@ -25,4 +25,6 @@ case class WorkerOffer( executorId: String, host: String, cores: Int, + // `address` is an optional hostPort string, it provide more useful information than `host` + // when multiple executors are launched on the same host. address: Option[String] = None) diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 500b87c038b59..a5d7eef1e277a 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -641,7 +641,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu context.barrier() it } - rdd2.collect + rdd2.collect() eventually(timeout(10.seconds)) { assert(sc.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum == 0) @@ -663,7 +663,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu context.barrier() it } - rdd2.collect + rdd2.collect() eventually(timeout(10.seconds)) { assert(sc.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum == 0) From ed89af8bdb4cfdf0c4384ec6beb84f5665be3c95 Mon Sep 17 00:00:00 2001 From: Xingbo Jiang Date: Thu, 19 Jul 2018 21:51:22 +0800 Subject: [PATCH 05/14] update and add test cases. --- .../org/apache/spark/BarrierTaskInfo.scala | 1 + .../main/scala/org/apache/spark/rdd/RDD.scala | 4 ++- .../apache/spark/scheduler/ActiveJob.scala | 4 +-- .../apache/spark/scheduler/DAGScheduler.scala | 24 ++++++------- .../org/apache/spark/SparkContextSuite.scala | 4 +-- .../org/apache/spark/scheduler/FakeTask.scala | 24 +++++++++++-- .../scheduler/TaskSchedulerImplSuite.scala | 34 +++++++++++++++++++ 7 files changed, 76 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala b/core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala index d91f7b1441349..36f94db780dac 100644 --- a/core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala +++ b/core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala @@ -19,5 +19,6 @@ package org.apache.spark /** * Carries all task infos of a barrier task. + * @param address the IPv4 address of the executor that a barrier task is running on */ class BarrierTaskInfo(val address: String) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index edbef65aab3b4..c7068dc16edfb 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1853,8 +1853,10 @@ abstract class RDD[T: ClassTag]( * a RDDBarrier. This function always returns false for a [[ShuffledRDD]], since a * [[ShuffledRDD]] indicates start of a new stage. */ - def isBarrier(): Boolean = isBarrier_ + private[spark] def isBarrier(): Boolean = isBarrier_ + // From performance concern, cache the value to avoid repeatedly compute `isBarrier()` on a long + // RDD chain. @transient private lazy val isBarrier_ : Boolean = dependencies.exists(_.rdd.isBarrier()) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala b/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala index 0ea5f0eb457e5..b3bbfb1683c5c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala @@ -62,8 +62,8 @@ private[spark] class ActiveJob( var numFinished = 0 // Mark all the partitions of the stage to be not finished. - def markAllPartitionsAsUnfinished(): Unit = { - (0 until numPartitions).map(finished.update(_, false)) + def resetAllPartitions(): Unit = { + (0 until numPartitions).foreach(finished.update(_, false)) numFinished = 0 } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 82c1bb1b6d451..6928eb1cfe8da 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1350,15 +1350,15 @@ class DAGScheduler( if (failedStage.rdd.isBarrier()) { failedStage match { - case mapStage: ShuffleMapStage => + case failedMapStage: ShuffleMapStage => // Mark all the map as broken in the map stage, to ensure retry all the tasks on // resubmitted stage attempt. - mapOutputTracker.unregisterAllMapOutput(mapStage.shuffleDep.shuffleId) + mapOutputTracker.unregisterAllMapOutput(failedMapStage.shuffleDep.shuffleId) - case resultStage: ResultStage => + case failedResultStage: ResultStage => // Mark all the partitions of the result stage to be not finished, to ensure retry // all the tasks on resubmitted stage attempt. - resultStage.activeJob.map(_.markAllPartitionsAsUnfinished()) + failedResultStage.activeJob.map(_.resetAllPartitions()) } } @@ -1430,8 +1430,8 @@ class DAGScheduler( sms.pendingPartitions += task.partitionId case _ => - assert(false, "TaskSetManagers should only send Resubmitted task statuses for " + - "tasks in ShuffleMapStages.") + throw new SparkException("TaskSetManagers should only send Resubmitted task " + + "statuses for tasks in ShuffleMapStages.") } case _ => // Do nothing. @@ -1473,15 +1473,15 @@ class DAGScheduler( abortStage(failedStage, abortMessage, None) } else { failedStage match { - case mapStage: ShuffleMapStage => + case failedMapStage: ShuffleMapStage => // Mark all the map as broken in the map stage, to ensure retry all the tasks on // resubmitted stage attempt. - mapOutputTracker.unregisterAllMapOutput(mapStage.shuffleDep.shuffleId) + mapOutputTracker.unregisterAllMapOutput(failedMapStage.shuffleDep.shuffleId) - case resultStage: ResultStage => + case failedResultStage: ResultStage => // Mark all the partitions of the result stage to be not finished, to ensure retry // all the tasks on resubmitted stage attempt. - resultStage.activeJob.map(_.markAllPartitionsAsUnfinished()) + failedResultStage.activeJob.map(_.resetAllPartitions()) } // update failedStages and make sure a ResubmitFailedStages event is enqueued @@ -1500,8 +1500,8 @@ class DAGScheduler( sms.pendingPartitions += task.partitionId case _ => - assert(false, "TaskSetManagers should only send Resubmitted task statuses for " + - "tasks in ShuffleMapStages.") + throw new SparkException("TaskSetManagers should only send Resubmitted task " + + "statuses for tasks in ShuffleMapStages.") } case _: TaskCommitDenied => diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index a5d7eef1e277a..e5f31a04c6e7e 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -628,7 +628,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu stream.close() } - test("support barrier sync under local mode") { + test("support barrier execution mode under local mode") { val conf = new SparkConf().setAppName("test").setMaster("local[2]") sc = new SparkContext(conf) val rdd = sc.makeRDD(Seq(1, 2, 3, 4), 2) @@ -648,7 +648,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu } } - test("support barrier sync under local-cluster mode") { + test("support barrier execution mode under local-cluster mode") { val conf = new SparkConf() .setMaster("local-cluster[3, 1, 1024]") .setAppName("test-cluster") diff --git a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala index 109d4a0a870b8..b29d32f7b35c5 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala @@ -27,8 +27,10 @@ class FakeTask( partitionId: Int, prefLocs: Seq[TaskLocation] = Nil, serializedTaskMetrics: Array[Byte] = - SparkEnv.get.closureSerializer.newInstance().serialize(TaskMetrics.registered).array()) - extends Task[Int](stageId, 0, partitionId, new Properties, serializedTaskMetrics) { + SparkEnv.get.closureSerializer.newInstance().serialize(TaskMetrics.registered).array(), + isBarrier: Boolean = false) + extends Task[Int](stageId, 0, partitionId, new Properties, serializedTaskMetrics, + isBarrier = isBarrier) { override def runTask(context: TaskContext): Int = 0 override def preferredLocations: Seq[TaskLocation] = prefLocs @@ -74,4 +76,22 @@ object FakeTask { } new TaskSet(tasks, stageId, stageAttemptId, priority = 0, null) } + + def createBarrierTaskSet(numTasks: Int, prefLocs: Seq[TaskLocation]*): TaskSet = { + createBarrierTaskSet(numTasks, stageId = 0, stageAttempId = 0, prefLocs: _*) + } + + def createBarrierTaskSet( + numTasks: Int, + stageId: Int, + stageAttempId: Int, + prefLocs: Seq[TaskLocation]*): TaskSet = { + if (prefLocs.size != 0 && prefLocs.size != numTasks) { + throw new IllegalArgumentException("Wrong number of task locations") + } + val tasks = Array.tabulate[Task[_]](numTasks) { i => + new FakeTask(stageId, i, if (prefLocs.size != 0) prefLocs(i) else Nil, isBarrier = true) + } + new TaskSet(tasks, stageId, stageAttempId, priority = 0, null) + } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 33f2ea1c94e75..624384abcd71d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -1021,4 +1021,38 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B verify(blacklist).updateBlacklistForSuccessfulTaskSet(meq(0), meq(stageAttempt), anyObject()) } } + + test("don't schedule for a barrier taskSet if available slots are less than pending tasks") { + val taskCpus = 2 + val taskScheduler = setupScheduler("spark.task.cpus" -> taskCpus.toString) + + val numFreeCores = 3 + val workerOffers = IndexedSeq( + new WorkerOffer("executor0", "host0", numFreeCores, Some("192.168.0.101:49625")), + new WorkerOffer("executor1", "host1", numFreeCores, Some("192.168.0.101:49627"))) + val attempt1 = FakeTask.createBarrierTaskSet(3) + + // submit attempt 1, offer some resources, since the available slots are less than pending + // tasks, don't schedule barrier tasks on the resource offer. + taskScheduler.submitTasks(attempt1) + val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten + assert(0 === taskDescriptions.length) + } + + test("schedule tasks for a barrier taskSet if all tasks can be launched together") { + val taskCpus = 2 + val taskScheduler = setupScheduler("spark.task.cpus" -> taskCpus.toString) + + val numFreeCores = 3 + val workerOffers = IndexedSeq( + new WorkerOffer("executor0", "host0", numFreeCores, Some("192.168.0.101:49625")), + new WorkerOffer("executor1", "host1", numFreeCores, Some("192.168.0.101:49627")), + new WorkerOffer("executor2", "host2", numFreeCores, Some("192.168.0.101:49629"))) + val attempt1 = FakeTask.createBarrierTaskSet(3) + + // submit attempt 1, offer some resources, all tasks get launched together + taskScheduler.submitTasks(attempt1) + val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten + assert(3 === taskDescriptions.length) + } } From 51bbb0f3849666c139716ea24645f30503645ced Mon Sep 17 00:00:00 2001 From: Xingbo Jiang Date: Fri, 20 Jul 2018 00:22:29 +0800 Subject: [PATCH 06/14] mark new apis as developerApi or experimental --- .../main/scala/org/apache/spark/BarrierTaskContext.scala | 6 ++++++ core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala | 4 ++++ core/src/main/scala/org/apache/spark/rdd/RDD.scala | 4 +++- core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala | 5 ++++- 4 files changed, 17 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala index 778640472884f..2a7f40bb2d481 100644 --- a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala +++ b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala @@ -17,6 +17,8 @@ package org.apache.spark +import org.apache.spark.annotation.{Experimental, Since} + /** A [[TaskContext]] with extra info and tooling for a barrier stage. */ trait BarrierTaskContext extends TaskContext { @@ -25,10 +27,14 @@ trait BarrierTaskContext extends TaskContext { * MPI_Barrier function in MPI, the barrier() function call blocks until all tasks in the same * stage have reached this routine. */ + @Experimental + @Since("2.4.0") def barrier(): Unit /** * Returns the all task infos in this barrier stage, the task infos are ordered by partitionId. */ + @Experimental + @Since("2.4.0") def getTaskInfos(): Array[BarrierTaskInfo] } diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala b/core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala index 36f94db780dac..af4a07c562f0c 100644 --- a/core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala +++ b/core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala @@ -17,8 +17,12 @@ package org.apache.spark +import org.apache.spark.annotation.DeveloperApi + /** * Carries all task infos of a barrier task. + * * @param address the IPv4 address of the executor that a barrier task is running on */ +@DeveloperApi class BarrierTaskInfo(val address: String) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index c7068dc16edfb..2c99884b36509 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -33,7 +33,7 @@ import org.apache.hadoop.mapred.TextOutputFormat import org.apache.spark._ import org.apache.spark.Partitioner._ -import org.apache.spark.annotation.{DeveloperApi, Since} +import org.apache.spark.annotation.{DeveloperApi, Experimental, Since} import org.apache.spark.api.java.JavaRDD import org.apache.spark.internal.Logging import org.apache.spark.partial.BoundedDouble @@ -1650,6 +1650,8 @@ abstract class RDD[T: ClassTag]( /** * Indicates that Spark must launch the tasks together for the current stage. */ + @Experimental + @Since("2.4.0") def barrier(): RDDBarrier[T] = withScope(new RDDBarrier[T](this)) // ======================================================================= diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala b/core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala index 064ab222b5b67..0f21965cee517 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala @@ -21,6 +21,7 @@ import scala.reflect.ClassTag import org.apache.spark.BarrierTaskContext import org.apache.spark.TaskContext +import org.apache.spark.annotation.{Experimental, Since} /** Represents an RDD barrier, which forces Spark to launch tasks of this stage together. */ class RDDBarrier[T: ClassTag](rdd: RDD[T]) { @@ -30,7 +31,9 @@ class RDDBarrier[T: ClassTag](rdd: RDD[T]) { * * `preservesPartitioning` indicates whether the input function preserves the partitioner, which * should be `false` unless `rdd` is a pair RDD and the input function doesn't modify the keys. - * */ + */ + @Experimental + @Since("2.4.0") def mapPartitions[S: ClassTag]( f: (Iterator[T], BarrierTaskContext) => Iterator[S], preservesPartitioning: Boolean = false): RDD[S] = rdd.withScope { From 8c2306b2a24329c0734cf809d215c951cc312a79 Mon Sep 17 00:00:00 2001 From: Xingbo Jiang Date: Sat, 21 Jul 2018 01:02:07 +0800 Subject: [PATCH 07/14] update --- .../org/apache/spark/BarrierTaskContext.scala | 2 + .../org/apache/spark/BarrierTaskInfo.scala | 9 ++- .../main/scala/org/apache/spark/rdd/RDD.scala | 1 + .../org/apache/spark/rdd/RDDBarrier.scala | 1 + .../apache/spark/scheduler/DAGScheduler.scala | 73 +++++++++---------- .../org/apache/spark/scheduler/Task.scala | 2 + .../spark/scheduler/TaskSchedulerImpl.scala | 27 +++---- 7 files changed, 60 insertions(+), 55 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala index 2a7f40bb2d481..4c358629dee96 100644 --- a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala +++ b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala @@ -23,6 +23,7 @@ import org.apache.spark.annotation.{Experimental, Since} trait BarrierTaskContext extends TaskContext { /** + * :: Experimental :: * Sets a global barrier and waits until all tasks in this stage hit this barrier. Similar to * MPI_Barrier function in MPI, the barrier() function call blocks until all tasks in the same * stage have reached this routine. @@ -32,6 +33,7 @@ trait BarrierTaskContext extends TaskContext { def barrier(): Unit /** + * :: Experimental :: * Returns the all task infos in this barrier stage, the task infos are ordered by partitionId. */ @Experimental diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala b/core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala index af4a07c562f0c..ce2653df2e845 100644 --- a/core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala +++ b/core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala @@ -17,12 +17,15 @@ package org.apache.spark -import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.annotation.{Experimental, Since} + /** + * :: Experimental :: * Carries all task infos of a barrier task. * - * @param address the IPv4 address of the executor that a barrier task is running on + * @param address the IPv4 address(host:port) of the executor that a barrier task is running on */ -@DeveloperApi +@Experimental +@Since("2.4.0") class BarrierTaskInfo(val address: String) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 2c99884b36509..4e5b035c88c02 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1648,6 +1648,7 @@ abstract class RDD[T: ClassTag]( } /** + * :: Experimental :: * Indicates that Spark must launch the tasks together for the current stage. */ @Experimental diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala b/core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala index 0f21965cee517..8d91e4898f27c 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala @@ -27,6 +27,7 @@ import org.apache.spark.annotation.{Experimental, Since} class RDDBarrier[T: ClassTag](rdd: RDD[T]) { /** + * :: Experimental :: * Maps partitions together with a provided [[BarrierTaskContext]]. * * `preservesPartitioning` indicates whether the input function preserves the partitioner, which diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 6928eb1cfe8da..3481e5e97f5d6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1362,25 +1362,6 @@ class DAGScheduler( } } - // TODO: mark the executor as failed only if there were lots of fetch failures on it - if (bmAddress != null) { - val hostToUnregisterOutputs = if (env.blockManager.externalShuffleServiceEnabled && - unRegisterOutputOnHostOnFetchFailure) { - // We had a fetch failure with the external shuffle service, so we - // assume all shuffle data on the node is bad. - Some(bmAddress.host) - } else { - // Unregister shuffle data just for one executor (we don't have any - // reason to believe shuffle data has been lost for the entire host). - None - } - removeExecutorAndUnregisterOutputs( - execId = bmAddress.executorId, - fileLost = true, - hostToUnregisterOutputs = hostToUnregisterOutputs, - maybeEpoch = Some(task.epoch)) - } - if (shouldAbortStage) { val abortMessage = if (disallowStageRetryForTest) { "Fetch failure will not retry stage due to testing config" @@ -1418,21 +1399,32 @@ class DAGScheduler( ) } } + + // TODO: mark the executor as failed only if there were lots of fetch failures on it + if (bmAddress != null) { + val hostToUnregisterOutputs = if (env.blockManager.externalShuffleServiceEnabled && + unRegisterOutputOnHostOnFetchFailure) { + // We had a fetch failure with the external shuffle service, so we + // assume all shuffle data on the node is bad. + Some(bmAddress.host) + } else { + // Unregister shuffle data just for one executor (we don't have any + // reason to believe shuffle data has been lost for the entire host). + None + } + removeExecutorAndUnregisterOutputs( + execId = bmAddress.executorId, + fileLost = true, + hostToUnregisterOutputs = hostToUnregisterOutputs, + maybeEpoch = Some(task.epoch)) + } } case failure: TaskFailedReason if task.isBarrier => // Also handle the task failed reasons here. failure match { case Resubmitted => - logInfo("Resubmitted " + task + ", so marking it as still running") - stage match { - case sms: ShuffleMapStage => - sms.pendingPartitions += task.partitionId - - case _ => - throw new SparkException("TaskSetManagers should only send Resubmitted task " + - "statuses for tasks in ShuffleMapStages.") - } + handleResubmittedFailure(task, stage) case _ => // Do nothing. } @@ -1449,6 +1441,7 @@ class DAGScheduler( } catch { case e: UnsupportedOperationException => // Cannot continue with barrier stage if failed to cancel zombie barrier tasks. + // TODO SPARK-24877 leave the zombie tasks and ignore their completion events. logWarning(s"Could not cancel tasks for stage $stageId", e) abortStage(failedStage, "Could not cancel zombie barrier tasks for stage " + s"$failedStage (${failedStage.name})", Some(e)) @@ -1456,6 +1449,8 @@ class DAGScheduler( markStageAsFinished(failedStage, Some(message)) failedStage.failedAttemptIds.add(task.stageAttemptId) + // TODO Refactor the failure handling logic to combine similar code with that of + // FetchFailed. val shouldAbortStage = failedStage.failedAttemptIds.size >= maxConsecutiveStageAttempts || disallowStageRetryForTest @@ -1494,15 +1489,7 @@ class DAGScheduler( } case Resubmitted => - logInfo("Resubmitted " + task + ", so marking it as still running") - stage match { - case sms: ShuffleMapStage => - sms.pendingPartitions += task.partitionId - - case _ => - throw new SparkException("TaskSetManagers should only send Resubmitted task " + - "statuses for tasks in ShuffleMapStages.") - } + handleResubmittedFailure(task, stage) case _: TaskCommitDenied => // Do nothing here, left up to the TaskScheduler to decide how to handle denied commits @@ -1519,6 +1506,18 @@ class DAGScheduler( } } + private def handleResubmittedFailure(task: Task[_], stage: Stage): Unit = { + logInfo("Resubmitted " + task + ", so marking it as still running") + stage match { + case sms: ShuffleMapStage => + sms.pendingPartitions += task.partitionId + + case _ => + throw new SparkException("TaskSetManagers should only send Resubmitted task " + + "statuses for tasks in ShuffleMapStages.") + } + } + private[scheduler] def markMapStageJobsAsFinished(shuffleStage: ShuffleMapStage): Unit = { // Mark any map-stage jobs waiting on this stage as finished if (shuffleStage.isAvailable && shuffleStage.mapStageJobs.nonEmpty) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 155ffba4789a7..89ff2038e5f8a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -80,6 +80,8 @@ private[spark] abstract class Task[T]( attemptNumber: Int, metricsSystem: MetricsSystem): T = { SparkEnv.get.blockManager.registerTask(taskAttemptId) + // TODO SPARK-24874 Allow create BarrierTaskContext based on partitions, instead of whether + // the stage is barrier. context = if (isBarrier) { new BarrierTaskContextImpl( stageId, diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 15ec1456a283c..853523959a270 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -275,8 +275,7 @@ private[spark] class TaskSchedulerImpl( shuffledOffers: Seq[WorkerOffer], availableCpus: Array[Int], tasks: IndexedSeq[ArrayBuffer[TaskDescription]], - addresses: ArrayBuffer[String], - taskDescs: ArrayBuffer[TaskDescription]) : Boolean = { + addressesWithDescs: ArrayBuffer[(String, TaskDescription)]) : Boolean = { var launchedTask = false // nodes and executors that are blacklisted for the entire application have already been // filtered out by this point @@ -296,8 +295,7 @@ private[spark] class TaskSchedulerImpl( // Only update hosts for a barrier task. if (taskSet.isBarrier) { // The executor address is expected to be non empty. - addresses += shuffledOffers(i).address.get - taskDescs += task + addressesWithDescs += Tuple2(shuffledOffers(i).address.get, task) } launchedTask = true } @@ -380,12 +378,11 @@ private[spark] class TaskSchedulerImpl( var launchedAnyTask = false var launchedTaskAtCurrentMaxLocality = false // Record all the executor IDs assigned barrier tasks on. - val addresses = ArrayBuffer[String]() - val taskDescs = ArrayBuffer[TaskDescription]() + val addressesWithDescs = ArrayBuffer[(String, TaskDescription)]() for (currentMaxLocality <- taskSet.myLocalityLevels) { do { launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet, - currentMaxLocality, shuffledOffers, availableCpus, tasks, addresses, taskDescs) + currentMaxLocality, shuffledOffers, availableCpus, tasks, addressesWithDescs) launchedAnyTask |= launchedTaskAtCurrentMaxLocality } while (launchedTaskAtCurrentMaxLocality) } @@ -396,22 +393,22 @@ private[spark] class TaskSchedulerImpl( // Check whether the barrier tasks are partially launched. // TODO SPARK-24818 handle the assert failure case (that can happen when some locality // requirements are not fulfilled, and we should revert the launched tasks). - require(taskDescs.size == taskSet.numTasks, + require(addressesWithDescs.size == taskSet.numTasks, s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " + - s"because only ${taskDescs.size} out of a total number of ${taskSet.numTasks} " + - "tasks got resource offers. The resource offers may have been blacklisted or " + - "cannot fulfill task locality requirements.") + s"because only ${addressesWithDescs.size} out of a total number of " + + s"${taskSet.numTasks} tasks got resource offers. The resource offers may have " + + "been blacklisted or cannot fulfill task locality requirements.") // Update the taskInfos into all the barrier task properties. - val addressesStr = addresses.zip(taskDescs) + val addressesStr = addressesWithDescs // Addresses ordered by partitionId .sortBy(_._2.partitionId) .map(_._1) .mkString(",") - taskDescs.foreach(_.properties.setProperty("addresses", addressesStr)) + addressesWithDescs.foreach(_._2.properties.setProperty("addresses", addressesStr)) - logInfo(s"Successfully scheduled all the ${taskDescs.size} tasks for barrier stage " + - s"${taskSet.stageId}.") + logInfo(s"Successfully scheduled all the ${addressesWithDescs.size} tasks for barrier " + + s"stage ${taskSet.stageId}.") } } } From ef7dc5ac052f5206acc552fecd46643d2d3867df Mon Sep 17 00:00:00 2001 From: Xingbo Jiang Date: Sat, 21 Jul 2018 01:15:31 +0800 Subject: [PATCH 08/14] update RDD.isBarrier() --- core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala | 2 +- core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala index e33c6cd9a3f54..e25425e15ea26 100644 --- a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala @@ -43,5 +43,5 @@ private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( prev = null } - override def isBarrier(): Boolean = isFromBarrier || prev.isBarrier() + private[spark] override def isBarrier(): Boolean = isFromBarrier || prev.isBarrier() } diff --git a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala index 489fb8cb80312..e8f9b27b7eb55 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala @@ -111,5 +111,5 @@ class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag]( prev = null } - override def isBarrier(): Boolean = false + private[spark] override def isBarrier(): Boolean = false } From 7b6def8d01ac2c8a1dbce6819978454ec486b551 Mon Sep 17 00:00:00 2001 From: Xingbo Jiang Date: Sat, 21 Jul 2018 22:15:21 +0800 Subject: [PATCH 09/14] update --- core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala b/core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala index 8d91e4898f27c..85565d16e2717 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala @@ -28,7 +28,7 @@ class RDDBarrier[T: ClassTag](rdd: RDD[T]) { /** * :: Experimental :: - * Maps partitions together with a provided [[BarrierTaskContext]]. + * Maps partitions together with a provided BarrierTaskContext. * * `preservesPartitioning` indicates whether the input function preserves the partitioner, which * should be `false` unless `rdd` is a pair RDD and the input function doesn't modify the keys. From 92297714fb6b01ad6e6a693ab2b35de6dfa88b37 Mon Sep 17 00:00:00 2001 From: Xingbo Jiang Date: Sun, 22 Jul 2018 22:25:34 +0800 Subject: [PATCH 10/14] fix checkpoint() failure. --- .../src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala | 3 ++- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 2 +- core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala index e25425e15ea26..d4ad09143e80d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala @@ -43,5 +43,6 @@ private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( prev = null } - private[spark] override def isBarrier(): Boolean = isFromBarrier || prev.isBarrier() + @transient protected lazy override val isBarrier_ : Boolean = + isFromBarrier || dependencies.exists(_.rdd.isBarrier()) } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 4e5b035c88c02..04698132adf83 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1860,7 +1860,7 @@ abstract class RDD[T: ClassTag]( // From performance concern, cache the value to avoid repeatedly compute `isBarrier()` on a long // RDD chain. - @transient private lazy val isBarrier_ : Boolean = dependencies.exists(_.rdd.isBarrier()) + @transient protected lazy val isBarrier_ : Boolean = dependencies.exists(_.rdd.isBarrier()) } diff --git a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala index e8f9b27b7eb55..93207182176f8 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala @@ -111,5 +111,5 @@ class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag]( prev = null } - private[spark] override def isBarrier(): Boolean = false + @transient protected lazy override val isBarrier_ : Boolean = false } From 94e5237b71ce97769936f0506eb044219a1e7021 Mon Sep 17 00:00:00 2001 From: Xingbo Jiang Date: Sun, 22 Jul 2018 22:43:50 +0800 Subject: [PATCH 11/14] update --- core/src/main/scala/org/apache/spark/MapOutputTracker.scala | 2 +- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 5 ++--- .../scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 4 ++-- .../test/scala/org/apache/spark/rdd/RDDBarrierSuite.scala | 2 +- 4 files changed, 6 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 7649cefb95039..1c4fa4bc6541f 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -442,7 +442,7 @@ private[spark] class MapOutputTrackerMaster( incrementEpoch() case None => throw new SparkException( - s"unregisterAllMapOutput called for nonexistent shuffle ID ${shuffleId}.") + s"unregisterAllMapOutput called for nonexistent shuffle ID $shuffleId.") } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 3481e5e97f5d6..f6b68039c1f8e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1462,8 +1462,7 @@ class DAGScheduler( s"""$failedStage (${failedStage.name}) |has failed the maximum allowable number of |times: $maxConsecutiveStageAttempts. - |Most recent failure reason: $message - """.stripMargin.replaceAll("\n", " ") + |Most recent failure reason: $message""".stripMargin.replaceAll("\n", " ") } abortStage(failedStage, abortMessage, None) } else { @@ -1507,7 +1506,7 @@ class DAGScheduler( } private def handleResubmittedFailure(task: Task[_], stage: Stage): Unit = { - logInfo("Resubmitted " + task + ", so marking it as still running") + logInfo(s"Resubmitted $task, so marking it as still running.") stage match { case sms: ShuffleMapStage => sms.pendingPartitions += task.partitionId diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 853523959a270..3ecc24501fc28 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -373,13 +373,13 @@ private[spark] class TaskSchedulerImpl( // slots), fail the job on submit. logInfo(s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " + s"because the barrier taskSet requires ${taskSet.numTasks} slots, while the total " + - s"number of available slots is ${availableSlots}.") + s"number of available slots is $availableSlots.") } else { var launchedAnyTask = false - var launchedTaskAtCurrentMaxLocality = false // Record all the executor IDs assigned barrier tasks on. val addressesWithDescs = ArrayBuffer[(String, TaskDescription)]() for (currentMaxLocality <- taskSet.myLocalityLevels) { + var launchedTaskAtCurrentMaxLocality = false do { launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks, addressesWithDescs) diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDBarrierSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDBarrierSuite.scala index 1489666a988cb..39d4618e4c6c5 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDBarrierSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDBarrierSuite.scala @@ -30,7 +30,7 @@ class RDDBarrierSuite extends SparkFunSuite with SharedSparkContext { } test("create an RDDBarrier in the middle of a chain of RDDs") { - val rdd = sc.parallelize(1 to 10, 4) + val rdd = sc.parallelize(1 to 10, 4).map(x => x * 2) val rdd2 = rdd.barrier().mapPartitions((iter, context) => iter).map(x => (x, x + 1)) assert(rdd2.isBarrier() === true) } From 9ae56d12b580f0a3cecb90dfe275d067e8f3a7f3 Mon Sep 17 00:00:00 2001 From: Xingbo Jiang Date: Mon, 23 Jul 2018 21:11:23 +0800 Subject: [PATCH 12/14] update --- core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala | 2 +- .../scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala index 93207182176f8..e8f9b27b7eb55 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala @@ -111,5 +111,5 @@ class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag]( prev = null } - @transient protected lazy override val isBarrier_ : Boolean = false + private[spark] override def isBarrier(): Boolean = false } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 3ecc24501fc28..587ed4b5243b7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -295,7 +295,7 @@ private[spark] class TaskSchedulerImpl( // Only update hosts for a barrier task. if (taskSet.isBarrier) { // The executor address is expected to be non empty. - addressesWithDescs += Tuple2(shuffledOffers(i).address.get, task) + addressesWithDescs += (shuffledOffers(i).address.get -> task) } launchedTask = true } From c16a47f0d15998133b9d61d8df5310f1f66b11b0 Mon Sep 17 00:00:00 2001 From: Xingbo Jiang Date: Mon, 23 Jul 2018 21:14:22 +0800 Subject: [PATCH 13/14] update --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index f6b68039c1f8e..003d64f78e853 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1434,7 +1434,7 @@ class DAGScheduler( logInfo(s"Marking $failedStage (${failedStage.name}) as failed due to a barrier task " + "failed.") val message = s"Stage failed because barrier task $task finished unsuccessfully. " + - s"${failure.toErrorString}" + failure.toErrorString try { // cancelTasks will fail if a SchedulerBackend does not implement killTask taskScheduler.cancelTasks(stageId, interruptThread = false) From c7600c24221d29fde31dca921d9d5863af2666e9 Mon Sep 17 00:00:00 2001 From: Xingbo Jiang Date: Thu, 26 Jul 2018 11:24:01 +0800 Subject: [PATCH 14/14] update --- .../scala/org/apache/spark/rdd/MapPartitionsRDD.scala | 9 +++++++++ core/src/main/scala/org/apache/spark/rdd/RDD.scala | 5 ++++- .../scala/org/apache/spark/scheduler/ActiveJob.scala | 2 +- 3 files changed, 14 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala index d4ad09143e80d..904d9c025629f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala @@ -23,6 +23,15 @@ import org.apache.spark.{Partition, TaskContext} /** * An RDD that applies the provided function to every partition of the parent RDD. + * + * @param prev the parent RDD. + * @param f The function used to map a tuple of (TaskContext, partition index, input iterator) to + * an output iterator. + * @param preservesPartitioning Whether the input function preserves the partitioner, which should + * be `false` unless `prev` is a pair RDD and the input function + * doesn't modify the keys. + * @param isFromBarrier Indicates whether this RDD is transformed from an RDDBarrier, a stage + * containing at least one RDDBarrier shall be turned into a barrier stage. */ private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( var prev: RDD[T], diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 04698132adf83..cbc1143126d8e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1853,8 +1853,11 @@ abstract class RDD[T: ClassTag]( * barrier stage. * * An RDD is in a barrier stage, if at least one of its parent RDD(s), or itself, are mapped from - * a RDDBarrier. This function always returns false for a [[ShuffledRDD]], since a + * an [[RDDBarrier]]. This function always returns false for a [[ShuffledRDD]], since a * [[ShuffledRDD]] indicates start of a new stage. + * + * A [[MapPartitionsRDD]] can be transformed from an [[RDDBarrier]], under that case the + * [[MapPartitionsRDD]] shall be marked as barrier. */ private[spark] def isBarrier(): Boolean = isBarrier_ diff --git a/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala b/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala index b3bbfb1683c5c..6e4d062749d5f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala @@ -61,7 +61,7 @@ private[spark] class ActiveJob( var numFinished = 0 - // Mark all the partitions of the stage to be not finished. + /** Resets the status of all partitions in this stage so they are marked as not finished. */ def resetAllPartitions(): Unit = { (0 until numPartitions).foreach(finished.update(_, false)) numFinished = 0