Skip to content

Commit

Permalink
Merge branch 'master' of git://git.apache.org/spark into timeline-vie…
Browse files Browse the repository at this point in the history
…wer-feature
  • Loading branch information
sarutak committed Apr 3, 2015
2 parents 09cce97 + 26b415e commit dab7cc1
Show file tree
Hide file tree
Showing 92 changed files with 1,032 additions and 406 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

package org.apache.spark

import java.util.concurrent.{Executors, TimeUnit}

import scala.collection.mutable

import org.apache.spark.scheduler._
import org.apache.spark.util.{SystemClock, Clock}
import org.apache.spark.util.{Clock, SystemClock, Utils}

/**
* An agent that dynamically allocates and removes executors based on the workload.
Expand Down Expand Up @@ -129,6 +131,10 @@ private[spark] class ExecutorAllocationManager(
// Listener for Spark events that impact the allocation policy
private val listener = new ExecutorAllocationListener

// Executor that handles the scheduling task.
private val executor = Executors.newSingleThreadScheduledExecutor(
Utils.namedThreadFactory("spark-dynamic-executor-allocation"))

/**
* Verify that the settings specified through the config are valid.
* If not, throw an appropriate exception.
Expand Down Expand Up @@ -173,32 +179,24 @@ private[spark] class ExecutorAllocationManager(
}

/**
* Register for scheduler callbacks to decide when to add and remove executors.
* Register for scheduler callbacks to decide when to add and remove executors, and start
* the scheduling task.
*/
def start(): Unit = {
listenerBus.addListener(listener)
startPolling()

val scheduleTask = new Runnable() {
override def run(): Unit = Utils.logUncaughtExceptions(schedule())
}
executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
}

/**
* Start the main polling thread that keeps track of when to add and remove executors.
* Stop the allocation manager.
*/
private def startPolling(): Unit = {
val t = new Thread {
override def run(): Unit = {
while (true) {
try {
schedule()
} catch {
case e: Exception => logError("Exception in dynamic executor allocation thread!", e)
}
Thread.sleep(intervalMillis)
}
}
}
t.setName("spark-dynamic-executor-allocation")
t.setDaemon(true)
t.start()
def stop(): Unit = {
executor.shutdown()
executor.awaitTermination(10, TimeUnit.SECONDS)
}

/**
Expand Down
32 changes: 25 additions & 7 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,24 @@ private[spark] case class Heartbeat(
taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics
blockManagerId: BlockManagerId)

/**
* An event that SparkContext uses to notify HeartbeatReceiver that SparkContext.taskScheduler is
* created.
*/
private[spark] case object TaskSchedulerIsSet

private[spark] case object ExpireDeadHosts

private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)

/**
* Lives in the driver to receive heartbeats from executors..
*/
private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskScheduler)
private[spark] class HeartbeatReceiver(sc: SparkContext)
extends Actor with ActorLogReceive with Logging {

private var scheduler: TaskScheduler = null

// executor ID -> timestamp of when the last heartbeat from this executor was received
private val executorLastSeen = new mutable.HashMap[String, Long]

Expand All @@ -71,12 +79,22 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule
}

override def receiveWithLogging: PartialFunction[Any, Unit] = {
case Heartbeat(executorId, taskMetrics, blockManagerId) =>
val unknownExecutor = !scheduler.executorHeartbeatReceived(
executorId, taskMetrics, blockManagerId)
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
executorLastSeen(executorId) = System.currentTimeMillis()
sender ! response
case TaskSchedulerIsSet =>
scheduler = sc.taskScheduler
case heartbeat @ Heartbeat(executorId, taskMetrics, blockManagerId) =>
if (scheduler != null) {
val unknownExecutor = !scheduler.executorHeartbeatReceived(
executorId, taskMetrics, blockManagerId)
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
executorLastSeen(executorId) = System.currentTimeMillis()
sender ! response
} else {
// Because Executor will sleep several seconds before sending the first "Heartbeat", this
// case rarely happens. However, if it really happens, log it and ask the executor to
// register itself again.
logWarning(s"Dropping $heartbeat because TaskScheduler is not ready yet")
sender ! HeartbeatResponse(reregisterBlockManager = true)
}
case ExpireDeadHosts =>
expireDeadHosts()
}
Expand Down
11 changes: 7 additions & 4 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -356,11 +356,14 @@ private[spark] object MapOutputTracker extends Logging {
def serializeMapStatuses(statuses: Array[MapStatus]): Array[Byte] = {
val out = new ByteArrayOutputStream
val objOut = new ObjectOutputStream(new GZIPOutputStream(out))
// Since statuses can be modified in parallel, sync on it
statuses.synchronized {
objOut.writeObject(statuses)
Utils.tryWithSafeFinally {
// Since statuses can be modified in parallel, sync on it
statuses.synchronized {
objOut.writeObject(statuses)
}
} {
objOut.close()
}
objOut.close()
out.toByteArray
}

Expand Down
76 changes: 43 additions & 33 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.io._
import java.lang.reflect.Constructor
import java.net.URI
import java.util.{Arrays, Properties, UUID}
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.UUID.randomUUID

import scala.collection.{Map, Set}
Expand Down Expand Up @@ -95,10 +95,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli

val startTime = System.currentTimeMillis()

@volatile private var stopped: Boolean = false
private val stopped: AtomicBoolean = new AtomicBoolean(false)

private def assertNotStopped(): Unit = {
if (stopped) {
if (stopped.get()) {
throw new IllegalStateException("Cannot call methods on a stopped SparkContext")
}
}
Expand Down Expand Up @@ -227,9 +227,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
val appName = conf.get("spark.app.name")

private[spark] val isEventLogEnabled = conf.getBoolean("spark.eventLog.enabled", false)
private[spark] val eventLogDir: Option[String] = {
private[spark] val eventLogDir: Option[URI] = {
if (isEventLogEnabled) {
Some(conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR).stripSuffix("/"))
val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
.stripSuffix("/")
Some(Utils.resolveURI(unresolvedDir))
} else {
None
}
Expand Down Expand Up @@ -356,11 +358,17 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
val sparkUser = Utils.getCurrentUserName()
executorEnvs("SPARK_USER") = sparkUser

// We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
// retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
private val heartbeatReceiver = env.actorSystem.actorOf(
Props(new HeartbeatReceiver(this)), "HeartbeatReceiver")

// Create and start the scheduler
private[spark] var (schedulerBackend, taskScheduler) =
SparkContext.createTaskScheduler(this, master)
private val heartbeatReceiver = env.actorSystem.actorOf(
Props(new HeartbeatReceiver(this, taskScheduler)), "HeartbeatReceiver")

heartbeatReceiver ! TaskSchedulerIsSet

@volatile private[spark] var dagScheduler: DAGScheduler = _
try {
dagScheduler = new DAGScheduler(this)
Expand Down Expand Up @@ -1136,7 +1144,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* Return whether dynamically adjusting the amount of resources allocated to
* this application is supported. This is currently only available for YARN.
*/
private[spark] def supportDynamicAllocation =
private[spark] def supportDynamicAllocation =
master.contains("yarn") || dynamicAllocationTesting

/**
Expand Down Expand Up @@ -1390,32 +1398,34 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
addedJars.clear()
}

/** Shut down the SparkContext. */
// Shut down the SparkContext.
def stop() {
SparkContext.SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
if (!stopped) {
stopped = true
postApplicationEnd()
ui.foreach(_.stop())
env.metricsSystem.report()
metadataCleaner.cancel()
cleaner.foreach(_.stop())
dagScheduler.stop()
dagScheduler = null
listenerBus.stop()
eventLogger.foreach(_.stop())
env.actorSystem.stop(heartbeatReceiver)
progressBar.foreach(_.stop())
taskScheduler = null
// TODO: Cache.stop()?
env.stop()
SparkEnv.set(null)
logInfo("Successfully stopped SparkContext")
SparkContext.clearActiveContext()
} else {
logInfo("SparkContext already stopped")
}
// Use the stopping variable to ensure no contention for the stop scenario.
// Still track the stopped variable for use elsewhere in the code.

if (!stopped.compareAndSet(false, true)) {
logInfo("SparkContext already stopped.")
return
}

postApplicationEnd()
ui.foreach(_.stop())
env.metricsSystem.report()
metadataCleaner.cancel()
cleaner.foreach(_.stop())
executorAllocationManager.foreach(_.stop())
dagScheduler.stop()
dagScheduler = null
listenerBus.stop()
eventLogger.foreach(_.stop())
env.actorSystem.stop(heartbeatReceiver)
progressBar.foreach(_.stop())
taskScheduler = null
// TODO: Cache.stop()?
env.stop()
SparkEnv.set(null)
SparkContext.clearActiveContext()
logInfo("Successfully stopped SparkContext")
}


Expand Down Expand Up @@ -1477,7 +1487,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
partitions: Seq[Int],
allowLocal: Boolean,
resultHandler: (Int, U) => Unit) {
if (stopped) {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
*/
def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairRDD[K, U] = {
import scala.collection.JavaConverters._
def fn = (x: V) => f.call(x).asScala
def fn: (V) => Iterable[U] = (x: V) => f.call(x).asScala
implicit val ctag: ClassTag[U] = fakeClassTag
fromRDD(rdd.flatMapValues(fn))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
*/
def sortBy[S](f: JFunction[T, S], ascending: Boolean, numPartitions: Int): JavaRDD[T] = {
import scala.collection.JavaConverters._
def fn = (x: T) => f.call(x)
def fn: (T) => S = (x: T) => f.call(x)
import com.google.common.collect.Ordering // shadows scala.math.Ordering
implicit val ordering = Ordering.natural().asInstanceOf[Ordering[S]]
implicit val ctag: ClassTag[S] = fakeClassTag
Expand Down
Loading

0 comments on commit dab7cc1

Please sign in to comment.