Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into them-rdd-memories
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Jun 27, 2014
2 parents 87aa75c + f1f7385 commit e1b8b25
Show file tree
Hide file tree
Showing 111 changed files with 2,150 additions and 1,030 deletions.
2 changes: 2 additions & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ target
.classpath
.mima-excludes
.generated-mima-excludes
.generated-mima-class-excludes
.generated-mima-member-excludes
.rat-excludes
.*md
derby.log
Expand Down
5 changes: 5 additions & 0 deletions bin/spark-class
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ else
fi

if [[ "$1" =~ org.apache.spark.tools.* ]]; then
if test -z "$SPARK_TOOLS_JAR"; then
echo "Failed to find Spark Tools Jar in $FWDIR/tools/target/scala-$SCALA_VERSION/" 1>&2
echo "You need to build spark before running $1." 1>&2
exit 1
fi
CLASSPATH="$CLASSPATH:$SPARK_TOOLS_JAR"
fi

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class Accumulable[R, T] (
Accumulators.register(this, false)
}

override def toString = value_.toString
override def toString = if (value_ == null) "null" else value_.toString
}

/**
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/Dependency.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ class ShuffleDependency[K, V, C](
val partitioner: Partitioner,
val serializer: Option[Serializer] = None,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None)
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false)
extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) {

val shuffleId: Int = rdd.context.newShuffleId()
Expand Down
68 changes: 61 additions & 7 deletions core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,51 +30,105 @@ import org.apache.spark.storage.BlockManagerId
@DeveloperApi
sealed trait TaskEndReason

/**
* :: DeveloperApi ::
* Task succeeded.
*/
@DeveloperApi
case object Success extends TaskEndReason

/**
* :: DeveloperApi ::
* Various possible reasons why a task failed.
*/
@DeveloperApi
sealed trait TaskFailedReason extends TaskEndReason {
/** Error message displayed in the web UI. */
def toErrorString: String
}

/**
* :: DeveloperApi ::
* A [[org.apache.spark.scheduler.ShuffleMapTask]] that completed successfully earlier, but we
* lost the executor before the stage completed. This means Spark needs to reschedule the task
* to be re-executed on a different executor.
*/
@DeveloperApi
case object Resubmitted extends TaskEndReason // Task was finished earlier but we've now lost it
case object Resubmitted extends TaskFailedReason {
override def toErrorString: String = "Resubmitted (resubmitted due to lost executor)"
}

/**
* :: DeveloperApi ::
* Task failed to fetch shuffle data from a remote node. Probably means we have lost the remote
* executors the task is trying to fetch from, and thus need to rerun the previous stage.
*/
@DeveloperApi
case class FetchFailed(
bmAddress: BlockManagerId,
shuffleId: Int,
mapId: Int,
reduceId: Int)
extends TaskEndReason
extends TaskFailedReason {
override def toErrorString: String = {
val bmAddressString = if (bmAddress == null) "null" else bmAddress.toString
s"FetchFailed($bmAddressString, shuffleId=$shuffleId, mapId=$mapId, reduceId=$reduceId)"
}
}

/**
* :: DeveloperApi ::
* Task failed due to a runtime exception. This is the most common failure case and also captures
* user program exceptions.
*/
@DeveloperApi
case class ExceptionFailure(
className: String,
description: String,
stackTrace: Array[StackTraceElement],
metrics: Option[TaskMetrics])
extends TaskEndReason
extends TaskFailedReason {
override def toErrorString: String = {
val stackTraceString = if (stackTrace == null) "null" else stackTrace.mkString("\n")
s"$className ($description}\n$stackTraceString"
}
}

/**
* :: DeveloperApi ::
* The task finished successfully, but the result was lost from the executor's block manager before
* it was fetched.
*/
@DeveloperApi
case object TaskResultLost extends TaskEndReason
case object TaskResultLost extends TaskFailedReason {
override def toErrorString: String = "TaskResultLost (result lost from block manager)"
}

/**
* :: DeveloperApi ::
* Task was killed intentionally and needs to be rescheduled.
*/
@DeveloperApi
case object TaskKilled extends TaskEndReason
case object TaskKilled extends TaskFailedReason {
override def toErrorString: String = "TaskKilled (killed intentionally)"
}

/**
* :: DeveloperApi ::
* The task failed because the executor that it was running on was lost. This may happen because
* the task crashed the JVM.
*/
@DeveloperApi
case object ExecutorLostFailure extends TaskEndReason
case object ExecutorLostFailure extends TaskFailedReason {
override def toErrorString: String = "ExecutorLostFailure (executor lost)"
}

/**
* :: DeveloperApi ::
* We don't know why the task ended -- for example, because of a ClassNotFound exception when
* deserializing the task result.
*/
@DeveloperApi
case object UnknownReason extends TaskEndReason
case object UnknownReason extends TaskFailedReason {
override def toErrorString: String = "UnknownReason"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.history

import org.apache.spark.ui.SparkUI

private[spark] case class ApplicationHistoryInfo(
id: String,
name: String,
startTime: Long,
endTime: Long,
lastUpdated: Long,
sparkUser: String)

private[spark] abstract class ApplicationHistoryProvider {

/**
* Returns a list of applications available for the history server to show.
*
* @return List of all know applications.
*/
def getListing(): Seq[ApplicationHistoryInfo]

/**
* Returns the Spark UI for a specific application.
*
* @param appId The application ID.
* @return The application's UI, or null if application is not found.
*/
def getAppUI(appId: String): SparkUI

/**
* Called when the server is shutting down.
*/
def stop(): Unit = { }

/**
* Returns configuration data to be shown in the History Server home page.
*
* @return A map with the configuration data. Data is show in the order returned by the map.
*/
def getConfig(): Map[String, String] = Map()

}
Loading

0 comments on commit e1b8b25

Please sign in to comment.