Skip to content

Commit

Permalink
SPARK-2450 Adds executor log links to Web UI
Browse files Browse the repository at this point in the history
Adds links to stderr/stdout in the executor tab of the webUI for:
1) Standalone
2) Yarn client
3) Yarn cluster

This tries to add the log url support in a general way so as to make it easy to add support for all the
cluster managers. This is done by using environment variables to pass to the executor the log urls. The
SPARK_LOG_URL_ prefix is used and so additional logs besides stderr/stdout can also be added.

To propagate this information to the UI we use the onExecutorAdded spark listener event.

Although this commit doesn't add log urls when running on a mesos cluster, it should be possible to add using the same mechanism.

Author: Kostas Sakellis <[email protected]>
Author: Josh Rosen <[email protected]>

Closes #3486 from ksakellis/kostas-spark-2450 and squashes the following commits:

d190936 [Josh Rosen] Fix a few minor style / formatting nits. Reset listener after each test Don't null listener out at end of main().
8673fe1 [Kostas Sakellis] CR feedback. Hide the log column if there are no logs available
5bf6952 [Kostas Sakellis] [SPARK-2450] [CORE] Adds exeuctor log links to Web UI

(cherry picked from commit 32e964c)
Signed-off-by: Josh Rosen <[email protected]>
  • Loading branch information
Kostas Sakellis authored and JoshRosen committed Feb 6, 2015
1 parent 3feb798 commit e74dd04
Show file tree
Hide file tree
Showing 18 changed files with 178 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ private[spark] class ExecutorRunner(
val worker: ActorRef,
val workerId: String,
val host: String,
val webUiPort: Int,
val sparkHome: File,
val executorDir: File,
val workerUrl: String,
Expand Down Expand Up @@ -134,6 +135,12 @@ private[spark] class ExecutorRunner(
// In case we are running this from within the Spark Shell, avoid creating a "scala"
// parent process for the executor command
builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")

// Add webUI log urls
val baseUrl = s"http://$host:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")

process = builder.start()
val header = "Spark Executor Command: %s\n%s\n\n".format(
command.mkString("\"", "\" \"", "\""), "=" * 40)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ private[spark] class Worker(
self,
workerId,
host,
webUiPort,
sparkHome,
executorDir,
akkaUrl,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,16 @@ private[spark] class CoarseGrainedExecutorBackend(
override def preStart() {
logInfo("Connecting to driver: " + driverUrl)
driver = context.actorSelection(driverUrl)
driver ! RegisterExecutor(executorId, hostPort, cores)
driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls)
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}

def extractLogUrls: Map[String, String] = {
val prefix = "SPARK_LOG_URL_"
sys.env.filterKeys(_.startsWith(prefix))
.map(e => (e._1.substring(prefix.length).toLowerCase, e._2))
}

override def receiveWithLogging = {
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ private[spark] object CoarseGrainedClusterMessages {
case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage

// Executors to driver
case class RegisterExecutor(executorId: String, hostPort: String, cores: Int)
case class RegisterExecutor(
executorId: String,
hostPort: String,
cores: Int,
logUrls: Map[String, String])
extends CoarseGrainedClusterMessage {
Utils.checkHostPort(hostPort, "Expected host port")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
}

def receiveWithLogging = {
case RegisterExecutor(executorId, hostPort, cores) =>
case RegisterExecutor(executorId, hostPort, cores, logUrls) =>
Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
if (executorDataMap.contains(executorId)) {
sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)
Expand All @@ -98,7 +98,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
val (host, _) = Utils.parseHostPort(hostPort)
val data = new ExecutorData(sender, sender.path.address, host, cores, cores)
val data = new ExecutorData(sender, sender.path.address, host, cores, cores, logUrls)
// This must be synchronized because variables mutated
// in this block are read when requesting executors
CoarseGrainedSchedulerBackend.this.synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,6 @@ private[cluster] class ExecutorData(
val executorAddress: Address,
override val executorHost: String,
var freeCores: Int,
override val totalCores: Int
) extends ExecutorInfo(executorHost, totalCores)
override val totalCores: Int,
override val logUrlMap: Map[String, String]
) extends ExecutorInfo(executorHost, totalCores, logUrlMap)
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,22 @@ import org.apache.spark.annotation.DeveloperApi
@DeveloperApi
class ExecutorInfo(
val executorHost: String,
val totalCores: Int
) {
val totalCores: Int,
val logUrlMap: Map[String, String]) {

def canEqual(other: Any): Boolean = other.isInstanceOf[ExecutorInfo]

override def equals(other: Any): Boolean = other match {
case that: ExecutorInfo =>
(that canEqual this) &&
executorHost == that.executorHost &&
totalCores == that.totalCores
totalCores == that.totalCores &&
logUrlMap == that.logUrlMap
case _ => false
}

override def hashCode(): Int = {
val state = Seq(executorHost, totalCores)
val state = Seq(executorHost, totalCores, logUrlMap)
state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,8 @@ private[spark] class MesosSchedulerBackend(
mesosTasks.foreach { case (slaveId, tasks) =>
slaveIdToWorkerOffer.get(slaveId).foreach(o =>
listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), slaveId,
new ExecutorInfo(o.host, o.cores)))
// TODO: Add support for log urls for Mesos
new ExecutorInfo(o.host, o.cores, Map.empty)))
)
d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters)
}
Expand Down
31 changes: 26 additions & 5 deletions core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage}
import org.apache.spark.util.Utils

/** Summary information about an executor to display in the UI. */
private case class ExecutorSummaryInfo(
// Needs to be private[ui] because of a false positive MiMa failure.
private[ui] case class ExecutorSummaryInfo(
id: String,
hostPort: String,
rddBlocks: Int,
Expand All @@ -40,7 +41,8 @@ private case class ExecutorSummaryInfo(
totalInputBytes: Long,
totalShuffleRead: Long,
totalShuffleWrite: Long,
maxMemory: Long)
maxMemory: Long,
executorLogs: Map[String, String])

private[ui] class ExecutorsPage(
parent: ExecutorsTab,
Expand All @@ -55,6 +57,7 @@ private[ui] class ExecutorsPage(
val diskUsed = storageStatusList.map(_.diskUsed).sum
val execInfo = for (statusId <- 0 until storageStatusList.size) yield getExecInfo(statusId)
val execInfoSorted = execInfo.sortBy(_.id)
val logsExist = execInfo.filter(_.executorLogs.nonEmpty).nonEmpty

val execTable =
<table class={UIUtils.TABLE_CLASS_STRIPED}>
Expand All @@ -79,10 +82,11 @@ private[ui] class ExecutorsPage(
Shuffle Write
</span>
</th>
{if (logsExist) <th class="sorttable_nosort">Logs</th> else Seq.empty}
{if (threadDumpEnabled) <th class="sorttable_nosort">Thread Dump</th> else Seq.empty}
</thead>
<tbody>
{execInfoSorted.map(execRow)}
{execInfoSorted.map(execRow(_, logsExist))}
</tbody>
</table>

Expand All @@ -107,7 +111,7 @@ private[ui] class ExecutorsPage(
}

/** Render an HTML row representing an executor */
private def execRow(info: ExecutorSummaryInfo): Seq[Node] = {
private def execRow(info: ExecutorSummaryInfo, logsExist: Boolean): Seq[Node] = {
val maximumMemory = info.maxMemory
val memoryUsed = info.memoryUsed
val diskUsed = info.diskUsed
Expand Down Expand Up @@ -138,6 +142,21 @@ private[ui] class ExecutorsPage(
<td sorttable_customkey={info.totalShuffleWrite.toString}>
{Utils.bytesToString(info.totalShuffleWrite)}
</td>
{
if (logsExist) {
<td>
{
info.executorLogs.map { case (logName, logUrl) =>
<div>
<a href={logUrl}>
{logName}
</a>
</div>
}
}
</td>
}
}
{
if (threadDumpEnabled) {
val encodedId = URLEncoder.encode(info.id, "UTF-8")
Expand Down Expand Up @@ -168,6 +187,7 @@ private[ui] class ExecutorsPage(
val totalInputBytes = listener.executorToInputBytes.getOrElse(execId, 0L)
val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0L)
val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0L)
val executorLogs = listener.executorToLogUrls.getOrElse(execId, Map.empty)

new ExecutorSummaryInfo(
execId,
Expand All @@ -183,7 +203,8 @@ private[ui] class ExecutorsPage(
totalInputBytes,
totalShuffleRead,
totalShuffleWrite,
maxMem
maxMem,
executorLogs
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,15 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
val executorToOutputBytes = HashMap[String, Long]()
val executorToShuffleRead = HashMap[String, Long]()
val executorToShuffleWrite = HashMap[String, Long]()
val executorToLogUrls = HashMap[String, Map[String, String]]()

def storageStatusList = storageStatusListener.storageStatusList

override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded) = synchronized {
val eid = executorAdded.executorId
executorToLogUrls(eid) = executorAdded.executorInfo.logUrlMap
}

override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
val eid = taskStart.taskInfo.executorId
executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,8 @@ private[spark] object JsonProtocol {

def executorInfoToJson(executorInfo: ExecutorInfo): JValue = {
("Host" -> executorInfo.executorHost) ~
("Total Cores" -> executorInfo.totalCores)
("Total Cores" -> executorInfo.totalCores) ~
("Log Urls" -> mapToJson(executorInfo.logUrlMap))
}

/** ------------------------------ *
Expand Down Expand Up @@ -792,7 +793,8 @@ private[spark] object JsonProtocol {
def executorInfoFromJson(json: JValue): ExecutorInfo = {
val executorHost = (json \ "Host").extract[String]
val totalCores = (json \ "Total Cores").extract[Int]
new ExecutorInfo(executorHost, totalCores)
val logUrls = mapFromJson(json \ "Log Urls").toMap
new ExecutorInfo(executorHost, totalCores, logUrls)
}

/** -------------------------------- *
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class JsonProtocolSuite extends FunSuite {
}

def createExecutorRunner(): ExecutorRunner = {
new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host",
new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host", 123,
new File("sparkHome"), new File("workDir"), "akka://worker",
new SparkConf, Seq("localDir"), ExecutorState.RUNNING)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy

import scala.collection.mutable

import org.scalatest.{BeforeAndAfter, FunSuite}

import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.scheduler.{SparkListenerExecutorAdded, SparkListener}
import org.apache.spark.{SparkContext, LocalSparkContext}

class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext with BeforeAndAfter {

/** Length of time to wait while draining listener events. */
val WAIT_TIMEOUT_MILLIS = 10000

before {
sc = new SparkContext("local-cluster[2,1,512]", "test")
}

test("verify log urls get propagated from workers") {
val listener = new SaveExecutorInfo
sc.addSparkListener(listener)

val rdd1 = sc.parallelize(1 to 100, 4)
val rdd2 = rdd1.map(_.toString)
rdd2.setName("Target RDD")
rdd2.count()

assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
listener.addedExecutorInfos.values.foreach { info =>
assert(info.logUrlMap.nonEmpty)
}
}

private class SaveExecutorInfo extends SparkListener {
val addedExecutorInfos = mutable.Map[String, ExecutorInfo]()

override def onExecutorAdded(executor: SparkListenerExecutorAdded) {
addedExecutorInfos(executor.executorId) = executor.executorInfo
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class ExecutorRunnerTest extends FunSuite {
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
val appDesc = new ApplicationDescription("app name", Some(8), 500,
Command("foo", Seq(appId), Map(), Seq(), Seq(), Seq()), "appUiUrl")
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321",
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", 123,
new File(sparkHome), new File("ooga"), "blah", new SparkConf, Seq("localDir"),
ExecutorState.RUNNING)
val builder = CommandUtils.buildProcessBuilder(appDesc.command, 512, sparkHome, er.substituteVariables)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
conf.set("spark.mesos.executor.home" , "/mesos-home")

val listenerBus = EasyMock.createMock(classOf[LiveListenerBus])
listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2)))
listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
EasyMock.replay(listenerBus)

val sc = EasyMock.createMock(classOf[SparkContext])
Expand Down Expand Up @@ -88,7 +88,7 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl])

val listenerBus = EasyMock.createMock(classOf[LiveListenerBus])
listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2)))
listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
EasyMock.replay(listenerBus)

val sc = EasyMock.createMock(classOf[SparkContext])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,9 @@ class JsonProtocolSuite extends FunSuite {
val unpersistRdd = SparkListenerUnpersistRDD(12345)
val applicationStart = SparkListenerApplicationStart("The winner of all", None, 42L, "Garfield")
val applicationEnd = SparkListenerApplicationEnd(42L)
val logUrlMap = Map("stderr" -> "mystderr", "stdout" -> "mystdout").toMap
val executorAdded = SparkListenerExecutorAdded(executorAddedTime, "exec1",
new ExecutorInfo("Hostee.awesome.com", 11))
new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap))
val executorRemoved = SparkListenerExecutorRemoved(executorRemovedTime, "exec2", "test reason")

testEvent(stageSubmitted, stageSubmittedJsonString)
Expand All @@ -100,13 +101,14 @@ class JsonProtocolSuite extends FunSuite {
}

test("Dependent Classes") {
val logUrlMap = Map("stderr" -> "mystderr", "stdout" -> "mystdout").toMap
testRDDInfo(makeRddInfo(2, 3, 4, 5L, 6L))
testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L))
testTaskInfo(makeTaskInfo(999L, 888, 55, 777L, false))
testTaskMetrics(makeTaskMetrics(
33333L, 44444L, 55555L, 66666L, 7, 8, hasHadoopInput = false, hasOutput = false))
testBlockManagerId(BlockManagerId("Hong", "Kong", 500))
testExecutorInfo(new ExecutorInfo("host", 43))
testExecutorInfo(new ExecutorInfo("host", 43, logUrlMap))

// StorageLevel
testStorageLevel(StorageLevel.NONE)
Expand Down Expand Up @@ -1463,7 +1465,11 @@ class JsonProtocolSuite extends FunSuite {
| "Executor ID": "exec1",
| "Executor Info": {
| "Host": "Hostee.awesome.com",
| "Total Cores": 11
| "Total Cores": 11,
| "Log Urls" : {
| "stderr" : "mystderr",
| "stdout" : "mystdout"
| }
| }
|}
"""
Expand Down
Loading

0 comments on commit e74dd04

Please sign in to comment.