Skip to content

Commit

Permalink
merge master
Browse files Browse the repository at this point in the history
  • Loading branch information
dorx committed Jun 17, 2014
2 parents 9e74ab5 + b2ebf42 commit 90d94c0
Show file tree
Hide file tree
Showing 121 changed files with 2,154 additions and 1,110 deletions.
2 changes: 2 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
<version>3.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
Expand Down
21 changes: 21 additions & 0 deletions core/src/main/resources/org/apache/spark/ui/static/webui.css
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,24 @@ span.kill-link {
span.kill-link a {
color: gray;
}

span.expand-details {
font-size: 10pt;
cursor: pointer;
color: grey;
float: right;
}

.stage-details {
max-height: 100px;
overflow-y: auto;
margin: 0;
transition: max-height 0.5s ease-out, padding 0.5s ease-out;
}

.stage-details.collapsed {
max-height: 0;
padding-top: 0;
padding-bottom: 0;
border: none;
}
20 changes: 12 additions & 8 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,14 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
private val loading = new HashSet[RDDBlockId]()

/** Gets or computes an RDD split. Used by RDD.iterator() when an RDD is cached. */
def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext,
def getOrCompute[T](
rdd: RDD[T],
split: Partition,
context: TaskContext,
storageLevel: StorageLevel): Iterator[T] = {

val key = RDDBlockId(rdd.id, split.index)
logDebug("Looking for partition " + key)
logDebug(s"Looking for partition $key")
blockManager.get(key) match {
case Some(values) =>
// Partition is already materialized, so just return its values
Expand All @@ -45,7 +49,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
// Mark the split as loading (unless someone else marks it first)
loading.synchronized {
if (loading.contains(key)) {
logInfo("Another thread is loading %s, waiting for it to finish...".format(key))
logInfo(s"Another thread is loading $key, waiting for it to finish...")
while (loading.contains(key)) {
try {
loading.wait()
Expand All @@ -54,7 +58,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
logWarning(s"Got an exception while waiting for another thread to load $key", e)
}
}
logInfo("Finished waiting for %s".format(key))
logInfo(s"Finished waiting for $key")
/* See whether someone else has successfully loaded it. The main way this would fail
* is for the RDD-level cache eviction policy if someone else has loaded the same RDD
* partition but we didn't want to make space for it. However, that case is unlikely
Expand All @@ -64,7 +68,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
case Some(values) =>
return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
case None =>
logInfo("Whoever was loading %s failed; we'll try it ourselves".format(key))
logInfo(s"Whoever was loading $key failed; we'll try it ourselves")
loading.add(key)
}
} else {
Expand All @@ -73,7 +77,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
}
try {
// If we got here, we have to load the split
logInfo("Partition %s not found, computing it".format(key))
logInfo(s"Partition $key not found, computing it")
val computedValues = rdd.computeOrReadCheckpoint(split, context)

// Persist the result, so long as the task is not running locally
Expand All @@ -97,8 +101,8 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
case Some(values) =>
values.asInstanceOf[Iterator[T]]
case None =>
logInfo("Failure to store %s".format(key))
throw new Exception("Block manager failed to return persisted valued")
logInfo(s"Failure to store $key")
throw new SparkException("Block manager failed to return persisted value")
}
} else {
// In this case the RDD is cached to an array buffer. This will save the results
Expand Down
31 changes: 21 additions & 10 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, Me
import org.apache.spark.scheduler.local.LocalBackend
import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}
import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}

/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
Expand Down Expand Up @@ -434,12 +434,21 @@ class SparkContext(config: SparkConf) extends Logging {

// Methods for creating RDDs

/** Distribute a local Scala collection to form an RDD. */
/** Distribute a local Scala collection to form an RDD.
*
* @note Parallelize acts lazily. If `seq` is a mutable collection and is
* altered after the call to parallelize and before the first action on the
* RDD, the resultant RDD will reflect the modified collection. Pass a copy of
* the argument to avoid this.
*/
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}

/** Distribute a local Scala collection to form an RDD. */
/** Distribute a local Scala collection to form an RDD.
*
* This method is identical to `parallelize`.
*/
def makeRDD[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
parallelize(seq, numSlices)
}
Expand Down Expand Up @@ -1027,9 +1036,11 @@ class SparkContext(config: SparkConf) extends Logging {
* Capture the current user callsite and return a formatted version for printing. If the user
* has overridden the call site, this will return the user's version.
*/
private[spark] def getCallSite(): String = {
val defaultCallSite = Utils.getCallSiteInfo
Option(getLocalProperty("externalCallSite")).getOrElse(defaultCallSite.toString)
private[spark] def getCallSite(): CallSite = {
Option(getLocalProperty("externalCallSite")) match {
case Some(callSite) => CallSite(callSite, long = "")
case None => Utils.getCallSite
}
}

/**
Expand All @@ -1049,11 +1060,11 @@ class SparkContext(config: SparkConf) extends Logging {
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite)
logInfo("Starting job: " + callSite.short)
val start = System.nanoTime
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
resultHandler, localProperties.get)
logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
logInfo("Job finished: " + callSite.short + ", took " + (System.nanoTime - start) / 1e9 + " s")
rdd.doCheckpoint()
}

Expand Down Expand Up @@ -1134,11 +1145,11 @@ class SparkContext(config: SparkConf) extends Logging {
evaluator: ApproximateEvaluator[U, R],
timeout: Long): PartialResult[R] = {
val callSite = getCallSite
logInfo("Starting job: " + callSite)
logInfo("Starting job: " + callSite.short)
val start = System.nanoTime
val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout,
localProperties.get)
logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
logInfo("Job finished: " + callSite.short + ", took " + (System.nanoTime - start) / 1e9 + " s")
result
}

Expand Down
16 changes: 16 additions & 0 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@

package org.apache.spark.api.java

import java.util.Comparator

import scala.language.implicitConversions
import scala.reflect.ClassTag

import org.apache.spark._
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
Expand Down Expand Up @@ -172,6 +175,19 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
rdd.setName(name)
this
}

/**
* Return this RDD sorted by the given key function.
*/
def sortBy[S](f: JFunction[T, S], ascending: Boolean, numPartitions: Int): JavaRDD[T] = {
import scala.collection.JavaConverters._
def fn = (x: T) => f.call(x)
import com.google.common.collect.Ordering // shadows scala.math.Ordering
implicit val ordering = Ordering.natural().asInstanceOf[Ordering[S]]
implicit val ctag: ClassTag[S] = fakeClassTag
wrapRDD(rdd.sortBy(fn, ascending, numPartitions))
}

}

object JavaRDD {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ package org.apache.spark.deploy

private[spark] object ExecutorState extends Enumeration {

val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST = Value
val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST, EXITED = Value

type ExecutorState = Value

def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST).contains(state)
def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST, EXITED).contains(state)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.deploy.master
import java.util.Date

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import akka.actor.ActorRef

Expand All @@ -36,6 +37,7 @@ private[spark] class ApplicationInfo(

@transient var state: ApplicationState.Value = _
@transient var executors: mutable.HashMap[Int, ExecutorInfo] = _
@transient var removedExecutors: ArrayBuffer[ExecutorInfo] = _
@transient var coresGranted: Int = _
@transient var endTime: Long = _
@transient var appSource: ApplicationSource = _
Expand All @@ -51,6 +53,7 @@ private[spark] class ApplicationInfo(
endTime = -1L
appSource = new ApplicationSource(this)
nextExecutorId = 0
removedExecutors = new ArrayBuffer[ExecutorInfo]
}

private def newExecutorId(useID: Option[Int] = None): Int = {
Expand All @@ -74,6 +77,7 @@ private[spark] class ApplicationInfo(

def removeExecutor(exec: ExecutorInfo) {
if (executors.contains(exec.id)) {
removedExecutors += executors(exec.id)
executors -= exec.id
coresGranted -= exec.cores
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,19 @@ private[spark] class ExecutorInfo(
}

def fullId: String = application.id + "/" + id

override def equals(other: Any): Boolean = {
other match {
case info: ExecutorInfo =>
fullId == info.fullId &&
worker.id == info.worker.id &&
cores == info.cores &&
memory == info.memory
case _ => false
}
}

override def toString: String = fullId

override def hashCode: Int = toString.hashCode()
}
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,11 @@ private[spark] class Master(
appInfo.removeExecutor(exec)
exec.worker.removeExecutor(exec)

val normalExit = exitStatus.exists(_ == 0)
// Only retry certain number of times so we don't go into an infinite loop.
if (appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) {
if (!normalExit && appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) {
schedule()
} else {
} else if (!normalExit) {
logError("Application %s with ID %s failed %d times, removing it".format(
appInfo.desc.name, appInfo.id, appInfo.retryCount))
removeApplication(appInfo, ApplicationState.FAILED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.xml.Node
import akka.pattern.ask
import org.json4s.JValue

import org.apache.spark.deploy.JsonProtocol
import org.apache.spark.deploy.{ExecutorState, JsonProtocol}
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.master.ExecutorInfo
import org.apache.spark.ui.{WebUIPage, UIUtils}
Expand Down Expand Up @@ -57,43 +57,55 @@ private[spark] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app
})

val executorHeaders = Seq("ExecutorID", "Worker", "Cores", "Memory", "State", "Logs")
val executors = app.executors.values.toSeq
val executorTable = UIUtils.listingTable(executorHeaders, executorRow, executors)
val allExecutors = (app.executors.values ++ app.removedExecutors).toSet.toSeq
// This includes executors that are either still running or have exited cleanly
val executors = allExecutors.filter { exec =>
!ExecutorState.isFinished(exec.state) || exec.state == ExecutorState.EXITED
}
val removedExecutors = allExecutors.diff(executors)
val executorsTable = UIUtils.listingTable(executorHeaders, executorRow, executors)
val removedExecutorsTable = UIUtils.listingTable(executorHeaders, executorRow, removedExecutors)

val content =
<div class="row-fluid">
<div class="span12">
<ul class="unstyled">
<li><strong>ID:</strong> {app.id}</li>
<li><strong>Name:</strong> {app.desc.name}</li>
<li><strong>User:</strong> {app.desc.user}</li>
<li><strong>Cores:</strong>
{
if (app.desc.maxCores.isEmpty) {
"Unlimited (%s granted)".format(app.coresGranted)
} else {
"%s (%s granted, %s left)".format(
app.desc.maxCores.get, app.coresGranted, app.coresLeft)
}
}
</li>
<li>
<strong>Executor Memory:</strong>
{Utils.megabytesToString(app.desc.memoryPerSlave)}
</li>
<li><strong>Submit Date:</strong> {app.submitDate}</li>
<li><strong>State:</strong> {app.state}</li>
<li><strong><a href={app.desc.appUiUrl}>Application Detail UI</a></strong></li>
</ul>
</div>
<div class="row-fluid">
<div class="span12">
<ul class="unstyled">
<li><strong>ID:</strong> {app.id}</li>
<li><strong>Name:</strong> {app.desc.name}</li>
<li><strong>User:</strong> {app.desc.user}</li>
<li><strong>Cores:</strong>
{
if (app.desc.maxCores.isEmpty) {
"Unlimited (%s granted)".format(app.coresGranted)
} else {
"%s (%s granted, %s left)".format(
app.desc.maxCores.get, app.coresGranted, app.coresLeft)
}
}
</li>
<li>
<strong>Executor Memory:</strong>
{Utils.megabytesToString(app.desc.memoryPerSlave)}
</li>
<li><strong>Submit Date:</strong> {app.submitDate}</li>
<li><strong>State:</strong> {app.state}</li>
<li><strong><a href={app.desc.appUiUrl}>Application Detail UI</a></strong></li>
</ul>
</div>
</div>

<div class="row-fluid"> <!-- Executors -->
<div class="span12">
<h4> Executor Summary </h4>
{executorTable}
</div>
</div>;
<div class="row-fluid"> <!-- Executors -->
<div class="span12">
<h4> Executor Summary </h4>
{executorsTable}
{
if (removedExecutors.nonEmpty) {
<h4> Removed Executors </h4> ++
removedExecutorsTable
}
}
</div>
</div>;
UIUtils.basicSparkPage(content, "Application: " + app.desc.name)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,10 @@ private[spark] class ExecutorRunner(
Files.write(header, stderr, Charsets.UTF_8)
stderrAppender = FileAppender(process.getErrorStream, stderr, conf)

// Wait for it to exit; this is actually a bad thing if it happens, because we expect to run
// long-lived processes only. However, in the future, we might restart the executor a few
// times on the same machine.
// Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
// or with nonzero exit code
val exitCode = process.waitFor()
state = ExecutorState.FAILED
state = ExecutorState.EXITED
val message = "Command exited with code " + exitCode
worker ! ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode))
} catch {
Expand Down
Loading

0 comments on commit 90d94c0

Please sign in to comment.