Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2098] All Spark processes should support spark-defaults.conf, config file #2379

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,11 @@

package org.apache.spark.deploy

import java.io.{File, FileInputStream, IOException}
import java.util.Properties
import java.util.jar.JarFile

import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap}

import org.apache.spark.SparkException
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -63,9 +60,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
val defaultProperties = new HashMap[String, String]()
if (verbose) SparkSubmit.printStream.println(s"Using properties file: $propertiesFile")
Option(propertiesFile).foreach { filename =>
val file = new File(filename)
SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, v) =>
if (k.startsWith("spark")) {
Utils.getPropertiesFromFile(filename).foreach { case (k, v) =>
if (k.startsWith("spark.")) {
defaultProperties(k) = v
if (verbose) SparkSubmit.printStream.println(s"Adding default property: $k=$v")
} else {
Expand All @@ -90,19 +86,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
*/
private def mergeSparkProperties(): Unit = {
// Use common defaults file, if not specified by user
if (propertiesFile == null) {
val sep = File.separator
val sparkHomeConfig = env.get("SPARK_HOME").map(sparkHome => s"${sparkHome}${sep}conf")
val confDir = env.get("SPARK_CONF_DIR").orElse(sparkHomeConfig)

confDir.foreach { sparkConfDir =>
val defaultPath = s"${sparkConfDir}${sep}spark-defaults.conf"
val file = new File(defaultPath)
if (file.exists()) {
propertiesFile = file.getAbsolutePath
}
}
}
propertiesFile = Option(propertiesFile).getOrElse(Utils.getDefaultPropertiesFile(env))

val properties = HashMap[String, String]()
properties.putAll(defaultSparkProperties)
Expand Down Expand Up @@ -397,23 +381,3 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
SparkSubmit.exitFn()
}
}

object SparkSubmitArguments {
/** Load properties present in the given file. */
def getPropertiesFromFile(file: File): Seq[(String, String)] = {
require(file.exists(), s"Properties file $file does not exist")
require(file.isFile(), s"Properties file $file is not a normal file")
val inputStream = new FileInputStream(file)
try {
val properties = new Properties()
properties.load(inputStream)
properties.stringPropertyNames().toSeq.map(k => (k, properties(k).trim))
} catch {
case e: IOException =>
val message = s"Failed when loading Spark properties file $file"
throw new SparkException(message, e)
} finally {
inputStream.close()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ private[spark] object SparkSubmitDriverBootstrapper {
assume(bootstrapDriver != null, "SPARK_SUBMIT_BOOTSTRAP_DRIVER must be set")

// Parse the properties file for the equivalent spark.driver.* configs
val properties = SparkSubmitArguments.getPropertiesFromFile(new File(propertiesFile)).toMap
val properties = Utils.getPropertiesFromFile(propertiesFile)
val confDriverMemory = properties.get("spark.driver.memory")
val confLibraryPath = properties.get("spark.driver.extraLibraryPath")
val confClasspath = properties.get("spark.driver.extraClassPath")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
package org.apache.spark.deploy.history

import org.apache.spark.SparkConf
import org.apache.spark.util.Utils

/**
* Command-line parser for the master.
*/
private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]) {
private var logDir: String = null
private var propertiesFile: String = null

parse(args.toList)

Expand All @@ -32,22 +34,34 @@ private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]
case ("--dir" | "-d") :: value :: tail =>
logDir = value
conf.set("spark.history.fs.logDirectory", value)
System.setProperty("spark.history.fs.logDirectory", value)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed since you're setting the conf above?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had the same thought and almost made the same comment, but then I figured it might be better to keep whatever set through the conf consistent with the sys props. Not a huge deal either way.

parse(tail)

case ("--help" | "-h") :: tail =>
printUsageAndExit(0)

case ("--properties-file") :: value :: tail =>
propertiesFile = value
parse(tail)

case Nil =>

case _ =>
printUsageAndExit(1)
}
}

// This mutates the SparkConf, so all accesses to it must be made after this line
Utils.loadDefaultSparkProperties(conf, propertiesFile)

private def printUsageAndExit(exitCode: Int) {
System.err.println(
"""
|Usage: HistoryServer
|Usage: HistoryServer [options]
|
|Options:
| --properties-file FILE Path to a custom Spark properties file.
| Default is conf/spark-defaults.conf.
|
|Configuration options can be set by setting the corresponding JVM system property.
|History Server options are always available; additional options depend on the provider.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ private[spark] class MasterArguments(args: Array[String], conf: SparkConf) {
var host = Utils.localHostName()
var port = 7077
var webUiPort = 8080
var propertiesFile: String = null

// Check for settings in environment variables
if (System.getenv("SPARK_MASTER_HOST") != null) {
Expand All @@ -38,12 +39,16 @@ private[spark] class MasterArguments(args: Array[String], conf: SparkConf) {
if (System.getenv("SPARK_MASTER_WEBUI_PORT") != null) {
webUiPort = System.getenv("SPARK_MASTER_WEBUI_PORT").toInt
}

parse(args.toList)

// This mutates the SparkConf, so all accesses to it must be made after this line
propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)

if (conf.contains("spark.master.ui.port")) {
webUiPort = conf.get("spark.master.ui.port").toInt
}

parse(args.toList)

def parse(args: List[String]): Unit = args match {
case ("--ip" | "-i") :: value :: tail =>
Utils.checkHost(value, "ip no longer supported, please use hostname " + value)
Expand All @@ -63,7 +68,11 @@ private[spark] class MasterArguments(args: Array[String], conf: SparkConf) {
webUiPort = value
parse(tail)

case ("--help" | "-h") :: tail =>
case ("--properties-file") :: value :: tail =>
propertiesFile = value
parse(tail)

case ("--help") :: tail =>
printUsageAndExit(0)

case Nil => {}
Expand All @@ -83,7 +92,9 @@ private[spark] class MasterArguments(args: Array[String], conf: SparkConf) {
" -i HOST, --ip HOST Hostname to listen on (deprecated, please use --host or -h) \n" +
" -h HOST, --host HOST Hostname to listen on\n" +
" -p PORT, --port PORT Port to listen on (default: 7077)\n" +
" --webui-port PORT Port for web UI (default: 8080)")
" --webui-port PORT Port for web UI (default: 8080)\n" +
" --properties-file FILE Path to a custom Spark properties file.\n" +
" Default is conf/spark-defaults.conf.")
System.exit(exitCode)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) {
var memory = inferDefaultMemory()
var masters: Array[String] = null
var workDir: String = null
var propertiesFile: String = null

// Check for settings in environment variables
if (System.getenv("SPARK_WORKER_PORT") != null) {
Expand All @@ -47,15 +48,19 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) {
if (System.getenv("SPARK_WORKER_WEBUI_PORT") != null) {
webUiPort = System.getenv("SPARK_WORKER_WEBUI_PORT").toInt
}
if (conf.contains("spark.worker.ui.port")) {
webUiPort = conf.get("spark.worker.ui.port").toInt
}
if (System.getenv("SPARK_WORKER_DIR") != null) {
workDir = System.getenv("SPARK_WORKER_DIR")
}

parse(args.toList)

// This mutates the SparkConf, so all accesses to it must be made after this line
propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)

if (conf.contains("spark.worker.ui.port")) {
webUiPort = conf.get("spark.worker.ui.port").toInt
}

checkWorkerMemory()

def parse(args: List[String]): Unit = args match {
Expand Down Expand Up @@ -89,7 +94,11 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) {
webUiPort = value
parse(tail)

case ("--help" | "-h") :: tail =>
case ("--properties-file") :: value :: tail =>
propertiesFile = value
parse(tail)

case ("--help") :: tail =>
printUsageAndExit(0)

case value :: tail =>
Expand Down Expand Up @@ -124,7 +133,9 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) {
" -i HOST, --ip IP Hostname to listen on (deprecated, please use --host or -h)\n" +
" -h HOST, --host HOST Hostname to listen on\n" +
" -p PORT, --port PORT Port to listen on (default: random)\n" +
" --webui-port PORT Port for web UI (default: 8081)")
" --webui-port PORT Port for web UI (default: 8081)\n" +
" --properties-file FILE Path to a custom Spark properties file.\n" +
" Default is conf/spark-defaults.conf.")
System.exit(exitCode)
}

Expand Down
48 changes: 48 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1410,6 +1410,54 @@ private[spark] object Utils extends Logging {
}
}

/**
* Load default Spark properties from the given file. If no file is provided,
* use the common defaults file. This mutates state in the given SparkConf and
* in this JVM's system properties if the config specified in the file is not
* already set. Return the path of the properties file used.
*/
def loadDefaultSparkProperties(conf: SparkConf, filePath: String = null): String = {
val path = Option(filePath).getOrElse(getDefaultPropertiesFile())
Option(path).foreach { confFile =>
getPropertiesFromFile(confFile).filter { case (k, v) =>
k.startsWith("spark.")
}.foreach { case (k, v) =>
conf.setIfMissing(k, v)
sys.props.getOrElseUpdate(k, v)
}
}
path
}

/** Load properties present in the given file. */
def getPropertiesFromFile(filename: String): Map[String, String] = {
val file = new File(filename)
require(file.exists(), s"Properties file $file does not exist")
require(file.isFile(), s"Properties file $file is not a normal file")

val inReader = new InputStreamReader(new FileInputStream(file), "UTF-8")
try {
val properties = new Properties()
properties.load(inReader)
properties.stringPropertyNames().map(k => (k, properties(k).trim)).toMap
} catch {
case e: IOException =>
throw new SparkException(s"Failed when loading Spark properties from $filename", e)
} finally {
inReader.close()
}
}

/** Return the path of the default Spark properties file. */
def getDefaultPropertiesFile(env: Map[String, String] = sys.env): String = {
env.get("SPARK_CONF_DIR")
.orElse(env.get("SPARK_HOME").map { t => s"$t${File.separator}conf" })
.map { t => new File(s"$t${File.separator}spark-defaults.conf")}
.filter(_.isFile)
.map(_.getAbsolutePath)
.orNull
}

/** Return a nice string representation of the exception, including the stack trace. */
def exceptionString(e: Exception): String = {
if (e == null) "" else exceptionString(getFormattedClassName(e), e.getMessage, e.getStackTrace)
Expand Down
19 changes: 19 additions & 0 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import com.google.common.base.Charsets
import com.google.common.io.Files
import org.scalatest.FunSuite

import org.apache.spark.SparkConf

class UtilsSuite extends FunSuite {

test("bytesToString") {
Expand Down Expand Up @@ -332,4 +334,21 @@ class UtilsSuite extends FunSuite {
assert(!tempFile2.exists())
}

test("loading properties from file") {
val outFile = File.createTempFile("test-load-spark-properties", "test")
try {
System.setProperty("spark.test.fileNameLoadB", "2")
Files.write("spark.test.fileNameLoadA true\n" +
"spark.test.fileNameLoadB 1\n", outFile, Charsets.UTF_8)
val properties = Utils.getPropertiesFromFile(outFile.getAbsolutePath)
properties
.filter { case (k, v) => k.startsWith("spark.")}
.foreach { case (k, v) => sys.props.getOrElseUpdate(k, v)}
val sparkConf = new SparkConf
assert(sparkConf.getBoolean("spark.test.fileNameLoadA", false) === true)
assert(sparkConf.getInt("spark.test.fileNameLoadB", 1) === 2)
} finally {
outFile.delete()
}
}
}
7 changes: 7 additions & 0 deletions docs/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ follows:
one implementation, provided by Spark, which looks for application logs stored in the
file system.</td>
</tr>
<tr>
<td>spark.history.fs.logDirectory</td>
<td>(none)</td>
<td>
Directory that contains application event logs to be loaded by the history server
</td>
</tr>
<tr>
<td>spark.history.fs.updateInterval</td>
<td>10</td>
Expand Down