Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into SPARK-23922
Browse files Browse the repository at this point in the history
  • Loading branch information
mgaido91 committed Apr 17, 2018
2 parents 682bc73 + 05ae747 commit 876cd93
Show file tree
Hide file tree
Showing 181 changed files with 6,226 additions and 3,817 deletions.
30 changes: 28 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.util.{MutableURLClassLoader, Utils}

private[deploy] object DependencyUtils {
private[deploy] object DependencyUtils extends Logging {

def resolveMavenDependencies(
packagesExclusions: String,
Expand Down Expand Up @@ -75,7 +76,7 @@ private[deploy] object DependencyUtils {
def addJarsToClassPath(jars: String, loader: MutableURLClassLoader): Unit = {
if (jars != null) {
for (jar <- jars.split(",")) {
SparkSubmit.addJarToClasspath(jar, loader)
addJarToClasspath(jar, loader)
}
}
}
Expand Down Expand Up @@ -151,6 +152,31 @@ private[deploy] object DependencyUtils {
}.mkString(",")
}

def addJarToClasspath(localJar: String, loader: MutableURLClassLoader): Unit = {
val uri = Utils.resolveURI(localJar)
uri.getScheme match {
case "file" | "local" =>
val file = new File(uri.getPath)
if (file.exists()) {
loader.addURL(file.toURI.toURL)
} else {
logWarning(s"Local jar $file does not exist, skipping.")
}
case _ =>
logWarning(s"Skip remote jar $uri.")
}
}

/**
* Merge a sequence of comma-separated file lists, some of which may be null to indicate
* no files, into a single comma-separated string.
*/
def mergeFileLists(lists: String*): String = {
val merged = lists.filterNot(StringUtils.isBlank)
.flatMap(Utils.stringToSeq)
if (merged.nonEmpty) merged.mkString(",") else null
}

private def splitOnFragment(path: String): (URI, Option[String]) = {
val uri = Utils.resolveURI(path)
val withoutFragment = new URI(uri.getScheme, uri.getSchemeSpecificPart, null)
Expand Down
318 changes: 161 additions & 157 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.io.Source
import scala.util.Try

import org.apache.spark.{SparkException, SparkUserAppException}
import org.apache.spark.deploy.SparkSubmitAction._
import org.apache.spark.internal.Logging
import org.apache.spark.launcher.SparkSubmitArgumentsParser
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.util.Utils
Expand All @@ -40,7 +42,7 @@ import org.apache.spark.util.Utils
* The env argument is used for testing.
*/
private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, String] = sys.env)
extends SparkSubmitArgumentsParser {
extends SparkSubmitArgumentsParser with Logging {
var master: String = null
var deployMode: String = null
var executorMemory: String = null
Expand Down Expand Up @@ -85,8 +87,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
/** Default properties present in the currently defined defaults file. */
lazy val defaultSparkProperties: HashMap[String, String] = {
val defaultProperties = new HashMap[String, String]()
// scalastyle:off println
if (verbose) SparkSubmit.printStream.println(s"Using properties file: $propertiesFile")
if (verbose) {
logInfo(s"Using properties file: $propertiesFile")
}
Option(propertiesFile).foreach { filename =>
val properties = Utils.getPropertiesFromFile(filename)
properties.foreach { case (k, v) =>
Expand All @@ -95,21 +98,16 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
// Property files may contain sensitive information, so redact before printing
if (verbose) {
Utils.redact(properties).foreach { case (k, v) =>
SparkSubmit.printStream.println(s"Adding default property: $k=$v")
logInfo(s"Adding default property: $k=$v")
}
}
}
// scalastyle:on println
defaultProperties
}

// Set parameters from command line arguments
try {
parse(args.asJava)
} catch {
case e: IllegalArgumentException =>
SparkSubmit.printErrorAndExit(e.getMessage())
}
parse(args.asJava)

// Populate `sparkProperties` map from properties file
mergeDefaultSparkProperties()
// Remove keys that don't start with "spark." from `sparkProperties`.
Expand Down Expand Up @@ -141,7 +139,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
sparkProperties.foreach { case (k, v) =>
if (!k.startsWith("spark.")) {
sparkProperties -= k
SparkSubmit.printWarning(s"Ignoring non-spark config property: $k=$v")
logWarning(s"Ignoring non-spark config property: $k=$v")
}
}
}
Expand Down Expand Up @@ -215,10 +213,10 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
}
} catch {
case _: Exception =>
SparkSubmit.printErrorAndExit(s"Cannot load main class from JAR $primaryResource")
error(s"Cannot load main class from JAR $primaryResource")
}
case _ =>
SparkSubmit.printErrorAndExit(
error(
s"Cannot load main class from JAR $primaryResource with URI $uriScheme. " +
"Please specify a class through --class.")
}
Expand Down Expand Up @@ -248,6 +246,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
case SUBMIT => validateSubmitArguments()
case KILL => validateKillArguments()
case REQUEST_STATUS => validateStatusRequestArguments()
case PRINT_VERSION =>
}
}

Expand All @@ -256,62 +255,61 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
printUsageAndExit(-1)
}
if (primaryResource == null) {
SparkSubmit.printErrorAndExit("Must specify a primary resource (JAR or Python or R file)")
error("Must specify a primary resource (JAR or Python or R file)")
}
if (mainClass == null && SparkSubmit.isUserJar(primaryResource)) {
SparkSubmit.printErrorAndExit("No main class set in JAR; please specify one with --class")
error("No main class set in JAR; please specify one with --class")
}
if (driverMemory != null
&& Try(JavaUtils.byteStringAsBytes(driverMemory)).getOrElse(-1L) <= 0) {
SparkSubmit.printErrorAndExit("Driver Memory must be a positive number")
error("Driver memory must be a positive number")
}
if (executorMemory != null
&& Try(JavaUtils.byteStringAsBytes(executorMemory)).getOrElse(-1L) <= 0) {
SparkSubmit.printErrorAndExit("Executor Memory cores must be a positive number")
error("Executor memory must be a positive number")
}
if (executorCores != null && Try(executorCores.toInt).getOrElse(-1) <= 0) {
SparkSubmit.printErrorAndExit("Executor cores must be a positive number")
error("Executor cores must be a positive number")
}
if (totalExecutorCores != null && Try(totalExecutorCores.toInt).getOrElse(-1) <= 0) {
SparkSubmit.printErrorAndExit("Total executor cores must be a positive number")
error("Total executor cores must be a positive number")
}
if (numExecutors != null && Try(numExecutors.toInt).getOrElse(-1) <= 0) {
SparkSubmit.printErrorAndExit("Number of executors must be a positive number")
error("Number of executors must be a positive number")
}
if (pyFiles != null && !isPython) {
SparkSubmit.printErrorAndExit("--py-files given but primary resource is not a Python script")
error("--py-files given but primary resource is not a Python script")
}

if (master.startsWith("yarn")) {
val hasHadoopEnv = env.contains("HADOOP_CONF_DIR") || env.contains("YARN_CONF_DIR")
if (!hasHadoopEnv && !Utils.isTesting) {
throw new Exception(s"When running with master '$master' " +
error(s"When running with master '$master' " +
"either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.")
}
}

if (proxyUser != null && principal != null) {
SparkSubmit.printErrorAndExit("Only one of --proxy-user or --principal can be provided.")
error("Only one of --proxy-user or --principal can be provided.")
}
}

private def validateKillArguments(): Unit = {
if (!master.startsWith("spark://") && !master.startsWith("mesos://")) {
SparkSubmit.printErrorAndExit(
"Killing submissions is only supported in standalone or Mesos mode!")
error("Killing submissions is only supported in standalone or Mesos mode!")
}
if (submissionToKill == null) {
SparkSubmit.printErrorAndExit("Please specify a submission to kill.")
error("Please specify a submission to kill.")
}
}

private def validateStatusRequestArguments(): Unit = {
if (!master.startsWith("spark://") && !master.startsWith("mesos://")) {
SparkSubmit.printErrorAndExit(
error(
"Requesting submission statuses is only supported in standalone or Mesos mode!")
}
if (submissionToRequestStatusFor == null) {
SparkSubmit.printErrorAndExit("Please specify a submission to request status for.")
error("Please specify a submission to request status for.")
}
}

Expand Down Expand Up @@ -368,7 +366,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S

case DEPLOY_MODE =>
if (value != "client" && value != "cluster") {
SparkSubmit.printErrorAndExit("--deploy-mode must be either \"client\" or \"cluster\"")
error("--deploy-mode must be either \"client\" or \"cluster\"")
}
deployMode = value

Expand Down Expand Up @@ -405,14 +403,14 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
case KILL_SUBMISSION =>
submissionToKill = value
if (action != null) {
SparkSubmit.printErrorAndExit(s"Action cannot be both $action and $KILL.")
error(s"Action cannot be both $action and $KILL.")
}
action = KILL

case STATUS =>
submissionToRequestStatusFor = value
if (action != null) {
SparkSubmit.printErrorAndExit(s"Action cannot be both $action and $REQUEST_STATUS.")
error(s"Action cannot be both $action and $REQUEST_STATUS.")
}
action = REQUEST_STATUS

Expand Down Expand Up @@ -444,7 +442,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
repositories = value

case CONF =>
val (confName, confValue) = SparkSubmit.parseSparkConfProperty(value)
val (confName, confValue) = SparkSubmitUtils.parseSparkConfProperty(value)
sparkProperties(confName) = confValue

case PROXY_USER =>
Expand All @@ -463,15 +461,15 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
verbose = true

case VERSION =>
SparkSubmit.printVersionAndExit()
action = SparkSubmitAction.PRINT_VERSION

case USAGE_ERROR =>
printUsageAndExit(1)

case _ =>
throw new IllegalArgumentException(s"Unexpected argument '$opt'.")
error(s"Unexpected argument '$opt'.")
}
true
action != SparkSubmitAction.PRINT_VERSION
}

/**
Expand All @@ -482,7 +480,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
*/
override protected def handleUnknown(opt: String): Boolean = {
if (opt.startsWith("-")) {
SparkSubmit.printErrorAndExit(s"Unrecognized option '$opt'.")
error(s"Unrecognized option '$opt'.")
}

primaryResource =
Expand All @@ -501,20 +499,18 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
}

private def printUsageAndExit(exitCode: Int, unknownParam: Any = null): Unit = {
// scalastyle:off println
val outStream = SparkSubmit.printStream
if (unknownParam != null) {
outStream.println("Unknown/unsupported param " + unknownParam)
logInfo("Unknown/unsupported param " + unknownParam)
}
val command = sys.env.get("_SPARK_CMD_USAGE").getOrElse(
"""Usage: spark-submit [options] <app jar | python file | R file> [app arguments]
|Usage: spark-submit --kill [submission ID] --master [spark://...]
|Usage: spark-submit --status [submission ID] --master [spark://...]
|Usage: spark-submit run-example [options] example-class [example args]""".stripMargin)
outStream.println(command)
logInfo(command)

val mem_mb = Utils.DEFAULT_DRIVER_MEM_MB
outStream.println(
logInfo(
s"""
|Options:
| --master MASTER_URL spark://host:port, mesos://host:port, yarn,
Expand Down Expand Up @@ -596,12 +592,11 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
)

if (SparkSubmit.isSqlShell(mainClass)) {
outStream.println("CLI options:")
outStream.println(getSqlShellOptions())
logInfo("CLI options:")
logInfo(getSqlShellOptions())
}
// scalastyle:on println

SparkSubmit.exitFn(exitCode)
throw new SparkUserAppException(exitCode)
}

/**
Expand Down Expand Up @@ -655,4 +650,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
System.setErr(currentErr)
}
}

private def error(msg: String): Unit = throw new SparkException(msg)

}
Loading

0 comments on commit 876cd93

Please sign in to comment.