Skip to content

Commit

Permalink
Making the OutputCommitCoordinatorMessage serializable
Browse files Browse the repository at this point in the history
  • Loading branch information
mccheah committed Jan 23, 2015
1 parent abc7db4 commit 83de900
Showing 1 changed file with 2 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import akka.actor.{PoisonPill, ActorRef, Actor}
import org.apache.spark.Logging
import org.apache.spark.util.{AkkaUtils, ActorLogReceive}

private[spark] sealed trait OutputCommitCoordinationMessage
private[spark] sealed trait OutputCommitCoordinationMessage extends Serializable

private[spark] case class StageStarted(stage: Int) extends OutputCommitCoordinationMessage
private[spark] case class StageEnded(stage: Int) extends OutputCommitCoordinationMessage
Expand Down Expand Up @@ -54,14 +54,14 @@ private[spark] class OutputCommitCoordinator extends Logging {
// Initialized by SparkEnv
var coordinatorActor: ActorRef = _

// TODO: handling stage attempt ids?
private type StageId = Int
private type TaskId = Long
private type TaskAttemptId = Long

private val authorizedCommittersByStage:
mutable.Map[StageId, mutable.Map[TaskId, TaskAttemptId]] = mutable.HashMap()


def stageStart(stage: StageId) {
coordinatorActor ! StageStarted(stage)
}
Expand Down Expand Up @@ -102,7 +102,6 @@ private[spark] class OutputCommitCoordinator extends Logging {
}

private def handleStageStart(stage: StageId): Unit = {
// TODO: assert that we're not overwriting an existing entry?
authorizedCommittersByStage(stage) = mutable.HashMap[TaskId, TaskAttemptId]()
}

Expand Down

0 comments on commit 83de900

Please sign in to comment.