Skip to content

Commit

Permalink
Switch to on-demand collection of thread dumps
Browse files Browse the repository at this point in the history
Uses a (hack) driver -> executor RPC path that I built.
  • Loading branch information
JoshRosen committed Oct 29, 2014
1 parent dfec08b commit f4ac1c1
Show file tree
Hide file tree
Showing 20 changed files with 86 additions and 94 deletions.
13 changes: 3 additions & 10 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import akka.actor.Actor
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.scheduler.TaskScheduler
import org.apache.spark.util.{ActorLogReceive, ThreadStackTrace}
import org.apache.spark.util.ActorLogReceive

/**
* A heartbeat from executors to the driver. This is a shared message used by several internal
Expand All @@ -35,22 +35,15 @@ private[spark] case class Heartbeat(
private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)

/**
* A thread dump sent from executors to the driver.
*/
private[spark] case class ThreadDump(executorId: String, threadStackTraces: Array[ThreadStackTrace])

/**
* Lives in the driver to receive heartbeats from executors.
* Lives in the driver to receive heartbeats from executors..
*/
private[spark] class HeartbeatReceiver(scheduler: TaskScheduler)
extends Actor with ActorLogReceive with Logging {

override def receiveWithLogging = {
case Heartbeat(executorId, taskMetrics, blockManagerId) =>
val response = HeartbeatResponse(
! scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId))
!scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId))
sender ! response
case ThreadDump(executorId, stackTraces: Array[ThreadStackTrace]) =>
scheduler.executorThreadDumpReceived(executorId, stackTraces)
}
}
30 changes: 15 additions & 15 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.language.implicitConversions

import java.io._
import java.net.URI
import java.util.{Arrays, Properties, Timer, TimerTask, UUID}
import java.util.{Arrays, Properties, UUID}
import java.util.concurrent.atomic.AtomicInteger
import java.util.UUID.randomUUID
import scala.collection.{Map, Set}
Expand All @@ -40,6 +40,7 @@ import akka.actor.Props
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.executor.TriggerThreadDump
import org.apache.spark.input.WholeTextFileInputFormat
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
Expand All @@ -50,7 +51,7 @@ import org.apache.spark.scheduler.local.LocalBackend
import org.apache.spark.storage._
import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.jobs.JobProgressListener
import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}
import org.apache.spark.util._

/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
Expand Down Expand Up @@ -241,18 +242,6 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
// the bound port to the cluster manager properly
ui.foreach(_.bind())

// If we are not running in local mode, then start a new timer thread for capturing driver thread
// dumps for display in the web UI (in local mode, this is handled by the local Executor):
private val threadDumpTimer = new Timer("Driver thread dump timer", true)
if (!isLocal && conf.getBoolean("spark.executor.sendThreadDumps", true)) {
val threadDumpInterval = conf.getInt("spark.executor.heartbeatInterval", 10000)
threadDumpTimer.scheduleAtFixedRate(new TimerTask {
override def run(): Unit = {
listenerBus.post(SparkListenerExecutorThreadDump("<driver>", Utils.getThreadDump()))
}
}, threadDumpInterval, threadDumpInterval)
}

/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf)

Expand Down Expand Up @@ -362,6 +351,18 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
override protected def childValue(parent: Properties): Properties = new Properties(parent)
}

/** Called by the web UI to obtain executor thread dumps */
private[spark] def getExecutorThreadDump(executorId: String): Array[ThreadStackTrace] = {
if (executorId == "<driver>") {
Utils.getThreadDump()
} else {
val (host, port) = env.blockManager.master.getActorSystemHostPortForExecutor(executorId).get
val actorRef = AkkaUtils.makeExecutorRef("ExecutorActor", conf, host, port, env.actorSystem)
AkkaUtils.askWithReply[Array[ThreadStackTrace]](TriggerThreadDump(), actorRef,
AkkaUtils.numRetries(conf), AkkaUtils.retryWaitMs(conf), AkkaUtils.askTimeout(conf))
}
}

private[spark] def getLocalProperties: Properties = localProperties.get()

private[spark] def setLocalProperties(props: Properties) {
Expand Down Expand Up @@ -971,7 +972,6 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
def stop() {
postApplicationEnd()
ui.foreach(_.stop())
threadDumpTimer.cancel()
// Do this only if not stopped already - best case effort.
// prevent NPE if stopped more than once.
val dagSchedulerCopy = dagScheduler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
// Create a new ActorSystem using driver's Spark properties to run the backend.
val driverConf = new SparkConf().setAll(props)
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
"sparkExecutor", hostname, port, driverConf, new SecurityManager(driverConf))
SparkEnv.executorActorSystemName,
hostname, port, driverConf, new SecurityManager(driverConf))
// set it
val sparkHostPort = hostname + ":" + boundPort
actorSystem.actorOf(
Expand Down
14 changes: 8 additions & 6 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.util.control.NonFatal

import akka.actor.ActorSystem
import akka.actor.{Props, ActorSystem}

import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
Expand Down Expand Up @@ -92,6 +92,10 @@ private[spark] class Executor(
}
}

// Create an actor for receiving RPCs from the driver
private val executorActor = env.actorSystem.actorOf(
Props(new ExecutorActor(executorId)), "ExecutorActor")

// Create our ClassLoader
// do this after SparkEnv creation so can access the SecurityManager
private val urlClassLoader = createClassLoader()
Expand Down Expand Up @@ -128,6 +132,7 @@ private[spark] class Executor(

def stop() {
env.metricsSystem.report()
env.actorSystem.stop(executorActor)
isStopped = true
threadPool.shutdown()
if (!isLocal) {
Expand Down Expand Up @@ -355,17 +360,13 @@ private[spark] class Executor(
val retryAttempts = AkkaUtils.numRetries(conf)
val retryIntervalMs = AkkaUtils.retryWaitMs(conf)
val heartbeatReceiverRef = AkkaUtils.makeDriverRef("HeartbeatReceiver", conf, env.actorSystem)
val threadDumpsEnabled = conf.getBoolean("spark.executor.sendThreadDumps", true)

val t = new Thread() {
override def run() {
// Sleep a random interval so the heartbeats don't end up in sync
Thread.sleep(interval + (math.random * interval).asInstanceOf[Int])

while (!isStopped) {
if (threadDumpsEnabled) {
// Send the thread-dump as a fire-and-forget, best-effort message:
heartbeatReceiverRef ! ThreadDump(executorId, Utils.getThreadDump())
}
val tasksMetrics = new ArrayBuffer[(Long, TaskMetrics)]()
for (taskRunner <- runningTasks.values()) {
if (!taskRunner.attemptedTask.isEmpty) {
Expand All @@ -384,6 +385,7 @@ private[spark] class Executor(
}
}
}

val message = Heartbeat(executorId, tasksMetrics.toArray, env.blockManager.blockManagerId)
try {
val response = AkkaUtils.askWithReply[HeartbeatResponse](message, heartbeatReceiverRef,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage._
import org.apache.spark.util._
import org.apache.spark.util.{CallSite, SystemClock, Clock, Utils}
import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat

/**
Expand Down Expand Up @@ -174,13 +174,6 @@ class DAGScheduler(
timeout.duration).asInstanceOf[Boolean]
}

/**
* Called by the TaskScheduler when a thread dump is received from an executor.
*/
def executorThreadDumpReceived(execId: String, stackTraces: Array[ThreadStackTrace]) {
listenerBus.post(SparkListenerExecutorThreadDump(execId, stackTraces))
}

// Called by TaskScheduler when an executor fails.
def executorLost(execId: String) {
eventProcessActor ! ExecutorLost(execId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.{Logging, TaskEndReason}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.{Distribution, Utils, ThreadStackTrace}
import org.apache.spark.util.{Distribution, Utils}

@DeveloperApi
sealed trait SparkListenerEvent
Expand Down Expand Up @@ -77,12 +77,6 @@ case class SparkListenerBlockManagerRemoved(time: Long, blockManagerId: BlockMan
@DeveloperApi
case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent

@DeveloperApi
case class SparkListenerExecutorThreadDump(
execId: String,
threadStackTraces: Array[ThreadStackTrace])
extends SparkListenerEvent

/**
* Periodic updates from executors.
* @param execId executor id
Expand Down Expand Up @@ -178,11 +172,6 @@ trait SparkListener {
*/
def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { }

/**
* Called when the driver receives thread dumps from an executor in a heartbeat.
*/
def onExecutorThreadDump(executorThreadDump: SparkListenerExecutorThreadDump) {}

/**
* Called when the driver receives task metrics from an executor in a heartbeat.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@ private[spark] trait SparkListenerBus extends Logging {
foreachListener(_.onApplicationEnd(applicationEnd))
case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
foreachListener(_.onExecutorMetricsUpdate(metricsUpdate))
case threadDump: SparkListenerExecutorThreadDump =>
foreachListener(_.onExecutorThreadDump(threadDump))
case SparkListenerShutdown =>
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.scheduler
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.ThreadStackTrace

/**
* Low-level task scheduler interface, currently implemented exclusively by TaskSchedulerImpl.
Expand Down Expand Up @@ -66,12 +65,7 @@ private[spark] trait TaskScheduler {
* indicating that the block manager should re-register.
*/
def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)],
blockManagerId: BlockManagerId): Boolean

/**
* Called when a thread dump has been received from an executor.
*/
def executorThreadDumpReceived(execId: String, stackTraces: Array[ThreadStackTrace])
blockManagerId: BlockManagerId): Boolean

/**
* Get an application ID associated with the job.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ import scala.util.Random
import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.util.{ThreadStackTrace, Utils}
import org.apache.spark.util.Utils
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId
import akka.actor.Props

/**
* Schedules tasks for multiple types of clusters by acting through a SchedulerBackend.
Expand Down Expand Up @@ -341,10 +342,6 @@ private[spark] class TaskSchedulerImpl(
dagScheduler.executorHeartbeatReceived(execId, metricsWithStageIds, blockManagerId)
}

override def executorThreadDumpReceived(execId: String, stackTraces: Array[ThreadStackTrace]) {
dagScheduler.executorThreadDumpReceived(execId, stackTraces)
}

def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long) {
taskSetManager.handleTaskGettingResult(tid)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ private[spark] class LocalActor(

private var freeCores = totalCores

private val localExecutorId = "<driver>"
private val localExecutorId = "localhost"
private val localExecutorHostname = "localhost"

val executor = new Executor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ class BlockManagerMaster(
askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId))
}

def getActorSystemHostPortForExecutor(executorId: String): Option[(String, Int)] = {
askDriverWithReply[Option[(String, Int)]](GetActorSystemHostPortForExecutor(executorId))
}

/**
* Remove a block from the slaves that have it. This can only be used to remove
* blocks that the driver knows about.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
case GetPeers(blockManagerId) =>
sender ! getPeers(blockManagerId)

case GetActorSystemHostPortForExecutor(executorId) =>
sender ! getActorSystemHostPortForExecutor(executorId)

case GetMemoryStatus =>
sender ! memoryStatus

Expand Down Expand Up @@ -412,6 +415,17 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
Seq.empty
}
}

private def getActorSystemHostPortForExecutor(executorId: String): Option[(String, Int)] = {
for (
blockManagerId <- blockManagerIdByExecutor.get(executorId);
info <- blockManagerInfo.get(blockManagerId);
host <- info.slaveActor.path.address.host;
port <- info.slaveActor.path.address.port
) yield {
(host, port)
}
}
}

@DeveloperApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ private[spark] object BlockManagerMessages {

case class GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster

case class GetActorSystemHostPortForExecutor(executorId: String) extends ToBlockManagerMaster

case class RemoveExecutor(execId: String) extends ToBlockManagerMaster

case object StopBlockManagerMaster extends ToBlockManagerMaster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import org.apache.spark.ui.{SparkUI, SparkUITab}
private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "executors") {
val executorsListener = parent.executorsListener
val jobProgressListener = parent.jobProgressListener
val threadDumpEnabled = parent.conf.getBoolean("spark.executor.sendThreadDumps", true)
val threadDumpEnabled = parent.conf.getBoolean("spark.ui.threadDumpsEnabled", true)
val sc = parent.sc

attachPage(new ExecutorsPage(this, threadDumpEnabled))
if (threadDumpEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,23 @@ package org.apache.spark.ui.exec

import javax.servlet.http.HttpServletRequest

import scala.util.Try
import scala.xml.{Text, Node}

import org.apache.spark.ui.{UIUtils, WebUIPage}

class ThreadDumpPage(parent: ExecutorsTab) extends WebUIPage("threadDump") {

private val sc = parent.sc

def render(request: HttpServletRequest): Seq[Node] = {
val executorId = Option(request.getParameter("executorId")).getOrElse {
return Text(s"Missing executorId parameter")
}
val maybeThreadDump = parent.jobProgressListener synchronized {
parent.jobProgressListener.executorIdToLastThreadDump.get(executorId)
}
val content = maybeThreadDump.map { case (time, threadDump) =>
val time = System.currentTimeMillis()
val maybeThreadDump = Try(sc.get.getExecutorThreadDump(executorId))

val content = maybeThreadDump.map { threadDump =>
val dumpRows = threadDump.map { thread =>
<div class="accordion-group">
<div class="accordion-heading" onclick="$(this).next().toggleClass('hidden')">
Expand Down Expand Up @@ -62,7 +65,7 @@ class ThreadDumpPage(parent: ExecutorsTab) extends WebUIPage("threadDump") {
}
<div class="accordion">{dumpRows}</div>
</div>
}.getOrElse(Text("No thread dump to display"))
}.getOrElse(Text("Error fetching thread dump"))
UIUtils.headerSparkPage(s"Thread dump for executor $executorId", content, parent)
}
}
Loading

0 comments on commit f4ac1c1

Please sign in to comment.