Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-38194][YARN][MESOS][K8S] Make memory overhead factor configurable #35504

4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,9 @@ private[spark] object SparkConf extends Logging {
DeprecatedConfig("spark.blacklist.killBlacklistedExecutors", "3.1.0",
"Please use spark.excludeOnFailure.killExcludedExecutors"),
DeprecatedConfig("spark.yarn.blacklist.executor.launch.blacklisting.enabled", "3.1.0",
"Please use spark.yarn.executor.launch.excludeOnFailure.enabled")
"Please use spark.yarn.executor.launch.excludeOnFailure.enabled"),
DeprecatedConfig("spark.kubernetes.memoryOverheadFactor", "3.3.0",
"Please use spark.driver.memoryOverheadFactor and spark.executor.memoryOverheadFactor")
)

Map(configs.map { cfg => (cfg.key -> cfg) } : _*)
Expand Down
28 changes: 28 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,22 @@ package object config {
.bytesConf(ByteUnit.MiB)
.createOptional

private[spark] val DRIVER_MEMORY_OVERHEAD_FACTOR =
ConfigBuilder("spark.driver.memoryOverheadFactor")
.doc("Fraction of driver memory to be allocated as additional non-heap memory per driver " +
"process in cluster mode. This is memory that accounts for things like VM overheads, " +
"interned strings, other native overheads, etc. This tends to grow with the container " +
"size. This value defaults to 0.10 except for Kubernetes non-JVM jobs, which defaults to " +
"0.40. This is done as non-JVM tasks need more non-JVM heap space and such tasks " +
"commonly fail with \"Memory Overhead Exceeded\" errors. This preempts this error " +
"with a higher default. This value is ignored if spark.driver.memoryOverhead is set " +
"directly.")
.version("3.3.0")
.doubleConf
.checkValue(factor => factor > 0,
"Ensure that memory overhead is a double greater than 0")
.createWithDefault(0.1)

private[spark] val DRIVER_LOG_DFS_DIR =
ConfigBuilder("spark.driver.log.dfsDir").version("3.0.0").stringConf.createOptional

Expand Down Expand Up @@ -315,6 +331,18 @@ package object config {
.bytesConf(ByteUnit.MiB)
.createOptional

private[spark] val EXECUTOR_MEMORY_OVERHEAD_FACTOR =
ConfigBuilder("spark.executor.memoryOverheadFactor")
.doc("Fraction of executor memory to be allocated as additional non-heap memory per " +
"executor process. This is memory that accounts for things like VM overheads, " +
"interned strings, other native overheads, etc. This tends to grow with the container " +
"size. This value is ignored if spark.executor.memoryOverhead is set directly.")
.version("3.3.0")
.doubleConf
.checkValue(factor => factor > 0,
"Ensure that memory overhead is a double greater than 0")
.createWithDefault(0.1)

private[spark] val CORES_MAX = ConfigBuilder("spark.cores.max")
.doc("When running on a standalone deploy cluster or a Mesos cluster in coarse-grained " +
"sharing mode, the maximum amount of CPU cores to request for the application from across " +
Expand Down
30 changes: 28 additions & 2 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ of the most common options to set are:
</tr>
<tr>
<td><code>spark.driver.memoryOverhead</code></td>
<td>driverMemory * 0.10, with minimum of 384 </td>
<td>driverMemory * <code>spark.driver.memoryOverheadFactor</code>, with minimum of 384 </td>
<td>
Amount of non-heap memory to be allocated per driver process in cluster mode, in MiB unless
otherwise specified. This is memory that accounts for things like VM overheads, interned strings,
Expand All @@ -198,6 +198,21 @@ of the most common options to set are:
</td>
<td>2.3.0</td>
</tr>
<tr>
<td><code>spark.driver.memoryOverheadFactor</code></td>
<td>0.10</td>
<td>
Fraction of driver memory to be allocated as additional non-heap memory per driver process in cluster mode.
This is memory that accounts for things like VM overheads, interned strings,
other native overheads, etc. This tends to grow with the container size.
This value defaults to 0.10 except for Kubernetes non-JVM jobs, which defaults to
0.40. This is done as non-JVM tasks need more non-JVM heap space and such tasks
commonly fail with "Memory Overhead Exceeded" errors. This preempts this error
with a higher default.
This value is ignored if <code>spark.driver.memoryOverhead</code> is set directly.
</td>
<td>3.3.0</td>
</tr>
<tr>
<td><code>spark.driver.resource.{resourceName}.amount</code></td>
<td>0</td>
Expand Down Expand Up @@ -272,7 +287,7 @@ of the most common options to set are:
</tr>
<tr>
<td><code>spark.executor.memoryOverhead</code></td>
<td>executorMemory * 0.10, with minimum of 384 </td>
<td>executorMemory * <code>spark.executor.memoryOverheadFactor</code>, with minimum of 384 </td>
<td>
Amount of additional memory to be allocated per executor process, in MiB unless otherwise specified.
This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc.
Expand All @@ -287,6 +302,17 @@ of the most common options to set are:
</td>
<td>2.3.0</td>
</tr>
<tr>
<td><code>spark.executor.memoryOverheadFactor</code></td>
<td>0.10</td>
<td>
Fraction of executor memory to be allocated as additional non-heap memory per executor process.
This is memory that accounts for things like VM overheads, interned strings,
other native overheads, etc. This tends to grow with the container size.
This value is ignored if <code>spark.executor.memoryOverhead</code> is set directly.
</td>
<td>3.3.0</td>
</tr>
<tr>
<td><code>spark.executor.resource.{resourceName}.amount</code></td>
<td>0</td>
Expand Down
9 changes: 0 additions & 9 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -1137,15 +1137,6 @@ See the [configuration page](configuration.html) for information on Spark config
</td>
<td>3.0.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.memoryOverheadFactor</code></td>
<td><code>0.1</code></td>
<td>
This sets the Memory Overhead Factor that will allocate memory to non-JVM memory, which includes off-heap memory allocations, non-JVM tasks, various systems processes, and <code>tmpfs</code>-based local directories when <code>spark.kubernetes.local.dirs.tmpfs</code> is <code>true</code>. For JVM-based jobs this value will default to 0.10 and 0.40 for non-JVM jobs.
This is done as non-JVM tasks need more non-JVM heap space and such tasks commonly fail with "Memory Overhead Exceeded" errors. This preempts this error with a higher default.
</td>
<td>2.4.0</td>
</tr>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is (3). We should not remove a documentation during deprecation stage.

<tr>
<td><code>spark.kubernetes.pyspark.pythonVersion</code></td>
<td><code>"3"</code></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,23 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)

// Memory settings
private val driverMemoryMiB = conf.get(DRIVER_MEMORY)
private val memoryOverheadFactor = if (conf.contains(DRIVER_MEMORY_OVERHEAD_FACTOR)) {
conf.get(DRIVER_MEMORY_OVERHEAD_FACTOR)
} else {
conf.get(MEMORY_OVERHEAD_FACTOR)
}

// The memory overhead factor to use. If the user has not set it, then use a different
// value for non-JVM apps. This value is propagated to executors.
private val overheadFactor =
if (conf.mainAppResource.isInstanceOf[NonJVMResource]) {
if (conf.contains(MEMORY_OVERHEAD_FACTOR)) {
conf.get(MEMORY_OVERHEAD_FACTOR)
if (conf.contains(MEMORY_OVERHEAD_FACTOR) || conf.contains(DRIVER_MEMORY_OVERHEAD_FACTOR)) {
memoryOverheadFactor
} else {
NON_JVM_MEMORY_OVERHEAD_FACTOR
}
} else {
conf.get(MEMORY_OVERHEAD_FACTOR)
memoryOverheadFactor
}

private val memoryOverheadMiB = conf
Expand Down Expand Up @@ -163,7 +168,7 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
KUBERNETES_DRIVER_POD_NAME.key -> driverPodName,
"spark.app.id" -> conf.appId,
KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true",
MEMORY_OVERHEAD_FACTOR.key -> overheadFactor.toString)
DRIVER_MEMORY_OVERHEAD_FACTOR.key -> overheadFactor.toString)
// try upload local, resolvable files to a hadoop compatible file system
Seq(JARS, FILES, ARCHIVES, SUBMIT_PYTHON_FILES).foreach { key =>
val uris = conf.get(key).filter(uri => KubernetesUtils.isLocalAndResolvable(uri))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,16 @@ private[spark] class BasicExecutorFeatureStep(
private val isDefaultProfile = resourceProfile.id == ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
private val isPythonApp = kubernetesConf.get(APP_RESOURCE_TYPE) == Some(APP_RESOURCE_TYPE_PYTHON)
private val disableConfigMap = kubernetesConf.get(KUBERNETES_EXECUTOR_DISABLE_CONFIGMAP)
private val memoryOverheadFactor = if (kubernetesConf.contains(EXECUTOR_MEMORY_OVERHEAD_FACTOR)) {
kubernetesConf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR)
} else {
kubernetesConf.get(MEMORY_OVERHEAD_FACTOR)
}
Comment on lines +62 to +66
Copy link
Member

@Yikun Yikun Mar 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason should be in here, before kubernetesConf.get(MEMORY_OVERHEAD_FACTOR) was used as default factor, it's 0.4 according my real debug watch on IT Run PySpark on simple pi.py example.

But current EXECUTOR_MEMORY_OVERHEAD_FACTOR has more priority than so MEMORY_OVERHEAD_FACTOR is be overrited. (so 0.1 by default). So that the default behavior changed.

But I haven't found why kubernetesConf.get(MEMORY_OVERHEAD_FACTOR) is 0.4 yet, I couldn't find a code in IT to set this explictly.

cc @Kimahriman @tgravescs

Copy link
Member

@Yikun Yikun Mar 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// The memory overhead factor to use. If the user has not set it, then use a different
// value for non-JVM apps. This value is propagated to executors.

I found it, it is propagated to executors from driver

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah thanks this is what I was also just looking at but I'm not sure how it was propagated to the executors. I was looking at through the KubernetesDriverconf somehow or possible through the pod system properties:
MEMORY_OVERHEAD_FACTOR.key -> overheadFactor.toString.

If you find it let me know, still investigating.

Copy link
Contributor

@tgravescs tgravescs Mar 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I think I see how this is happening:

 val resolvedDriverSpec = builder.buildFromFeatures(conf, kubernetesClient)
    val configMapName = KubernetesClientUtils.configMapNameDriver
    val confFilesMap = KubernetesClientUtils.buildSparkConfDirFilesMap(configMapName,
      conf.sparkConf, resolvedDriverSpec.systemProperties)

We build the driver spec, which includes the added system properties:

spec.systemProperties ++ addedSystemProperties

Added system properties in driver feature steps add the memory overhead setting there:


val additionalProps = mutable.Map(
      KUBERNETES_DRIVER_POD_NAME.key -> driverPodName,
      "spark.app.id" -> conf.appId,
      KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true",
      MEMORY_OVERHEAD_FACTOR.key -> overheadFactor.toString)

Then the KubernetesClientUtils.buildSparkConfDirFilesMap is called which propagates it to the executors (I think)

Copy link
Member

@Yikun Yikun Mar 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I think so.

loadedConfFilesMap ++ Map(Constants.SPARK_CONF_FILE_NAME -> resolvedProperties)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should also state that if people don't want in 3.3, I'm personally fine with it just need input from anyone interested in the feature.

Thank you. Then, it's simpler because this PR was backported manually after feature freeze. :)
We are currently discussing on the whitelisting about late arrivals like this PR, aren't we, @tgravescs ?
We can discuss there together to get those input you need.

Copy link
Member

@dongjoon-hyun dongjoon-hyun Mar 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please unblock Apache Spark 3.3 K8s module QA period by reverting this. We can land it back after having a healthy commit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay I think I get it, those "system properties" end up as default spark configs on the executor. Clear as mud

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#35900 revert pr

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your decision, @tgravescs .


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as in BasicDriverFeatureStep - change order of config query.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Went with if (conf.contains... to more easily handle the type conversions and default value of the backup setting

val execResources = ResourceProfile.getResourcesForClusterManager(
resourceProfile.id,
resourceProfile.executorResources,
kubernetesConf.get(MEMORY_OVERHEAD_FACTOR),
memoryOverheadFactor,
kubernetesConf.sparkConf,
isPythonApp,
Map.empty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
KUBERNETES_DRIVER_POD_NAME.key -> "spark-driver-pod",
"spark.app.id" -> KubernetesTestConf.APP_ID,
"spark.kubernetes.submitInDriver" -> "true",
MEMORY_OVERHEAD_FACTOR.key -> MEMORY_OVERHEAD_FACTOR.defaultValue.get.toString)
DRIVER_MEMORY_OVERHEAD_FACTOR.key -> DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get.toString)
assert(featureStep.getAdditionalPodSystemProperties() === expectedSparkConf)
}

Expand Down Expand Up @@ -188,21 +188,21 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
// Memory overhead tests. Tuples are:
// test name, main resource, overhead factor, expected factor
Seq(
("java", JavaMainAppResource(None), None, MEMORY_OVERHEAD_FACTOR.defaultValue.get),
("java", JavaMainAppResource(None), None, DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get),
("python default", PythonMainAppResource(null), None, NON_JVM_MEMORY_OVERHEAD_FACTOR),
("python w/ override", PythonMainAppResource(null), Some(0.9d), 0.9d),
("r default", RMainAppResource(null), None, NON_JVM_MEMORY_OVERHEAD_FACTOR)
).foreach { case (name, resource, factor, expectedFactor) =>
test(s"memory overhead factor: $name") {
// Choose a driver memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB
val driverMem =
ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2
ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2

// main app resource, overhead factor
val sparkConf = new SparkConf(false)
.set(CONTAINER_IMAGE, "spark-driver:latest")
.set(DRIVER_MEMORY.key, s"${driverMem.toInt}m")
factor.foreach { value => sparkConf.set(MEMORY_OVERHEAD_FACTOR, value) }
factor.foreach { value => sparkConf.set(DRIVER_MEMORY_OVERHEAD_FACTOR, value) }
val conf = KubernetesTestConf.createDriverConf(
sparkConf = sparkConf,
mainAppResource = resource)
Expand All @@ -213,10 +213,63 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
assert(mem === s"${expected}Mi")

val systemProperties = step.getAdditionalPodSystemProperties()
assert(systemProperties(MEMORY_OVERHEAD_FACTOR.key) === expectedFactor.toString)
assert(systemProperties(DRIVER_MEMORY_OVERHEAD_FACTOR.key) === expectedFactor.toString)
}
}

test(s"SPARK-38194: memory overhead factor precendence") {
// Choose a driver memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB
val driverMem =
ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2

// main app resource, overhead factor
val sparkConf = new SparkConf(false)
.set(CONTAINER_IMAGE, "spark-driver:latest")
.set(DRIVER_MEMORY.key, s"${driverMem.toInt}m")

// New config should take precedence
val expectedFactor = 0.2
sparkConf.set(DRIVER_MEMORY_OVERHEAD_FACTOR, expectedFactor)
sparkConf.set(MEMORY_OVERHEAD_FACTOR, 0.3)

val conf = KubernetesTestConf.createDriverConf(
sparkConf = sparkConf)
val step = new BasicDriverFeatureStep(conf)
val pod = step.configurePod(SparkPod.initialPod())
val mem = amountAndFormat(pod.container.getResources.getRequests.get("memory"))
val expected = (driverMem + driverMem * expectedFactor).toInt
assert(mem === s"${expected}Mi")

val systemProperties = step.getAdditionalPodSystemProperties()
assert(systemProperties(DRIVER_MEMORY_OVERHEAD_FACTOR.key) === expectedFactor.toString)
}

test(s"SPARK-38194: old memory factor settings is applied if new one isn't given") {
// Choose a driver memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB
val driverMem =
ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2

// main app resource, overhead factor
val sparkConf = new SparkConf(false)
.set(CONTAINER_IMAGE, "spark-driver:latest")
.set(DRIVER_MEMORY.key, s"${driverMem.toInt}m")

// Old config still works if new config isn't given
val expectedFactor = 0.3
sparkConf.set(MEMORY_OVERHEAD_FACTOR, expectedFactor)

val conf = KubernetesTestConf.createDriverConf(
sparkConf = sparkConf)
val step = new BasicDriverFeatureStep(conf)
val pod = step.configurePod(SparkPod.initialPod())
val mem = amountAndFormat(pod.container.getResources.getRequests.get("memory"))
val expected = (driverMem + driverMem * expectedFactor).toInt
assert(mem === s"${expected}Mi")

val systemProperties = step.getAdditionalPodSystemProperties()
assert(systemProperties(DRIVER_MEMORY_OVERHEAD_FACTOR.key) === expectedFactor.toString)
}

test("SPARK-35493: make spark.blockManager.port be able to be fallen back to in driver pod") {
val initPod = SparkPod.initialPod()
val sparkConf = new SparkConf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,60 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
))
}

test(s"SPARK-38194: memory overhead factor precendence") {
// Choose an executor memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB
val defaultFactor = EXECUTOR_MEMORY_OVERHEAD_FACTOR.defaultValue.get
val executorMem = ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / defaultFactor * 2

// main app resource, overhead factor
val sparkConf = new SparkConf(false)
.set(CONTAINER_IMAGE, "spark-driver:latest")
.set(EXECUTOR_MEMORY.key, s"${executorMem.toInt}m")

// New config should take precedence
val expectedFactor = 0.2
sparkConf.set(EXECUTOR_MEMORY_OVERHEAD_FACTOR, expectedFactor)
sparkConf.set(MEMORY_OVERHEAD_FACTOR, 0.3)

val conf = KubernetesTestConf.createExecutorConf(
sparkConf = sparkConf)
ResourceProfile.clearDefaultProfile()
val resourceProfile = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
val step = new BasicExecutorFeatureStep(conf, new SecurityManager(baseConf),
resourceProfile)
val pod = step.configurePod(SparkPod.initialPod())
val mem = amountAndFormat(pod.container.getResources.getRequests.get("memory"))
val expected = (executorMem + executorMem * expectedFactor).toInt
assert(mem === s"${expected}Mi")
}

test(s"SPARK-38194: old memory factor settings is applied if new one isn't given") {
// Choose an executor memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB
val defaultFactor = EXECUTOR_MEMORY_OVERHEAD_FACTOR.defaultValue.get
val executorMem = ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / defaultFactor * 2

// main app resource, overhead factor
val sparkConf = new SparkConf(false)
.set(CONTAINER_IMAGE, "spark-driver:latest")
.set(EXECUTOR_MEMORY.key, s"${executorMem.toInt}m")

// New config should take precedence
val expectedFactor = 0.3
sparkConf.set(MEMORY_OVERHEAD_FACTOR, expectedFactor)

val conf = KubernetesTestConf.createExecutorConf(
sparkConf = sparkConf)
ResourceProfile.clearDefaultProfile()
val resourceProfile = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
val step = new BasicExecutorFeatureStep(conf, new SecurityManager(baseConf),
resourceProfile)
val pod = step.configurePod(SparkPod.initialPod())
val mem = amountAndFormat(pod.container.getResources.getRequests.get("memory"))
val expected = (executorMem + executorMem * expectedFactor).toInt
assert(mem === s"${expected}Mi")
}


// There is always exactly one controller reference, and it points to the driver pod.
private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = {
assert(executor.getMetadata.getOwnerReferences.size() === 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ private[mesos] class MesosSubmitRequestServlet(
val superviseDriver = sparkProperties.get(config.DRIVER_SUPERVISE.key)
val driverMemory = sparkProperties.get(config.DRIVER_MEMORY.key)
val driverMemoryOverhead = sparkProperties.get(config.DRIVER_MEMORY_OVERHEAD.key)
val driverMemoryOverheadFactor = sparkProperties.get(config.DRIVER_MEMORY_OVERHEAD_FACTOR.key)
val driverCores = sparkProperties.get(config.DRIVER_CORES.key)
val name = request.sparkProperties.getOrElse("spark.app.name", mainClass)

Expand All @@ -121,8 +122,10 @@ private[mesos] class MesosSubmitRequestServlet(
mainClass, appArgs, environmentVariables, extraClassPath, extraLibraryPath, javaOpts)
val actualSuperviseDriver = superviseDriver.map(_.toBoolean).getOrElse(DEFAULT_SUPERVISE)
val actualDriverMemory = driverMemory.map(Utils.memoryStringToMb).getOrElse(DEFAULT_MEMORY)
val actualDriverMemoryFactor = driverMemoryOverheadFactor.map(_.toDouble).getOrElse(
MEMORY_OVERHEAD_FACTOR)
val actualDriverMemoryOverhead = driverMemoryOverhead.map(_.toInt).getOrElse(
math.max((MEMORY_OVERHEAD_FACTOR * actualDriverMemory).toInt, MEMORY_OVERHEAD_MIN))
math.max((actualDriverMemoryFactor * actualDriverMemory).toInt, MEMORY_OVERHEAD_MIN))
val actualDriverCores = driverCores.map(_.toDouble).getOrElse(DEFAULT_CORES)
val submitDate = new Date()
val submissionId = newDriverId(submitDate)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,8 +387,7 @@ trait MesosSchedulerUtils extends Logging {
}
}

// These defaults copied from YARN
private val MEMORY_OVERHEAD_FRACTION = 0.10
// This default copied from YARN
private val MEMORY_OVERHEAD_MINIMUM = 384

/**
Expand All @@ -400,8 +399,9 @@ trait MesosSchedulerUtils extends Logging {
* (whichever is larger)
*/
def executorMemory(sc: SparkContext): Int = {
val memoryOverheadFactor = sc.conf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR)
sc.conf.get(mesosConfig.EXECUTOR_MEMORY_OVERHEAD).getOrElse(
math.max(MEMORY_OVERHEAD_FRACTION * sc.executorMemory, MEMORY_OVERHEAD_MINIMUM).toInt) +
math.max(memoryOverheadFactor * sc.executorMemory, MEMORY_OVERHEAD_MINIMUM).toInt) +
sc.executorMemory
}

Expand All @@ -415,7 +415,8 @@ trait MesosSchedulerUtils extends Logging {
* `MEMORY_OVERHEAD_FRACTION (=0.1) * driverMemory`
*/
def driverContainerMemory(driverDesc: MesosDriverDescription): Int = {
val defaultMem = math.max(MEMORY_OVERHEAD_FRACTION * driverDesc.mem, MEMORY_OVERHEAD_MINIMUM)
val memoryOverheadFactor = driverDesc.conf.get(DRIVER_MEMORY_OVERHEAD_FACTOR)
val defaultMem = math.max(memoryOverheadFactor * driverDesc.mem, MEMORY_OVERHEAD_MINIMUM)
driverDesc.conf.get(mesosConfig.DRIVER_MEMORY_OVERHEAD).getOrElse(defaultMem.toInt) +
driverDesc.mem
}
Expand Down
Loading