Skip to content

Commit

Permalink
[SPARK-38194][YARN][MESOS][K8S] Make memory overhead factor configurable
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Add a new config to set the memory overhead factor for drivers and executors. Currently the memory overhead is hard coded to 10% (except in Kubernetes), and the only way to set it higher is to set it to a specific memory amount.

### Why are the changes needed?

In dynamic environments where different people or use cases need different memory requirements, it would be helpful to set a higher memory overhead factor instead of having to set a higher specific memory overhead value. The kubernetes resource manager already makes this configurable. This makes it configurable across the board.

### Does this PR introduce _any_ user-facing change?

No change to default behavior, just adds a new config users can change.

### How was this patch tested?

New UT to check the memory calculation.

Closes apache#35504 from Kimahriman/yarn-configurable-memory-overhead-factor.

Authored-by: Adam Binford <[email protected]>
Signed-off-by: Thomas Graves <[email protected]>
  • Loading branch information
Kimahriman committed Mar 19, 2022
1 parent 28f7d95 commit cebf398
Show file tree
Hide file tree
Showing 15 changed files with 248 additions and 35 deletions.
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,9 @@ private[spark] object SparkConf extends Logging {
DeprecatedConfig("spark.blacklist.killBlacklistedExecutors", "3.1.0",
"Please use spark.excludeOnFailure.killExcludedExecutors"),
DeprecatedConfig("spark.yarn.blacklist.executor.launch.blacklisting.enabled", "3.1.0",
"Please use spark.yarn.executor.launch.excludeOnFailure.enabled")
"Please use spark.yarn.executor.launch.excludeOnFailure.enabled"),
DeprecatedConfig("spark.kubernetes.memoryOverheadFactor", "3.3.0",
"Please use spark.driver.memoryOverheadFactor and spark.executor.memoryOverheadFactor")
)

Map(configs.map { cfg => (cfg.key -> cfg) } : _*)
Expand Down
28 changes: 28 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,22 @@ package object config {
.bytesConf(ByteUnit.MiB)
.createOptional

private[spark] val DRIVER_MEMORY_OVERHEAD_FACTOR =
ConfigBuilder("spark.driver.memoryOverheadFactor")
.doc("Fraction of driver memory to be allocated as additional non-heap memory per driver " +
"process in cluster mode. This is memory that accounts for things like VM overheads, " +
"interned strings, other native overheads, etc. This tends to grow with the container " +
"size. This value defaults to 0.10 except for Kubernetes non-JVM jobs, which defaults to " +
"0.40. This is done as non-JVM tasks need more non-JVM heap space and such tasks " +
"commonly fail with \"Memory Overhead Exceeded\" errors. This preempts this error " +
"with a higher default. This value is ignored if spark.driver.memoryOverhead is set " +
"directly.")
.version("3.3.0")
.doubleConf
.checkValue(factor => factor > 0,
"Ensure that memory overhead is a double greater than 0")
.createWithDefault(0.1)

private[spark] val DRIVER_LOG_DFS_DIR =
ConfigBuilder("spark.driver.log.dfsDir").version("3.0.0").stringConf.createOptional

Expand Down Expand Up @@ -315,6 +331,18 @@ package object config {
.bytesConf(ByteUnit.MiB)
.createOptional

private[spark] val EXECUTOR_MEMORY_OVERHEAD_FACTOR =
ConfigBuilder("spark.executor.memoryOverheadFactor")
.doc("Fraction of executor memory to be allocated as additional non-heap memory per " +
"executor process. This is memory that accounts for things like VM overheads, " +
"interned strings, other native overheads, etc. This tends to grow with the container " +
"size. This value is ignored if spark.executor.memoryOverhead is set directly.")
.version("3.3.0")
.doubleConf
.checkValue(factor => factor > 0,
"Ensure that memory overhead is a double greater than 0")
.createWithDefault(0.1)

private[spark] val CORES_MAX = ConfigBuilder("spark.cores.max")
.doc("When running on a standalone deploy cluster or a Mesos cluster in coarse-grained " +
"sharing mode, the maximum amount of CPU cores to request for the application from across " +
Expand Down
30 changes: 28 additions & 2 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ of the most common options to set are:
</tr>
<tr>
<td><code>spark.driver.memoryOverhead</code></td>
<td>driverMemory * 0.10, with minimum of 384 </td>
<td>driverMemory * <code>spark.driver.memoryOverheadFactor</code>, with minimum of 384 </td>
<td>
Amount of non-heap memory to be allocated per driver process in cluster mode, in MiB unless
otherwise specified. This is memory that accounts for things like VM overheads, interned strings,
Expand All @@ -198,6 +198,21 @@ of the most common options to set are:
</td>
<td>2.3.0</td>
</tr>
<tr>
<td><code>spark.driver.memoryOverheadFactor</code></td>
<td>0.10</td>
<td>
Fraction of driver memory to be allocated as additional non-heap memory per driver process in cluster mode.
This is memory that accounts for things like VM overheads, interned strings,
other native overheads, etc. This tends to grow with the container size.
This value defaults to 0.10 except for Kubernetes non-JVM jobs, which defaults to
0.40. This is done as non-JVM tasks need more non-JVM heap space and such tasks
commonly fail with "Memory Overhead Exceeded" errors. This preempts this error
with a higher default.
This value is ignored if <code>spark.driver.memoryOverhead</code> is set directly.
</td>
<td>3.3.0</td>
</tr>
<tr>
<td><code>spark.driver.resource.{resourceName}.amount</code></td>
<td>0</td>
Expand Down Expand Up @@ -272,7 +287,7 @@ of the most common options to set are:
</tr>
<tr>
<td><code>spark.executor.memoryOverhead</code></td>
<td>executorMemory * 0.10, with minimum of 384 </td>
<td>executorMemory * <code>spark.executor.memoryOverheadFactor</code>, with minimum of 384 </td>
<td>
Amount of additional memory to be allocated per executor process, in MiB unless otherwise specified.
This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc.
Expand All @@ -287,6 +302,17 @@ of the most common options to set are:
</td>
<td>2.3.0</td>
</tr>
<tr>
<td><code>spark.executor.memoryOverheadFactor</code></td>
<td>0.10</td>
<td>
Fraction of executor memory to be allocated as additional non-heap memory per executor process.
This is memory that accounts for things like VM overheads, interned strings,
other native overheads, etc. This tends to grow with the container size.
This value is ignored if <code>spark.executor.memoryOverhead</code> is set directly.
</td>
<td>3.3.0</td>
</tr>
<tr>
<td><code>spark.executor.resource.{resourceName}.amount</code></td>
<td>0</td>
Expand Down
9 changes: 0 additions & 9 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -1137,15 +1137,6 @@ See the [configuration page](configuration.html) for information on Spark config
</td>
<td>3.0.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.memoryOverheadFactor</code></td>
<td><code>0.1</code></td>
<td>
This sets the Memory Overhead Factor that will allocate memory to non-JVM memory, which includes off-heap memory allocations, non-JVM tasks, various systems processes, and <code>tmpfs</code>-based local directories when <code>spark.kubernetes.local.dirs.tmpfs</code> is <code>true</code>. For JVM-based jobs this value will default to 0.10 and 0.40 for non-JVM jobs.
This is done as non-JVM tasks need more non-JVM heap space and such tasks commonly fail with "Memory Overhead Exceeded" errors. This preempts this error with a higher default.
</td>
<td>2.4.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.pyspark.pythonVersion</code></td>
<td><code>"3"</code></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,23 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)

// Memory settings
private val driverMemoryMiB = conf.get(DRIVER_MEMORY)
private val memoryOverheadFactor = if (conf.contains(DRIVER_MEMORY_OVERHEAD_FACTOR)) {
conf.get(DRIVER_MEMORY_OVERHEAD_FACTOR)
} else {
conf.get(MEMORY_OVERHEAD_FACTOR)
}

// The memory overhead factor to use. If the user has not set it, then use a different
// value for non-JVM apps. This value is propagated to executors.
private val overheadFactor =
if (conf.mainAppResource.isInstanceOf[NonJVMResource]) {
if (conf.contains(MEMORY_OVERHEAD_FACTOR)) {
conf.get(MEMORY_OVERHEAD_FACTOR)
if (conf.contains(MEMORY_OVERHEAD_FACTOR) || conf.contains(DRIVER_MEMORY_OVERHEAD_FACTOR)) {
memoryOverheadFactor
} else {
NON_JVM_MEMORY_OVERHEAD_FACTOR
}
} else {
conf.get(MEMORY_OVERHEAD_FACTOR)
memoryOverheadFactor
}

private val memoryOverheadMiB = conf
Expand Down Expand Up @@ -164,7 +169,7 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
KUBERNETES_DRIVER_POD_NAME.key -> driverPodName,
"spark.app.id" -> conf.appId,
KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true",
MEMORY_OVERHEAD_FACTOR.key -> overheadFactor.toString)
DRIVER_MEMORY_OVERHEAD_FACTOR.key -> overheadFactor.toString)
// try upload local, resolvable files to a hadoop compatible file system
Seq(JARS, FILES, ARCHIVES, SUBMIT_PYTHON_FILES).foreach { key =>
val uris = conf.get(key).filter(uri => KubernetesUtils.isLocalAndResolvable(uri))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,16 @@ private[spark] class BasicExecutorFeatureStep(
private val isDefaultProfile = resourceProfile.id == ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
private val isPythonApp = kubernetesConf.get(APP_RESOURCE_TYPE) == Some(APP_RESOURCE_TYPE_PYTHON)
private val disableConfigMap = kubernetesConf.get(KUBERNETES_EXECUTOR_DISABLE_CONFIGMAP)
private val memoryOverheadFactor = if (kubernetesConf.contains(EXECUTOR_MEMORY_OVERHEAD_FACTOR)) {
kubernetesConf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR)
} else {
kubernetesConf.get(MEMORY_OVERHEAD_FACTOR)
}

val execResources = ResourceProfile.getResourcesForClusterManager(
resourceProfile.id,
resourceProfile.executorResources,
kubernetesConf.get(MEMORY_OVERHEAD_FACTOR),
memoryOverheadFactor,
kubernetesConf.sparkConf,
isPythonApp,
Map.empty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
KUBERNETES_DRIVER_POD_NAME.key -> "spark-driver-pod",
"spark.app.id" -> KubernetesTestConf.APP_ID,
"spark.kubernetes.submitInDriver" -> "true",
MEMORY_OVERHEAD_FACTOR.key -> MEMORY_OVERHEAD_FACTOR.defaultValue.get.toString)
DRIVER_MEMORY_OVERHEAD_FACTOR.key -> DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get.toString)
assert(featureStep.getAdditionalPodSystemProperties() === expectedSparkConf)
}

Expand Down Expand Up @@ -193,21 +193,21 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
// Memory overhead tests. Tuples are:
// test name, main resource, overhead factor, expected factor
Seq(
("java", JavaMainAppResource(None), None, MEMORY_OVERHEAD_FACTOR.defaultValue.get),
("java", JavaMainAppResource(None), None, DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get),
("python default", PythonMainAppResource(null), None, NON_JVM_MEMORY_OVERHEAD_FACTOR),
("python w/ override", PythonMainAppResource(null), Some(0.9d), 0.9d),
("r default", RMainAppResource(null), None, NON_JVM_MEMORY_OVERHEAD_FACTOR)
).foreach { case (name, resource, factor, expectedFactor) =>
test(s"memory overhead factor: $name") {
// Choose a driver memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB
val driverMem =
ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2
ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2

// main app resource, overhead factor
val sparkConf = new SparkConf(false)
.set(CONTAINER_IMAGE, "spark-driver:latest")
.set(DRIVER_MEMORY.key, s"${driverMem.toInt}m")
factor.foreach { value => sparkConf.set(MEMORY_OVERHEAD_FACTOR, value) }
factor.foreach { value => sparkConf.set(DRIVER_MEMORY_OVERHEAD_FACTOR, value) }
val conf = KubernetesTestConf.createDriverConf(
sparkConf = sparkConf,
mainAppResource = resource)
Expand All @@ -218,10 +218,63 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
assert(mem === s"${expected}Mi")

val systemProperties = step.getAdditionalPodSystemProperties()
assert(systemProperties(MEMORY_OVERHEAD_FACTOR.key) === expectedFactor.toString)
assert(systemProperties(DRIVER_MEMORY_OVERHEAD_FACTOR.key) === expectedFactor.toString)
}
}

test(s"SPARK-38194: memory overhead factor precendence") {
// Choose a driver memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB
val driverMem =
ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2

// main app resource, overhead factor
val sparkConf = new SparkConf(false)
.set(CONTAINER_IMAGE, "spark-driver:latest")
.set(DRIVER_MEMORY.key, s"${driverMem.toInt}m")

// New config should take precedence
val expectedFactor = 0.2
sparkConf.set(DRIVER_MEMORY_OVERHEAD_FACTOR, expectedFactor)
sparkConf.set(MEMORY_OVERHEAD_FACTOR, 0.3)

val conf = KubernetesTestConf.createDriverConf(
sparkConf = sparkConf)
val step = new BasicDriverFeatureStep(conf)
val pod = step.configurePod(SparkPod.initialPod())
val mem = amountAndFormat(pod.container.getResources.getRequests.get("memory"))
val expected = (driverMem + driverMem * expectedFactor).toInt
assert(mem === s"${expected}Mi")

val systemProperties = step.getAdditionalPodSystemProperties()
assert(systemProperties(DRIVER_MEMORY_OVERHEAD_FACTOR.key) === expectedFactor.toString)
}

test(s"SPARK-38194: old memory factor settings is applied if new one isn't given") {
// Choose a driver memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB
val driverMem =
ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2

// main app resource, overhead factor
val sparkConf = new SparkConf(false)
.set(CONTAINER_IMAGE, "spark-driver:latest")
.set(DRIVER_MEMORY.key, s"${driverMem.toInt}m")

// Old config still works if new config isn't given
val expectedFactor = 0.3
sparkConf.set(MEMORY_OVERHEAD_FACTOR, expectedFactor)

val conf = KubernetesTestConf.createDriverConf(
sparkConf = sparkConf)
val step = new BasicDriverFeatureStep(conf)
val pod = step.configurePod(SparkPod.initialPod())
val mem = amountAndFormat(pod.container.getResources.getRequests.get("memory"))
val expected = (driverMem + driverMem * expectedFactor).toInt
assert(mem === s"${expected}Mi")

val systemProperties = step.getAdditionalPodSystemProperties()
assert(systemProperties(DRIVER_MEMORY_OVERHEAD_FACTOR.key) === expectedFactor.toString)
}

test("SPARK-35493: make spark.blockManager.port be able to be fallen back to in driver pod") {
val initPod = SparkPod.initialPod()
val sparkConf = new SparkConf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,60 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
))
}

test(s"SPARK-38194: memory overhead factor precendence") {
// Choose an executor memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB
val defaultFactor = EXECUTOR_MEMORY_OVERHEAD_FACTOR.defaultValue.get
val executorMem = ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / defaultFactor * 2

// main app resource, overhead factor
val sparkConf = new SparkConf(false)
.set(CONTAINER_IMAGE, "spark-driver:latest")
.set(EXECUTOR_MEMORY.key, s"${executorMem.toInt}m")

// New config should take precedence
val expectedFactor = 0.2
sparkConf.set(EXECUTOR_MEMORY_OVERHEAD_FACTOR, expectedFactor)
sparkConf.set(MEMORY_OVERHEAD_FACTOR, 0.3)

val conf = KubernetesTestConf.createExecutorConf(
sparkConf = sparkConf)
ResourceProfile.clearDefaultProfile()
val resourceProfile = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
val step = new BasicExecutorFeatureStep(conf, new SecurityManager(baseConf),
resourceProfile)
val pod = step.configurePod(SparkPod.initialPod())
val mem = amountAndFormat(pod.container.getResources.getRequests.get("memory"))
val expected = (executorMem + executorMem * expectedFactor).toInt
assert(mem === s"${expected}Mi")
}

test(s"SPARK-38194: old memory factor settings is applied if new one isn't given") {
// Choose an executor memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB
val defaultFactor = EXECUTOR_MEMORY_OVERHEAD_FACTOR.defaultValue.get
val executorMem = ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / defaultFactor * 2

// main app resource, overhead factor
val sparkConf = new SparkConf(false)
.set(CONTAINER_IMAGE, "spark-driver:latest")
.set(EXECUTOR_MEMORY.key, s"${executorMem.toInt}m")

// New config should take precedence
val expectedFactor = 0.3
sparkConf.set(MEMORY_OVERHEAD_FACTOR, expectedFactor)

val conf = KubernetesTestConf.createExecutorConf(
sparkConf = sparkConf)
ResourceProfile.clearDefaultProfile()
val resourceProfile = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
val step = new BasicExecutorFeatureStep(conf, new SecurityManager(baseConf),
resourceProfile)
val pod = step.configurePod(SparkPod.initialPod())
val mem = amountAndFormat(pod.container.getResources.getRequests.get("memory"))
val expected = (executorMem + executorMem * expectedFactor).toInt
assert(mem === s"${expected}Mi")
}


// There is always exactly one controller reference, and it points to the driver pod.
private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = {
assert(executor.getMetadata.getOwnerReferences.size() === 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ private[mesos] class MesosSubmitRequestServlet(
val superviseDriver = sparkProperties.get(config.DRIVER_SUPERVISE.key)
val driverMemory = sparkProperties.get(config.DRIVER_MEMORY.key)
val driverMemoryOverhead = sparkProperties.get(config.DRIVER_MEMORY_OVERHEAD.key)
val driverMemoryOverheadFactor = sparkProperties.get(config.DRIVER_MEMORY_OVERHEAD_FACTOR.key)
val driverCores = sparkProperties.get(config.DRIVER_CORES.key)
val name = request.sparkProperties.getOrElse("spark.app.name", mainClass)

Expand All @@ -121,8 +122,10 @@ private[mesos] class MesosSubmitRequestServlet(
mainClass, appArgs, environmentVariables, extraClassPath, extraLibraryPath, javaOpts)
val actualSuperviseDriver = superviseDriver.map(_.toBoolean).getOrElse(DEFAULT_SUPERVISE)
val actualDriverMemory = driverMemory.map(Utils.memoryStringToMb).getOrElse(DEFAULT_MEMORY)
val actualDriverMemoryFactor = driverMemoryOverheadFactor.map(_.toDouble).getOrElse(
MEMORY_OVERHEAD_FACTOR)
val actualDriverMemoryOverhead = driverMemoryOverhead.map(_.toInt).getOrElse(
math.max((MEMORY_OVERHEAD_FACTOR * actualDriverMemory).toInt, MEMORY_OVERHEAD_MIN))
math.max((actualDriverMemoryFactor * actualDriverMemory).toInt, MEMORY_OVERHEAD_MIN))
val actualDriverCores = driverCores.map(_.toDouble).getOrElse(DEFAULT_CORES)
val submitDate = new Date()
val submissionId = newDriverId(submitDate)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,8 +387,7 @@ trait MesosSchedulerUtils extends Logging {
}
}

// These defaults copied from YARN
private val MEMORY_OVERHEAD_FRACTION = 0.10
// This default copied from YARN
private val MEMORY_OVERHEAD_MINIMUM = 384

/**
Expand All @@ -400,8 +399,9 @@ trait MesosSchedulerUtils extends Logging {
* (whichever is larger)
*/
def executorMemory(sc: SparkContext): Int = {
val memoryOverheadFactor = sc.conf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR)
sc.conf.get(mesosConfig.EXECUTOR_MEMORY_OVERHEAD).getOrElse(
math.max(MEMORY_OVERHEAD_FRACTION * sc.executorMemory, MEMORY_OVERHEAD_MINIMUM).toInt) +
math.max(memoryOverheadFactor * sc.executorMemory, MEMORY_OVERHEAD_MINIMUM).toInt) +
sc.executorMemory
}

Expand All @@ -415,7 +415,8 @@ trait MesosSchedulerUtils extends Logging {
* `MEMORY_OVERHEAD_FRACTION (=0.1) * driverMemory`
*/
def driverContainerMemory(driverDesc: MesosDriverDescription): Int = {
val defaultMem = math.max(MEMORY_OVERHEAD_FRACTION * driverDesc.mem, MEMORY_OVERHEAD_MINIMUM)
val memoryOverheadFactor = driverDesc.conf.get(DRIVER_MEMORY_OVERHEAD_FACTOR)
val defaultMem = math.max(memoryOverheadFactor * driverDesc.mem, MEMORY_OVERHEAD_MINIMUM)
driverDesc.conf.get(mesosConfig.DRIVER_MEMORY_OVERHEAD).getOrElse(defaultMem.toInt) +
driverDesc.mem
}
Expand Down
Loading

0 comments on commit cebf398

Please sign in to comment.