From 6419db7639d62644eee3dd23906836b76b1c4fd4 Mon Sep 17 00:00:00 2001 From: IceMimosa Date: Tue, 8 May 2018 14:14:00 +0800 Subject: [PATCH] [SPARK-18113] Use ask to replace askWithRetry in canCommit and make receiver idempotent.@PR[#16503] --- .../scheduler/OutputCommitCoordinator.scala | 21 +++++++++++++++---- .../OutputCommitCoordinatorSuite.scala | 16 ++++++++++++++ .../aggregate/gio/CollectSBitmap.scala | 2 +- 3 files changed, 34 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala index 7bed6851d0cde..e4eb475624230 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala @@ -22,6 +22,7 @@ import scala.collection.mutable import org.apache.spark._ import org.apache.spark.internal.Logging import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv} +import org.apache.spark.util.{RpcUtils, ThreadUtils} private sealed trait OutputCommitCoordinationMessage extends Serializable @@ -88,7 +89,9 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) val msg = AskPermissionToCommitOutput(stage, partition, attemptNumber) coordinatorRef match { case Some(endpointRef) => - endpointRef.askWithRetry[Boolean](msg) + // endpointRef.askWithRetry[Boolean](msg) + ThreadUtils.awaitResult(endpointRef.ask[Boolean](msg), + RpcUtils.askRpcTimeout(conf).duration) case None => logError( "canCommit called after coordinator was stopped (is SparkEnv shutdown in progress)?") @@ -164,10 +167,20 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) s"partition=$partition") authorizedCommitters(partition) = attemptNumber true + + // Coordinator should be idempotent when receiving AskPermissionToCommit. case existingCommitter => - logDebug(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage, " + - s"partition=$partition; existingCommitter = $existingCommitter") - false + if (existingCommitter == attemptNumber) { + logWarning(s"Authorizing duplicate request to commit for " + + s"attemptNumber=$attemptNumber to commit for stage=$stage," + + s" partition=$partition; existingCommitter = $existingCommitter." + + s" This can indicate dropped network traffic.") + true + } else { + logDebug(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage, " + + s"partition=$partition; existingCommitter = $existingCommitter") + false + } } case None => logDebug(s"Stage $stage has completed, so not allowing attempt number $attemptNumber of" + diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala index 83288db92bb43..224301c6c42cd 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -190,6 +190,12 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { assert( !outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 3)) } + + test("Duplicate calls to canCommit from the authorized committer gets idempotent responses.") { + val rdd = sc.parallelize(Seq(1), 1) + sc.runJob(rdd, OutputCommitFunctions(tempDir.getAbsolutePath).callCanCommitMultipleTimes _, + rdd.partitions.indices) + } } /** @@ -222,6 +228,16 @@ private case class OutputCommitFunctions(tempDirPath: String) { if (ctx.attemptNumber == 0) failingOutputCommitter else successfulOutputCommitter) } + // Receiver should be idempotent for AskPermissionToCommitOutput + def callCanCommitMultipleTimes(iter: Iterator[Int]): Unit = { + val ctx = TaskContext.get() + val canCommit1 = SparkEnv.get.outputCommitCoordinator + .canCommit(ctx.stageId(), ctx.partitionId(), ctx.attemptNumber()) + val canCommit2 = SparkEnv.get.outputCommitCoordinator + .canCommit(ctx.stageId(), ctx.partitionId(), ctx.attemptNumber()) + assert(canCommit1 && canCommit2) + } + private def runCommitWithProvidedCommitter( ctx: TaskContext, iter: Iterator[Int], diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/gio/CollectSBitmap.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/gio/CollectSBitmap.scala index 7ec6c4411e416..a747a5fda4578 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/gio/CollectSBitmap.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/gio/CollectSBitmap.scala @@ -43,7 +43,7 @@ case class CollectSBitmap( extends TypedImperativeAggregate[SBitMap] { def this(bucket: Expression, uid: Expression, session: Expression) = { - this(bucket, uid, session) + this(bucket, uid, session, 0, 0) } override def createAggregationBuffer(): SBitMap = new SBitMap()