diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala index d6726bcb9a71a..ee20896de1449 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala @@ -25,7 +25,7 @@ import akka.actor.{PoisonPill, ActorRef, Actor} import org.apache.spark.Logging import org.apache.spark.util.{AkkaUtils, ActorLogReceive} -private[spark] sealed trait OutputCommitCoordinationMessage +private[spark] sealed trait OutputCommitCoordinationMessage extends Serializable private[spark] case class StageStarted(stage: Int) extends OutputCommitCoordinationMessage private[spark] case class StageEnded(stage: Int) extends OutputCommitCoordinationMessage @@ -54,7 +54,6 @@ private[spark] class OutputCommitCoordinator extends Logging { // Initialized by SparkEnv var coordinatorActor: ActorRef = _ - // TODO: handling stage attempt ids? private type StageId = Int private type TaskId = Long private type TaskAttemptId = Long @@ -62,6 +61,7 @@ private[spark] class OutputCommitCoordinator extends Logging { private val authorizedCommittersByStage: mutable.Map[StageId, mutable.Map[TaskId, TaskAttemptId]] = mutable.HashMap() + def stageStart(stage: StageId) { coordinatorActor ! StageStarted(stage) } @@ -102,7 +102,6 @@ private[spark] class OutputCommitCoordinator extends Logging { } private def handleStageStart(stage: StageId): Unit = { - // TODO: assert that we're not overwriting an existing entry? authorizedCommittersByStage(stage) = mutable.HashMap[TaskId, TaskAttemptId]() }