From 56cc7fbcaf04a5aab88296d20da2cfc5b84a7651 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 28 Mar 2014 14:45:46 -0700 Subject: [PATCH] First cut implementation of Streaming UI. --- .../spark/streaming/StreamingContext.scala | 4 + .../spark/streaming/ui/StreamingUI.scala | 131 ++++++++++++++++++ .../apache/spark/streaming/ui/UIUtils.scala | 72 ++++++++++ 3 files changed, 207 insertions(+) create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index e198c69470c1f..d45cdac5bef41 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -40,6 +40,7 @@ import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.receivers._ import org.apache.spark.streaming.scheduler._ import org.apache.hadoop.conf.Configuration +import org.apache.spark.streaming.ui.StreamingUI /** * Main entry point for Spark Streaming functionality. It provides methods used to create @@ -158,6 +159,9 @@ class StreamingContext private[streaming] ( private[streaming] val waiter = new ContextWaiter + private[streaming] val ui = new StreamingUI(this) + ui.bind() + /** * Return the associated Spark context */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala new file mode 100644 index 0000000000000..e9f8d21faab45 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.ui + +import scala.collection.mutable.SynchronizedQueue +import scala.xml.Node + +import javax.servlet.http.HttpServletRequest +import org.eclipse.jetty.servlet.ServletContextHandler + +import org.apache.spark.Logging +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.scheduler.{BatchInfo, StreamingListener, StreamingListenerBatchCompleted} +import org.apache.spark.ui.{ServerInfo, SparkUI} +import org.apache.spark.ui.JettyUtils._ +import org.apache.spark.util.{Distribution, Utils} + +private[spark] class StreamingUIListener() extends StreamingListener { + + private val batchInfos = new SynchronizedQueue[BatchInfo] + private val maxBatchInfos = 100 + + override def onBatchCompleted(batchStarted: StreamingListenerBatchCompleted) { + batchInfos.enqueue(batchStarted.batchInfo) + if (batchInfos.size > maxBatchInfos) batchInfos.dequeue() + } + + def processingDelayDistribution = extractDistribution(_.processingDelay) + + def schedulingDelayDistribution = extractDistribution(_.schedulingDelay) + + def totalDelay = extractDistribution(_.totalDelay) + + def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = { + Distribution(batchInfos.flatMap(getMetric(_)).map(_.toDouble)) + } + + def numBatchInfos = batchInfos.size +} + +private[spark] class StreamingUI(ssc: StreamingContext) extends Logging { + + private val sc = ssc.sparkContext + private val conf = sc.conf + private val appName = sc.appName + private val bindHost = Utils.localHostName() + private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(bindHost) + private val port = conf.getInt("spark.streaming.ui.port", StreamingUI.DEFAULT_PORT) + private val securityManager = sc.env.securityManager + private val listener = new StreamingUIListener() + private val handlers: Seq[ServletContextHandler] = { + Seq( + createServletHandler("/", + (request: HttpServletRequest) => render(request), securityManager), + createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static") + ) + } + + private var serverInfo: Option[ServerInfo] = None + + ssc.addStreamingListener(listener) + + def bind() { + try { + serverInfo = Some(startJettyServer(bindHost, port, handlers, sc.conf)) + logInfo("Started Spark Streaming Web UI at http://%s:%d".format(publicHost, boundPort)) + } catch { + case e: Exception => + logError("Failed to create Spark JettyUtils", e) + System.exit(1) + } + } + + def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1) + + private def render(request: HttpServletRequest): Seq[Node] = { + val batchStatsTable = generateBatchStatsTable() + val content = batchStatsTable + UIUtils.headerStreamingPage(content, "", appName, "Spark Streaming Overview") + } + + private def generateBatchStatsTable(): Seq[Node] = { + def getQuantiles(timeDistributionOption: Option[Distribution]) = { + timeDistributionOption.get.getQuantiles().map { ms => Utils.msDurationToString(ms.toLong) } + } + val numBatches = listener.numBatchInfos + val table = if (numBatches > 0) { + val processingDelayQuantilesRow = + "Processing Times" +: getQuantiles(listener.processingDelayDistribution) + val schedulingDelayQuantilesRow = + "Scheduling Delay:" +: getQuantiles(listener.processingDelayDistribution) + val totalDelayQuantilesRow = + "End-to-end Delay:" +: getQuantiles(listener.totalDelay) + + val headerRow = Seq("Metric", "Min", "25th percentile", + "Median", "75th percentile", "Max") + val dataRows: Seq[Seq[String]] = Seq( + processingDelayQuantilesRow, + schedulingDelayQuantilesRow, + totalDelayQuantilesRow + ) + Some(UIUtils.listingTable(headerRow, dataRows, fixedWidth = true)) + } else { + None + } + + val content = +

Batch Processing Statistics

++ +
{table.getOrElse("No statistics have been generated yet.")}
+ content + } +} + +object StreamingUI { + val DEFAULT_PORT = 6060 +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala new file mode 100644 index 0000000000000..62e95135fa5c5 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala @@ -0,0 +1,72 @@ +package org.apache.spark.streaming.ui + +import scala.xml.Node +import org.apache.spark.ui.Page + +private[spark] object UIUtils { + + import org.apache.spark.ui.UIUtils.prependBaseUri + + def headerStreamingPage( + content: => Seq[Node], + basePath: String, + appName: String, + title: String): Seq[Node] = { + val overview = { +
  • Overview
  • + } + + + + + + + + {appName} - {title} + + + + +
    +
    +
    +

    + {title} +

    +
    +
    + {content} +
    + + + } + + def listingTable[T]( + headers: Seq[String], + makeRow: T => Seq[Node], + rows: Seq[T], + fixedWidth: Boolean = false): Seq[Node] = { + org.apache.spark.ui.UIUtils.listingTable(headers, makeRow, rows, fixedWidth) + } + + def listingTable[T]( + headers: Seq[String], + rows: Seq[Seq[String]], + fixedWidth: Boolean = false + ): Seq[Node] = { + def makeRow(data: Seq[String]): Seq[Node] = {data.map(d => {d})} + org.apache.spark.ui.UIUtils.listingTable(headers, makeRow, rows, fixedWidth) + } +}