Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into them-rdd-memories
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Jun 27, 2014
2 parents e1b8b25 + 18f29b9 commit a8704c1
Show file tree
Hide file tree
Showing 24 changed files with 232 additions and 158 deletions.
12 changes: 7 additions & 5 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ import scala.concurrent.Await

import akka.actor._
import akka.pattern.ask

import org.apache.spark.util._
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.shuffle.MetadataFetchFailedException
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util._

private[spark] sealed trait MapOutputTrackerMessage
private[spark] case class GetMapOutputStatuses(shuffleId: Int)
Expand Down Expand Up @@ -168,8 +170,8 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses)
}
} else {
throw new FetchFailedException(null, shuffleId, -1, reduceId,
new Exception("Missing all output locations for shuffle " + shuffleId))
throw new MetadataFetchFailedException(
shuffleId, reduceId, "Missing all output locations for shuffle " + shuffleId)
}
} else {
statuses.synchronized {
Expand Down Expand Up @@ -371,8 +373,8 @@ private[spark] object MapOutputTracker {
statuses.map {
status =>
if (status == null) {
throw new FetchFailedException(null, shuffleId, -1, reduceId,
new Exception("Missing an output location for shuffle " + shuffleId))
throw new MetadataFetchFailedException(
shuffleId, reduceId, "Missing an output location for shuffle " + shuffleId)
} else {
(status.location, decompressSize(status.compressedSizes(reduceId)))
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ case object Resubmitted extends TaskFailedReason {
*/
@DeveloperApi
case class FetchFailed(
bmAddress: BlockManagerId,
bmAddress: BlockManagerId, // Note that bmAddress can be null
shuffleId: Int,
mapId: Int,
reduceId: Int)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import scala.collection.JavaConversions._
import scala.collection.mutable.HashMap

import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler._
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
import org.apache.spark.util.{AkkaUtils, Utils}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,11 @@ case class SparkListenerBlockManagerRemoved(blockManagerId: BlockManagerId)
@DeveloperApi
case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent

@DeveloperApi
case class SparkListenerApplicationStart(appName: String, time: Long, sparkUser: String)
extends SparkListenerEvent

@DeveloperApi
case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent

/** An event used in the listener to shutdown the listener daemon thread. */
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/Stage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ private[spark] class Stage(
id
}

def attemptId: Int = nextAttemptId

val name = callSite.short
val details = callSite.long

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ import java.nio.ByteBuffer

import org.apache.spark.util.SerializableBuffer

/**
* Description of a task that gets passed onto executors to be executed, usually created by
* [[TaskSetManager.resourceOffer]].
*/
private[spark] class TaskDescription(
val taskId: Long,
val executorId: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ import org.apache.spark.annotation.DeveloperApi
class TaskInfo(
val taskId: Long,
val index: Int,
val attempt: Int,
val launchTime: Long,
val executorId: String,
val host: String,
val taskLocality: TaskLocality.TaskLocality) {
val taskLocality: TaskLocality.TaskLocality,
val speculative: Boolean) {

/**
* The time when the task started remotely getting the result. Will not be set if the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,17 +335,19 @@ private[spark] class TaskSetManager(
/**
* Dequeue a pending task for a given node and return its index and locality level.
* Only search for tasks matching the given locality constraint.
*
* @return An option containing (task index within the task set, locality, is speculative?)
*/
private def findTask(execId: String, host: String, locality: TaskLocality.Value)
: Option[(Int, TaskLocality.Value)] =
: Option[(Int, TaskLocality.Value, Boolean)] =
{
for (index <- findTaskFromList(execId, getPendingTasksForExecutor(execId))) {
return Some((index, TaskLocality.PROCESS_LOCAL))
return Some((index, TaskLocality.PROCESS_LOCAL, false))
}

if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
for (index <- findTaskFromList(execId, getPendingTasksForHost(host))) {
return Some((index, TaskLocality.NODE_LOCAL))
return Some((index, TaskLocality.NODE_LOCAL, false))
}
}

Expand All @@ -354,23 +356,25 @@ private[spark] class TaskSetManager(
rack <- sched.getRackForHost(host)
index <- findTaskFromList(execId, getPendingTasksForRack(rack))
} {
return Some((index, TaskLocality.RACK_LOCAL))
return Some((index, TaskLocality.RACK_LOCAL, false))
}
}

// Look for no-pref tasks after rack-local tasks since they can run anywhere.
for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) {
return Some((index, TaskLocality.PROCESS_LOCAL))
return Some((index, TaskLocality.PROCESS_LOCAL, false))
}

if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
for (index <- findTaskFromList(execId, allPendingTasks)) {
return Some((index, TaskLocality.ANY))
return Some((index, TaskLocality.ANY, false))
}
}

// Finally, if all else has failed, find a speculative task
findSpeculativeTask(execId, host, locality)
findSpeculativeTask(execId, host, locality).map { case (taskIndex, allowedLocality) =>
(taskIndex, allowedLocality, true)
}
}

/**
Expand All @@ -391,7 +395,7 @@ private[spark] class TaskSetManager(
}

findTask(execId, host, allowedLocality) match {
case Some((index, taskLocality)) => {
case Some((index, taskLocality, speculative)) => {
// Found a task; do some bookkeeping and return a task description
val task = tasks(index)
val taskId = sched.newTaskId()
Expand All @@ -400,7 +404,9 @@ private[spark] class TaskSetManager(
taskSet.id, index, taskId, execId, host, taskLocality))
// Do various bookkeeping
copiesRunning(index) += 1
val info = new TaskInfo(taskId, index, curTime, execId, host, taskLocality)
val attemptNum = taskAttempts(index).size
val info = new TaskInfo(
taskId, index, attemptNum + 1, curTime, execId, host, taskLocality, speculative)
taskInfos(taskId) = info
taskAttempts(index) = info :: taskAttempts(index)
// Update our locality level for delay scheduling
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,38 @@
* limitations under the License.
*/

package org.apache.spark
package org.apache.spark.shuffle

import org.apache.spark.storage.BlockManagerId
import org.apache.spark.{FetchFailed, TaskEndReason}

/**
* Failed to fetch a shuffle block. The executor catches this exception and propagates it
* back to DAGScheduler (through TaskEndReason) so we'd resubmit the previous stage.
*
* Note that bmAddress can be null.
*/
private[spark] class FetchFailedException(
taskEndReason: TaskEndReason,
message: String,
cause: Throwable)
bmAddress: BlockManagerId,
shuffleId: Int,
mapId: Int,
reduceId: Int)
extends Exception {

def this (bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, reduceId: Int,
cause: Throwable) =
this(FetchFailed(bmAddress, shuffleId, mapId, reduceId),
"Fetch failed: %s %d %d %d".format(bmAddress, shuffleId, mapId, reduceId),
cause)

def this (shuffleId: Int, reduceId: Int, cause: Throwable) =
this(FetchFailed(null, shuffleId, -1, reduceId),
"Unable to fetch locations from master: %d %d".format(shuffleId, reduceId), cause)

override def getMessage(): String = message
override def getMessage: String =
"Fetch failed: %s %d %d %d".format(bmAddress, shuffleId, mapId, reduceId)

def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId)
}

override def getCause(): Throwable = cause

def toTaskEndReason: TaskEndReason = taskEndReason
/**
* Failed to get shuffle metadata from [[org.apache.spark.MapOutputTracker]].
*/
private[spark] class MetadataFetchFailedException(
shuffleId: Int,
reduceId: Int,
message: String)
extends FetchFailedException(null, shuffleId, -1, reduceId) {

override def getMessage: String = message
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ package org.apache.spark.shuffle.hash
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap

import org.apache.spark._
import org.apache.spark.executor.ShuffleReadMetrics
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockId}
import org.apache.spark.util.CompletionIterator
import org.apache.spark._

private[hash] object BlockStoreShuffleFetcher extends Logging {
def fetch[T](
Expand Down Expand Up @@ -63,7 +64,7 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
blockId match {
case ShuffleBlockId(shufId, mapId, _) =>
val address = statuses(mapId.toInt)._1
throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, null)
throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId)
case _ =>
throw new SparkException(
"Failed to get block " + blockId + ", which is not a shuffle block")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
val maximumMemory = values("Maximum Memory")
val memoryUsed = values("Memory Used")
val diskUsed = values("Disk Used")
// scalastyle:off
<tr>
<td>{values("Executor ID")}</td>
<td>{values("Address")}</td>
Expand All @@ -94,10 +95,11 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
<td>{values("Failed Tasks")}</td>
<td>{values("Complete Tasks")}</td>
<td>{values("Total Tasks")}</td>
<td>{Utils.msDurationToString(values("Task Time").toLong)}</td>
<td>{Utils.bytesToString(values("Shuffle Read").toLong)}</td>
<td>{Utils.bytesToString(values("Shuffle Write").toLong)}</td>
<td sorttable_customkey={values("Task Time")}>{Utils.msDurationToString(values("Task Time").toLong)}</td>
<td sorttable_customkey={values("Shuffle Read")}>{Utils.bytesToString(values("Shuffle Read").toLong)}</td>
<td sorttable_customkey={values("Shuffle Write")} >{Utils.bytesToString(values("Shuffle Write").toLong)}</td>
</tr>
// scalastyle:on
}

/** Represent an executor's info as a map given a storage status index */
Expand Down
12 changes: 7 additions & 5 deletions core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,20 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) {
executorIdToSummary match {
case Some(x) =>
x.toSeq.sortBy(_._1).map { case (k, v) => {
// scalastyle:off
<tr>
<td>{k}</td>
<td>{executorIdToAddress.getOrElse(k, "CANNOT FIND ADDRESS")}</td>
<td>{UIUtils.formatDuration(v.taskTime)}</td>
<td sorttable_customekey={v.taskTime.toString}>{UIUtils.formatDuration(v.taskTime)}</td>
<td>{v.failedTasks + v.succeededTasks}</td>
<td>{v.failedTasks}</td>
<td>{v.succeededTasks}</td>
<td>{Utils.bytesToString(v.shuffleRead)}</td>
<td>{Utils.bytesToString(v.shuffleWrite)}</td>
<td>{Utils.bytesToString(v.memoryBytesSpilled)}</td>
<td>{Utils.bytesToString(v.diskBytesSpilled)}</td>
<td sorttable_customekey={v.shuffleRead.toString}>{Utils.bytesToString(v.shuffleRead)}</td>
<td sorttable_customekey={v.shuffleWrite.toString}>{Utils.bytesToString(v.shuffleWrite)}</td>
<td sorttable_customekey={v.memoryBytesSpilled.toString} >{Utils.bytesToString(v.memoryBytesSpilled)}</td>
<td sorttable_customekey={v.diskBytesSpilled.toString} >{Utils.bytesToString(v.diskBytesSpilled)}</td>
</tr>
// scalastyle:on
}
}
case _ => Seq[Node]()
Expand Down
11 changes: 9 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,9 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
</div>
// scalastyle:on
val taskHeaders: Seq[String] =
Seq("Task Index", "Task ID", "Status", "Locality Level", "Executor", "Launch Time") ++
Seq("Duration", "GC Time", "Result Ser Time") ++
Seq(
"Index", "ID", "Attempt", "Status", "Locality Level", "Executor",
"Launch Time", "Duration", "GC Time") ++
{if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++
{if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++
{if (hasBytesSpilled) Seq("Shuffle Spill (Memory)", "Shuffle Spill (Disk)") else Nil} ++
Expand Down Expand Up @@ -245,6 +246,9 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
<tr>
<td>{info.index}</td>
<td>{info.taskId}</td>
<td sorttable_customkey={info.attempt.toString}>{
if (info.speculative) s"${info.attempt} (speculative)" else info.attempt.toString
}</td>
<td>{info.status}</td>
<td>{info.taskLocality}</td>
<td>{info.host}</td>
Expand All @@ -255,9 +259,12 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
<td sorttable_customkey={gcTime.toString}>
{if (gcTime > 0) UIUtils.formatDuration(gcTime) else ""}
</td>
<!--
TODO: Add this back after we add support to hide certain columns.
<td sorttable_customkey={serializationTime.toString}>
{if (serializationTime > 0) UIUtils.formatDuration(serializationTime) else ""}
</td>
-->
{if (shuffleRead) {
<td sorttable_customkey={shuffleReadSortable}>
{shuffleReadReadable}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") {

/** Render an HTML row representing an RDD */
private def rddRow(rdd: RDDInfo): Seq[Node] = {
// scalastyle:off
<tr>
<td>
<a href={"%s/storage/rdd?id=%s".format(UIUtils.prependBaseUri(basePath), rdd.id)}>
Expand All @@ -59,9 +60,10 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") {
</td>
<td>{rdd.numCachedPartitions}</td>
<td>{"%.0f%%".format(rdd.numCachedPartitions * 100.0 / rdd.numPartitions)}</td>
<td>{Utils.bytesToString(rdd.memSize)}</td>
<td>{Utils.bytesToString(rdd.tachyonSize)}</td>
<td>{Utils.bytesToString(rdd.diskSize)}</td>
<td sorttable_customekey={rdd.memSize.toString}>{Utils.bytesToString(rdd.memSize)}</td>
<td sorttable_customekey={rdd.tachyonSize.toString}>{Utils.bytesToString(rdd.tachyonSize)}</td>
<td sorttable_customekey={rdd.diskSize.toString} >{Utils.bytesToString(rdd.diskSize)}</td>
</tr>
// scalastyle:on
}
}
Loading

0 comments on commit a8704c1

Please sign in to comment.