diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 6db95a45189b3..467c8e8ff8d14 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -45,7 +45,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) private val lostMasters = new HashSet[Address] private var activeMasterActor: ActorSelection = null - val timeout = RpcUtils.askTimeout(conf) + val timeout = RpcUtils.askRpcTimeout(conf) override def preStart(): Unit = { context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index 8d3d3024f6efc..648c0ff2c3cec 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -193,7 +193,7 @@ private[spark] class AppClient( def stop() { if (actor != null) { try { - val timeout = RpcUtils.askTimeout(conf) + val timeout = RpcUtils.askRpcTimeout(conf) val future = actor.ask(StopAppClient)(timeout.duration) timeout.awaitResult(future) } catch { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 01015af3e8dda..e5aa058cf2ae3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -938,7 +938,7 @@ private[deploy] object Master extends Logging { securityManager = securityMgr) val actor = actorSystem.actorOf( Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), actorName) - val timeout = RpcUtils.askTimeout(conf) + val timeout = RpcUtils.askRpcTimeout(conf) val portsRequest = actor.ask(BoundPortsRequest)(timeout.duration) val portsResponse = timeout.awaitResult(portsRequest).asInstanceOf[BoundPortsResponse] (actorSystem, boundPort, portsResponse.webUIPort, portsResponse.restPort) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index 2111a8581f2e4..5fc3dd5752058 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -34,7 +34,7 @@ class MasterWebUI(val master: Master, requestedPort: Int) with UIRoot { val masterActorRef = master.self - val timeout = RpcUtils.askTimeout(master.conf) + val timeout = RpcUtils.askRpcTimeout(master.conf) val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true) val masterPage = new MasterPage(this) diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala index 502b9bb701ccf..e93e9f93c0c6b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala @@ -71,7 +71,7 @@ private[rest] class StandaloneKillRequestServlet(masterActor: ActorRef, conf: Sp extends KillRequestServlet { protected def handleKill(submissionId: String): KillSubmissionResponse = { - val askTimeout = RpcUtils.askTimeout(conf) + val askTimeout = RpcUtils.askRpcTimeout(conf) val response = AkkaUtils.askWithReply[DeployMessages.KillDriverResponse]( DeployMessages.RequestKillDriver(submissionId), masterActor, askTimeout) val k = new KillSubmissionResponse @@ -90,7 +90,7 @@ private[rest] class StandaloneStatusRequestServlet(masterActor: ActorRef, conf: extends StatusRequestServlet { protected def handleStatus(submissionId: String): SubmissionStatusResponse = { - val askTimeout = RpcUtils.askTimeout(conf) + val askTimeout = RpcUtils.askRpcTimeout(conf) val response = AkkaUtils.askWithReply[DeployMessages.DriverStatusResponse]( DeployMessages.RequestDriverStatus(submissionId), masterActor, askTimeout) val message = response.exception.map { s"Exception from the cluster:\n" + formatException(_) } @@ -175,7 +175,7 @@ private[rest] class StandaloneSubmitRequestServlet( responseServlet: HttpServletResponse): SubmitRestProtocolResponse = { requestMessage match { case submitRequest: CreateSubmissionRequest => - val askTimeout = RpcUtils.askTimeout(conf) + val askTimeout = RpcUtils.askRpcTimeout(conf) val driverDescription = buildDriverDescription(submitRequest) val response = AkkaUtils.askWithReply[DeployMessages.SubmitDriverResponse]( DeployMessages.RequestSubmitDriver(driverDescription), masterActor, askTimeout) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala index b3bb5f911dbd7..334a5b10142aa 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala @@ -38,7 +38,7 @@ class WorkerWebUI( extends WebUI(worker.securityMgr, requestedPort, worker.conf, name = "WorkerUI") with Logging { - private[ui] val timeout = RpcUtils.askTimeout(worker.conf) + private[ui] val timeout = RpcUtils.askRpcTimeout(worker.conf) initialize() diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala index 4f5df21499adc..6ae47894598be 100644 --- a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala +++ b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala @@ -31,7 +31,7 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf) private[this] val maxRetries = RpcUtils.numRetries(conf) private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf) - private[this] val defaultAskTimeout = RpcUtils.askTimeout(conf) + private[this] val defaultAskTimeout = RpcUtils.askRpcTimeout(conf) /** * return the address for the [[RpcEndpointRef]] @@ -118,4 +118,5 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf) throw new SparkException( s"Error sending message [message = $message]", lastException) } + } diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala index 3b21e4e53dd08..c749be99ba9c4 100644 --- a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala @@ -68,7 +68,7 @@ private[spark] object RpcEnv { */ private[spark] abstract class RpcEnv(conf: SparkConf) { - private[spark] val defaultLookupTimeout = RpcUtils.lookupTimeout(conf) + private[spark] val defaultLookupTimeout = RpcUtils.lookupRpcTimeout(conf) /** * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 214c6e8f96bf6..89c1f86647922 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -33,7 +33,7 @@ class BlockManagerMaster( isDriver: Boolean) extends Logging { - val timeout = RpcUtils.askTimeout(conf) + val timeout = RpcUtils.askRpcTimeout(conf) /** Remove a dead executor from the driver endpoint. This is only called on the driver side. */ def removeExecutor(execId: String) { diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index 22c92468679bc..cd54d8d8fc6a1 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -197,7 +197,7 @@ private[spark] object AkkaUtils extends Logging { val driverPort: Int = conf.getInt("spark.driver.port", 7077) Utils.checkHost(driverHost, "Expected hostname") val url = address(protocol(actorSystem), driverActorSystemName, driverHost, driverPort, name) - val timeout = RpcUtils.lookupTimeout(conf) + val timeout = RpcUtils.lookupRpcTimeout(conf) logInfo(s"Connecting to $name: $url") timeout.awaitResult(actorSystem.actorSelection(url).resolveOne(timeout.duration)) } @@ -211,7 +211,7 @@ private[spark] object AkkaUtils extends Logging { val executorActorSystemName = SparkEnv.executorActorSystemName Utils.checkHost(host, "Expected hostname") val url = address(protocol(actorSystem), executorActorSystemName, host, port, name) - val timeout = RpcUtils.lookupTimeout(conf) + val timeout = RpcUtils.lookupRpcTimeout(conf) logInfo(s"Connecting to $name: $url") timeout.awaitResult(actorSystem.actorSelection(url).resolveOne(timeout.duration)) } diff --git a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala index e8950ecdfbd5a..7e15cad626f71 100644 --- a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala @@ -17,6 +17,7 @@ package org.apache.spark.util +import scala.concurrent.duration.FiniteDuration import scala.language.postfixOps import org.apache.spark.{SparkEnv, SparkConf} @@ -46,12 +47,22 @@ object RpcUtils { } /** Returns the default Spark timeout to use for RPC ask operations. */ - def askTimeout(conf: SparkConf): RpcTimeout = { + def askRpcTimeout(conf: SparkConf): RpcTimeout = { RpcTimeout(conf, Seq("spark.rpc.askTimeout", "spark.network.timeout"), "120s") } + @deprecated("use askRpcTimeout instead", "1.5.0") + def askTimeout(conf: SparkConf): FiniteDuration = { + askRpcTimeout(conf).duration + } + /** Returns the default Spark timeout to use for RPC remote endpoint lookup. */ - def lookupTimeout(conf: SparkConf): RpcTimeout = { + def lookupRpcTimeout(conf: SparkConf): RpcTimeout = { RpcTimeout(conf, Seq("spark.rpc.lookupTimeout", "spark.network.timeout"), "120s") } + + @deprecated("use lookupRpcTimeout instead", "1.5.0") + def lookupTimeout(conf: SparkConf): FiniteDuration = { + lookupRpcTimeout(conf).duration + } }