Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Commit

Permalink
[DCOS-54813] Base tech update from 2.4.0 to 2.4.3 (#62)
Browse files Browse the repository at this point in the history
* Support for DSCOS_SERVICE_ACCOUNT_CREDENTIAL environment variable in MesosClusterScheduler

* File Based Secrets support

* [SPARK-723][SPARK-740] Add Metrics to Dispatcher and Driver

- Counters: The total number of times that submissions have entered states
- Timers: The duration from submit or launch until a submission entered a given state
- Histogram: The retry counts at time of retry

* Fixes to handling finished drivers

- Rename 'failed' case to 'exception'
- When a driver is 'finished', record its final MesosTaskState
- Fix naming consistency after seeing how they look in practice

* Register "finished" counters up-front

Otherwise their values are never published.

* [SPARK-692] Added spark.mesos.executor.gpus to specify the number of Executor CPUs

* [SPARK-23941][MESOS] Mesos task failed on specific spark app name (#33)

* [SPARK-23941][MESOS] Mesos task failed on specific spark app name

Port from SPARK#21014

** edit: not a direct port from upstream Spark. Changes were needed because we saw PySpark jobs fail to launch when 1) run with docker and 2) including --py-files

==============

* Shell escape only appName, mainClass, default and driverConf

Specifically, we do not want to shell-escape the --py-files. What we've
seen IRL is that for spark jobs that use docker images coupled w/ python
files, the $MESOS_SANDBOX path is escaped and results in
FileNotFoundErrors during py4j.SparkSession.getOrCreate

* [DCOS-39150][SPARK] Support unique Executor IDs in cluster managers (#36)

Using incremental integers as Executor IDs leads to a situation when Spark Executors launched by different Drivers have same IDs. This leads to a situation when Mesos Task IDs for multiple Spark Executors are the same too. This PR prepends UUID unique for a CoarseGrainedSchedulerBackend instance to numeric ID thus allowing to distinguish Executors belonging to different drivers.

This PR reverts commit ebe3c7f "[SPARK-12864][YARN] initialize executorIdCounter after ApplicationMaster killed for max n…)"

* Upgrade of Hadoop, ZooKeeper, and Jackson libraries to fix CVEs. Updates for JSON-related tests. (#43)

List of upgrades for 3rd-party libraries having CVEs:

- Hadoop: 2.7.3 -> 2.7.7. Fixes: CVE-2016-6811, CVE-2017-3166, CVE-2017-3162, CVE-2018-8009
- Jackson 2.6.5 -> 2.9.6. Fixes: CVE-2017-15095, CVE-2017-17485, CVE-2017-7525, CVE-2018-7489, CVE-2016-3720
- ZooKeeper 3.4.6 -> 3.4.13 (https://zookeeper.apache.org/doc/r3.4.13/releasenotes.html)

# Conflicts:
#	dev/deps/spark-deps-hadoop-2.6
#	dev/deps/spark-deps-hadoop-2.7
#	dev/deps/spark-deps-hadoop-3.1
#	pom.xml

* CNI Support for Docker containerizer, binding to SPARK_LOCAL_IP instead of 0.0.0.0 to properly advertise executors during shuffle (#44)

* Spark Dispatcher support for launching applications in the same virtual network by default (#45)

* [DCOS-46585] Fix supervised driver retry logic for outdated tasks (#46)

This commit fixes a bug where `--supervised` drivers would relaunch after receiving an outdated status update from a restarted/crashed agent even if they had already been relaunched and running elsewhere. In those scenarios, previous logic would cause two identical jobs to be running and ZK state would only have a record of the latest one effectively orphaning the 1st job.

* Revert "[SPARK-25088][CORE][MESOS][DOCS] Update Rest Server docs & defaults."

This reverts commit 1024875.

The change introduced in the reverted commit is breaking:
- breaks semantics of `spark.master.rest.enabled` which belongs to Spark Standalone Master only but not to SparkSubmit
- reverts the default behavior for Spark Standalone from REST to legacy RPC
- contains misleading messages in `require` assertion blocks
- prevents users from running jobs without specifying `spark.master.rest.enabled`

* [DCOS-49020] Specify user in CommandInfo for Spark Driver launched on Mesos (#49)

* [DCOS-40974] Mesos checkpointing support for Spark Drivers (#51)

* [DCOS-51158] Improved Task ID assignment for Executor tasks (#52)

* [DCOS-51454] Remove irrelevant Mesos REPL test (#54)

* [DCOS-51453] Added Hadoop 2.9 profile (#53)

* [DCOS-34235] spark.mesos.executor.memoryOverhead equivalent for the Driver when running on Mesos (#55)

* Refactoring of metrics naming to add mesos semantics and avoid clashing with existing Spark metrics (#58)

* [DCOS-34549] Mesos label NPE fix (#60)
  • Loading branch information
alembiewski authored Aug 19, 2019
1 parent c3e32bf commit 401fd7b
Show file tree
Hide file tree
Showing 40 changed files with 1,300 additions and 296 deletions.
4 changes: 4 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,10 @@
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-xml_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
Expand Down
19 changes: 18 additions & 1 deletion core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.spark
import java.net.{Authenticator, PasswordAuthentication}
import java.nio.charset.StandardCharsets.UTF_8

import com.google.common.hash.HashCodes
import java.nio.file.{Files => jFiles, Paths => jPaths}
import org.apache.hadoop.io.Text
import org.apache.hadoop.security.{Credentials, UserGroupInformation}

Expand Down Expand Up @@ -327,7 +329,7 @@ private[spark] class SecurityManager(
// in different context.
.orElse(Option(secretKey))
.orElse(Option(sparkConf.getenv(ENV_AUTH_SECRET)))
.orElse(sparkConf.getOption(SPARK_AUTH_SECRET_CONF))
.orElse(getFileBasedSecret())
.getOrElse {
throw new IllegalArgumentException(
s"A secret key must be specified via the $SPARK_AUTH_SECRET_CONF config")
Expand All @@ -337,6 +339,21 @@ private[spark] class SecurityManager(
}
}

/**
* Trying to find a File Based Secret with path specified in SPARK_AUTH_SECRET_CONF
*/
def getFileBasedSecret(): Option[String] = {
sparkConf
.getOption(SPARK_AUTH_SECRET_CONF)
.map { value =>
if (jFiles.exists(jPaths.get(value))) {
HashCodes.fromBytes(jFiles.readAllBytes(jPaths.get(value))).toString
} else {
value
}
}
}

/**
* Initialize the authentication secret.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
var driverCores: String = null
var submissionToKill: String = null
var submissionToRequestStatusFor: String = null
var useRest: Boolean = false // used internally
var useRest: Boolean = true // used internally

/** Default properties present in the currently defined defaults file. */
lazy val defaultSparkProperties: HashMap[String, String] = {
Expand Down Expand Up @@ -115,8 +115,6 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
// Use `sparkProperties` map along with env vars to fill in any missing parameters
loadEnvironmentArguments()

useRest = sparkProperties.getOrElse("spark.master.rest.enabled", "false").toBoolean

validateArguments()

/**
Expand Down
10 changes: 1 addition & 9 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -121,18 +121,10 @@ private[deploy] class Master(
}

// Alternative application submission gateway that is stable across Spark versions
private val restServerEnabled = conf.getBoolean("spark.master.rest.enabled", false)
private val restServerEnabled = conf.getBoolean("spark.master.rest.enabled", true)
private var restServer: Option[StandaloneRestServer] = None
private var restServerBoundPort: Option[Int] = None

{
val authKey = SecurityManager.SPARK_AUTH_SECRET_CONF
require(conf.getOption(authKey).isEmpty || !restServerEnabled,
s"The RestSubmissionServer does not support authentication via ${authKey}. Either turn " +
"off the RestSubmissionServer with spark.master.rest.enabled=false, or do not use " +
"authentication.")
}

override def onStart(): Unit = {
logInfo("Starting Spark master at " + masterUrl)
logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ private[spark] abstract class RestSubmissionServer(
val host: String,
val requestedPort: Int,
val masterConf: SparkConf) extends Logging {

protected val submitRequestServlet: SubmitRequestServlet
protected val killRequestServlet: KillRequestServlet
protected val statusRequestServlet: StatusRequestServlet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ package org.apache.spark.executor

import java.net.URL
import java.nio.ByteBuffer
import java.nio.file.{Files, Paths}
import java.util.Locale
import java.util.concurrent.atomic.AtomicBoolean

import scala.collection.mutable
import scala.util.{Failure, Success}
import scala.util.control.NonFatal

import com.google.common.hash.HashCodes

import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.deploy.SparkHadoopUtil
Expand Down Expand Up @@ -191,6 +194,16 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {

// Bootstrap to fetch the driver's Spark properties.
val executorConf = new SparkConf

if (System.getenv(SecurityManager.ENV_AUTH_SECRET) != null) {
executorConf.set("spark.authenticate", "true")
val secret = System.getenv(SecurityManager.ENV_AUTH_SECRET)
if (Files.exists(Paths.get(secret))) {
val s = HashCodes.fromBytes(Files.readAllBytes(Paths.get(secret))).toString
executorConf.set(SecurityManager.SPARK_AUTH_SECRET_CONF, s)
}
}

val fetcher = RpcEnv.create(
"driverPropsFetcher",
hostname,
Expand Down Expand Up @@ -273,7 +286,12 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
}
}

if (driverUrl == null || executorId == null || hostname == null || cores <= 0 ||
if (hostname == null) {
hostname = Utils.localHostName()
log.info(s"Executor hostname is not provided, will use '$hostname' to advertise itself")
}

if (driverUrl == null || executorId == null || cores <= 0 ||
appId == null) {
printUsageAndExit()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,18 @@ private[spark] class MetricsConfig(conf: SparkConf) extends Logging {
var is: InputStream = null
try {
is = path match {
case Some(f) => new FileInputStream(f)
case None => Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_METRICS_CONF_FILENAME)
case Some(f) =>
logInfo(s"Loading metrics properties from file $f")
new FileInputStream(f)
case None =>
logInfo(s"Loading metrics properties from resource $DEFAULT_METRICS_CONF_FILENAME")
Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_METRICS_CONF_FILENAME)
}

if (is != null) {
properties.load(is)
}
logInfo(s"Metrics properties: " + properties.toString)
} catch {
case e: Exception =>
val file = path.getOrElse(DEFAULT_METRICS_CONF_FILENAME)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ private[spark] class MetricsSystem private (
sources += source
try {
val regName = buildRegistryName(source)
logInfo(s"Registering source: $regName")
registry.register(regName, source.metricRegistry)
} catch {
case e: IllegalArgumentException => logInfo("Metrics already registered", e)
Expand All @@ -166,6 +167,7 @@ private[spark] class MetricsSystem private (
def removeSource(source: Source) {
sources -= source
val regName = buildRegistryName(source)
logInfo(s"Removing source: $regName")
registry.removeMatching(new MetricFilter {
def matches(name: String, metric: Metric): Boolean = name.startsWith(regName)
})
Expand Down Expand Up @@ -194,6 +196,7 @@ private[spark] class MetricsSystem private (
sinkConfigs.foreach { kv =>
val classPath = kv._2.getProperty("class")
if (null != classPath) {
logInfo(s"Initializing sink: $classPath")
try {
val sink = Utils.classForName(classPath)
.getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
Expand All @@ -205,7 +208,7 @@ private[spark] class MetricsSystem private (
}
} catch {
case e: Exception =>
logError("Sink class " + classPath + " cannot be instantiated")
logError(s"Sink class $classPath cannot be instantiated")
throw e
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import org.apache.spark.internal.Logging
* There is no particular relationship between an operation scope and a stage or a job.
* A scope may live inside one stage (e.g. map) or span across multiple jobs (e.g. take).
*/
@JsonInclude(Include.NON_NULL)
@JsonInclude(Include.NON_ABSENT)
@JsonPropertyOrder(Array("id", "name", "parent"))
private[spark] class RDDOperationScope(
val name: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ private[spark] object CoarseGrainedClusterMessages {
hadoopDelegationCreds: Option[Array[Byte]])
extends CoarseGrainedClusterMessage

case object RetrieveLastAllocatedExecutorId extends CoarseGrainedClusterMessage

// Driver to executors
case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
@GuardedBy("CoarseGrainedSchedulerBackend.this")
protected var localityAwareTasks = 0

// The num of current max ExecutorId used to re-register appMaster
@volatile protected var currentExecutorIdCounter = 0

private val reviveThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")

Expand Down Expand Up @@ -195,9 +192,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// in this block are read when requesting executors
CoarseGrainedSchedulerBackend.this.synchronized {
executorDataMap.put(executorId, data)
if (currentExecutorIdCounter < executorId.toInt) {
currentExecutorIdCounter = executorId.toInt
}
if (numPendingExecutors > 0) {
numPendingExecutors -= 1
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ private[v1] class JacksonMessageWriter extends MessageBodyWriter[Object]{
}
mapper.registerModule(com.fasterxml.jackson.module.scala.DefaultScalaModule)
mapper.enable(SerializationFeature.INDENT_OUTPUT)
mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL)
mapper.setSerializationInclusion(JsonInclude.Include.NON_ABSENT)
mapper.setDateFormat(JacksonMessageWriter.makeISODateFormat)

override def isWriteable(
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/org/apache/spark/JsonTestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.json4s.jackson.JsonMethods

trait JsonTestUtils {
def assertValidDataInJson(validateJson: JValue, expectedJson: JValue) {
val Diff(c, a, d) = validateJson.diff(expectedJson)
val Diff(c, a, d) = expectedJson.diff(validateJson)
val validatePretty = JsonMethods.pretty(validateJson)
val expectedPretty = JsonMethods.pretty(expectedJson)
val errorMessage = s"Expected:\n$expectedPretty\nFound:\n$validatePretty"
Expand Down
21 changes: 11 additions & 10 deletions core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B

val storageJson = getJson(ui, "storage/rdd")
storageJson.children.length should be (1)
(storageJson.children.head \ "storageLevel").extract[String] should be (
StorageLevels.DISK_ONLY.description)

(storageJson.children.head \ "storageLevel").extract[String] should be (StorageLevels.DISK_ONLY.description)
val rddJson = getJson(ui, "storage/rdd/0")
(rddJson \ "storageLevel").extract[String] should be (StorageLevels.DISK_ONLY.description)

Expand Down Expand Up @@ -330,11 +330,12 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
find(cssSelector(".progress-cell .progress")).get.text should be ("2/2 (1 failed)")
}
val jobJson = getJson(sc.ui.get, "jobs")
(jobJson \\ "numTasks").extract[Int]should be (2)
(jobJson \\ "numCompletedTasks").extract[Int] should be (3)
(jobJson \\ "numFailedTasks").extract[Int] should be (1)
(jobJson \\ "numCompletedStages").extract[Int] should be (2)
(jobJson \\ "numFailedStages").extract[Int] should be (1)
(jobJson.children.head \ "numTasks").extract[Int]should be (2)
(jobJson.children.head \ "numCompletedTasks").extract[Int] should be (3)
(jobJson.children.head \ "numFailedTasks").extract[Int] should be (1)
(jobJson.children.head \ "numCompletedStages").extract[Int] should be (2)
(jobJson.children.head \ "numFailedStages").extract[Int] should be (1)

val stageJson = getJson(sc.ui.get, "stages")

for {
Expand Down Expand Up @@ -663,9 +664,9 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
appListJsonAst.children.length should be (1)
val attempts = (appListJsonAst.children.head \ "attempts").children
attempts.size should be (1)
(attempts(0) \ "completed").extract[Boolean] should be (false)
parseDate(attempts(0) \ "startTime") should be (sc.startTime)
parseDate(attempts(0) \ "endTime") should be (-1)
(attempts.head \ "completed").extract[Boolean] should be (false)
parseDate(attempts.head \ "startTime") should be (sc.startTime)
parseDate(attempts.head \ "endTime") should be (-1)
val oneAppJsonAst = getJson(sc.ui.get, "")
oneAppJsonAst should be (appListJsonAst.children(0))
}
Expand Down
33 changes: 17 additions & 16 deletions dev/deps/spark-deps-hadoop-2.6
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ arpack_combined_all-0.1.jar
arrow-format-0.10.0.jar
arrow-memory-0.10.0.jar
arrow-vector-0.10.0.jar
audience-annotations-0.5.0.jar
automaton-1.11-8.jar
avro-1.8.2.jar
avro-ipc-1.8.2.jar
Expand Down Expand Up @@ -86,16 +87,16 @@ htrace-core-3.0.4.jar
httpclient-4.5.6.jar
httpcore-4.4.10.jar
ivy-2.4.0.jar
jackson-annotations-2.6.7.jar
jackson-core-2.6.7.jar
jackson-annotations-2.9.6.jar
jackson-core-2.9.6.jar
jackson-core-asl-1.9.13.jar
jackson-databind-2.6.7.1.jar
jackson-dataformat-yaml-2.6.7.jar
jackson-databind-2.9.6.jar
jackson-dataformat-yaml-2.9.6.jar
jackson-jaxrs-1.9.13.jar
jackson-mapper-asl-1.9.13.jar
jackson-module-jaxb-annotations-2.6.7.jar
jackson-module-paranamer-2.7.9.jar
jackson-module-scala_2.11-2.6.7.1.jar
jackson-module-jaxb-annotations-2.9.6.jar
jackson-module-paranamer-2.9.6.jar
jackson-module-scala_2.11-2.9.6.jar
jackson-xc-1.9.13.jar
janino-3.0.9.jar
javassist-3.18.1-GA.jar
Expand All @@ -121,10 +122,10 @@ jline-2.14.6.jar
joda-time-2.9.3.jar
jodd-core-3.5.2.jar
jpam-1.1.jar
json4s-ast_2.11-3.5.3.jar
json4s-core_2.11-3.5.3.jar
json4s-jackson_2.11-3.5.3.jar
json4s-scalap_2.11-3.5.3.jar
json4s-ast_2.11-3.6.2.jar
json4s-core_2.11-3.6.2.jar
json4s-jackson_2.11-3.6.2.jar
json4s-scalap_2.11-3.6.2.jar
jsr305-1.3.9.jar
jta-1.1.jar
jtransforms-2.4.0.jar
Expand Down Expand Up @@ -171,16 +172,16 @@ py4j-0.10.7.jar
pyrolite-4.13.jar
scala-compiler-2.11.12.jar
scala-library-2.11.12.jar
scala-parser-combinators_2.11-1.1.0.jar
scala-parser-combinators_2.11-1.1.1.jar
scala-reflect-2.11.12.jar
scala-xml_2.11-1.0.5.jar
scala-xml_2.11-1.1.1.jar
shapeless_2.11-2.3.2.jar
shims-0.7.45.jar
slf4j-api-1.7.16.jar
slf4j-log4j12-1.7.16.jar
snakeyaml-1.15.jar
snakeyaml-1.18.jar
snappy-0.2.jar
snappy-java-1.1.7.3.jar
snappy-java-1.1.2.6.jar
spire-macros_2.11-0.13.0.jar
spire_2.11-0.13.0.jar
stax-api-1.0-2.jar
Expand All @@ -195,5 +196,5 @@ xercesImpl-2.9.1.jar
xmlenc-0.52.jar
xz-1.5.jar
zjsonpatch-0.3.0.jar
zookeeper-3.4.6.jar
zookeeper-3.4.13.jar
zstd-jni-1.3.2-2.jar
Loading

0 comments on commit 401fd7b

Please sign in to comment.