diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 5f37a1abb1909..cf121749b7348 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -636,7 +636,9 @@ private[spark] object SparkConf extends Logging { DeprecatedConfig("spark.blacklist.killBlacklistedExecutors", "3.1.0", "Please use spark.excludeOnFailure.killExcludedExecutors"), DeprecatedConfig("spark.yarn.blacklist.executor.launch.blacklisting.enabled", "3.1.0", - "Please use spark.yarn.executor.launch.excludeOnFailure.enabled") + "Please use spark.yarn.executor.launch.excludeOnFailure.enabled"), + DeprecatedConfig("spark.kubernetes.memoryOverheadFactor", "3.3.0", + "Please use spark.driver.memoryOverheadFactor and spark.executor.memoryOverheadFactor") ) Map(configs.map { cfg => (cfg.key -> cfg) } : _*) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index dbec61a1fdb76..ffe4501248f43 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -105,6 +105,22 @@ package object config { .bytesConf(ByteUnit.MiB) .createOptional + private[spark] val DRIVER_MEMORY_OVERHEAD_FACTOR = + ConfigBuilder("spark.driver.memoryOverheadFactor") + .doc("Fraction of driver memory to be allocated as additional non-heap memory per driver " + + "process in cluster mode. This is memory that accounts for things like VM overheads, " + + "interned strings, other native overheads, etc. This tends to grow with the container " + + "size. This value defaults to 0.10 except for Kubernetes non-JVM jobs, which defaults to " + + "0.40. This is done as non-JVM tasks need more non-JVM heap space and such tasks " + + "commonly fail with \"Memory Overhead Exceeded\" errors. This preempts this error " + + "with a higher default. This value is ignored if spark.driver.memoryOverhead is set " + + "directly.") + .version("3.3.0") + .doubleConf + .checkValue(factor => factor > 0, + "Ensure that memory overhead is a double greater than 0") + .createWithDefault(0.1) + private[spark] val DRIVER_LOG_DFS_DIR = ConfigBuilder("spark.driver.log.dfsDir").version("3.0.0").stringConf.createOptional @@ -315,6 +331,18 @@ package object config { .bytesConf(ByteUnit.MiB) .createOptional + private[spark] val EXECUTOR_MEMORY_OVERHEAD_FACTOR = + ConfigBuilder("spark.executor.memoryOverheadFactor") + .doc("Fraction of executor memory to be allocated as additional non-heap memory per " + + "executor process. This is memory that accounts for things like VM overheads, " + + "interned strings, other native overheads, etc. This tends to grow with the container " + + "size. This value is ignored if spark.executor.memoryOverhead is set directly.") + .version("3.3.0") + .doubleConf + .checkValue(factor => factor > 0, + "Ensure that memory overhead is a double greater than 0") + .createWithDefault(0.1) + private[spark] val CORES_MAX = ConfigBuilder("spark.cores.max") .doc("When running on a standalone deploy cluster or a Mesos cluster in coarse-grained " + "sharing mode, the maximum amount of CPU cores to request for the application from across " + diff --git a/docs/configuration.md b/docs/configuration.md index ae3f422f34b3a..a2e6797b55e2f 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -183,7 +183,7 @@ of the most common options to set are: spark.driver.memoryOverhead - driverMemory * 0.10, with minimum of 384 + driverMemory * spark.driver.memoryOverheadFactor, with minimum of 384 Amount of non-heap memory to be allocated per driver process in cluster mode, in MiB unless otherwise specified. This is memory that accounts for things like VM overheads, interned strings, @@ -198,6 +198,21 @@ of the most common options to set are: 2.3.0 + + spark.driver.memoryOverheadFactor + 0.10 + + Fraction of driver memory to be allocated as additional non-heap memory per driver process in cluster mode. + This is memory that accounts for things like VM overheads, interned strings, + other native overheads, etc. This tends to grow with the container size. + This value defaults to 0.10 except for Kubernetes non-JVM jobs, which defaults to + 0.40. This is done as non-JVM tasks need more non-JVM heap space and such tasks + commonly fail with "Memory Overhead Exceeded" errors. This preempts this error + with a higher default. + This value is ignored if spark.driver.memoryOverhead is set directly. + + 3.3.0 + spark.driver.resource.{resourceName}.amount 0 @@ -272,7 +287,7 @@ of the most common options to set are: spark.executor.memoryOverhead - executorMemory * 0.10, with minimum of 384 + executorMemory * spark.executor.memoryOverheadFactor, with minimum of 384 Amount of additional memory to be allocated per executor process, in MiB unless otherwise specified. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. @@ -287,6 +302,17 @@ of the most common options to set are: 2.3.0 + + spark.executor.memoryOverheadFactor + 0.10 + + Fraction of executor memory to be allocated as additional non-heap memory per executor process. + This is memory that accounts for things like VM overheads, interned strings, + other native overheads, etc. This tends to grow with the container size. + This value is ignored if spark.executor.memoryOverhead is set directly. + + 3.3.0 + spark.executor.resource.{resourceName}.amount 0 diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 8553d7886acf0..cf4aef5a84c38 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -1137,15 +1137,6 @@ See the [configuration page](configuration.html) for information on Spark config 3.0.0 - - spark.kubernetes.memoryOverheadFactor - 0.1 - - This sets the Memory Overhead Factor that will allocate memory to non-JVM memory, which includes off-heap memory allocations, non-JVM tasks, various systems processes, and tmpfs-based local directories when spark.kubernetes.local.dirs.tmpfs is true. For JVM-based jobs this value will default to 0.10 and 0.40 for non-JVM jobs. - This is done as non-JVM tasks need more non-JVM heap space and such tasks commonly fail with "Memory Overhead Exceeded" errors. This preempts this error with a higher default. - - 2.4.0 - spark.kubernetes.pyspark.pythonVersion "3" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index f2104d433ad49..e490bea6382e2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -53,18 +53,23 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf) // Memory settings private val driverMemoryMiB = conf.get(DRIVER_MEMORY) + private val memoryOverheadFactor = if (conf.contains(DRIVER_MEMORY_OVERHEAD_FACTOR)) { + conf.get(DRIVER_MEMORY_OVERHEAD_FACTOR) + } else { + conf.get(MEMORY_OVERHEAD_FACTOR) + } // The memory overhead factor to use. If the user has not set it, then use a different // value for non-JVM apps. This value is propagated to executors. private val overheadFactor = if (conf.mainAppResource.isInstanceOf[NonJVMResource]) { - if (conf.contains(MEMORY_OVERHEAD_FACTOR)) { - conf.get(MEMORY_OVERHEAD_FACTOR) + if (conf.contains(MEMORY_OVERHEAD_FACTOR) || conf.contains(DRIVER_MEMORY_OVERHEAD_FACTOR)) { + memoryOverheadFactor } else { NON_JVM_MEMORY_OVERHEAD_FACTOR } } else { - conf.get(MEMORY_OVERHEAD_FACTOR) + memoryOverheadFactor } private val memoryOverheadMiB = conf @@ -163,7 +168,7 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf) KUBERNETES_DRIVER_POD_NAME.key -> driverPodName, "spark.app.id" -> conf.appId, KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true", - MEMORY_OVERHEAD_FACTOR.key -> overheadFactor.toString) + DRIVER_MEMORY_OVERHEAD_FACTOR.key -> overheadFactor.toString) // try upload local, resolvable files to a hadoop compatible file system Seq(JARS, FILES, ARCHIVES, SUBMIT_PYTHON_FILES).foreach { key => val uris = conf.get(key).filter(uri => KubernetesUtils.isLocalAndResolvable(uri)) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index c6084720c56fe..b0e5298d6f39d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -59,11 +59,16 @@ private[spark] class BasicExecutorFeatureStep( private val isDefaultProfile = resourceProfile.id == ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID private val isPythonApp = kubernetesConf.get(APP_RESOURCE_TYPE) == Some(APP_RESOURCE_TYPE_PYTHON) private val disableConfigMap = kubernetesConf.get(KUBERNETES_EXECUTOR_DISABLE_CONFIGMAP) + private val memoryOverheadFactor = if (kubernetesConf.contains(EXECUTOR_MEMORY_OVERHEAD_FACTOR)) { + kubernetesConf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR) + } else { + kubernetesConf.get(MEMORY_OVERHEAD_FACTOR) + } val execResources = ResourceProfile.getResourcesForClusterManager( resourceProfile.id, resourceProfile.executorResources, - kubernetesConf.get(MEMORY_OVERHEAD_FACTOR), + memoryOverheadFactor, kubernetesConf.sparkConf, isPythonApp, Map.empty) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala index 83444e5518e32..4c95041d4f139 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala @@ -129,7 +129,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { KUBERNETES_DRIVER_POD_NAME.key -> "spark-driver-pod", "spark.app.id" -> KubernetesTestConf.APP_ID, "spark.kubernetes.submitInDriver" -> "true", - MEMORY_OVERHEAD_FACTOR.key -> MEMORY_OVERHEAD_FACTOR.defaultValue.get.toString) + DRIVER_MEMORY_OVERHEAD_FACTOR.key -> DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get.toString) assert(featureStep.getAdditionalPodSystemProperties() === expectedSparkConf) } @@ -188,7 +188,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { // Memory overhead tests. Tuples are: // test name, main resource, overhead factor, expected factor Seq( - ("java", JavaMainAppResource(None), None, MEMORY_OVERHEAD_FACTOR.defaultValue.get), + ("java", JavaMainAppResource(None), None, DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get), ("python default", PythonMainAppResource(null), None, NON_JVM_MEMORY_OVERHEAD_FACTOR), ("python w/ override", PythonMainAppResource(null), Some(0.9d), 0.9d), ("r default", RMainAppResource(null), None, NON_JVM_MEMORY_OVERHEAD_FACTOR) @@ -196,13 +196,13 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { test(s"memory overhead factor: $name") { // Choose a driver memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB val driverMem = - ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2 + ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2 // main app resource, overhead factor val sparkConf = new SparkConf(false) .set(CONTAINER_IMAGE, "spark-driver:latest") .set(DRIVER_MEMORY.key, s"${driverMem.toInt}m") - factor.foreach { value => sparkConf.set(MEMORY_OVERHEAD_FACTOR, value) } + factor.foreach { value => sparkConf.set(DRIVER_MEMORY_OVERHEAD_FACTOR, value) } val conf = KubernetesTestConf.createDriverConf( sparkConf = sparkConf, mainAppResource = resource) @@ -213,10 +213,63 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { assert(mem === s"${expected}Mi") val systemProperties = step.getAdditionalPodSystemProperties() - assert(systemProperties(MEMORY_OVERHEAD_FACTOR.key) === expectedFactor.toString) + assert(systemProperties(DRIVER_MEMORY_OVERHEAD_FACTOR.key) === expectedFactor.toString) } } + test(s"SPARK-38194: memory overhead factor precendence") { + // Choose a driver memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB + val driverMem = + ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2 + + // main app resource, overhead factor + val sparkConf = new SparkConf(false) + .set(CONTAINER_IMAGE, "spark-driver:latest") + .set(DRIVER_MEMORY.key, s"${driverMem.toInt}m") + + // New config should take precedence + val expectedFactor = 0.2 + sparkConf.set(DRIVER_MEMORY_OVERHEAD_FACTOR, expectedFactor) + sparkConf.set(MEMORY_OVERHEAD_FACTOR, 0.3) + + val conf = KubernetesTestConf.createDriverConf( + sparkConf = sparkConf) + val step = new BasicDriverFeatureStep(conf) + val pod = step.configurePod(SparkPod.initialPod()) + val mem = amountAndFormat(pod.container.getResources.getRequests.get("memory")) + val expected = (driverMem + driverMem * expectedFactor).toInt + assert(mem === s"${expected}Mi") + + val systemProperties = step.getAdditionalPodSystemProperties() + assert(systemProperties(DRIVER_MEMORY_OVERHEAD_FACTOR.key) === expectedFactor.toString) + } + + test(s"SPARK-38194: old memory factor settings is applied if new one isn't given") { + // Choose a driver memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB + val driverMem = + ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2 + + // main app resource, overhead factor + val sparkConf = new SparkConf(false) + .set(CONTAINER_IMAGE, "spark-driver:latest") + .set(DRIVER_MEMORY.key, s"${driverMem.toInt}m") + + // Old config still works if new config isn't given + val expectedFactor = 0.3 + sparkConf.set(MEMORY_OVERHEAD_FACTOR, expectedFactor) + + val conf = KubernetesTestConf.createDriverConf( + sparkConf = sparkConf) + val step = new BasicDriverFeatureStep(conf) + val pod = step.configurePod(SparkPod.initialPod()) + val mem = amountAndFormat(pod.container.getResources.getRequests.get("memory")) + val expected = (driverMem + driverMem * expectedFactor).toInt + assert(mem === s"${expected}Mi") + + val systemProperties = step.getAdditionalPodSystemProperties() + assert(systemProperties(DRIVER_MEMORY_OVERHEAD_FACTOR.key) === expectedFactor.toString) + } + test("SPARK-35493: make spark.blockManager.port be able to be fallen back to in driver pod") { val initPod = SparkPod.initialPod() val sparkConf = new SparkConf() diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index f5f2712481604..731a9b77d2059 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -441,6 +441,60 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { )) } + test(s"SPARK-38194: memory overhead factor precendence") { + // Choose an executor memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB + val defaultFactor = EXECUTOR_MEMORY_OVERHEAD_FACTOR.defaultValue.get + val executorMem = ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / defaultFactor * 2 + + // main app resource, overhead factor + val sparkConf = new SparkConf(false) + .set(CONTAINER_IMAGE, "spark-driver:latest") + .set(EXECUTOR_MEMORY.key, s"${executorMem.toInt}m") + + // New config should take precedence + val expectedFactor = 0.2 + sparkConf.set(EXECUTOR_MEMORY_OVERHEAD_FACTOR, expectedFactor) + sparkConf.set(MEMORY_OVERHEAD_FACTOR, 0.3) + + val conf = KubernetesTestConf.createExecutorConf( + sparkConf = sparkConf) + ResourceProfile.clearDefaultProfile() + val resourceProfile = ResourceProfile.getOrCreateDefaultProfile(sparkConf) + val step = new BasicExecutorFeatureStep(conf, new SecurityManager(baseConf), + resourceProfile) + val pod = step.configurePod(SparkPod.initialPod()) + val mem = amountAndFormat(pod.container.getResources.getRequests.get("memory")) + val expected = (executorMem + executorMem * expectedFactor).toInt + assert(mem === s"${expected}Mi") + } + + test(s"SPARK-38194: old memory factor settings is applied if new one isn't given") { + // Choose an executor memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB + val defaultFactor = EXECUTOR_MEMORY_OVERHEAD_FACTOR.defaultValue.get + val executorMem = ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / defaultFactor * 2 + + // main app resource, overhead factor + val sparkConf = new SparkConf(false) + .set(CONTAINER_IMAGE, "spark-driver:latest") + .set(EXECUTOR_MEMORY.key, s"${executorMem.toInt}m") + + // New config should take precedence + val expectedFactor = 0.3 + sparkConf.set(MEMORY_OVERHEAD_FACTOR, expectedFactor) + + val conf = KubernetesTestConf.createExecutorConf( + sparkConf = sparkConf) + ResourceProfile.clearDefaultProfile() + val resourceProfile = ResourceProfile.getOrCreateDefaultProfile(sparkConf) + val step = new BasicExecutorFeatureStep(conf, new SecurityManager(baseConf), + resourceProfile) + val pod = step.configurePod(SparkPod.initialPod()) + val mem = amountAndFormat(pod.container.getResources.getRequests.get("memory")) + val expected = (executorMem + executorMem * expectedFactor).toInt + assert(mem === s"${expected}Mi") + } + + // There is always exactly one controller reference, and it points to the driver pod. private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = { assert(executor.getMetadata.getOwnerReferences.size() === 1) diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala index 2fd13a5903243..9e4187837b680 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala @@ -105,6 +105,7 @@ private[mesos] class MesosSubmitRequestServlet( val superviseDriver = sparkProperties.get(config.DRIVER_SUPERVISE.key) val driverMemory = sparkProperties.get(config.DRIVER_MEMORY.key) val driverMemoryOverhead = sparkProperties.get(config.DRIVER_MEMORY_OVERHEAD.key) + val driverMemoryOverheadFactor = sparkProperties.get(config.DRIVER_MEMORY_OVERHEAD_FACTOR.key) val driverCores = sparkProperties.get(config.DRIVER_CORES.key) val name = request.sparkProperties.getOrElse("spark.app.name", mainClass) @@ -121,8 +122,10 @@ private[mesos] class MesosSubmitRequestServlet( mainClass, appArgs, environmentVariables, extraClassPath, extraLibraryPath, javaOpts) val actualSuperviseDriver = superviseDriver.map(_.toBoolean).getOrElse(DEFAULT_SUPERVISE) val actualDriverMemory = driverMemory.map(Utils.memoryStringToMb).getOrElse(DEFAULT_MEMORY) + val actualDriverMemoryFactor = driverMemoryOverheadFactor.map(_.toDouble).getOrElse( + MEMORY_OVERHEAD_FACTOR) val actualDriverMemoryOverhead = driverMemoryOverhead.map(_.toInt).getOrElse( - math.max((MEMORY_OVERHEAD_FACTOR * actualDriverMemory).toInt, MEMORY_OVERHEAD_MIN)) + math.max((actualDriverMemoryFactor * actualDriverMemory).toInt, MEMORY_OVERHEAD_MIN)) val actualDriverCores = driverCores.map(_.toDouble).getOrElse(DEFAULT_CORES) val submitDate = new Date() val submissionId = newDriverId(submitDate) diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index 38f83df00e428..524b1d514fafe 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -387,8 +387,7 @@ trait MesosSchedulerUtils extends Logging { } } - // These defaults copied from YARN - private val MEMORY_OVERHEAD_FRACTION = 0.10 + // This default copied from YARN private val MEMORY_OVERHEAD_MINIMUM = 384 /** @@ -400,8 +399,9 @@ trait MesosSchedulerUtils extends Logging { * (whichever is larger) */ def executorMemory(sc: SparkContext): Int = { + val memoryOverheadFactor = sc.conf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR) sc.conf.get(mesosConfig.EXECUTOR_MEMORY_OVERHEAD).getOrElse( - math.max(MEMORY_OVERHEAD_FRACTION * sc.executorMemory, MEMORY_OVERHEAD_MINIMUM).toInt) + + math.max(memoryOverheadFactor * sc.executorMemory, MEMORY_OVERHEAD_MINIMUM).toInt) + sc.executorMemory } @@ -415,7 +415,8 @@ trait MesosSchedulerUtils extends Logging { * `MEMORY_OVERHEAD_FRACTION (=0.1) * driverMemory` */ def driverContainerMemory(driverDesc: MesosDriverDescription): Int = { - val defaultMem = math.max(MEMORY_OVERHEAD_FRACTION * driverDesc.mem, MEMORY_OVERHEAD_MINIMUM) + val memoryOverheadFactor = driverDesc.conf.get(DRIVER_MEMORY_OVERHEAD_FACTOR) + val defaultMem = math.max(memoryOverheadFactor * driverDesc.mem, MEMORY_OVERHEAD_MINIMUM) driverDesc.conf.get(mesosConfig.DRIVER_MEMORY_OVERHEAD).getOrElse(defaultMem.toInt) + driverDesc.mem } diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/deploy/rest/mesos/MesosRestServerSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/deploy/rest/mesos/MesosRestServerSuite.scala index 344fc38c84fb1..8bed43a54d5d0 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/deploy/rest/mesos/MesosRestServerSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/deploy/rest/mesos/MesosRestServerSuite.scala @@ -35,10 +35,16 @@ class MesosRestServerSuite extends SparkFunSuite testOverheadMemory(new SparkConf(), "2000M", 2384) } - test("test driver overhead memory with overhead factor") { + test("test driver overhead memory with default overhead factor") { testOverheadMemory(new SparkConf(), "5000M", 5500) } + test("test driver overhead memory with overhead factor") { + val conf = new SparkConf() + conf.set(config.DRIVER_MEMORY_OVERHEAD_FACTOR.key, "0.2") + testOverheadMemory(conf, "5000M", 6000) + } + test("test configured driver overhead memory") { val conf = new SparkConf() conf.set(config.DRIVER_MEMORY_OVERHEAD.key, "1000") diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index ae85ea8d6110a..f364b79216098 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -54,6 +54,7 @@ import org.apache.spark.api.python.PythonUtils import org.apache.spark.deploy.{SparkApplication, SparkHadoopUtil} import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.deploy.yarn.ResourceRequestHelper._ +import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ @@ -70,7 +71,6 @@ private[spark] class Client( extends Logging { import Client._ - import YarnSparkHadoopUtil._ private val yarnClient = YarnClient.createYarnClient private val hadoopConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf)) @@ -85,6 +85,12 @@ private[spark] class Client( private var appMaster: ApplicationMaster = _ private var stagingDirPath: Path = _ + private val amMemoryOverheadFactor = if (isClusterMode) { + sparkConf.get(DRIVER_MEMORY_OVERHEAD_FACTOR) + } else { + AM_MEMORY_OVERHEAD_FACTOR + } + // AM related configurations private val amMemory = if (isClusterMode) { sparkConf.get(DRIVER_MEMORY).toInt @@ -94,7 +100,7 @@ private[spark] class Client( private val amMemoryOverhead = { val amMemoryOverheadEntry = if (isClusterMode) DRIVER_MEMORY_OVERHEAD else AM_MEMORY_OVERHEAD sparkConf.get(amMemoryOverheadEntry).getOrElse( - math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toLong, + math.max((amMemoryOverheadFactor * amMemory).toLong, ResourceProfile.MEMORY_OVERHEAD_MIN_MIB)).toInt } private val amCores = if (isClusterMode) { @@ -107,8 +113,10 @@ private[spark] class Client( private val executorMemory = sparkConf.get(EXECUTOR_MEMORY) // Executor offHeap memory in MiB. protected val executorOffHeapMemory = Utils.executorOffHeapMemorySizeAsMb(sparkConf) + + private val executorMemoryOvereadFactor = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR) private val executorMemoryOverhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse( - math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong, + math.max((executorMemoryOvereadFactor * executorMemory).toLong, ResourceProfile.MEMORY_OVERHEAD_MIN_MIB)).toInt private val isPython = sparkConf.get(IS_PYTHON_APP) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 54ab643f2755b..a85b7174673af 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -163,6 +163,8 @@ private[yarn] class YarnAllocator( private val isPythonApp = sparkConf.get(IS_PYTHON_APP) + private val memoryOverheadFactor = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR) + private val launcherPool = ThreadUtils.newDaemonCachedThreadPool( "ContainerLauncher", sparkConf.get(CONTAINER_LAUNCH_MAX_THREADS)) @@ -280,9 +282,10 @@ private[yarn] class YarnAllocator( // track the resource profile if not already there getOrUpdateRunningExecutorForRPId(rp.id) logInfo(s"Resource profile ${rp.id} doesn't exist, adding it") + val resourcesWithDefaults = ResourceProfile.getResourcesForClusterManager(rp.id, rp.executorResources, - MEMORY_OVERHEAD_FACTOR, sparkConf, isPythonApp, resourceNameMapping) + memoryOverheadFactor, sparkConf, isPythonApp, resourceNameMapping) val customSparkResources = resourcesWithDefaults.customResources.map { case (name, execReq) => (name, execReq.amount.toString) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index f347e37ba24ab..1869c739e4844 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -34,11 +34,10 @@ import org.apache.spark.util.Utils object YarnSparkHadoopUtil { - // Additional memory overhead + // Additional memory overhead for application masters in client mode. // 10% was arrived at experimentally. In the interest of minimizing memory waste while covering // the common cases. Memory overhead tends to grow with container size. - - val MEMORY_OVERHEAD_FACTOR = 0.10 + val AM_MEMORY_OVERHEAD_FACTOR = 0.10 val ANY_HOST = "*" diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index db65d128b07f0..ae010f11503dd 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -706,4 +706,33 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter sparkConf.set(MEMORY_OFFHEAP_SIZE, originalOffHeapSize) } } + + test("SPARK-38194: Configurable memory overhead factor") { + val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toLong + try { + sparkConf.set(EXECUTOR_MEMORY_OVERHEAD_FACTOR, 0.5) + val (handler, _) = createAllocator(maxExecutors = 1, + additionalConfigs = Map(EXECUTOR_MEMORY.key -> executorMemory.toString)) + val defaultResource = handler.rpIdToYarnResource.get(defaultRPId) + val memory = defaultResource.getMemory + assert(memory == (executorMemory * 1.5).toLong) + } finally { + sparkConf.set(EXECUTOR_MEMORY_OVERHEAD_FACTOR, 0.1) + } + } + + test("SPARK-38194: Memory overhead takes precedence over factor") { + val executorMemory = sparkConf.get(EXECUTOR_MEMORY) + try { + sparkConf.set(EXECUTOR_MEMORY_OVERHEAD_FACTOR, 0.5) + sparkConf.set(EXECUTOR_MEMORY_OVERHEAD, (executorMemory * 0.4).toLong) + val (handler, _) = createAllocator(maxExecutors = 1, + additionalConfigs = Map(EXECUTOR_MEMORY.key -> executorMemory.toString)) + val defaultResource = handler.rpIdToYarnResource.get(defaultRPId) + val memory = defaultResource.getMemory + assert(memory == (executorMemory * 1.4).toLong) + } finally { + sparkConf.set(EXECUTOR_MEMORY_OVERHEAD_FACTOR, 0.1) + } + } }