Skip to content

Commit

Permalink
Use actor only for RPC; call methods directly in DAGScheduler.
Browse files Browse the repository at this point in the history
This makes the interleaving of local and remote calls easier to reason about.
  • Loading branch information
JoshRosen committed Feb 4, 2015
1 parent f582574 commit 97da5fe
Showing 1 changed file with 34 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,8 @@ import org.apache.spark.util.{AkkaUtils, ActorLogReceive}

private sealed trait OutputCommitCoordinationMessage extends Serializable

private case class StageStarted(stage: Int) extends OutputCommitCoordinationMessage
private case class StageEnded(stage: Int) extends OutputCommitCoordinationMessage
private case object StopCoordinator extends OutputCommitCoordinationMessage

private case class AskPermissionToCommitOutput(
stage: Int,
task: Long,
taskAttempt: Long)
extends OutputCommitCoordinationMessage

private case class TaskCompleted(
stage: Int,
task: Long,
attempt: Long,
reason: TaskEndReason)
extends OutputCommitCoordinationMessage
private case class AskPermissionToCommitOutput(stage: Int, task: Long, taskAttempt: Long)

/**
* Authority that decides whether tasks can commit output to HDFS.
Expand All @@ -64,6 +50,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging {
private type TaskAttemptId = Long
private type CommittersByStageMap = mutable.Map[StageId, mutable.Map[TaskId, TaskAttemptId]]

// Access to this state should be guarded by synchronizing on the instance.
private val authorizedCommittersByStage: CommittersByStageMap = mutable.Map()

/**
Expand All @@ -82,47 +69,60 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging {
stage: StageId,
task: TaskId,
attempt: TaskAttemptId): Boolean = {
askActor(AskPermissionToCommitOutput(stage, task, attempt))
val msg = AskPermissionToCommitOutput(stage, task, attempt)
coordinatorActor match {
case Some(actor) =>
AkkaUtils.askWithReply[Boolean](msg, actor, maxAttempts, retryInterval, timeout)
case None =>
logError(
"canCommit called after coordinator was stopped (is SparkEnv shutdown in progress)?")
false
}
}

// Called by DAGScheduler
private[scheduler] def stageStart(stage: StageId): Unit = {
sendToActor(StageStarted(stage))
private[scheduler] def stageStart(stage: StageId): Unit = synchronized {
authorizedCommittersByStage(stage) = mutable.HashMap[TaskId, TaskAttemptId]()
}

// Called by DAGScheduler
private[scheduler] def stageEnd(stage: StageId): Unit = {
sendToActor(StageEnded(stage))
private[scheduler] def stageEnd(stage: StageId): Unit = synchronized {
authorizedCommittersByStage.remove(stage)
}

// Called by DAGScheduler
private[scheduler] def taskCompleted(
stage: StageId,
task: TaskId,
attempt: TaskAttemptId,
reason: TaskEndReason): Unit = {
sendToActor(TaskCompleted(stage, task, attempt, reason))
reason: TaskEndReason): Unit = synchronized {
val authorizedCommitters = authorizedCommittersByStage.getOrElse(stage, {
logDebug(s"Ignoring task completion for completed stage")
return
})
reason match {
case Success =>
// The task output has been committed successfully
case TaskCommitDenied(jobID, splitID, attemptID) =>
logInfo(s"Task was denied committing, stage: $stage, taskId: $task, attempt: $attempt")
case otherReason =>
logDebug(s"Authorized committer $attempt (stage=$stage, task=$task) failed;" +
s" clearing lock")
authorizedCommitters.remove(task)
}
}

def stop(): Unit = {
sendToActor(StopCoordinator)
def stop(): Unit = synchronized {
coordinatorActor.foreach(_ ! StopCoordinator)
coordinatorActor = None
authorizedCommittersByStage.foreach(_._2.clear)
authorizedCommittersByStage.clear
}

private def handleStageStart(stage: StageId): Unit = {
authorizedCommittersByStage(stage) = mutable.HashMap[TaskId, TaskAttemptId]()
}

private def handleStageEnd(stage: StageId): Unit = {
authorizedCommittersByStage.remove(stage)
}

private def handleAskPermissionToCommit(
stage: StageId,
task: TaskId,
attempt: TaskAttemptId): Boolean = {
attempt: TaskAttemptId): Boolean = synchronized {
authorizedCommittersByStage.get(stage) match {
case Some(authorizedCommitters) =>
authorizedCommitters.get(task) match {
Expand All @@ -140,56 +140,17 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging {
false
}
}

private def handleTaskCompletion(
stage: StageId,
task: TaskId,
attempt: TaskAttemptId,
reason: TaskEndReason): Unit = {
val authorizedCommitters = authorizedCommittersByStage.getOrElse(stage, {
logDebug(s"Ignoring task completion for completed stage")
return
})
reason match {
case Success =>
// The task output has been committed successfully
case TaskCommitDenied(jobID, splitID, attemptID) =>
logInfo(s"Task was denied committing, stage: $stage, taskId: $task, attempt: $attempt")
case otherReason =>
logDebug(s"Authorized committer $attempt (stage=$stage, task=$task) failed;" +
s" clearing lock")
authorizedCommitters.remove(task)
}
}

private def sendToActor(msg: OutputCommitCoordinationMessage): Unit = {
coordinatorActor.foreach(_ ! msg)
}

private def askActor(msg: OutputCommitCoordinationMessage): Boolean = {
coordinatorActor match {
case Some(actor) =>
AkkaUtils.askWithReply[Boolean](msg, actor, maxAttempts, retryInterval, timeout)
case None =>
false
}
}
}

private[spark] object OutputCommitCoordinator {

// This actor is used only for RPC
class OutputCommitCoordinatorActor(outputCommitCoordinator: OutputCommitCoordinator)
extends Actor with ActorLogReceive with Logging {

override def receiveWithLogging = {
case StageStarted(stage) =>
outputCommitCoordinator.handleStageStart(stage)
case StageEnded(stage) =>
outputCommitCoordinator.handleStageEnd(stage)
case AskPermissionToCommitOutput(stage, task, taskAttempt) =>
sender ! outputCommitCoordinator.handleAskPermissionToCommit(stage, task, taskAttempt)
case TaskCompleted(stage, task, attempt, reason) =>
outputCommitCoordinator.handleTaskCompletion(stage, task, attempt, reason)
case StopCoordinator =>
logInfo("OutputCommitCoordinator stopped!")
context.stop(self)
Expand Down

0 comments on commit 97da5fe

Please sign in to comment.