Skip to content

Commit

Permalink
Merge branch 'master' of git://git.apache.org/spark into timeline-vie…
Browse files Browse the repository at this point in the history
…wer-feature
  • Loading branch information
sarutak committed Apr 14, 2015
2 parents 68b7540 + d7f2c19 commit 19815ae
Show file tree
Hide file tree
Showing 234 changed files with 4,593 additions and 1,306 deletions.
2 changes: 1 addition & 1 deletion R/pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ setMethod("unpersist",
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' setCheckpointDir(sc, "checkpoints")
#' setCheckpointDir(sc, "checkpoint")
#' rdd <- parallelize(sc, 1:10, 2L)
#' checkpoint(rdd)
#'}
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/context.R
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ broadcast <- function(sc, object) {
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' setCheckpointDir(sc, "~/checkpoints")
#' setCheckpointDir(sc, "~/checkpoint")
#' rdd <- parallelize(sc, 1:2, 2L)
#' checkpoint(rdd)
#'}
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/inst/tests/test_binaryFile.R
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ test_that("saveAsObjectFile()/objectFile() following textFile() works", {
fileName2 <- tempfile(pattern="spark-test", fileext=".tmp")
writeLines(mockFile, fileName1)

rdd <- textFile(sc, fileName1)
rdd <- textFile(sc, fileName1, 1)
saveAsObjectFile(rdd, fileName2)
rdd <- objectFile(sc, fileName2)
expect_equal(collect(rdd), as.list(mockFile))
Expand All @@ -40,7 +40,7 @@ test_that("saveAsObjectFile()/objectFile() works on a parallelized list", {
fileName <- tempfile(pattern="spark-test", fileext=".tmp")

l <- list(1, 2, 3)
rdd <- parallelize(sc, l)
rdd <- parallelize(sc, l, 1)
saveAsObjectFile(rdd, fileName)
rdd <- objectFile(sc, fileName)
expect_equal(collect(rdd), l)
Expand Down
5 changes: 3 additions & 2 deletions R/pkg/inst/tests/test_rdd.R
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ test_that("PipelinedRDD support actions: cache(), persist(), unpersist(), checkp
unpersist(rdd2)
expect_false(rdd2@env$isCached)

setCheckpointDir(sc, "checkpoints")
tempDir <- tempfile(pattern = "checkpoint")
setCheckpointDir(sc, tempDir)
checkpoint(rdd2)
expect_true(rdd2@env$isCheckpointed)

Expand All @@ -152,7 +153,7 @@ test_that("PipelinedRDD support actions: cache(), persist(), unpersist(), checkp
# make sure the data is collectable
collect(rdd2)

unlink("checkpoints")
unlink(tempDir)
})

test_that("reduce on RDD", {
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/inst/tests/test_textFile.R
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ test_that("textFile() followed by a saveAsTextFile() returns the same content",
fileName2 <- tempfile(pattern="spark-test", fileext=".tmp")
writeLines(mockFile, fileName1)

rdd <- textFile(sc, fileName1)
rdd <- textFile(sc, fileName1, 1L)
saveAsTextFile(rdd, fileName2)
rdd <- textFile(sc, fileName2)
expect_equal(collect(rdd), as.list(mockFile))
Expand All @@ -93,7 +93,7 @@ test_that("textFile() followed by a saveAsTextFile() returns the same content",
test_that("saveAsTextFile() on a parallelized list works as expected", {
fileName <- tempfile(pattern="spark-test", fileext=".tmp")
l <- list(1, 2, 3)
rdd <- parallelize(sc, l)
rdd <- parallelize(sc, l, 1L)
saveAsTextFile(rdd, fileName)
rdd <- textFile(sc, fileName)
expect_equal(collect(rdd), lapply(l, function(x) {toString(x)}))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,16 @@ private[spark] class ExecutorAllocationManager(
Integer.MAX_VALUE)

// How long there must be backlogged tasks for before an addition is triggered (seconds)
private val schedulerBacklogTimeout = conf.getLong(
"spark.dynamicAllocation.schedulerBacklogTimeout", 5)
private val schedulerBacklogTimeoutS = conf.getTimeAsSeconds(
"spark.dynamicAllocation.schedulerBacklogTimeout", "5s")

// Same as above, but used only after `schedulerBacklogTimeout` is exceeded
private val sustainedSchedulerBacklogTimeout = conf.getLong(
"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", schedulerBacklogTimeout)
// Same as above, but used only after `schedulerBacklogTimeoutS` is exceeded
private val sustainedSchedulerBacklogTimeoutS = conf.getTimeAsSeconds(
"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", s"${schedulerBacklogTimeoutS}s")

// How long an executor must be idle for before it is removed (seconds)
private val executorIdleTimeout = conf.getLong(
"spark.dynamicAllocation.executorIdleTimeout", 600)
private val executorIdleTimeoutS = conf.getTimeAsSeconds(
"spark.dynamicAllocation.executorIdleTimeout", "600s")

// During testing, the methods to actually kill and add executors are mocked out
private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false)
Expand Down Expand Up @@ -150,14 +150,14 @@ private[spark] class ExecutorAllocationManager(
throw new SparkException(s"spark.dynamicAllocation.minExecutors ($minNumExecutors) must " +
s"be less than or equal to spark.dynamicAllocation.maxExecutors ($maxNumExecutors)!")
}
if (schedulerBacklogTimeout <= 0) {
if (schedulerBacklogTimeoutS <= 0) {
throw new SparkException("spark.dynamicAllocation.schedulerBacklogTimeout must be > 0!")
}
if (sustainedSchedulerBacklogTimeout <= 0) {
if (sustainedSchedulerBacklogTimeoutS <= 0) {
throw new SparkException(
"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout must be > 0!")
}
if (executorIdleTimeout <= 0) {
if (executorIdleTimeoutS <= 0) {
throw new SparkException("spark.dynamicAllocation.executorIdleTimeout must be > 0!")
}
// Require external shuffle service for dynamic allocation
Expand Down Expand Up @@ -262,8 +262,8 @@ private[spark] class ExecutorAllocationManager(
} else if (addTime != NOT_SET && now >= addTime) {
val delta = addExecutors(maxNeeded)
logDebug(s"Starting timer to add more executors (to " +
s"expire in $sustainedSchedulerBacklogTimeout seconds)")
addTime += sustainedSchedulerBacklogTimeout * 1000
s"expire in $sustainedSchedulerBacklogTimeoutS seconds)")
addTime += sustainedSchedulerBacklogTimeoutS * 1000
delta
} else {
0
Expand Down Expand Up @@ -351,7 +351,7 @@ private[spark] class ExecutorAllocationManager(
val removeRequestAcknowledged = testing || client.killExecutor(executorId)
if (removeRequestAcknowledged) {
logInfo(s"Removing executor $executorId because it has been idle for " +
s"$executorIdleTimeout seconds (new desired total will be ${numExistingExecutors - 1})")
s"$executorIdleTimeoutS seconds (new desired total will be ${numExistingExecutors - 1})")
executorsPendingToRemove.add(executorId)
true
} else {
Expand Down Expand Up @@ -407,8 +407,8 @@ private[spark] class ExecutorAllocationManager(
private def onSchedulerBacklogged(): Unit = synchronized {
if (addTime == NOT_SET) {
logDebug(s"Starting timer to add executors because pending tasks " +
s"are building up (to expire in $schedulerBacklogTimeout seconds)")
addTime = clock.getTimeMillis + schedulerBacklogTimeout * 1000
s"are building up (to expire in $schedulerBacklogTimeoutS seconds)")
addTime = clock.getTimeMillis + schedulerBacklogTimeoutS * 1000
}
}

Expand All @@ -431,8 +431,8 @@ private[spark] class ExecutorAllocationManager(
if (executorIds.contains(executorId)) {
if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) {
logDebug(s"Starting idle timer for $executorId because there are no more tasks " +
s"scheduled to run on the executor (to expire in $executorIdleTimeout seconds)")
removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeout * 1000
s"scheduled to run on the executor (to expire in $executorIdleTimeoutS seconds)")
removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeoutS * 1000
}
} else {
logWarning(s"Attempted to mark unknown executor $executorId idle")
Expand Down
15 changes: 9 additions & 6 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,17 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)

// "spark.network.timeout" uses "seconds", while `spark.storage.blockManagerSlaveTimeoutMs` uses
// "milliseconds"
private val executorTimeoutMs = sc.conf.getOption("spark.network.timeout").map(_.toLong * 1000).
getOrElse(sc.conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", 120000))

private val slaveTimeoutMs =
sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs", "120s")
private val executorTimeoutMs =
sc.conf.getTimeAsSeconds("spark.network.timeout", s"${slaveTimeoutMs}ms") * 1000

// "spark.network.timeoutInterval" uses "seconds", while
// "spark.storage.blockManagerTimeoutIntervalMs" uses "milliseconds"
private val checkTimeoutIntervalMs =
sc.conf.getOption("spark.network.timeoutInterval").map(_.toLong * 1000).
getOrElse(sc.conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60000))
private val timeoutIntervalMs =
sc.conf.getTimeAsMs("spark.storage.blockManagerTimeoutIntervalMs", "60s")
private val checkTimeoutIntervalMs =
sc.conf.getTimeAsSeconds("spark.network.timeoutInterval", s"${timeoutIntervalMs}ms") * 1000

private var timeoutCheckingTask: ScheduledFuture[_] = null

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/HttpServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ private[spark] class HttpServer(
throw new ServerStateException("Server is not started")
} else {
val scheme = if (securityManager.fileServerSSLOptions.enabled) "https" else "http"
s"$scheme://${Utils.localIpAddress}:$port"
s"$scheme://${Utils.localHostNameForURI()}:$port"
}
}
}
36 changes: 36 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,42 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
getOption(key).getOrElse(defaultValue)
}

/**
* Get a time parameter as seconds; throws a NoSuchElementException if it's not set. If no
* suffix is provided then seconds are assumed.
* @throws NoSuchElementException
*/
def getTimeAsSeconds(key: String): Long = {
Utils.timeStringAsSeconds(get(key))
}

/**
* Get a time parameter as seconds, falling back to a default if not set. If no
* suffix is provided then seconds are assumed.
*
*/
def getTimeAsSeconds(key: String, defaultValue: String): Long = {
Utils.timeStringAsSeconds(get(key, defaultValue))
}

/**
* Get a time parameter as milliseconds; throws a NoSuchElementException if it's not set. If no
* suffix is provided then milliseconds are assumed.
* @throws NoSuchElementException
*/
def getTimeAsMs(key: String): Long = {
Utils.timeStringAsMs(get(key))
}

/**
* Get a time parameter as milliseconds, falling back to a default if not set. If no
* suffix is provided then milliseconds are assumed.
*/
def getTimeAsMs(key: String, defaultValue: String): Long = {
Utils.timeStringAsMs(get(key, defaultValue))
}


/** Get a parameter as an Option */
def getOption(key: String): Option[String] = {
Option(settings.get(key))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class LocalSparkCluster(
/* Start the Master */
val (masterSystem, masterPort, _, _) = Master.startSystemAndActor(localHostname, 0, 0, _conf)
masterActorSystems += masterSystem
val masterUrl = "spark://" + localHostname + ":" + masterPort
val masterUrl = "spark://" + Utils.localHostNameForURI() + ":" + masterPort
val masters = Array(masterUrl)

/* Start the Workers */
Expand Down
38 changes: 34 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.fs.FileSystem.Statistics
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.mapreduce.JobContext
import org.apache.hadoop.security.{Credentials, UserGroupInformation}

import org.apache.spark.{Logging, SparkContext, SparkConf, SparkException}
import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -201,6 +200,37 @@ class SparkHadoopUtil extends Logging {
val baseStatus = fs.getFileStatus(basePath)
if (baseStatus.isDir) recurse(basePath) else Array(baseStatus)
}

private val HADOOP_CONF_PATTERN = "(\\$\\{hadoopconf-[^\\}\\$\\s]+\\})".r.unanchored

/**
* Substitute variables by looking them up in Hadoop configs. Only variables that match the
* ${hadoopconf- .. } pattern are substituted.
*/
def substituteHadoopVariables(text: String, hadoopConf: Configuration): String = {
text match {
case HADOOP_CONF_PATTERN(matched) => {
logDebug(text + " matched " + HADOOP_CONF_PATTERN)
val key = matched.substring(13, matched.length() - 1) // remove ${hadoopconf- .. }
val eval = Option[String](hadoopConf.get(key))
.map { value =>
logDebug("Substituted " + matched + " with " + value)
text.replace(matched, value)
}
if (eval.isEmpty) {
// The variable was not found in Hadoop configs, so return text as is.
text
} else {
// Continue to substitute more variables.
substituteHadoopVariables(eval.get, hadoopConf)
}
}
case _ => {
logDebug(text + " didn't match " + HADOOP_CONF_PATTERN)
text
}
}
}
}

object SparkHadoopUtil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private[spark] object TestClient {
def main(args: Array[String]) {
val url = args(0)
val conf = new SparkConf
val (actorSystem, _) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0,
val (actorSystem, _) = AkkaUtils.createActorSystem("spark", Utils.localHostName(), 0,
conf = conf, securityManager = new SecurityManager(conf))
val desc = new ApplicationDescription("TestClient", Some(1), 512,
Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(), Seq(), Seq()), "ignored")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private[deploy] class ExecutorRunner(
val workerUrl: String,
conf: SparkConf,
val appLocalDirs: Seq[String],
var state: ExecutorState.Value)
@volatile var state: ExecutorState.Value)
extends Logging {

private val fullId = appId + "/" + execId
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -436,14 +436,14 @@ private[spark] class Executor(
* This thread stops running when the executor is stopped.
*/
private def startDriverHeartbeater(): Unit = {
val interval = conf.getInt("spark.executor.heartbeatInterval", 10000)
val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")
val thread = new Thread() {
override def run() {
// Sleep a random interval so the heartbeats don't end up in sync
Thread.sleep(interval + (math.random * interval).asInstanceOf[Int])
Thread.sleep(intervalMs + (math.random * intervalMs).asInstanceOf[Int])
while (!isStopped) {
reportHeartBeat()
Thread.sleep(interval)
Thread.sleep(intervalMs)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ private[nio] class ConnectionManager(
new HashedWheelTimer(Utils.namedThreadFactory("AckTimeoutMonitor"))

private val ackTimeout =
conf.getInt("spark.core.connection.ack.wait.timeout", conf.getInt("spark.network.timeout", 120))
conf.getTimeAsSeconds("spark.core.connection.ack.wait.timeout",
conf.get("spark.network.timeout", "120s"))

// Get the thread counts from the Spark Configuration.
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.shuffle.ShuffleWriter
* See [[org.apache.spark.scheduler.Task]] for more information.
*
* @param stageId id of the stage this task belongs to
* @param taskBinary broadcast version of of the RDD and the ShuffleDependency. Once deserialized,
* @param taskBinary broadcast version of the RDD and the ShuffleDependency. Once deserialized,
* the type should be (RDD[_], ShuffleDependency[_, _, _]).
* @param partition partition of the RDD this task is associated with
* @param locs preferred task execution locations for locality scheduling
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ private[spark] class TaskSchedulerImpl(
val conf = sc.conf

// How often to check for speculative tasks
val SPECULATION_INTERVAL = conf.getLong("spark.speculation.interval", 100)
val SPECULATION_INTERVAL_MS = conf.getTimeAsMs("spark.speculation.interval", "100ms")

// Threshold above which we warn user initial TaskSet may be starved
val STARVATION_TIMEOUT = conf.getLong("spark.starvation.timeout", 15000)
val STARVATION_TIMEOUT_MS = conf.getTimeAsMs("spark.starvation.timeout", "15s")

// CPUs to request per task
val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)
Expand Down Expand Up @@ -143,8 +143,8 @@ private[spark] class TaskSchedulerImpl(
if (!isLocal && conf.getBoolean("spark.speculation", false)) {
logInfo("Starting speculative execution thread")
import sc.env.actorSystem.dispatcher
sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
SPECULATION_INTERVAL milliseconds) {
sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL_MS milliseconds,
SPECULATION_INTERVAL_MS milliseconds) {
Utils.tryOrStopSparkContext(sc) { checkSpeculatableTasks() }
}
}
Expand Down Expand Up @@ -173,7 +173,7 @@ private[spark] class TaskSchedulerImpl(
this.cancel()
}
}
}, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
}
hasReceivedTask = true
}
Expand Down
Loading

0 comments on commit 19815ae

Please sign in to comment.