Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-5493
Browse files Browse the repository at this point in the history
Conflicts:
	core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
  • Loading branch information
Marcelo Vanzin committed Feb 6, 2015
2 parents 0540d38 + 80f3bcb commit b6c947d
Show file tree
Hide file tree
Showing 132 changed files with 6,236 additions and 1,236 deletions.
8 changes: 4 additions & 4 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
<deb.pkg.name>spark</deb.pkg.name>
<deb.install.path>/usr/share/spark</deb.install.path>
<deb.user>root</deb.user>
<deb.bin.filemode>744</deb.bin.filemode>
<deb.bin.filemode>755</deb.bin.filemode>
</properties>

<dependencies>
Expand Down Expand Up @@ -280,7 +280,7 @@
<user>${deb.user}</user>
<group>${deb.user}</group>
<prefix>${deb.install.path}/conf</prefix>
<filemode>744</filemode>
<filemode>${deb.bin.filemode}</filemode>
</mapper>
</data>
<data>
Expand All @@ -302,7 +302,7 @@
<user>${deb.user}</user>
<group>${deb.user}</group>
<prefix>${deb.install.path}/sbin</prefix>
<filemode>744</filemode>
<filemode>${deb.bin.filemode}</filemode>
</mapper>
</data>
<data>
Expand All @@ -313,7 +313,7 @@
<user>${deb.user}</user>
<group>${deb.user}</group>
<prefix>${deb.install.path}/python</prefix>
<filemode>744</filemode>
<filemode>${deb.bin.filemode}</filemode>
</mapper>
</data>
</dataSet>
Expand Down
Empty file modified bin/spark-shell.cmd
100755 → 100644
Empty file.
2 changes: 1 addition & 1 deletion bin/spark-submit2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ set ORIG_ARGS=%*
rem Reset the values of all variables used
set SPARK_SUBMIT_DEPLOY_MODE=client

if not defined %SPARK_CONF_DIR% (
if [%SPARK_CONF_DIR%] == [] (
set SPARK_CONF_DIR=%SPARK_HOME%\conf
)
set SPARK_SUBMIT_PROPERTIES_FILE=%SPARK_CONF_DIR%\spark-defaults.conf
Expand Down
10 changes: 5 additions & 5 deletions build/mvn
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ install_app() {
local binary="${_DIR}/$3"

# setup `curl` and `wget` silent options if we're running on Jenkins
local curl_opts=""
local curl_opts="-L"
local wget_opts=""
if [ -n "$AMPLAB_JENKINS" ]; then
curl_opts="-s"
wget_opts="--quiet"
curl_opts="-s ${curl_opts}"
wget_opts="--quiet ${wget_opts}"
else
curl_opts="--progress-bar"
wget_opts="--progress=bar:force"
curl_opts="--progress-bar ${curl_opts}"
wget_opts="--progress=bar:force ${wget_opts}"
fi

if [ -z "$3" -o ! -f "$binary" ]; then
Expand Down
7 changes: 7 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,13 @@
<artifactId>jetty-servlet</artifactId>
<scope>compile</scope>
</dependency>
<!-- Because we mark jetty as provided and shade it, its dependency
orbit is ignored, so we explicitly list it here (see SPARK-5557).-->
<dependency>
<groupId>org.eclipse.jetty.orbit</groupId>
<artifactId>javax.servlet</artifactId>
<version>${orbit.version}</version>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,15 @@ private[spark] class ExecutorAllocationManager(
private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors",
Integer.MAX_VALUE)

// How long there must be backlogged tasks for before an addition is triggered
// How long there must be backlogged tasks for before an addition is triggered (seconds)
private val schedulerBacklogTimeout = conf.getLong(
"spark.dynamicAllocation.schedulerBacklogTimeout", 60)
"spark.dynamicAllocation.schedulerBacklogTimeout", 5)

// Same as above, but used only after `schedulerBacklogTimeout` is exceeded
private val sustainedSchedulerBacklogTimeout = conf.getLong(
"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", schedulerBacklogTimeout)

// How long an executor must be idle for before it is removed
// How long an executor must be idle for before it is removed (seconds)
private val executorIdleTimeout = conf.getLong(
"spark.dynamicAllocation.executorIdleTimeout", 600)

Expand Down Expand Up @@ -486,8 +486,8 @@ private[spark] class ExecutorAllocationManager(
}
}

override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = {
val executorId = blockManagerAdded.blockManagerId.executorId
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = {
val executorId = executorAdded.executorId
if (executorId != SparkContext.DRIVER_IDENTIFIER) {
// This guards against the race condition in which the `SparkListenerTaskStart`
// event is posted before the `SparkListenerBlockManagerAdded` event, which is
Expand All @@ -498,9 +498,8 @@ private[spark] class ExecutorAllocationManager(
}
}

override def onBlockManagerRemoved(
blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = {
allocationManager.onExecutorRemoved(blockManagerRemoved.blockManagerId.executorId)
override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = {
allocationManager.onExecutorRemoved(executorRemoved.executorId)
}

/**
Expand Down
34 changes: 27 additions & 7 deletions core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,20 @@ private[spark] object TestUtils {
* Note: if this is used during class loader tests, class names should be unique
* in order to avoid interference between tests.
*/
def createJarWithClasses(classNames: Seq[String], value: String = ""): URL = {
def createJarWithClasses(
classNames: Seq[String],
toStringValue: String = "",
classNamesWithBase: Seq[(String, String)] = Seq(),
classpathUrls: Seq[URL] = Seq()): URL = {
val tempDir = Utils.createTempDir()
val files = for (name <- classNames) yield createCompiledClass(name, tempDir, value)
val files1 = for (name <- classNames) yield {
createCompiledClass(name, tempDir, toStringValue, classpathUrls = classpathUrls)
}
val files2 = for ((childName, baseName) <- classNamesWithBase) yield {
createCompiledClass(childName, tempDir, toStringValue, baseName, classpathUrls)
}
val jarFile = new File(tempDir, "testJar-%s.jar".format(System.currentTimeMillis()))
createJar(files, jarFile)
createJar(files1 ++ files2, jarFile)
}


Expand Down Expand Up @@ -85,15 +94,26 @@ private[spark] object TestUtils {
}

/** Creates a compiled class with the given name. Class file will be placed in destDir. */
def createCompiledClass(className: String, destDir: File, value: String = ""): File = {
def createCompiledClass(
className: String,
destDir: File,
toStringValue: String = "",
baseClass: String = null,
classpathUrls: Seq[URL] = Seq()): File = {
val compiler = ToolProvider.getSystemJavaCompiler
val extendsText = Option(baseClass).map { c => s" extends ${c}" }.getOrElse("")
val sourceFile = new JavaSourceFromString(className,
"public class " + className + " implements java.io.Serializable {" +
" @Override public String toString() { return \"" + value + "\"; }}")
"public class " + className + extendsText + " implements java.io.Serializable {" +
" @Override public String toString() { return \"" + toStringValue + "\"; }}")

// Calling this outputs a class file in pwd. It's easier to just rename the file than
// build a custom FileManager that controls the output location.
compiler.getTask(null, null, null, null, null, Seq(sourceFile)).call()
val options = if (classpathUrls.nonEmpty) {
Seq("-classpath", classpathUrls.map { _.getFile }.mkString(File.pathSeparator))
} else {
Seq()
}
compiler.getTask(null, null, null, options, null, Seq(sourceFile)).call()

val fileName = className + ".class"
val result = new File(fileName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.lang.reflect.Method
import java.security.PrivilegedExceptionAction

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.fs.FileSystem.Statistics
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
Expand Down Expand Up @@ -186,6 +186,21 @@ class SparkHadoopUtil extends Logging {
val method = context.getClass.getMethod("getConfiguration")
method.invoke(context).asInstanceOf[Configuration]
}

/**
* Get [[FileStatus]] objects for all leaf children (files) under the given base path. If the
* given path points to a file, return a single-element collection containing [[FileStatus]] of
* that file.
*/
def listLeafStatuses(fs: FileSystem, basePath: Path): Seq[FileStatus] = {
def recurse(path: Path) = {
val (directories, leaves) = fs.listStatus(path).partition(_.isDir)
leaves ++ directories.flatMap(f => listLeafStatuses(fs, f.getPath))
}

val baseStatus = fs.getFileStatus(basePath)
if (baseStatus.isDir) recurse(basePath) else Array(baseStatus)
}
}

object SparkHadoopUtil {
Expand Down
15 changes: 11 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ import org.apache.ivy.core.retrieve.RetrieveOptions
import org.apache.ivy.core.settings.IvySettings
import org.apache.ivy.plugins.matcher.GlobPatternMatcher
import org.apache.ivy.plugins.resolver.{ChainResolver, IBiblioResolver}

import org.apache.spark.executor.ExecutorURLClassLoader
import org.apache.spark.util.Utils
import org.apache.spark.executor.ChildExecutorURLClassLoader
import org.apache.spark.executor.MutableURLClassLoader

/**
* Main gateway of launching a Spark application.
Expand Down Expand Up @@ -390,8 +391,14 @@ object SparkSubmit {
printStream.println("\n")
}

val loader = new ExecutorURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
val loader =
if (sysProps.getOrElse("spark.files.userClassPathFirst", "false").toBoolean) {
new ChildExecutorURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
} else {
new ExecutorURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
}
Thread.currentThread.setContextClassLoader(loader)

for (jar <- childClasspath) {
Expand Down Expand Up @@ -444,7 +451,7 @@ object SparkSubmit {
}
}

private def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) {
private def addJarToClasspath(localJar: String, loader: MutableURLClassLoader) {
val uri = Utils.resolveURI(localJar)
uri.getScheme match {
case "file" | "local" =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,10 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
val logInfos = statusList
.filter { entry =>
try {
val modTime = getModificationTime(entry)
newLastModifiedTime = math.max(newLastModifiedTime, modTime)
modTime >= lastModifiedTime
getModificationTime(entry).map { time =>
newLastModifiedTime = math.max(newLastModifiedTime, time)
time >= lastModifiedTime
}.getOrElse(false)
} catch {
case e: AccessControlException =>
// Do not use "logInfo" since these messages can get pretty noisy if printed on
Expand Down Expand Up @@ -251,7 +252,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
appListener.appName.getOrElse(NOT_STARTED),
appListener.startTime.getOrElse(-1L),
appListener.endTime.getOrElse(-1L),
getModificationTime(eventLog),
getModificationTime(eventLog).get,
appListener.sparkUser.getOrElse(NOT_STARTED),
isApplicationCompleted(eventLog))
} finally {
Expand Down Expand Up @@ -310,11 +311,16 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
*/
private def isLegacyLogDirectory(entry: FileStatus): Boolean = entry.isDir()

private def getModificationTime(fsEntry: FileStatus): Long = {
if (fsEntry.isDir) {
fs.listStatus(fsEntry.getPath).map(_.getModificationTime()).max
/**
* Returns the modification time of the given event log. If the status points at an empty
* directory, `None` is returned, indicating that there isn't an event log at that location.
*/
private def getModificationTime(fsEntry: FileStatus): Option[Long] = {
if (isLegacyLogDirectory(fsEntry)) {
val statusList = fs.listStatus(fsEntry.getPath)
if (!statusList.isEmpty) Some(statusList.map(_.getModificationTime()).max) else None
} else {
fsEntry.getModificationTime()
Some(fsEntry.getModificationTime())
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ private[spark] class ExecutorRunner(
val worker: ActorRef,
val workerId: String,
val host: String,
val webUiPort: Int,
val sparkHome: File,
val executorDir: File,
val workerUrl: String,
Expand Down Expand Up @@ -134,6 +135,12 @@ private[spark] class ExecutorRunner(
// In case we are running this from within the Spark Shell, avoid creating a "scala"
// parent process for the executor command
builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")

// Add webUI log urls
val baseUrl = s"http://$host:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")

process = builder.start()
val header = "Spark Executor Command: %s\n%s\n\n".format(
command.mkString("\"", "\" \"", "\""), "=" * 40)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ private[spark] class Worker(
self,
workerId,
host,
webUiPort,
sparkHome,
executorDir,
akkaUrl,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,16 @@ private[spark] class CoarseGrainedExecutorBackend(
override def preStart() {
logInfo("Connecting to driver: " + driverUrl)
driver = context.actorSelection(driverUrl)
driver ! RegisterExecutor(executorId, hostPort, cores)
driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls)
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}

def extractLogUrls: Map[String, String] = {
val prefix = "SPARK_LOG_URL_"
sys.env.filterKeys(_.startsWith(prefix))
.map(e => (e._1.substring(prefix.length).toLowerCase, e._2))
}

override def receiveWithLogging = {
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ private[spark] class Executor(
Thread.setDefaultUncaughtExceptionHandler(SparkUncaughtExceptionHandler)
}

// Start worker thread pool
val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker")

val executorSource = new ExecutorSource(this, executorId)

if (!isLocal) {
Expand All @@ -101,9 +104,6 @@ private[spark] class Executor(
// Limit of bytes for total size of results (default is 1GB)
private val maxResultSize = Utils.getMaxResultSize(conf)

// Start worker thread pool
val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker")

// Maintains the list of running tasks.
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,17 @@ private[spark] class ChildExecutorURLClassLoader(urls: Array[URL], parent: Class
super.addURL(url)
}
override def findClass(name: String): Class[_] = {
super.findClass(name)
val loaded = super.findLoadedClass(name)
if (loaded != null) {
return loaded
}
try {
super.findClass(name)
} catch {
case e: ClassNotFoundException => {
parentClassLoader.loadClass(name)
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ private[spark] object CoarseGrainedClusterMessages {
case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage

// Executors to driver
case class RegisterExecutor(executorId: String, hostPort: String, cores: Int)
case class RegisterExecutor(
executorId: String,
hostPort: String,
cores: Int,
logUrls: Map[String, String])
extends CoarseGrainedClusterMessage {
Utils.checkHostPort(hostPort, "Expected host port")
}
Expand Down
Loading

0 comments on commit b6c947d

Please sign in to comment.