Skip to content
This repository has been archived by the owner on Jan 9, 2020. It is now read-only.

Config for hard cpu limit on pods; default unlimited #356

Merged
merged 1 commit into from
Jun 23, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,20 @@ from the other deployment modes. See the [configuration page](configuration.html
Docker image pull policy used when pulling Docker images with Kubernetes.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.limit.cores</code></td>
<td>(none)</td>
<td>
Specify the hard cpu limit for the driver pod
</td>
</tr>
<tr>
<td><code>spark.kubernetes.executor.limit.cores</code></td>
<td>(none)</td>
<td>
Specify the hard cpu limit for a single executor pod
</td>
</tr>
</table>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,18 @@ package object config extends Logging {
.stringConf
.createOptional

private[spark] val KUBERNETES_DRIVER_LIMIT_CORES =
ConfigBuilder("spark.kubernetes.driver.limit.cores")
.doc("Specify the hard cpu limit for the driver pod")
.stringConf
.createOptional

private[spark] val KUBERNETES_EXECUTOR_LIMIT_CORES =
ConfigBuilder("spark.kubernetes.executor.limit.cores")
.doc("Specify the hard cpu limit for a single executor pod")
.stringConf
.createOptional

private[spark] def resolveK8sMaster(rawMasterString: String): String = {
if (!rawMasterString.startsWith("k8s://")) {
throw new IllegalArgumentException("Master URL should start with k8s:// in Kubernetes mode.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ private[spark] class Client(

// CPU settings
private val driverCpuCores = sparkConf.getOption("spark.driver.cores").getOrElse("1")
private val driverLimitCores = sparkConf.getOption(KUBERNETES_DRIVER_LIMIT_CORES.key)

// Memory settings
private val driverMemoryMb = sparkConf.get(org.apache.spark.internal.config.DRIVER_MEMORY)
Expand Down Expand Up @@ -139,7 +140,6 @@ private[spark] class Client(
.endEnv()
.withNewResources()
.addToRequests("cpu", driverCpuQuantity)
.addToLimits("cpu", driverCpuQuantity)
.addToRequests("memory", driverMemoryQuantity)
.addToLimits("memory", driverMemoryLimitQuantity)
.endResources()
Expand All @@ -156,6 +156,21 @@ private[spark] class Client(
.addToContainers(driverContainer)
.endSpec()

driverLimitCores.map {
limitCores =>
val driverCpuLimitQuantity = new QuantityBuilder(false)
.withAmount(limitCores)
.build()
basePod
.editSpec()
.editFirstContainer()
.editResources
.addToLimits("cpu", driverCpuLimitQuantity)
.endResources()
.endContainer()
.endSpec()
}

val maybeSubmittedResourceIdentifiers = initContainerComponentsProvider
.provideInitContainerSubmittedDependencyUploader(allDriverLabels)
.map { uploader =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
private val executorMemoryWithOverhead = executorMemoryMb + memoryOverheadMb

private val executorCores = conf.getOption("spark.executor.cores").getOrElse("1")
private val executorLimitCores = conf.getOption(KUBERNETES_EXECUTOR_LIMIT_CORES.key)

private implicit val requestExecutorContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("kubernetes-executor-requests"))
Expand Down Expand Up @@ -438,14 +439,28 @@ private[spark] class KubernetesClusterSchedulerBackend(
.addToRequests("memory", executorMemoryQuantity)
.addToLimits("memory", executorMemoryLimitQuantity)
.addToRequests("cpu", executorCpuQuantity)
.addToLimits("cpu", executorCpuQuantity)
.endResources()
.addAllToEnv(requiredEnv.asJava)
.addToEnv(executorExtraClasspathEnv.toSeq: _*)
.withPorts(requiredPorts.asJava)
.endContainer()
.endSpec()

executorLimitCores.map {
limitCores =>
val executorCpuLimitQuantity = new QuantityBuilder(false)
.withAmount(limitCores)
.build()
basePodBuilder
.editSpec()
.editFirstContainer()
.editResources
.addToLimits("cpu", executorCpuLimitQuantity)
.endResources()
.endContainer()
.endSpec()
}

val withMaybeShuffleConfigPodBuilder = shuffleServiceConfig
.map { config =>
config.shuffleDirs.foldLeft(basePodBuilder) { (builder, dir) =>
Expand Down