diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index e97c45a7d31e3..906d4067a14eb 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -17,30 +17,28 @@
package org.apache.spark.streaming
-import scala.collection.mutable.Queue
-import scala.collection.Map
-import scala.reflect.ClassTag
-
import java.io.InputStream
import java.util.concurrent.atomic.AtomicInteger
-import akka.actor.Props
-import akka.actor.SupervisorStrategy
-import org.apache.hadoop.io.LongWritable
-import org.apache.hadoop.io.Text
+import scala.collection.Map
+import scala.collection.mutable.Queue
+import scala.reflect.ClassTag
+
+import akka.actor.{Props, SupervisorStrategy}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
-import org.apache.hadoop.fs.Path
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.MetadataCleaner
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receivers._
import org.apache.spark.streaming.scheduler._
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.streaming.ui.StreamingUI
+import org.apache.spark.streaming.ui.StreamingTab
+import org.apache.spark.util.MetadataCleaner
/**
* Main entry point for Spark Streaming functionality. It provides methods used to create
@@ -159,8 +157,8 @@ class StreamingContext private[streaming] (
private[streaming] val waiter = new ContextWaiter
- private[streaming] val ui = new StreamingUI(this)
- ui.bind()
+ private[streaming] val ui = new StreamingTab(this)
+ ui.start()
/** Enumeration to identify current state of the StreamingContext */
private[streaming] object StreamingContextState extends Enumeration {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
index cd37fb6380819..5a249706b4d2f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
@@ -31,7 +31,7 @@ import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.rdd.{BlockRDD, RDD}
import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId}
import org.apache.spark.streaming._
-import org.apache.spark.streaming.scheduler.{AddBlocks, DeregisterReceiver, ReceivedBlockInfo, RegisterReceiver}
+import org.apache.spark.streaming.scheduler.{AddBlock, DeregisterReceiver, ReceivedBlockInfo, RegisterReceiver}
import org.apache.spark.streaming.util.{RecurringTimer, SystemClock}
import org.apache.spark.util.{AkkaUtils, Utils}
@@ -237,7 +237,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
level: StorageLevel
) {
env.blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], level)
- trackerActor ! AddBlocks(ReceivedBlockInfo(streamId, blockId, arrayBuffer.size, metadata))
+ trackerActor ! AddBlock(ReceivedBlockInfo(streamId, blockId, arrayBuffer.size, metadata))
logDebug("Pushed block " + blockId)
}
@@ -251,7 +251,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
level: StorageLevel
) {
env.blockManager.putBytes(blockId, bytes, level)
- trackerActor ! AddBlocks(ReceivedBlockInfo(streamId, blockId, -1, metadata))
+ trackerActor ! AddBlock(ReceivedBlockInfo(streamId, blockId, -1, metadata))
}
/** Set the ID of the DStream that this receiver is associated with */
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
index 70da495863e8a..4fe4a10163c31 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
@@ -52,7 +52,7 @@ private[streaming] case class RegisterReceiver(
host: String,
receiverActor: ActorRef
) extends NetworkInputTrackerMessage
-private[streaming] case class AddBlocks(receivedBlockInfo: ReceivedBlockInfo)
+private[streaming] case class AddBlock(receivedBlockInfo: ReceivedBlockInfo)
extends NetworkInputTrackerMessage
private[streaming] case class DeregisterReceiver(streamId: Int, msg: String)
extends NetworkInputTrackerMessage
@@ -153,7 +153,7 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
case RegisterReceiver(streamId, typ, host, receiverActor) =>
registerReceiver(streamId, typ, host, receiverActor, sender)
sender ! true
- case AddBlocks(receivedBlockInfo) =>
+ case AddBlock(receivedBlockInfo) =>
addBlocks(receivedBlockInfo)
case DeregisterReceiver(streamId, message) =>
deregisterReceiver(streamId, message)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingProgressListener.scala
new file mode 100644
index 0000000000000..6db1af5245ce2
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingProgressListener.scala
@@ -0,0 +1,131 @@
+package org.apache.spark.streaming.ui
+
+import org.apache.spark.streaming.{Time, StreamingContext}
+import org.apache.spark.streaming.scheduler._
+import scala.collection.mutable.{Queue, HashMap}
+import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted
+import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted
+import org.apache.spark.streaming.scheduler.BatchInfo
+import org.apache.spark.streaming.scheduler.ReceiverInfo
+import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted
+import org.apache.spark.util.Distribution
+
+
+private[ui] class StreamingProgressListener(ssc: StreamingContext) extends StreamingListener {
+
+ private val waitingBatchInfos = new HashMap[Time, BatchInfo]
+ private val runningBatchInfos = new HashMap[Time, BatchInfo]
+ private val completedaBatchInfos = new Queue[BatchInfo]
+ private val batchInfoLimit = ssc.conf.getInt("spark.steaming.ui.maxBatches", 100)
+ private var totalCompletedBatches = 0L
+ private val receiverInfos = new HashMap[Int, ReceiverInfo]
+
+ val batchDuration = ssc.graph.batchDuration.milliseconds
+
+ override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) = {
+ synchronized {
+ receiverInfos.put(receiverStarted.receiverInfo.streamId, receiverStarted.receiverInfo)
+ }
+ }
+
+ override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) = synchronized {
+ runningBatchInfos(batchSubmitted.batchInfo.batchTime) = batchSubmitted.batchInfo
+ }
+
+ override def onBatchStarted(batchStarted: StreamingListenerBatchStarted) = synchronized {
+ runningBatchInfos(batchStarted.batchInfo.batchTime) = batchStarted.batchInfo
+ waitingBatchInfos.remove(batchStarted.batchInfo.batchTime)
+ }
+
+ override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) = synchronized {
+ waitingBatchInfos.remove(batchCompleted.batchInfo.batchTime)
+ runningBatchInfos.remove(batchCompleted.batchInfo.batchTime)
+ completedaBatchInfos.enqueue(batchCompleted.batchInfo)
+ if (completedaBatchInfos.size > batchInfoLimit) completedaBatchInfos.dequeue()
+ totalCompletedBatches += 1L
+ }
+
+ def numNetworkReceivers = synchronized {
+ ssc.graph.getNetworkInputStreams().size
+ }
+
+ def numTotalCompletedBatches: Long = synchronized {
+ totalCompletedBatches
+ }
+
+ def numUnprocessedBatches: Long = synchronized {
+ waitingBatchInfos.size + runningBatchInfos.size
+ }
+
+ def waitingBatches: Seq[BatchInfo] = synchronized {
+ waitingBatchInfos.values.toSeq
+ }
+
+ def runningBatches: Seq[BatchInfo] = synchronized {
+ runningBatchInfos.values.toSeq
+ }
+
+ def completedBatches: Seq[BatchInfo] = synchronized {
+ completedaBatchInfos.toSeq
+ }
+
+ def processingDelayDistribution: Option[Distribution] = synchronized {
+ extractDistribution(_.processingDelay)
+ }
+
+ def schedulingDelayDistribution: Option[Distribution] = synchronized {
+ extractDistribution(_.schedulingDelay)
+ }
+
+ def totalDelayDistribution: Option[Distribution] = synchronized {
+ extractDistribution(_.totalDelay)
+ }
+
+ def receivedRecordsDistributions: Map[Int, Option[Distribution]] = synchronized {
+ val latestBatchInfos = allBatches.reverse.take(batchInfoLimit)
+ val latestBlockInfos = latestBatchInfos.map(_.receivedBlockInfo)
+ (0 until numNetworkReceivers).map { receiverId =>
+ val blockInfoOfParticularReceiver = latestBlockInfos.map { batchInfo =>
+ batchInfo.get(receiverId).getOrElse(Array.empty)
+ }
+ val recordsOfParticularReceiver = blockInfoOfParticularReceiver.map { blockInfo =>
+ // calculate records per second for each batch
+ blockInfo.map(_.numRecords).sum.toDouble * 1000 / batchDuration
+ }
+ val distributionOption = Distribution(recordsOfParticularReceiver)
+ (receiverId, distributionOption)
+ }.toMap
+ }
+
+ def lastReceivedBatchRecords: Map[Int, Long] = {
+ val lastReceivedBlockInfoOption = lastReceivedBatch.map(_.receivedBlockInfo)
+ lastReceivedBlockInfoOption.map { lastReceivedBlockInfo =>
+ (0 until numNetworkReceivers).map { receiverId =>
+ (receiverId, lastReceivedBlockInfo(receiverId).map(_.numRecords).sum)
+ }.toMap
+ }.getOrElse {
+ (0 until numNetworkReceivers).map(receiverId => (receiverId, 0L)).toMap
+ }
+ }
+
+ def receiverInfo(receiverId: Int): Option[ReceiverInfo] = {
+ receiverInfos.get(receiverId)
+ }
+
+ def lastCompletedBatch: Option[BatchInfo] = {
+ completedaBatchInfos.sortBy(_.batchTime)(Time.ordering).lastOption
+ }
+
+ def lastReceivedBatch: Option[BatchInfo] = {
+ allBatches.lastOption
+ }
+
+ private def allBatches: Seq[BatchInfo] = synchronized {
+ (waitingBatchInfos.values.toSeq ++
+ runningBatchInfos.values.toSeq ++ completedaBatchInfos).sortBy(_.batchTime)(Time.ordering)
+ }
+
+ private def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = {
+ Distribution(completedaBatchInfos.flatMap(getMetric(_)).map(_.toDouble))
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala
index 9a3cd8058e338..81f883dadc798 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala
@@ -17,159 +17,43 @@
package org.apache.spark.streaming.ui
-import scala.collection.mutable.{HashMap, Queue}
-import scala.xml.Node
-
-import javax.servlet.http.HttpServletRequest
-import org.eclipse.jetty.servlet.ServletContextHandler
-
-import org.apache.spark.{Logging, SparkConf}
-import org.apache.spark.streaming.{StreamingContext, Time}
-import org.apache.spark.streaming.scheduler._
-import org.apache.spark.ui.{ServerInfo, SparkUI}
-import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.util.{Distribution, Utils}
import java.util.{Calendar, Locale}
+import javax.servlet.http.HttpServletRequest
-private[ui] class StreamingUIListener(ssc: StreamingContext) extends StreamingListener {
-
- private val waitingBatchInfos = new HashMap[Time, BatchInfo]
- private val runningBatchInfos = new HashMap[Time, BatchInfo]
- private val completedaBatchInfos = new Queue[BatchInfo]
- private val batchInfoLimit = ssc.conf.getInt("spark.steaming.ui.maxBatches", 100)
- private var totalCompletedBatches = 0L
- private val receiverInfos = new HashMap[Int, ReceiverInfo]
-
- val batchDuration = ssc.graph.batchDuration.milliseconds
-
- override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) = {
- synchronized {
- receiverInfos.put(receiverStarted.receiverInfo.streamId, receiverStarted.receiverInfo)
- }
- }
-
- override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) = synchronized {
- runningBatchInfos(batchSubmitted.batchInfo.batchTime) = batchSubmitted.batchInfo
- }
-
- override def onBatchStarted(batchStarted: StreamingListenerBatchStarted) = synchronized {
- runningBatchInfos(batchStarted.batchInfo.batchTime) = batchStarted.batchInfo
- waitingBatchInfos.remove(batchStarted.batchInfo.batchTime)
- }
-
- override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) = synchronized {
- waitingBatchInfos.remove(batchCompleted.batchInfo.batchTime)
- runningBatchInfos.remove(batchCompleted.batchInfo.batchTime)
- completedaBatchInfos.enqueue(batchCompleted.batchInfo)
- if (completedaBatchInfos.size > batchInfoLimit) completedaBatchInfos.dequeue()
- totalCompletedBatches += 1L
- }
-
- def numNetworkReceivers = synchronized {
- ssc.graph.getNetworkInputStreams().size
- }
-
- def numTotalCompletedBatches: Long = synchronized {
- totalCompletedBatches
- }
-
- def numUnprocessedBatches: Long = synchronized {
- waitingBatchInfos.size + runningBatchInfos.size
- }
-
- def waitingBatches: Seq[BatchInfo] = synchronized {
- waitingBatchInfos.values.toSeq
- }
-
- def runningBatches: Seq[BatchInfo] = synchronized {
- runningBatchInfos.values.toSeq
- }
-
- def completedBatches: Seq[BatchInfo] = synchronized {
- completedaBatchInfos.toSeq
- }
-
- def processingDelayDistribution: Option[Distribution] = synchronized {
- extractDistribution(_.processingDelay)
- }
-
- def schedulingDelayDistribution: Option[Distribution] = synchronized {
- extractDistribution(_.schedulingDelay)
- }
-
- def totalDelayDistribution: Option[Distribution] = synchronized {
- extractDistribution(_.totalDelay)
- }
-
- def receivedRecordsDistributions: Map[Int, Option[Distribution]] = synchronized {
- val latestBatchInfos = allBatches.reverse.take(batchInfoLimit)
- val latestBlockInfos = latestBatchInfos.map(_.receivedBlockInfo)
- (0 until numNetworkReceivers).map { receiverId =>
- val blockInfoOfParticularReceiver = latestBlockInfos.map { batchInfo =>
- batchInfo.get(receiverId).getOrElse(Array.empty)
- }
- val recordsOfParticularReceiver = blockInfoOfParticularReceiver.map { blockInfo =>
- // calculate records per second for each batch
- blockInfo.map(_.numRecords).sum.toDouble * 1000 / batchDuration
- }
- val distributionOption = Distribution(recordsOfParticularReceiver)
- (receiverId, distributionOption)
- }.toMap
- }
-
- def lastReceivedBatchRecords: Map[Int, Long] = {
- val lastReceivedBlockInfoOption = lastReceivedBatch.map(_.receivedBlockInfo)
- lastReceivedBlockInfoOption.map { lastReceivedBlockInfo =>
- (0 until numNetworkReceivers).map { receiverId =>
- (receiverId, lastReceivedBlockInfo(receiverId).map(_.numRecords).sum)
- }.toMap
- }.getOrElse {
- (0 until numNetworkReceivers).map(receiverId => (receiverId, 0L)).toMap
- }
- }
-
- def receiverInfo(receiverId: Int): Option[ReceiverInfo] = {
- receiverInfos.get(receiverId)
- }
-
- def lastCompletedBatch: Option[BatchInfo] = {
- completedaBatchInfos.sortBy(_.batchTime)(Time.ordering).lastOption
- }
-
- def lastReceivedBatch: Option[BatchInfo] = {
- allBatches.lastOption
- }
-
- private def allBatches: Seq[BatchInfo] = synchronized {
- (waitingBatchInfos.values.toSeq ++
- runningBatchInfos.values.toSeq ++ completedaBatchInfos).sortBy(_.batchTime)(Time.ordering)
- }
-
- private def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = {
- Distribution(completedaBatchInfos.flatMap(getMetric(_)).map(_.toDouble))
- }
-}
+import scala.xml.Node
+import org.apache.spark.Logging
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.ui._
+import org.apache.spark.util.Distribution
-private[ui] class StreamingPage(parent: StreamingUI) extends Logging {
+private[ui] class StreamingPage(parent: StreamingTab)
+ extends UIPage("") with Logging {
- private val listener = parent.listener
+ private val ssc = parent.ssc
+ private val sc = ssc.sparkContext
+ private val sparkUI = sc.ui
+ private val listener = new StreamingProgressListener(ssc)
private val calendar = Calendar.getInstance()
private val startTime = calendar.getTime()
private val emptyCellTest = "-"
- def render(request: HttpServletRequest): Seq[Node] = {
+ ssc.addStreamingListener(listener)
+ parent.attachPage(this)
+ /** Render the page */
+ override def render(request: HttpServletRequest): Seq[Node] = {
val content =
- generateBasicStats() ++
- Statistics over last {listener.completedBatches.size} processed batches ++
- generateNetworkStatsTable() ++
- generateBatchStatsTable()
- UIUtils.headerStreamingPage(content, "", parent.appName, "Spark Streaming Overview")
+ generateBasicStats() ++
+ Statistics over last {listener.completedBatches.size} processed batches ++
+ generateNetworkStatsTable() ++
+ generateBatchStatsTable()
+ UIUtils.headerSparkPage(
+ content, sparkUI.basePath, sc.appName, "Streaming", sparkUI.getTabs, parent, Some(5000))
}
+ /** Generate basic stats of the streaming program */
private def generateBasicStats(): Seq[Node] = {
-
val timeSinceStart = System.currentTimeMillis() - startTime.getTime
@@ -193,6 +77,7 @@ private[ui] class StreamingPage(parent: StreamingUI) extends Logging {
}
+ /** Generate stats of data received over the network the streaming program */
private def generateNetworkStatsTable(): Seq[Node] = {
val receivedRecordDistributions = listener.receivedRecordsDistributions
val lastBatchReceivedRecord = listener.lastReceivedBatchRecords
@@ -201,11 +86,11 @@ private[ui] class StreamingPage(parent: StreamingUI) extends Logging {
"Receiver",
"Location",
s"Records in last batch",
- "Minimum rate [records/sec]",
- "25th percentile rate [records/sec]",
- "Median rate [records/sec]",
- "75th percentile rate [records/sec]",
- "Maximum rate [records/sec]"
+ "Minimum rate\n[records/sec]",
+ "25th percentile rate\n[records/sec]",
+ "Median rate\n[records/sec]",
+ "75th percentile rate\n[records/sec]",
+ "Maximum rate\n[records/sec]"
)
val dataRows = (0 until listener.numNetworkReceivers).map { receiverId =>
val receiverInfo = listener.receiverInfo(receiverId)
@@ -220,7 +105,7 @@ private[ui] class StreamingPage(parent: StreamingUI) extends Logging {
Seq(receiverName, receiverLocation, receiverLastBatchRecords) ++
receivedRecordStats
}
- Some(UIUtils.listingTable(headerRow, dataRows, fixedWidth = true))
+ Some(listingTable(headerRow, dataRows, fixedWidth = true))
} else {
None
}
@@ -232,6 +117,7 @@ private[ui] class StreamingPage(parent: StreamingUI) extends Logging {
content
}
+ /** Generate stats of batch jobs of the streaming program */
private def generateBatchStatsTable(): Seq[Node] = {
val numBatches = listener.completedBatches.size
val lastCompletedBatch = listener.lastCompletedBatch
@@ -261,7 +147,7 @@ private[ui] class StreamingPage(parent: StreamingUI) extends Logging {
schedulingDelayQuantilesRow,
totalDelayQuantilesRow
)
- Some(UIUtils.listingTable(headerRow, dataRows, fixedWidth = true))
+ Some(listingTable(headerRow, dataRows, fixedWidth = true))
} else {
None
}
@@ -277,10 +163,9 @@ private[ui] class StreamingPage(parent: StreamingUI) extends Logging {
content
}
- private def getQuantiles(timeDistributionOption: Option[Distribution]) = {
- timeDistributionOption.get.getQuantiles().map { ms => msDurationToString(ms.toLong) }
- }
-
+ /**
+ * Returns a human-readable string representing a number
+ */
private def numberToString(records: Double): String = {
val trillion = 1e12
val billion = 1e9
@@ -353,50 +238,61 @@ private[ui] class StreamingPage(parent: StreamingUI) extends Logging {
}
}
+ /**
+ * Returns a human-readable string representing a duration such as "5 second 35 ms"
+ */
private def msDurationToString(msOption: Option[Long]): String = {
msOption.map(msDurationToString).getOrElse(emptyCellTest)
}
-}
-
-private[spark] class StreamingUI(val ssc: StreamingContext) extends Logging {
-
- val sc = ssc.sparkContext
- val conf = sc.conf
- val appName = sc.appName
- val listener = new StreamingUIListener(ssc)
- val overviewPage = new StreamingPage(this)
-
- private val bindHost = Utils.localHostName()
- private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(bindHost)
- private val port = conf.getInt("spark.streaming.ui.port", StreamingUI.DEFAULT_PORT)
- private val securityManager = sc.env.securityManager
- private val handlers: Seq[ServletContextHandler] = {
- Seq(
- createServletHandler("/",
- (request: HttpServletRequest) => overviewPage.render(request), securityManager),
- createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static")
- )
+ /** Get quantiles for any time distribution */
+ private def getQuantiles(timeDistributionOption: Option[Distribution]) = {
+ timeDistributionOption.get.getQuantiles().map { ms => msDurationToString(ms.toLong) }
}
- private var serverInfo: Option[ServerInfo] = None
+ /** Generate an HTML table constructed by generating a row for each object in a sequence. */
+ def listingTable[T](
+ headerRow: Seq[String],
+ dataRows: Seq[Seq[String]],
+ fixedWidth: Boolean = false
+ ): Seq[Node] = {
+
+ val colWidth = 100.toDouble / headerRow.size
+ val colWidthAttr = if (fixedWidth) colWidth + "%" else ""
+ var tableClass = "table table-bordered table-striped table-condensed sortable"
+ if (fixedWidth) {
+ tableClass += " table-fixed"
+ }
- ssc.addStreamingListener(listener)
+ def generateHeaderRow(header: Seq[String]): Seq[Node] = {
+ headerRow.map { case h =>
+
+
+ { h.split("\n").map { case t => {t} } }
+
+
+ }
+ }
- def bind() {
- try {
- serverInfo = Some(startJettyServer(bindHost, port, handlers, sc.conf))
- logInfo("Started Spark Streaming Web UI at http://%s:%d".format(publicHost, boundPort))
- } catch {
- case e: Exception =>
- logError("Failed to create Spark JettyUtils", e)
- System.exit(1)
+ def generateDataRow(data: Seq[String]): Seq[Node] = {
+
{data.map(d => {d} )}
}
- }
- private def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1)
+
+ {generateHeaderRow(headerRow)}
+
+ {dataRows.map(r => generateDataRow(r))}
+
+
+ }
}
-object StreamingUI {
- val DEFAULT_PORT = 6060
+private[spark] class StreamingTab(val ssc: StreamingContext)
+ extends UITab("streaming") with Logging {
+
+ val streamingPage = new StreamingPage(this)
+ ssc.sc.ui.attachTab(this)
+
+ /** Initialize listener and attach pages. */
+ def start() { }
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
deleted file mode 100644
index 4063ce3d7ca44..0000000000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.ui
-
-import scala.xml.Node
-
-private[spark] object UIUtils {
-
- import org.apache.spark.ui.UIUtils.prependBaseUri
-
- def headerStreamingPage(
- content: => Seq[Node],
- basePath: String,
- appName: String,
- title: String): Seq[Node] = {
- val overview = {
-
Overview
- }
-
-
-
-
-
-
-
-
{appName} - {title}
-
-
-
-
-
-
-
-
-
-
{appName} application UI
-
-
-
-
-
-
- }
-
- def listingTable[T](
- headers: Seq[String],
- makeRow: T => Seq[Node],
- rows: Seq[T],
- fixedWidth: Boolean = false): Seq[Node] = {
- org.apache.spark.ui.UIUtils.listingTable(headers, makeRow, rows, fixedWidth)
- }
-
- def listingTable[T](
- headers: Seq[String],
- rows: Seq[Seq[String]],
- fixedWidth: Boolean = false
- ): Seq[Node] = {
- def makeRow(data: Seq[String]): Seq[Node] =
{data.map(d => {d} )}
- org.apache.spark.ui.UIUtils.listingTable(headers, makeRow, rows, fixedWidth)
- }
-}