Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/spark into SPARK-1…
Browse files Browse the repository at this point in the history
…712_new
  • Loading branch information
witgo committed May 15, 2014
2 parents 926bd6a + 2f63995 commit 3b6d48c
Show file tree
Hide file tree
Showing 18 changed files with 267 additions and 82 deletions.
5 changes: 5 additions & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,8 @@ test.out/*
.*iml
service.properties
db.lck
build/*
dist/*
.*out
.*ipr
.*iws
6 changes: 6 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ class SparkContext(config: SparkConf) extends Logging {
// contains a map from hostname to a list of input format splits on the host.
private[spark] var preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()

/**
* Create a SparkContext that loads settings from system properties (for instance, when
* launching with ./bin/spark-submit).
*/
def this() = this(new SparkConf())

/**
* :: DeveloperApi ::
* Alternative constructor for setting preferred locations where Spark will create executors.
Expand Down
18 changes: 8 additions & 10 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -278,10 +278,11 @@ object SparkEnv extends Logging {
addedJars: Seq[String],
addedFiles: Seq[String]): Map[String, Seq[(String, String)]] = {

import Properties._
val jvmInformation = Seq(
("Java Version", "%s (%s)".format(Properties.javaVersion, Properties.javaVendor)),
("Java Home", Properties.javaHome),
("Scala Version", Properties.versionString)
("Java Version", s"$javaVersion ($javaVendor)"),
("Java Home", javaHome),
("Scala Version", versionString)
).sorted

// Spark properties
Expand All @@ -296,18 +297,15 @@ object SparkEnv extends Logging {

// System properties that are not java classpaths
val systemProperties = System.getProperties.iterator.toSeq
val otherProperties = systemProperties.filter { case (k, v) =>
val otherProperties = systemProperties.filter { case (k, _) =>
k != "java.class.path" && !k.startsWith("spark.")
}.sorted

// Class paths including all added jars and files
val classPathProperty = systemProperties.find { case (k, v) =>
k == "java.class.path"
}.getOrElse(("", ""))
val classPathEntries = classPathProperty._2
val classPathEntries = javaClassPath
.split(File.pathSeparator)
.filterNot(e => e.isEmpty)
.map(e => (e, "System Classpath"))
.filterNot(_.isEmpty)
.map((_, "System Classpath"))
val addedJarsAndFiles = (addedJars ++ addedFiles).map((_, "Added By User"))
val classPaths = (addedJarsAndFiles ++ classPathEntries).sorted

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ object SparkSubmit {

/**
* @return a tuple containing the arguments for the child, a list of classpath
* entries for the child, a list of system propertes, a list of env vars
* entries for the child, a list of system properties, a list of env vars
* and the main class for the child
*/
private[spark] def createLaunchEnv(args: SparkSubmitArguments): (ArrayBuffer[String],
Expand Down
18 changes: 10 additions & 8 deletions core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
import org.apache.spark.util.AkkaUtils
import org.apache.spark.util.{Utils, AkkaUtils}

/**
* Interface allowing applications to speak with a Spark deploy cluster. Takes a master URL,
Expand Down Expand Up @@ -88,13 +88,15 @@ private[spark] class AppClient(
var retries = 0
registrationRetryTimer = Some {
context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
retries += 1
if (registered) {
registrationRetryTimer.foreach(_.cancel())
} else if (retries >= REGISTRATION_RETRIES) {
markDead("All masters are unresponsive! Giving up.")
} else {
tryRegisterAllMasters()
Utils.tryOrExit {
retries += 1
if (registered) {
registrationRetryTimer.foreach(_.cancel())
} else if (retries >= REGISTRATION_RETRIES) {
markDead("All masters are unresponsive! Giving up.")
} else {
tryRegisterAllMasters()
}
}
}
}
Expand Down
18 changes: 10 additions & 8 deletions core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,16 @@ private[spark] class Worker(
var retries = 0
registrationRetryTimer = Some {
context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
retries += 1
if (registered) {
registrationRetryTimer.foreach(_.cancel())
} else if (retries >= REGISTRATION_RETRIES) {
logError("All masters are unresponsive! Giving up.")
System.exit(1)
} else {
tryRegisterAllMasters()
Utils.tryOrExit {
retries += 1
if (registered) {
registrationRetryTimer.foreach(_.cancel())
} else if (retries >= REGISTRATION_RETRIES) {
logError("All masters are unresponsive! Giving up.")
System.exit(1)
} else {
tryRegisterAllMasters()
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import scala.util.Random
import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.util.Utils

/**
* Schedules tasks for multiple types of clusters by acting through a SchedulerBackend.
Expand Down Expand Up @@ -139,7 +140,7 @@ private[spark] class TaskSchedulerImpl(
import sc.env.actorSystem.dispatcher
sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
SPECULATION_INTERVAL milliseconds) {
checkSpeculatableTasks()
Utils.tryOrExit { checkSpeculatableTasks() }
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ private[spark] class BlockManager(
BlockManagerWorker.startBlockManagerWorker(this)
if (!BlockManager.getDisableHeartBeatsForTesting(conf)) {
heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) {
heartBeat()
Utils.tryOrExit { heartBeat() }
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,13 @@ private[spark] object UIUtils extends Logging {
def formatDate(timestamp: Long): String = dateFormat.get.format(new Date(timestamp))

def formatDuration(milliseconds: Long): String = {
if (milliseconds < 100) {
return "%d ms".format(milliseconds)
}
val seconds = milliseconds.toDouble / 1000
if (seconds < 1) {
return "%.1f s".format(seconds)
}
if (seconds < 60) {
return "%.0f s".format(seconds)
}
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import tachyon.client.{TachyonFile,TachyonFS}

import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.executor.ExecutorUncaughtExceptionHandler
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}

/**
Expand Down Expand Up @@ -780,6 +781,18 @@ private[spark] object Utils extends Logging {
output.toString
}

/**
* Execute a block of code that evaluates to Unit, forwarding any uncaught exceptions to the
* default UncaughtExceptionHandler
*/
def tryOrExit(block: => Unit) {
try {
block
} catch {
case t: Throwable => ExecutorUncaughtExceptionHandler.uncaughtException(t)
}
}

/**
* A regular expression to match classes of the "core" Spark API that we want to skip when
* finding the call site of a method.
Expand Down
2 changes: 1 addition & 1 deletion docs/_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ SPARK_VERSION: 1.0.0-SNAPSHOT
SPARK_VERSION_SHORT: 1.0.0
SCALA_BINARY_VERSION: "2.10"
SCALA_VERSION: "2.10.4"
MESOS_VERSION: 0.13.0
MESOS_VERSION: 0.18.1
SPARK_ISSUE_TRACKER_URL: https://issues.apache.org/jira/browse/SPARK
SPARK_GITHUB_URL: https://github.com/apache/spark
Loading

0 comments on commit 3b6d48c

Please sign in to comment.