Skip to content

Commit

Permalink
Enable thread dumps from the driver when running in non-local mode.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Oct 28, 2014
1 parent cc3e6b3 commit 2b8bdf3
Show file tree
Hide file tree
Showing 12 changed files with 41 additions and 21 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
package org.apache.spark

import akka.actor.Actor
import org.apache.spark.executor.{TaskMetrics, ThreadStackTrace}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.scheduler.TaskScheduler
import org.apache.spark.util.ActorLogReceive
import org.apache.spark.util.{ActorLogReceive, ThreadStackTrace}

/**
* A heartbeat from executors to the driver. This is a shared message used by several internal
Expand Down
16 changes: 14 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ import scala.language.implicitConversions

import java.io._
import java.net.URI
import java.util.Arrays
import java.util.{Arrays, Properties, Timer, TimerTask, UUID}
import java.util.concurrent.atomic.AtomicInteger
import java.util.{Properties, UUID}
import java.util.UUID.randomUUID
import scala.collection.{Map, Set}
import scala.collection.generic.Growable
Expand Down Expand Up @@ -242,6 +241,18 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
// the bound port to the cluster manager properly
ui.foreach(_.bind())

// If we are not running in local mode, then start a new timer thread for capturing driver thread
// dumps for display in the web UI (in local mode, this is handled by the local Executor):
private val threadDumpTimer = new Timer("Driver thread dump timer", true)
if (!isLocal) {
val threadDumpInterval = conf.getInt("spark.executor.heartbeatInterval", 10000)
threadDumpTimer.scheduleAtFixedRate(new TimerTask {
override def run(): Unit = {
listenerBus.post(SparkListenerExecutorThreadDump("<driver>", Utils.getThreadDump()))
}
}, threadDumpInterval, threadDumpInterval)
}

/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf)

Expand Down Expand Up @@ -960,6 +971,7 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
def stop() {
postApplicationEnd()
ui.foreach(_.stop())
threadDumpTimer.cancel()
// Do this only if not stopped already - best case effort.
// prevent NPE if stopped more than once.
val dagSchedulerCopy = dagScheduler
Expand Down
7 changes: 1 addition & 6 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -379,12 +379,7 @@ private[spark] class Executor(
}
}
}
val threadDump = Thread.getAllStackTraces.toArray.sortBy(_._1.getId).map {
case (thread, stackElements) =>
val stackTrace = stackElements.map(_.toString).mkString("\n")
ThreadStackTrace(thread.getId, thread.getName, thread.getState.toString, stackTrace)
}
val message = Heartbeat(executorId, threadDump, tasksMetrics.toArray,
val message = Heartbeat(executorId, Utils.getThreadDump(), tasksMetrics.toArray,
env.blockManager.blockManagerId)
try {
val response = AkkaUtils.askWithReply[HeartbeatResponse](message, heartbeatReceiverRef,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ import akka.util.Timeout

import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.executor.{TaskMetrics, ThreadStackTrace}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage._
import org.apache.spark.util.{CallSite, SystemClock, Clock, Utils}
import org.apache.spark.util._
import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import scala.collection.mutable

import org.apache.spark.{Logging, TaskEndReason}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.{TaskMetrics, ThreadStackTrace}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.{Distribution, Utils}
import org.apache.spark.util.{Distribution, Utils, ThreadStackTrace}

@DeveloperApi
sealed trait SparkListenerEvent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ private[spark] trait SparkListenerBus extends Logging {
foreachListener(_.onApplicationEnd(applicationEnd))
case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
foreachListener(_.onExecutorMetricsUpdate(metricsUpdate))
case threadDump: SparkListenerExecutorThreadDump =>
foreachListener(_.onExecutorThreadDump(threadDump))
case SparkListenerShutdown =>
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
package org.apache.spark.scheduler

import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.executor.{TaskMetrics, ThreadStackTrace}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.ThreadStackTrace

/**
* Low-level task scheduler interface, currently implemented exclusively by TaskSchedulerImpl.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import scala.util.Random
import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.util.Utils
import org.apache.spark.executor.{TaskMetrics, ThreadStackTrace}
import org.apache.spark.util.{ThreadStackTrace, Utils}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ import scala.collection.mutable.{HashMap, ListBuffer}

import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.{TaskMetrics, ThreadStackTrace}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.ui.jobs.UIData._
import org.apache.spark.util.ThreadStackTrace

/**
* :: DeveloperApi ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.executor
package org.apache.spark.util

/**
* Used for shipping per-thread stacktraces from the executors to driver.
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1568,6 +1568,15 @@ private[spark] object Utils extends Logging {
s"$className: $desc\n$st"
}

/** Return a thread dump of all threads' stacktraces. Used to capture dumps for the web UI */
def getThreadDump(): Array[ThreadStackTrace] = {
Thread.getAllStackTraces.toArray.sortBy(_._1.getId).map {
case (thread, stackElements) =>
val stackTrace = stackElements.map(_.toString).mkString("\n")
ThreadStackTrace(thread.getId, thread.getName, thread.getState.toString, stackTrace)
}
}

/**
* Convert all spark properties set in the given SparkConf to a sequence of java options.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
import org.apache.spark.util.CallSite
import org.apache.spark.executor.{TaskMetrics, ThreadStackTrace}
import org.apache.spark.util.{CallSite, ThreadStackTrace}
import org.apache.spark.executor.TaskMetrics

class BuggyDAGEventProcessActor extends Actor {
val state = 0
Expand Down

0 comments on commit 2b8bdf3

Please sign in to comment.