From aae64f1890148654df269296bc177a66da2cdbf2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?moonfang=28=E6=88=BF=E5=AD=9D=E6=95=AC=29?= Date: Thu, 22 Jun 2017 11:25:05 +0800 Subject: [PATCH] add cpu limit to driver/executor pod --- docs/running-on-kubernetes.md | 14 ++++++++++++++ .../apache/spark/deploy/kubernetes/config.scala | 12 ++++++++++++ .../spark/deploy/kubernetes/submit/Client.scala | 17 ++++++++++++++++- .../KubernetesClusterSchedulerBackend.scala | 17 ++++++++++++++++- 4 files changed, 58 insertions(+), 2 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 52d847b4420cf..3a50860f826c5 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -718,6 +718,20 @@ from the other deployment modes. See the [configuration page](configuration.html Docker image pull policy used when pulling Docker images with Kubernetes. + + spark.kubernetes.driver.limit.cores + (none) + + Specify the hard cpu limit for the driver pod + + + + spark.kubernetes.executor.limit.cores + (none) + + Specify the hard cpu limit for a single executor pod + + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index 70ea19e44ef8c..e1c1ab9d459fc 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -485,6 +485,18 @@ package object config extends Logging { .stringConf .createOptional + private[spark] val KUBERNETES_DRIVER_LIMIT_CORES = + ConfigBuilder("spark.kubernetes.driver.limit.cores") + .doc("Specify the hard cpu limit for the driver pod") + .stringConf + .createOptional + + private[spark] val KUBERNETES_EXECUTOR_LIMIT_CORES = + ConfigBuilder("spark.kubernetes.executor.limit.cores") + .doc("Specify the hard cpu limit for a single executor pod") + .stringConf + .createOptional + private[spark] def resolveK8sMaster(rawMasterString: String): String = { if (!rawMasterString.startsWith("k8s://")) { throw new IllegalArgumentException("Master URL should start with k8s:// in Kubernetes mode.") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala index ac3a51e74f838..8220127eac449 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala @@ -64,6 +64,7 @@ private[spark] class Client( // CPU settings private val driverCpuCores = sparkConf.getOption("spark.driver.cores").getOrElse("1") + private val driverLimitCores = sparkConf.getOption(KUBERNETES_DRIVER_LIMIT_CORES.key) // Memory settings private val driverMemoryMb = sparkConf.get(org.apache.spark.internal.config.DRIVER_MEMORY) @@ -139,7 +140,6 @@ private[spark] class Client( .endEnv() .withNewResources() .addToRequests("cpu", driverCpuQuantity) - .addToLimits("cpu", driverCpuQuantity) .addToRequests("memory", driverMemoryQuantity) .addToLimits("memory", driverMemoryLimitQuantity) .endResources() @@ -156,6 +156,21 @@ private[spark] class Client( .addToContainers(driverContainer) .endSpec() + driverLimitCores.map { + limitCores => + val driverCpuLimitQuantity = new QuantityBuilder(false) + .withAmount(limitCores) + .build() + basePod + .editSpec() + .editFirstContainer() + .editResources + .addToLimits("cpu", driverCpuLimitQuantity) + .endResources() + .endContainer() + .endSpec() + } + val maybeSubmittedResourceIdentifiers = initContainerComponentsProvider .provideInitContainerSubmittedDependencyUploader(allDriverLabels) .map { uploader => diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 4165eb8cbd067..31cf929b94e8b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -108,6 +108,7 @@ private[spark] class KubernetesClusterSchedulerBackend( private val executorMemoryWithOverhead = executorMemoryMb + memoryOverheadMb private val executorCores = conf.getOption("spark.executor.cores").getOrElse("1") + private val executorLimitCores = conf.getOption(KUBERNETES_EXECUTOR_LIMIT_CORES.key) private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("kubernetes-executor-requests")) @@ -438,7 +439,6 @@ private[spark] class KubernetesClusterSchedulerBackend( .addToRequests("memory", executorMemoryQuantity) .addToLimits("memory", executorMemoryLimitQuantity) .addToRequests("cpu", executorCpuQuantity) - .addToLimits("cpu", executorCpuQuantity) .endResources() .addAllToEnv(requiredEnv.asJava) .addToEnv(executorExtraClasspathEnv.toSeq: _*) @@ -446,6 +446,21 @@ private[spark] class KubernetesClusterSchedulerBackend( .endContainer() .endSpec() + executorLimitCores.map { + limitCores => + val executorCpuLimitQuantity = new QuantityBuilder(false) + .withAmount(limitCores) + .build() + basePodBuilder + .editSpec() + .editFirstContainer() + .editResources + .addToLimits("cpu", executorCpuLimitQuantity) + .endResources() + .endContainer() + .endSpec() + } + val withMaybeShuffleConfigPodBuilder = shuffleServiceConfig .map { config => config.shuffleDirs.foldLeft(basePodBuilder) { (builder, dir) =>