From d78672aaf070ae3e4d4571b7016a2bdebba932c0 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Thu, 23 Apr 2015 18:09:24 +0800 Subject: [PATCH] Make the X axis use the same range --- .../apache/spark/ui/static/streaming-page.js | 14 ++- .../spark/streaming/ui/StreamingPage.scala | 101 ++++++++++-------- 2 files changed, 68 insertions(+), 47 deletions(-) diff --git a/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js b/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js index e689583ab404c..5557a6288534c 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js +++ b/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js @@ -16,6 +16,10 @@ */ +var globalMinX = 0; +var globalMaxX = 0; +var binCount = 10; + // An invisible div to show details of a point in the graph var graphTooltip = d3.select("body").append("div") .style("position", "absolute") @@ -137,14 +141,14 @@ function drawDistribution(id, values, minY, maxY, unitY) { var height = 150 - margin.top - margin.bottom; //var binCount = values.length > 100 ? 100 : values.length; - var binCount = 10; var formatBinValue = d3.format(",.2f"); var y = d3.scale.linear().domain([minY, maxY]).range([height, 0]); var data = d3.layout.histogram().range([minY, maxY]).bins(binCount)(values); var x = d3.scale.linear() - .domain([0, d3.max(data, function(d) { return d.y; })]) + .domain([globalMinX, globalMaxX]) + //.domain([0, d3.max(data, function(d) { return d.y; })]) .range([0, width]); var xAxis = d3.svg.axis().scale(x).orient("bottom").ticks(5); @@ -197,3 +201,9 @@ function drawDistribution(id, values, minY, maxY, unitY) { hideGraphTooltip(); }); } + +function prepareDistribution(values, minY, maxY) { + var data = d3.layout.histogram().range([minY, maxY]).bins(binCount)(values); + var maxBarSize = d3.max(data, function(d) { return d.y; }); + globalMaxX = maxBarSize > globalMaxX? maxBarSize : globalMaxX; +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala index 2baccafa00993..e4ad4b25bb53f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala @@ -37,13 +37,14 @@ import org.apache.spark.util.Distribution private[ui] case class TimelineUIData(divId: String, data: Seq[(Long, _)], minX: Long, maxX: Long, minY: Long, maxY: Long, unitY: String) { - def toHtmlAndJs: (Seq[Node], String) = { + def toHtml(jsCollector: JsCollector): Seq[Node] = { val jsForData = data.map { case (x, y) => s"""{"x": $x, "y": $y}""" }.mkString("[", ",", "]") - - (
, + jsCollector.addStatement( s"drawTimeline('#$divId', $jsForData, $minX, $maxX, $minY, $maxY, '$unitY');") + +
} } @@ -57,11 +58,12 @@ private[ui] case class TimelineUIData(divId: String, data: Seq[(Long, _)], minX: private[ui] case class DistributionUIData( divId: String, data: Seq[_], minY: Long, maxY: Long, unitY: String) { - def toHtmlAndJs: (Seq[Node], String) = { + def toHtml(jsCollector: JsCollector): Seq[Node] = { val jsForData = data.mkString("[", ",", "]") + jsCollector.addPreparedStatement(s"prepareDistribution($jsForData, $minY, $maxY);") + jsCollector.addStatement(s"drawDistribution('#$divId', $jsForData, $minY, $maxY, '$unitY');") - (
, - s"drawDistribution('#$divId', $jsForData, $minY, $maxY, '$unitY');") +
} } @@ -149,7 +151,7 @@ private[ui] class StreamingPage(parent: StreamingTab) batchInfo.totalDelay.map(batchInfo.batchTime.milliseconds -> _) }) - val jsCollector = ArrayBuffer[String]() + val jsCollector = new JsCollector // Use the max value of "schedulingDelay", "processingTime", and "totalDelay" to make the // Y axis ranges same. @@ -179,8 +181,7 @@ private[ui] class StreamingPage(parent: StreamingTab) maxBatchTime, minEventRate, maxEventRate, - "events/sec").toHtmlAndJs - jsCollector += timelineDataForEventRateOfAllReceivers._2 + "events/sec").toHtml(jsCollector) val distributionDataForEventRateOfAllReceivers = DistributionUIData( @@ -188,8 +189,7 @@ private[ui] class StreamingPage(parent: StreamingTab) eventRateForAllReceivers.data.map(_._2), minEventRate, maxEventRate, - "events/sec").toHtmlAndJs - jsCollector += distributionDataForEventRateOfAllReceivers._2 + "events/sec").toHtml(jsCollector) val timelineDataForSchedulingDelay = TimelineUIData( @@ -199,8 +199,7 @@ private[ui] class StreamingPage(parent: StreamingTab) maxBatchTime, minTime, maxTime, - "ms").toHtmlAndJs - jsCollector += timelineDataForSchedulingDelay._2 + "ms").toHtml(jsCollector) val distributionDataForSchedulingDelay = DistributionUIData( @@ -208,8 +207,7 @@ private[ui] class StreamingPage(parent: StreamingTab) schedulingDelay.data.map(_._2), minTime, maxTime, - "ms").toHtmlAndJs - jsCollector += distributionDataForSchedulingDelay._2 + "ms").toHtml(jsCollector) val timelineDataForProcessingTime = TimelineUIData( @@ -219,8 +217,7 @@ private[ui] class StreamingPage(parent: StreamingTab) maxBatchTime, minTime, maxTime, - "ms").toHtmlAndJs - jsCollector += timelineDataForProcessingTime._2 + "ms").toHtml(jsCollector) val distributionDataForProcessingTime = DistributionUIData( @@ -228,8 +225,7 @@ private[ui] class StreamingPage(parent: StreamingTab) processingTime.data.map(_._2), minTime, maxTime, - "ms").toHtmlAndJs - jsCollector += distributionDataForProcessingTime._2 + "ms").toHtml(jsCollector) val timelineDataForTotalDelay = TimelineUIData( @@ -239,8 +235,7 @@ private[ui] class StreamingPage(parent: StreamingTab) maxBatchTime, minTime, maxTime, - "ms").toHtmlAndJs - jsCollector += timelineDataForTotalDelay._2 + "ms").toHtml(jsCollector) val distributionDataForTotalDelay = DistributionUIData( @@ -248,8 +243,7 @@ private[ui] class StreamingPage(parent: StreamingTab) totalDelay.data.map(_._2), minTime, maxTime, - "ms").toHtmlAndJs - jsCollector += distributionDataForTotalDelay._2 + "ms").toHtml(jsCollector) val table = // scalastyle:off @@ -266,8 +260,8 @@ private[ui] class StreamingPage(parent: StreamingTab)
Avg: {eventRateForAllReceivers.avg.map(_.formatted("%.2f")).getOrElse(emptyCell)} events/sec
- {timelineDataForEventRateOfAllReceivers._1} - {distributionDataForEventRateOfAllReceivers._1} + {timelineDataForEventRateOfAllReceivers} + {distributionDataForEventRateOfAllReceivers} @@ -279,39 +273,34 @@ private[ui] class StreamingPage(parent: StreamingTab)
Scheduling Delay
Avg: {formatDurationOption(schedulingDelay.avg)}
- {timelineDataForSchedulingDelay._1} - {distributionDataForSchedulingDelay._1} + {timelineDataForSchedulingDelay} + {distributionDataForSchedulingDelay}
Processing Time
Avg: {formatDurationOption(processingTime.avg)}
- {timelineDataForProcessingTime._1} - {distributionDataForProcessingTime._1} + {timelineDataForProcessingTime} + {distributionDataForProcessingTime}
Total Delay
Avg: {formatDurationOption(totalDelay.avg)}
- {timelineDataForTotalDelay._1} - {distributionDataForTotalDelay._1} + {timelineDataForTotalDelay} + {distributionDataForTotalDelay} // scalastyle:on - val js = - s""" - |$$(document).ready(function(){ - | ${jsCollector.mkString("\n")} - |});""".stripMargin - table ++ + table ++ jsCollector.toHtml } private def generateInputReceiversTable( - jsCollector: ArrayBuffer[String], + jsCollector: JsCollector, minX: Long, maxX: Long, minY: Long, @@ -337,7 +326,7 @@ private[ui] class StreamingPage(parent: StreamingTab) } private def generateInputReceiverRow( - jsCollector: ArrayBuffer[String], + jsCollector: JsCollector, receiverId: Int, distribution: Option[Distribution], minX: Long, @@ -367,8 +356,7 @@ private[ui] class StreamingPage(parent: StreamingTab) maxX, minY, maxY, - "events/sec").toHtmlAndJs - jsCollector += timelineForEventRate._2 + "events/sec").toHtml(jsCollector) val distributionForEventsRate = DistributionUIData( @@ -376,8 +364,7 @@ private[ui] class StreamingPage(parent: StreamingTab) receivedRecords.map(_._2), minY, maxY, - "events/sec").toHtmlAndJs - jsCollector += distributionForEventsRate._2 + "events/sec").toHtml(jsCollector) // scalastyle:off @@ -394,9 +381,9 @@ private[ui] class StreamingPage(parent: StreamingTab) - {timelineForEventRate._1} + {timelineForEventRate} - {distributionForEventsRate._1} + {distributionForEventsRate} // scalastyle:on } @@ -435,3 +422,27 @@ private object StreamingPage { val BLACK_DOWN_TRIANGLE_HTML = "▼" } +private[ui] class JsCollector { + private val preparedStatements = ArrayBuffer[String]() + private val statements = ArrayBuffer[String]() + + def addPreparedStatement(js: String): Unit = { + preparedStatements += js + } + + def addStatement(js: String): Unit = { + statements += js + } + + def toHtml: Seq[Node] = { + val js = + s""" + |$$(document).ready(function(){ + | ${preparedStatements.mkString("\n")} + | ${statements.mkString("\n")} + |});""".stripMargin + + + } +} +