Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into expr_bin
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Jun 18, 2015
2 parents 0cf20f2 + 9db73ec commit cf62b95
Show file tree
Hide file tree
Showing 99 changed files with 1,819 additions and 696 deletions.
2 changes: 1 addition & 1 deletion R/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
log4j.rootCategory=INFO, file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=true
log4j.appender.file.file=R-unit-tests.log
log4j.appender.file.file=R/target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n

Expand Down
35 changes: 13 additions & 22 deletions build/mvn
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,14 @@ install_app() {

# Install maven under the build/ folder
install_mvn() {
local MVN_VERSION="3.3.3"

install_app \
"http://archive.apache.org/dist/maven/maven-3/3.2.5/binaries" \
"apache-maven-3.2.5-bin.tar.gz" \
"apache-maven-3.2.5/bin/mvn"
MVN_BIN="${_DIR}/apache-maven-3.2.5/bin/mvn"
"http://archive.apache.org/dist/maven/maven-3/${MVN_VERSION}/binaries" \
"apache-maven-${MVN_VERSION}-bin.tar.gz" \
"apache-maven-${MVN_VERSION}/bin/mvn"

MVN_BIN="${_DIR}/apache-maven-${MVN_VERSION}/bin/mvn"
}

# Install zinc under the build/ folder
Expand Down Expand Up @@ -105,28 +108,16 @@ install_scala() {
SCALA_LIBRARY="$(cd "$(dirname ${scala_bin})/../lib" && pwd)/scala-library.jar"
}

# Determines if a given application is already installed. If not, will attempt
# to install
## Arg1 - application name
## Arg2 - Alternate path to local install under build/ dir
check_and_install_app() {
# create the local environment variable in uppercase
local app_bin="`echo $1 | awk '{print toupper(\$0)}'`_BIN"
# some black magic to set the generated app variable (i.e. MVN_BIN) into the
# environment
eval "${app_bin}=`which $1 2>/dev/null`"

if [ -z "`which $1 2>/dev/null`" ]; then
install_$1
fi
}

# Setup healthy defaults for the Zinc port if none were provided from
# the environment
ZINC_PORT=${ZINC_PORT:-"3030"}

# Check and install all applications necessary to build Spark
check_and_install_app "mvn"
# Install Maven if necessary
MVN_BIN="$(command -v mvn)"

if [ ! "$MVN_BIN" ]; then
install_mvn
fi

# Install the proper version of Scala and Zinc for the build
install_zinc
Expand Down
17 changes: 14 additions & 3 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
// key used to store the spark secret in the Hadoop UGI
private val sparkSecretLookupKey = "sparkCookie"

private val authOn = sparkConf.getBoolean("spark.authenticate", false)
private val authOn = sparkConf.getBoolean(SecurityManager.SPARK_AUTH_CONF, false)
// keep spark.ui.acls.enable for backwards compatibility with 1.0
private var aclsOn =
sparkConf.getBoolean("spark.acls.enable", sparkConf.getBoolean("spark.ui.acls.enable", false))
Expand Down Expand Up @@ -365,10 +365,12 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
cookie
} else {
// user must have set spark.authenticate.secret config
sparkConf.getOption("spark.authenticate.secret") match {
// For Master/Worker, auth secret is in conf; for Executors, it is in env variable
sys.env.get(SecurityManager.ENV_AUTH_SECRET)
.orElse(sparkConf.getOption(SecurityManager.SPARK_AUTH_SECRET_CONF)) match {
case Some(value) => value
case None => throw new Exception("Error: a secret key must be specified via the " +
"spark.authenticate.secret config")
SecurityManager.SPARK_AUTH_SECRET_CONF + " config")
}
}
sCookie
Expand Down Expand Up @@ -449,3 +451,12 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
override def getSaslUser(appId: String): String = getSaslUser()
override def getSecretKey(appId: String): String = getSecretKey()
}

private[spark] object SecurityManager {

val SPARK_AUTH_CONF: String = "spark.authenticate"
val SPARK_AUTH_SECRET_CONF: String = "spark.authenticate.secret"
// This is used to set auth secret to an executor's env variable. It should have the same
// value as SPARK_AUTH_SECERET_CONF set in SparkConf
val ENV_AUTH_SECRET = "_SPARK_AUTH_SECRET"
}
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ private[spark] object SparkConf extends Logging {
def isExecutorStartupConf(name: String): Boolean = {
isAkkaConf(name) ||
name.startsWith("spark.akka") ||
name.startsWith("spark.auth") ||
(name.startsWith("spark.auth") && name != SecurityManager.SPARK_AUTH_SECRET_CONF) ||
name.startsWith("spark.ssl") ||
isSparkPortConf(name)
}
Expand Down
22 changes: 15 additions & 7 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ import org.apache.ivy.core.resolve.ResolveOptions
import org.apache.ivy.core.retrieve.RetrieveOptions
import org.apache.ivy.core.settings.IvySettings
import org.apache.ivy.plugins.matcher.GlobPatternMatcher
import org.apache.ivy.plugins.resolver.{ChainResolver, IBiblioResolver}
import org.apache.ivy.plugins.repository.file.FileRepository
import org.apache.ivy.plugins.resolver.{FileSystemResolver, ChainResolver, IBiblioResolver}
import org.apache.spark.SPARK_VERSION
import org.apache.spark.deploy.rest._
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
Expand Down Expand Up @@ -735,8 +736,14 @@ private[spark] object SparkSubmitUtils {
}

/** Path of the local Maven cache. */
private[spark] def m2Path: File = new File(System.getProperty("user.home"),
".m2" + File.separator + "repository" + File.separator)
private[spark] def m2Path: File = {
if (Utils.isTesting) {
// test builds delete the maven cache, and this can cause flakiness
new File("dummy", ".m2" + File.separator + "repository")
} else {
new File(System.getProperty("user.home"), ".m2" + File.separator + "repository")
}
}

/**
* Extracts maven coordinates from a comma-delimited string
Expand All @@ -756,12 +763,13 @@ private[spark] object SparkSubmitUtils {
localM2.setName("local-m2-cache")
cr.add(localM2)

val localIvy = new IBiblioResolver
localIvy.setRoot(new File(ivySettings.getDefaultIvyUserDir,
"local" + File.separator).toURI.toString)
val localIvy = new FileSystemResolver
val localIvyRoot = new File(ivySettings.getDefaultIvyUserDir, "local")
localIvy.setLocal(true)
localIvy.setRepository(new FileRepository(localIvyRoot))
val ivyPattern = Seq("[organisation]", "[module]", "[revision]", "[type]s",
"[artifact](-[classifier]).[ext]").mkString(File.separator)
localIvy.setPattern(ivyPattern)
localIvy.addIvyPattern(localIvyRoot.getAbsolutePath + File.separator + ivyPattern)
localIvy.setName("local-ivy-cache")
cr.add(localIvy)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
replayBus.addListener(appListener)
val appInfo = replay(fs.getFileStatus(new Path(logDir, attempt.logPath)), replayBus)

ui.setAppName(s"${appInfo.name} ($appId)")
appInfo.foreach { app => ui.setAppName(s"${app.name} ($appId)") }

val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
ui.getSecurityManager.setAcls(uiAclsEnabled)
Expand Down Expand Up @@ -282,8 +282,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val newAttempts = logs.flatMap { fileStatus =>
try {
val res = replay(fileStatus, bus)
logInfo(s"Application log ${res.logPath} loaded successfully.")
Some(res)
res match {
case Some(r) => logDebug(s"Application log ${r.logPath} loaded successfully.")
case None => logWarning(s"Failed to load application log ${fileStatus.getPath}. " +
"The application may have not started.")
}
res
} catch {
case e: Exception =>
logError(
Expand Down Expand Up @@ -429,9 +433,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

/**
* Replays the events in the specified log file and returns information about the associated
* application.
* application. Return `None` if the application ID cannot be located.
*/
private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationAttemptInfo = {
private def replay(
eventLog: FileStatus,
bus: ReplayListenerBus): Option[FsApplicationAttemptInfo] = {
val logPath = eventLog.getPath()
logInfo(s"Replaying log path: $logPath")
val logInput =
Expand All @@ -445,16 +451,18 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val appCompleted = isApplicationCompleted(eventLog)
bus.addListener(appListener)
bus.replay(logInput, logPath.toString, !appCompleted)
new FsApplicationAttemptInfo(
logPath.getName(),
appListener.appName.getOrElse(NOT_STARTED),
appListener.appId.getOrElse(logPath.getName()),
appListener.appAttemptId,
appListener.startTime.getOrElse(-1L),
appListener.endTime.getOrElse(-1L),
getModificationTime(eventLog).get,
appListener.sparkUser.getOrElse(NOT_STARTED),
appCompleted)
appListener.appId.map { appId =>
new FsApplicationAttemptInfo(
logPath.getName(),
appListener.appName.getOrElse(NOT_STARTED),
appId,
appListener.appAttemptId,
appListener.startTime.getOrElse(-1L),
appListener.endTime.getOrElse(-1L),
getModificationTime(eventLog).get,
appListener.sparkUser.getOrElse(NOT_STARTED),
appCompleted)
}
} finally {
logInput.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.JavaConversions._
import scala.collection.Map

import org.apache.spark.Logging
import org.apache.spark.SecurityManager
import org.apache.spark.deploy.Command
import org.apache.spark.launcher.WorkerCommandBuilder
import org.apache.spark.util.Utils
Expand All @@ -40,12 +41,14 @@ object CommandUtils extends Logging {
*/
def buildProcessBuilder(
command: Command,
securityMgr: SecurityManager,
memory: Int,
sparkHome: String,
substituteArguments: String => String,
classPaths: Seq[String] = Seq[String](),
env: Map[String, String] = sys.env): ProcessBuilder = {
val localCommand = buildLocalCommand(command, substituteArguments, classPaths, env)
val localCommand = buildLocalCommand(
command, securityMgr, substituteArguments, classPaths, env)
val commandSeq = buildCommandSeq(localCommand, memory, sparkHome)
val builder = new ProcessBuilder(commandSeq: _*)
val environment = builder.environment()
Expand All @@ -69,27 +72,34 @@ object CommandUtils extends Logging {
*/
private def buildLocalCommand(
command: Command,
securityMgr: SecurityManager,
substituteArguments: String => String,
classPath: Seq[String] = Seq[String](),
env: Map[String, String]): Command = {
val libraryPathName = Utils.libraryPathEnvName
val libraryPathEntries = command.libraryPathEntries
val cmdLibraryPath = command.environment.get(libraryPathName)

val newEnvironment = if (libraryPathEntries.nonEmpty && libraryPathName.nonEmpty) {
var newEnvironment = if (libraryPathEntries.nonEmpty && libraryPathName.nonEmpty) {
val libraryPaths = libraryPathEntries ++ cmdLibraryPath ++ env.get(libraryPathName)
command.environment + ((libraryPathName, libraryPaths.mkString(File.pathSeparator)))
} else {
command.environment
}

// set auth secret to env variable if needed
if (securityMgr.isAuthenticationEnabled) {
newEnvironment += (SecurityManager.ENV_AUTH_SECRET -> securityMgr.getSecretKey)
}

Command(
command.mainClass,
command.arguments.map(substituteArguments),
newEnvironment,
command.classPathEntries ++ classPath,
Seq[String](), // library path already captured in environment variable
command.javaOpts)
// filter out auth secret from java options
command.javaOpts.filterNot(_.startsWith("-D" + SecurityManager.SPARK_AUTH_SECRET_CONF)))
}

/** Spawn a thread that will redirect a given stream to a file */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ private[deploy] class DriverRunner(
}

// TODO: If we add ability to submit multiple jars they should also be added here
val builder = CommandUtils.buildProcessBuilder(driverDesc.command, driverDesc.mem,
sparkHome.getAbsolutePath, substituteVariables)
val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager,
driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)
launchDriver(builder, driverDir, driverDesc.supervise)
}
catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import akka.actor.ActorRef
import com.google.common.base.Charsets.UTF_8
import com.google.common.io.Files

import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.{SecurityManager, SparkConf, Logging}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -125,8 +125,8 @@ private[deploy] class ExecutorRunner(
private def fetchAndRunExecutor() {
try {
// Launch the process
val builder = CommandUtils.buildProcessBuilder(appDesc.command, memory,
sparkHome.getAbsolutePath, substituteVariables)
val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),
memory, sparkHome.getAbsolutePath, substituteVariables)
val command = builder.command()
logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,10 @@ private[spark] class BlockManager(
private var externalBlockStoreInitialized = false
private[spark] val memoryStore = new MemoryStore(this, maxMemory)
private[spark] val diskStore = new DiskStore(this, diskBlockManager)
private[spark] lazy val externalBlockStore: ExternalBlockStore =
private[spark] lazy val externalBlockStore: ExternalBlockStore = {
externalBlockStoreInitialized = true
new ExternalBlockStore(this, executorId)
}

private[spark]
val externalShuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ private[spark] object UIUtils extends Logging {
{ g.incomingEdges.map { e => <div class="incoming-edge">{e.fromId},{e.toId}</div> } }
{ g.outgoingEdges.map { e => <div class="outgoing-edge">{e.fromId},{e.toId}</div> } }
{
g.rootCluster.getAllNodes.filter(_.cached).map { n =>
g.rootCluster.getCachedNodes.map { n =>
<div class="cached-rdd">{n.id}</div>
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ private[ui] class RDDOperationCluster(val id: String, private var _name: String)
_childClusters += childCluster
}

/** Return all the nodes container in this cluster, including ones nested in other clusters. */
def getAllNodes: Seq[RDDOperationNode] = {
_childNodes ++ _childClusters.flatMap(_.childNodes)
/** Return all the nodes which are cached. */
def getCachedNodes: Seq[RDDOperationNode] = {
_childNodes.filter(_.cached) ++ _childClusters.flatMap(_.getCachedNodes)
}
}

Expand Down
Loading

0 comments on commit cf62b95

Please sign in to comment.