Skip to content

Commit

Permalink
KE-17158 renew executor in standalone (apache#171)
Browse files Browse the repository at this point in the history
* dynamic request executors and renew mem/core in standalone
  • Loading branch information
weiwei121723 authored Sep 15, 2020
1 parent c43c495 commit b735acc
Show file tree
Hide file tree
Showing 48 changed files with 206 additions and 42 deletions.
2 changes: 1 addition & 1 deletion assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.4.1-kylin-r48</version>
<version>2.4.1-kylin-r51</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/kvstore/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.4.1-kylin-r48</version>
<version>2.4.1-kylin-r51</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/network-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.4.1-kylin-r48</version>
<version>2.4.1-kylin-r51</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/network-shuffle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.4.1-kylin-r48</version>
<version>2.4.1-kylin-r51</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/network-yarn/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.4.1-kylin-r48</version>
<version>2.4.1-kylin-r51</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/sketch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.4.1-kylin-r48</version>
<version>2.4.1-kylin-r51</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/tags/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.4.1-kylin-r48</version>
<version>2.4.1-kylin-r51</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/unsafe/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.4.1-kylin-r48</version>
<version>2.4.1-kylin-r51</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.4.1-kylin-r48</version>
<version>2.4.1-kylin-r51</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ private[spark] trait ExecutorAllocationClient {
localityAwareTasks: Int,
hostToLocalTaskCount: Map[String, Int]): Boolean

private[spark] def requestTotalExecutors(
numExecutors: Int,
forceKillOldExecutors: Boolean,
newMemoryPerExecutorMB: Option[Int],
newCoresPerExecutor: Option[Int]): Boolean = {
requestTotalExecutors(numExecutors, 0, Map.empty)
}

/**
* Request an additional number of executors from the cluster manager.
* @return whether the request is acknowledged by the cluster manager.
Expand Down
17 changes: 17 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1643,6 +1643,23 @@ class SparkContext(config: SparkConf) extends Logging {
}
}

@DeveloperApi
def requestTotalExecutors(
numExecutors: Int,
forceKillOldExecutors: Boolean,
newMemoryPerExecutorMB: Option[Int],
newCoresPerExecutor: Option[Int]
): Boolean = {
schedulerBackend match {
case b: ExecutorAllocationClient =>
b.requestTotalExecutors(numExecutors, forceKillOldExecutors,
newMemoryPerExecutorMB, newCoresPerExecutor)
case _ =>
logWarning("Requesting executors is not supported by current scheduler.")
false
}
}

/**
* :: DeveloperApi ::
* Request an additional number of executors from the cluster manager.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ import java.net.URI
private[spark] case class ApplicationDescription(
name: String,
maxCores: Option[Int],
memoryPerExecutorMB: Int,
var memoryPerExecutorMB: Int,
command: Command,
appUiUrl: String,
eventLogDir: Option[URI] = None,
// short name of compression codec used when writing event logs, if any (e.g. lzf)
eventLogCodec: Option[String] = None,
coresPerExecutor: Option[Int] = None,
var coresPerExecutor: Option[Int] = None,
// number of executors this application wants to start with,
// only used if dynamic allocation is enabled
initialExecutorLimit: Option[Int] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ object DeployMessages {

case class RequestExecutors(appId: String, requestedTotal: Int)

case class RequestExecutorsRenew(appId: String, requestedTotal: Int
, forceKillOldExecutors: Boolean
, newMemoryPerExecutorMB: Option[Int]
, newCoresPerExecutor: Option[Int])

case class KillExecutors(appId: String, executorIds: Seq[String])

// Master to AppClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,14 @@ private[spark] class StandaloneAppClient(
logWarning("Attempted to request executors before registering with Master.")
context.reply(false)
}
case r: RequestExecutorsRenew =>
logInfo("send RequestExecutorsRenew rpc")
master match {
case Some(m) => askAndReplyAsync(m, context, r)
case None =>
logWarning("Attempted to request and renew executors.")
context.reply(false)
}

case k: KillExecutors =>
master match {
Expand Down Expand Up @@ -304,6 +312,21 @@ private[spark] class StandaloneAppClient(
}
}

def requestTotalExecutors(requestedTotal: Int,
forceKillOldExecutors: Boolean,
newMemoryPerExecutorMB: Option[Int],
newCoresPerExecutor: Option[Int]): Future[Boolean] = {
if (endpoint.get != null && appId.get != null) {
endpoint.get.ask[Boolean](
RequestExecutorsRenew(appId.get,
requestedTotal, forceKillOldExecutors,
newMemoryPerExecutorMB, newCoresPerExecutor))
} else {
logWarning("Attempted to request and refresh executors before driver fully initialized.")
Future.successful(false)
}
}

/**
* Kill the given list of executors through the Master.
* @return whether the kill request is acknowledged.
Expand Down
65 changes: 65 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,11 @@ private[deploy] class Master(
case RequestExecutors(appId, requestedTotal) =>
context.reply(handleRequestExecutors(appId, requestedTotal))

case RequestExecutorsRenew(appId, requestedTotal, forceKillOldExecutors,
newMemoryPerExecutorMB, newCoresPerExecutor) =>
context.reply(handleRefreshApplicationAndExecutors(appId, requestedTotal,
forceKillOldExecutors, newMemoryPerExecutorMB, newCoresPerExecutor))

case KillExecutors(appId, executorIds) =>
val formattedExecutorIds = formatExecutorIds(executorIds)
context.reply(handleKillExecutors(appId, formattedExecutorIds))
Expand Down Expand Up @@ -921,6 +926,66 @@ private[deploy] class Master(
}
}

private def printExecutorStatus(appInfo: ApplicationInfo): Unit = {
logInfo(s"${appInfo.id} current executor:" +
appInfo.executors.values.map(executor => s"(${executor.id},${executor.state})")
.toList.mkString(" "))
}

private def handleRefreshApplicationAndExecutors(appId: String, requestedTotal: Int
, forceKillOldExecutors: Boolean
, newMemoryPerExecutorMB: Option[Int]
, newCoresPerExecutor: Option[Int]): Boolean = {
idToApp.get(appId) match {
case Some(appInfo) =>
val appDesc = appInfo.desc
logInfo(s"Application $appId before requested and " +
s"renew to set total executors to $requestedTotal, " +
s"newMemoryPerExecutorMB to ${appDesc.memoryPerExecutorMB}, " +
s"newCoresPerExecutor to ${appDesc.coresPerExecutor}, ")
printExecutorStatus(appInfo)

newMemoryPerExecutorMB match {
case Some(i) => appDesc.memoryPerExecutorMB = i
}

if (newCoresPerExecutor != null) {
appDesc.coresPerExecutor = newCoresPerExecutor
}
appInfo.executorLimit = requestedTotal

handleKillOldExecutors(appInfo, forceKillOldExecutors)
schedule()

logInfo(s"Application $appId after requested and renew to set " +
s"newMemoryPerExecutorMB to ${appDesc.memoryPerExecutorMB}, " +
s"newCoresPerExecutor to ${appDesc.coresPerExecutor}, ")
printExecutorStatus(appInfo)
true
case None =>
logWarning(s"Unknown application $appId requested $requestedTotal total executors.")
false
}
}

private def handleKillOldExecutors(appInfo: ApplicationInfo,
forceKillOldExecutors: Boolean): Boolean = {
for (execId <- appInfo.executors.keys) {
appInfo.executors.get(execId) match {
case Some(executorDesc) =>
if (forceKillOldExecutors || ExecutorState.isFinished(executorDesc.state)) {
appInfo.removeExecutor(executorDesc)
killExecutor(executorDesc)
logInfo("remote kill executor " + executorDesc.fullId +
" on worker " + executorDesc.worker.id)
}
case None =>
logWarning("executor not found when kill")
}
}
true
}

/**
* Handle a kill request from the given application.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,32 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
defaultAskTimeout.awaitResult(response)
}

final override def requestTotalExecutors(numExecutors: Int,
forceKillOldExecutors: Boolean,
newMemoryPerExecutorMB: Option[Int],
newCoresPerExecutor: Option[Int]): Boolean = {
if (numExecutors < 0) {
throw new IllegalArgumentException(
"Attempted to request a negative number of executor(s) " +
s"$numExecutors from the cluster manager. Please specify a positive number!")
}
logInfo(s"request to total executor and renew $numExecutors , " +
s"forceKillOldExecutors:$forceKillOldExecutors , " +
s"newMemoryPerExecutorMB:$newMemoryPerExecutorMB ," +
s"newCoresPerExecutor:$newCoresPerExecutor")

val response = synchronized {
this.requestedTotalExecutors = numExecutors
numPendingExecutors =
math.max(numExecutors - numExistingExecutors + executorsPendingToRemove.size, 0)

doRequestTotalExecutors(numExecutors,
forceKillOldExecutors, newMemoryPerExecutorMB, newCoresPerExecutor)
}

defaultAskTimeout.awaitResult(response)
}

/**
* Request executors from the cluster manager by specifying the total number desired,
* including existing pending and running executors.
Expand All @@ -603,6 +629,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
protected def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] =
Future.successful(false)

protected def doRequestTotalExecutors(requestedTotal: Int,
forceKillOldExecutors: Boolean,
newMemoryPerExecutorMB: Option[Int],
newCoresPerExecutor: Option[Int]): Future[Boolean] =
Future.successful(false)
/**
* Request that the cluster manager kill the specified executors.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,21 @@ private[spark] class StandaloneSchedulerBackend(
}
}

protected override def doRequestTotalExecutors
(requestedTotal: Int,
forceKillOldExecutors: Boolean,
newMemoryPerExecutorMB: Option[Int],
newCoresPerExecutor: Option[Int]):
Future[Boolean] = {
Option(client) match {
case Some(c) => c.requestTotalExecutors(
requestedTotal, forceKillOldExecutors,
newMemoryPerExecutorMB, newCoresPerExecutor)
case None =>
logWarning("Attempted to request executors before driver fully initialized.")
Future.successful(false)
}
}
/**
* Kill the given list of executors through the Master.
* @return whether the kill request is acknowledged.
Expand Down
2 changes: 1 addition & 1 deletion examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.4.1-kylin-r48</version>
<version>2.4.1-kylin-r51</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/avro/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.4.1-kylin-r48</version>
<version>2.4.1-kylin-r51</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/docker-integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.4.1-kylin-r48</version>
<version>2.4.1-kylin-r51</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/flume-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.4.1-kylin-r48</version>
<version>2.4.1-kylin-r51</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/flume-sink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.4.1-kylin-r48</version>
<version>2.4.1-kylin-r51</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/flume/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.4.1-kylin-r48</version>
<version>2.4.1-kylin-r51</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/kafka-0-10-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.4.1-kylin-r48</version>
<version>2.4.1-kylin-r51</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
Loading

0 comments on commit b735acc

Please sign in to comment.