diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index 58197f9d84dfc..e6a4aaf8ecec9 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -18,10 +18,10 @@ package org.apache.spark import akka.actor.Actor -import org.apache.spark.executor.{TaskMetrics, ThreadStackTrace} +import org.apache.spark.executor.TaskMetrics import org.apache.spark.storage.BlockManagerId import org.apache.spark.scheduler.TaskScheduler -import org.apache.spark.util.ActorLogReceive +import org.apache.spark.util.{ActorLogReceive, ThreadStackTrace} /** * A heartbeat from executors to the driver. This is a shared message used by several internal diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index e8fdfff04390d..7e7dca5a6811a 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -21,9 +21,8 @@ import scala.language.implicitConversions import java.io._ import java.net.URI -import java.util.Arrays +import java.util.{Arrays, Properties, Timer, TimerTask, UUID} import java.util.concurrent.atomic.AtomicInteger -import java.util.{Properties, UUID} import java.util.UUID.randomUUID import scala.collection.{Map, Set} import scala.collection.generic.Growable @@ -242,6 +241,18 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { // the bound port to the cluster manager properly ui.foreach(_.bind()) + // If we are not running in local mode, then start a new timer thread for capturing driver thread + // dumps for display in the web UI (in local mode, this is handled by the local Executor): + private val threadDumpTimer = new Timer("Driver thread dump timer", true) + if (!isLocal) { + val threadDumpInterval = conf.getInt("spark.executor.heartbeatInterval", 10000) + threadDumpTimer.scheduleAtFixedRate(new TimerTask { + override def run(): Unit = { + listenerBus.post(SparkListenerExecutorThreadDump("", Utils.getThreadDump())) + } + }, threadDumpInterval, threadDumpInterval) + } + /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */ val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf) @@ -960,6 +971,7 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { def stop() { postApplicationEnd() ui.foreach(_.stop()) + threadDumpTimer.cancel() // Do this only if not stopped already - best case effort. // prevent NPE if stopped more than once. val dagSchedulerCopy = dagScheduler diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index d36b08572590a..91f036595d048 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -379,12 +379,7 @@ private[spark] class Executor( } } } - val threadDump = Thread.getAllStackTraces.toArray.sortBy(_._1.getId).map { - case (thread, stackElements) => - val stackTrace = stackElements.map(_.toString).mkString("\n") - ThreadStackTrace(thread.getId, thread.getName, thread.getState.toString, stackTrace) - } - val message = Heartbeat(executorId, threadDump, tasksMetrics.toArray, + val message = Heartbeat(executorId, Utils.getThreadDump(), tasksMetrics.toArray, env.blockManager.blockManagerId) try { val response = AkkaUtils.askWithReply[HeartbeatResponse](message, heartbeatReceiverRef, diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 5f2d3195500d4..14f26daa3665b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -35,11 +35,11 @@ import akka.util.Timeout import org.apache.spark._ import org.apache.spark.broadcast.Broadcast -import org.apache.spark.executor.{TaskMetrics, ThreadStackTrace} +import org.apache.spark.executor.TaskMetrics import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult} import org.apache.spark.rdd.RDD import org.apache.spark.storage._ -import org.apache.spark.util.{CallSite, SystemClock, Clock, Utils} +import org.apache.spark.util._ import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index e2b7e1f34c02b..71e72fe2b97f3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -24,9 +24,9 @@ import scala.collection.mutable import org.apache.spark.{Logging, TaskEndReason} import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.executor.{TaskMetrics, ThreadStackTrace} +import org.apache.spark.executor.TaskMetrics import org.apache.spark.storage.BlockManagerId -import org.apache.spark.util.{Distribution, Utils} +import org.apache.spark.util.{Distribution, Utils, ThreadStackTrace} @DeveloperApi sealed trait SparkListenerEvent diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala index e79ffd7a3587d..11b4f4e44ae51 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala @@ -70,6 +70,8 @@ private[spark] trait SparkListenerBus extends Logging { foreachListener(_.onApplicationEnd(applicationEnd)) case metricsUpdate: SparkListenerExecutorMetricsUpdate => foreachListener(_.onExecutorMetricsUpdate(metricsUpdate)) + case threadDump: SparkListenerExecutorThreadDump => + foreachListener(_.onExecutorThreadDump(threadDump)) case SparkListenerShutdown => } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index fcdf621bf8b66..54a11d3ae35f3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -18,8 +18,9 @@ package org.apache.spark.scheduler import org.apache.spark.scheduler.SchedulingMode.SchedulingMode -import org.apache.spark.executor.{TaskMetrics, ThreadStackTrace} +import org.apache.spark.executor.TaskMetrics import org.apache.spark.storage.BlockManagerId +import org.apache.spark.util.ThreadStackTrace /** * Low-level task scheduler interface, currently implemented exclusively by TaskSchedulerImpl. diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 789717ee42f6a..41a663b31d37c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -31,8 +31,8 @@ import scala.util.Random import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.scheduler.SchedulingMode.SchedulingMode -import org.apache.spark.util.Utils -import org.apache.spark.executor.{TaskMetrics, ThreadStackTrace} +import org.apache.spark.util.{ThreadStackTrace, Utils} +import org.apache.spark.executor.TaskMetrics import org.apache.spark.storage.BlockManagerId /** diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index b089cfaf2670d..a1a0b1c0d7fee 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -21,11 +21,12 @@ import scala.collection.mutable.{HashMap, ListBuffer} import org.apache.spark._ import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.executor.{TaskMetrics, ThreadStackTrace} +import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler._ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.storage.BlockManagerId import org.apache.spark.ui.jobs.UIData._ +import org.apache.spark.util.ThreadStackTrace /** * :: DeveloperApi :: diff --git a/core/src/main/scala/org/apache/spark/executor/ThreadStackTrace.scala b/core/src/main/scala/org/apache/spark/util/ThreadStackTrace.scala similarity index 96% rename from core/src/main/scala/org/apache/spark/executor/ThreadStackTrace.scala rename to core/src/main/scala/org/apache/spark/util/ThreadStackTrace.scala index 610c11c699971..f9514097f2b89 100644 --- a/core/src/main/scala/org/apache/spark/executor/ThreadStackTrace.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadStackTrace.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.executor +package org.apache.spark.util /** * Used for shipping per-thread stacktraces from the executors to driver. diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index e1dc49238733c..7d21c6fd24925 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1568,6 +1568,15 @@ private[spark] object Utils extends Logging { s"$className: $desc\n$st" } + /** Return a thread dump of all threads' stacktraces. Used to capture dumps for the web UI */ + def getThreadDump(): Array[ThreadStackTrace] = { + Thread.getAllStackTraces.toArray.sortBy(_._1.getId).map { + case (thread, stackElements) => + val stackTrace = stackElements.map(_.toString).mkString("\n") + ThreadStackTrace(thread.getId, thread.getName, thread.getState.toString, stackTrace) + } + } + /** * Convert all spark properties set in the given SparkConf to a sequence of java options. */ diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index a3e9318e7e0a9..14124fe998ebe 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -31,8 +31,8 @@ import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster} -import org.apache.spark.util.CallSite -import org.apache.spark.executor.{TaskMetrics, ThreadStackTrace} +import org.apache.spark.util.{CallSite, ThreadStackTrace} +import org.apache.spark.executor.TaskMetrics class BuggyDAGEventProcessActor extends Actor { val state = 0