diff --git a/LICENSE b/LICENSE
index 21c42e9a20fa3..b2001f029a4f0 100644
+For d3 (core/src/main/resources/org/apache/spark/ui/static/d3.min.js):
+Copyright (c) 2010-2015, Michael Bostock
+All rights reserved.
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+* Redistributions of source code must retain the above copyright notice, this
+ list of conditions and the following disclaimer.
+* Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+* The name Michael Bostock may not be used to endorse or promote products
+ derived from this software without specific prior written permission.
For Scala Interpreter classes (all .scala files in repl/src/main/scala
diff --git a/core/src/main/resources/org/apache/spark/ui/static/bootstrap-tooltip.js b/core/src/main/resources/org/apache/spark/ui/static/bootstrap-tooltip.js
index 2934181c1006a..acd6096e6743e 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/bootstrap-tooltip.js
+++ b/core/src/main/resources/org/apache/spark/ui/static/bootstrap-tooltip.js
@@ -1,9 +1,9 @@
/* ===========================================================
- * bootstrap-tooltip.js v2.2.2
- * http://twitter.github.com/bootstrap/javascript.html#tooltips
+ * bootstrap-tooltip.js v2.3.2
+ * http://getbootstrap.com/2.3.2/javascript.html#tooltips
* Inspired by the original jQuery.tipsy by Jason Frame
* ===========================================================
- * Copyright 2012 Twitter, Inc.
+ * Copyright 2013 Twitter, Inc.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -38,19 +38,27 @@
, init: function (type, element, options) {
var eventIn
, eventOut
+ , triggers
+ , trigger
+ , i
this.type = type
this.$element = $(element)
this.options = this.getOptions(options)
this.enabled = true
- if (this.options.trigger == 'click') {
- this.$element.on('click.' + this.type, this.options.selector, $.proxy(this.toggle, this))
- } else if (this.options.trigger != 'manual') {
- eventIn = this.options.trigger == 'hover' ? 'mouseenter' : 'focus'
- eventOut = this.options.trigger == 'hover' ? 'mouseleave' : 'blur'
- this.$element.on(eventIn + '.' + this.type, this.options.selector, $.proxy(this.enter, this))
- this.$element.on(eventOut + '.' + this.type, this.options.selector, $.proxy(this.leave, this))
+ triggers = this.options.trigger.split(' ')
+ for (i = triggers.length; i--;) {
+ trigger = triggers[i]
+ if (trigger == 'click') {
+ this.$element.on('click.' + this.type, this.options.selector, $.proxy(this.toggle, this))
+ } else if (trigger != 'manual') {
+ eventIn = trigger == 'hover' ? 'mouseenter' : 'focus'
+ eventOut = trigger == 'hover' ? 'mouseleave' : 'blur'
+ this.$element.on(eventIn + '.' + this.type, this.options.selector, $.proxy(this.enter, this))
+ this.$element.on(eventOut + '.' + this.type, this.options.selector, $.proxy(this.leave, this))
+ }
this.options.selector ?
@@ -59,7 +67,7 @@
, getOptions: function (options) {
- options = $.extend({}, $.fn[this.type].defaults, options, this.$element.data())
+ options = $.extend({}, $.fn[this.type].defaults, this.$element.data(), options)
if (options.delay && typeof options.delay == 'number') {
options.delay = {
@@ -72,7 +80,15 @@
, enter: function (e) {
- var self = $(e.currentTarget)[this.type](this._options).data(this.type)
+ var defaults = $.fn[this.type].defaults
+ , options = {}
+ , self
+ this._options && $.each(this._options, function (key, value) {
+ if (defaults[key] != value) options[key] = value
+ }, this)
+ self = $(e.currentTarget)[this.type](options).data(this.type)
if (!self.options.delay || !self.options.delay.show) return self.show()
@@ -97,14 +113,16 @@
, show: function () {
var $tip
- , inside
, pos
, actualWidth
, actualHeight
, placement
, tp
+ , e = $.Event('show')
if (this.hasContent() && this.enabled) {
+ this.$element.trigger(e)
+ if (e.isDefaultPrevented()) return
$tip = this.tip()
@@ -116,19 +134,18 @@
this.options.placement.call(this, $tip[0], this.$element[0]) :
- inside = /in/.test(placement)
.css({ top: 0, left: 0, display: 'block' })
- .insertAfter(this.$element)
- pos = this.getPosition(inside)
+ this.options.container ? $tip.appendTo(this.options.container) : $tip.insertAfter(this.$element)
+ pos = this.getPosition()
actualWidth = $tip[0].offsetWidth
actualHeight = $tip[0].offsetHeight
- switch (inside ? placement.split(' ')[1] : placement) {
+ switch (placement) {
case 'bottom':
tp = {top: pos.top + pos.height, left: pos.left + pos.width / 2 - actualWidth / 2}
@@ -143,11 +160,56 @@
- $tip
- .offset(tp)
- .addClass(placement)
- .addClass('in')
+ this.applyPlacement(tp, placement)
+ this.$element.trigger('shown')
+ }
+ }
+ , applyPlacement: function(offset, placement){
+ var $tip = this.tip()
+ , width = $tip[0].offsetWidth
+ , height = $tip[0].offsetHeight
+ , actualWidth
+ , actualHeight
+ , delta
+ , replace
+ $tip
+ .offset(offset)
+ .addClass(placement)
+ .addClass('in')
+ actualWidth = $tip[0].offsetWidth
+ actualHeight = $tip[0].offsetHeight
+ if (placement == 'top' && actualHeight != height) {
+ offset.top = offset.top + height - actualHeight
+ replace = true
+ }
+ if (placement == 'bottom' || placement == 'top') {
+ delta = 0
+ if (offset.left < 0){
+ delta = offset.left * -2
+ offset.left = 0
+ $tip.offset(offset)
+ actualWidth = $tip[0].offsetWidth
+ actualHeight = $tip[0].offsetHeight
+ }
+ this.replaceArrow(delta - width + actualWidth, actualWidth, 'left')
+ } else {
+ this.replaceArrow(actualHeight - height, actualHeight, 'top')
+ if (replace) $tip.offset(offset)
+ }
+ , replaceArrow: function(delta, dimension, position){
+ this
+ .arrow()
+ .css(position, delta ? (50 * (1 - delta / dimension) + "%") : '')
, setContent: function () {
@@ -161,6 +223,10 @@
, hide: function () {
var that = this
, $tip = this.tip()
+ , e = $.Event('hide')
+ this.$element.trigger(e)
+ if (e.isDefaultPrevented()) return
@@ -179,6 +245,8 @@
removeWithAnimation() :
+ this.$element.trigger('hidden')
return this
@@ -193,11 +261,12 @@
return this.getTitle()
- , getPosition: function (inside) {
- return $.extend({}, (inside ? {top: 0, left: 0} : this.$element.offset()), {
- width: this.$element[0].offsetWidth
- , height: this.$element[0].offsetHeight
- })
+ , getPosition: function () {
+ var el = this.$element[0]
+ return $.extend({}, (typeof el.getBoundingClientRect == 'function') ? el.getBoundingClientRect() : {
+ width: el.offsetWidth
+ , height: el.offsetHeight
+ }, this.$element.offset())
, getTitle: function () {
@@ -215,6 +284,10 @@
return this.$tip = this.$tip || $(this.options.template)
+ , arrow: function(){
+ return this.$arrow = this.$arrow || this.tip().find(".tooltip-arrow")
+ }
, validate: function () {
if (!this.$element[0].parentNode) {
@@ -236,8 +309,8 @@
, toggle: function (e) {
- var self = $(e.currentTarget)[this.type](this._options).data(this.type)
- self[self.tip().hasClass('in') ? 'hide' : 'show']()
+ var self = e ? $(e.currentTarget)[this.type](this._options).data(this.type) : this
+ self.tip().hasClass('in') ? self.hide() : self.show()
, destroy: function () {
@@ -269,10 +342,11 @@
, placement: 'top'
, selector: false
, template: '
- , trigger: 'hover'
+ , trigger: 'hover focus'
, title: ''
, delay: 0
, html: false
+ , container: false
@@ -285,4 +359,3 @@
diff --git a/core/src/main/resources/org/apache/spark/ui/static/streaming-page.css b/core/src/main/resources/org/apache/spark/ui/static/streaming-page.css
new file mode 100644
index 0000000000000..5da9d631ad124
--- /dev/null
+++ b/core/src/main/resources/org/apache/spark/ui/static/streaming-page.css
@@ -0,0 +1,58 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+.graph {
+ font: 10px sans-serif;
+.axis path, .axis line {
+ fill: none;
+ stroke: gray;
+ shape-rendering: crispEdges;
+.axis text {
+ fill: gray;
+.tooltip-inner {
+ max-width: 500px !important; // Make sure we only have one line tooltip
+.line {
+ fill: none;
+ stroke: #0088cc;
+ stroke-width: 1.5px;
+.bar rect {
+ fill: #0088cc;
+ shape-rendering: crispEdges;
+.bar rect:hover {
+ fill: #00c2ff;
+.timeline {
+ width: 500px;
+.histogram {
+ width: auto;
diff --git a/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js b/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js
new file mode 100644
index 0000000000000..a4e03b156f13e
--- /dev/null
+++ b/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js
@@ -0,0 +1,274 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// timeFormat: StreamingPage.scala will generate a global "timeFormat" dictionary to store the time
+// and its formatted string. Because we cannot specify a timezone in JavaScript, to make sure the
+// server and client use the same timezone, we use the "timeFormat" dictionary to format all time
+// values used in the graphs.
+// A global margin left for all timeline graphs. It will be set in "registerTimeline". This will be
+// used to align all timeline graphs.
+var maxMarginLeftForTimeline = 0;
+// The max X values for all histograms. It will be set in "registerHistogram".
+var maxXForHistogram = 0;
+var histogramBinCount = 10;
+var yValueFormat = d3.format(",.2f");
+// Show a tooltip "text" for "node"
+function showBootstrapTooltip(node, text) {
+ $(node).tooltip({title: text, trigger: "manual", container: "body"});
+ $(node).tooltip("show");
+// Hide the tooltip for "node"
+function hideBootstrapTooltip(node) {
+ $(node).tooltip("destroy");
+// Register a timeline graph. All timeline graphs should be register before calling any
+// "drawTimeline" so that we can determine the max margin left for all timeline graphs.
+function registerTimeline(minY, maxY) {
+ var numOfChars = yValueFormat(maxY).length;
+ // A least width for "maxY" in the graph
+ var pxForMaxY = numOfChars * 8 + 10;
+ // Make sure we have enough space to show the ticks in the y axis of timeline
+ maxMarginLeftForTimeline = pxForMaxY > maxMarginLeftForTimeline? pxForMaxY : maxMarginLeftForTimeline;
+// Register a histogram graph. All histogram graphs should be register before calling any
+// "drawHistogram" so that we can determine the max X value for histograms.
+function registerHistogram(values, minY, maxY) {
+ var data = d3.layout.histogram().range([minY, maxY]).bins(histogramBinCount)(values);
+ // d.x is the y values while d.y is the x values
+ var maxX = d3.max(data, function(d) { return d.y; });
+ maxXForHistogram = maxX > maxXForHistogram ? maxX : maxXForHistogram;
+// Draw a line between (x1, y1) and (x2, y2)
+function drawLine(svg, xFunc, yFunc, x1, y1, x2, y2) {
+ var line = d3.svg.line()
+ .x(function(d) { return xFunc(d.x); })
+ .y(function(d) { return yFunc(d.y); });
+ var data = [{x: x1, y: y1}, {x: x2, y: y2}];
+ svg.append("path")
+ .datum(data)
+ .style("stroke-dasharray", ("6, 6"))
+ .style("stroke", "lightblue")
+ .attr("class", "line")
+ .attr("d", line);
+ * @param id the `id` used in the html `div` tag
+ * @param data the data for the timeline graph
+ * @param minX the min value of X axis
+ * @param maxX the max value of X axis
+ * @param minY the min value of Y axis
+ * @param maxY the max value of Y axis
+ * @param unitY the unit of Y axis
+ * @param batchInterval if "batchInterval" is specified, we will draw a line for "batchInterval" in the graph
+ */
+function drawTimeline(id, data, minX, maxX, minY, maxY, unitY, batchInterval) {
+ // Hide the right border of "
". We cannot use "css" directly, or "sorttable.js" will override them.
+ d3.select(d3.select(id).node().parentNode)
+ .style("padding", "8px 0 8px 8px")
+ .style("border-right", "0px solid white");
+ var margin = {top: 20, right: 27, bottom: 30, left: maxMarginLeftForTimeline};
+ var width = 500 - margin.left - margin.right;
+ var height = 150 - margin.top - margin.bottom;
+ var x = d3.scale.linear().domain([minX, maxX]).range([0, width]);
+ var y = d3.scale.linear().domain([minY, maxY]).range([height, 0]);
+ var xAxis = d3.svg.axis().scale(x).orient("bottom").tickFormat(function(d) { return timeFormat[d]; });
+ var formatYValue = d3.format(",.2f");
+ var yAxis = d3.svg.axis().scale(y).orient("left").ticks(5).tickFormat(formatYValue);
+ var line = d3.svg.line()
+ .x(function(d) { return x(d.x); })
+ .y(function(d) { return y(d.y); });
+ var svg = d3.select(id).append("svg")
+ .attr("width", width + margin.left + margin.right)
+ .attr("height", height + margin.top + margin.bottom)
+ .append("g")
+ .attr("transform", "translate(" + margin.left + "," + margin.top + ")");
+ // Only show the first and last time in the graph
+ xAxis.tickValues(x.domain());
+ svg.append("g")
+ .attr("class", "x axis")
+ .attr("transform", "translate(0," + height + ")")
+ .call(xAxis)
+ svg.append("g")
+ .attr("class", "y axis")
+ .call(yAxis)
+ .append("text")
+ .attr("transform", "translate(0," + (-3) + ")")
+ .text(unitY);
+ if (batchInterval && batchInterval <= maxY) {
+ drawLine(svg, x, y, minX, batchInterval, maxX, batchInterval);
+ }
+ svg.append("path")
+ .datum(data)
+ .attr("class", "line")
+ .attr("d", line);
+ // Add points to the line. However, we make it invisible at first. But when the user moves mouse
+ // over a point, it will be displayed with its detail.
+ svg.selectAll(".point")
+ .data(data)
+ .enter().append("circle")
+ .attr("stroke", "white") // white and opacity = 0 make it invisible
+ .attr("fill", "white")
+ .attr("opacity", "0")
+ .attr("cx", function(d) { return x(d.x); })
+ .attr("cy", function(d) { return y(d.y); })
+ .attr("r", function(d) { return 3; })
+ .on('mouseover', function(d) {
+ var tip = formatYValue(d.y) + " " + unitY + " at " + timeFormat[d.x];
+ showBootstrapTooltip(d3.select(this).node(), tip);
+ // show the point
+ d3.select(this)
+ .attr("stroke", "steelblue")
+ .attr("fill", "steelblue")
+ .attr("opacity", "1");
+ })
+ .on('mouseout', function() {
+ hideBootstrapTooltip(d3.select(this).node());
+ // hide the point
+ d3.select(this)
+ .attr("stroke", "white")
+ .attr("fill", "white")
+ .attr("opacity", "0");
+ })
+ .on("click", function(d) {
+ window.location.href = "batch/?id=" + d.x;
+ });
+ * @param id the `id` used in the html `div` tag
+ * @param values the data for the histogram graph
+ * @param minY the min value of Y axis
+ * @param maxY the max value of Y axis
+ * @param unitY the unit of Y axis
+ * @param batchInterval if "batchInterval" is specified, we will draw a line for "batchInterval" in the graph
+ */
+function drawHistogram(id, values, minY, maxY, unitY, batchInterval) {
+ // Hide the left border of "
". We cannot use "css" directly, or "sorttable.js" will override them.
+ d3.select(d3.select(id).node().parentNode)
+ .style("padding", "8px 8px 8px 0")
+ .style("border-left", "0px solid white");
+ var margin = {top: 20, right: 30, bottom: 30, left: 10};
+ var width = 300 - margin.left - margin.right;
+ var height = 150 - margin.top - margin.bottom;
+ var x = d3.scale.linear().domain([0, maxXForHistogram]).range([0, width]);
+ var y = d3.scale.linear().domain([minY, maxY]).range([height, 0]);
+ var xAxis = d3.svg.axis().scale(x).orient("top").ticks(5);
+ var yAxis = d3.svg.axis().scale(y).orient("left").ticks(0).tickFormat(function(d) { return ""; });
+ var data = d3.layout.histogram().range([minY, maxY]).bins(histogramBinCount)(values);
+ var svg = d3.select(id).append("svg")
+ .attr("width", width + margin.left + margin.right)
+ .attr("height", height + margin.top + margin.bottom)
+ .append("g")
+ .attr("transform", "translate(" + margin.left + "," + margin.top + ")");
+ if (batchInterval && batchInterval <= maxY) {
+ drawLine(svg, x, y, 0, batchInterval, maxXForHistogram, batchInterval);
+ }
+ svg.append("g")
+ .attr("class", "x axis")
+ .call(xAxis)
+ svg.append("g")
+ .attr("class", "y axis")
+ .call(yAxis)
+ var bar = svg.selectAll(".bar")
+ .data(data)
+ .enter()
+ .append("g")
+ .attr("transform", function(d) { return "translate(0," + (y(d.x) - height + y(d.dx)) + ")";})
+ .attr("class", "bar").append("rect")
+ .attr("width", function(d) { return x(d.y); })
+ .attr("height", function(d) { return height - y(d.dx); })
+ .on('mouseover', function(d) {
+ var percent = yValueFormat(d.y * 100.0 / values.length) + "%";
+ var tip = d.y + " batches (" + percent + ") between " + yValueFormat(d.x) + " and " + yValueFormat(d.x + d.dx) + " " + unitY;
+ showBootstrapTooltip(d3.select(this).node(), tip);
+ })
+ .on('mouseout', function() {
+ hideBootstrapTooltip(d3.select(this).node());
+ });
+ if (batchInterval && batchInterval <= maxY) {
+ // Add the "stable" text to the graph below the batch interval line.
+ var stableXOffset = x(maxXForHistogram) - 20;
+ var stableYOffset = y(batchInterval) + 15;
+ svg.append("text")
+ .style("fill", "lightblue")
+ .attr("class", "stable-text")
+ .attr("text-anchor", "middle")
+ .attr("transform", "translate(" + stableXOffset + "," + stableYOffset + ")")
+ .text("stable")
+ .on('mouseover', function(d) {
+ var tip = "Processing Time <= Batch Interval (" + yValueFormat(batchInterval) +" " + unitY +")";
+ showBootstrapTooltip(d3.select(this).node(), tip);
+ })
+ .on('mouseout', function() {
+ hideBootstrapTooltip(d3.select(this).node());
+ });
+ }
+$(function() {
+ function getParameterFromURL(param)
+ {
+ var parameters = window.location.search.substring(1); // Remove "?"
+ var keyValues = parameters.split('&');
+ for (var i = 0; i < keyValues.length; i++)
+ {
+ var paramKeyValue = keyValues[i].split('=');
+ if (paramKeyValue[0] == param)
+ {
+ return paramKeyValue[1];
+ }
+ }
+ }
+ if (getParameterFromURL("show-streams-detail") == "true") {
+ // Show the details for all InputDStream
+ $('#inputs-table').toggle('collapsed');
+ $('#triangle').html('▼');
+ }
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index 175140481e5ae..9c7f698840778 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -110,6 +110,10 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
+ def getInputStreamName(streamId: Int): Option[String] = synchronized {
+ inputStreams.find(_.id == streamId).map(_.name)
+ }
def generateJobs(time: Time): Seq[Job] = {
logDebug("Generating jobs for time " + time)
val jobs = this.synchronized {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
index e4ad4b509d8d8..9716adb62817c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
@@ -44,6 +44,11 @@ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext)
/** This is an unique identifier for the input stream. */
val id = ssc.getNewInputStreamId()
+ /**
+ * The name of this InputDStream. By default, it's the class name with its id.
+ */
+ private[streaming] def name: String = s"${getClass.getSimpleName}-$id"
* Checks whether the 'time' is valid wrt slideDuration for generating RDD.
* Additionally it also ensures valid times are in strictly increasing order.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala
index 52f08b9c9de68..de85f24dd988d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala
@@ -32,6 +32,7 @@ case class ReceiverInfo(
active: Boolean,
location: String,
lastErrorMessage: String = "",
- lastError: String = ""
+ lastError: String = "",
+ lastErrorTime: Long = -1L
) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 3c341390eda39..f73f7e705ee0d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -155,10 +155,16 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
private def deregisterReceiver(streamId: Int, message: String, error: String) {
val newReceiverInfo = receiverInfo.get(streamId) match {
case Some(oldInfo) =>
- oldInfo.copy(endpoint = null, active = false, lastErrorMessage = message, lastError = error)
+ val lastErrorTime =
+ if (error == null || error == "") -1 else ssc.scheduler.clock.getTimeMillis()
+ oldInfo.copy(endpoint = null, active = false, lastErrorMessage = message,
+ lastError = error, lastErrorTime = lastErrorTime)
case None =>
logWarning("No prior receiver info")
- ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message, lastError = error)
+ val lastErrorTime =
+ if (error == null || error == "") -1 else ssc.scheduler.clock.getTimeMillis()
+ ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message,
+ lastError = error, lastErrorTime = lastErrorTime)
receiverInfo -= streamId
@@ -182,7 +188,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
oldInfo.copy(lastErrorMessage = message, lastError = error)
case None =>
logWarning("No prior receiver info")
- ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message, lastError = error)
+ ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message,
+ lastError = error, lastErrorTime = ssc.scheduler.clock.getTimeMillis())
receiverInfo(streamId) = newReceiverInfo
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
index e219e27785533..2960b528d4c5e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
@@ -19,7 +19,7 @@ package org.apache.spark.streaming.ui
import scala.xml.Node
-import org.apache.spark.ui.UIUtils
+import org.apache.spark.ui.{UIUtils => SparkUIUtils}
private[ui] abstract class BatchTableBase(tableId: String) {
@@ -32,12 +32,12 @@ private[ui] abstract class BatchTableBase(tableId: String) {
protected def baseRow(batch: BatchUIData): Seq[Node] = {
val batchTime = batch.batchTime.milliseconds
- val formattedBatchTime = UIUtils.formatDate(batch.batchTime.milliseconds)
+ val formattedBatchTime = SparkUIUtils.formatDate(batch.batchTime.milliseconds)
val eventCount = batch.numRecords
val schedulingDelay = batch.schedulingDelay
- val formattedSchedulingDelay = schedulingDelay.map(UIUtils.formatDuration).getOrElse("-")
+ val formattedSchedulingDelay = schedulingDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
val processingTime = batch.processingDelay
- val formattedProcessingTime = processingTime.map(UIUtils.formatDuration).getOrElse("-")
+ val formattedProcessingTime = processingTime.map(SparkUIUtils.formatDuration).getOrElse("-")
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
index 2da9a29e2529e..3f1cab69068dc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
@@ -24,7 +24,7 @@ import scala.xml.{NodeSeq, Node}
import org.apache.commons.lang3.StringEscapeUtils
import org.apache.spark.streaming.Time
-import org.apache.spark.ui.{UIUtils, WebUIPage}
+import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage}
import org.apache.spark.streaming.ui.StreamingJobProgressListener.{SparkJobId, OutputOpId}
import org.apache.spark.ui.jobs.UIData.JobUIData
@@ -73,8 +73,8 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
dropWhile(_.failureReason == None).take(1). // get the first info that contains failure
flatMap(info => info.failureReason).headOption.getOrElse("")
- val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("-")
- val detailUrl = s"${UIUtils.prependBaseUri(parent.basePath)}/jobs/job?id=${sparkJob.jobId}"
+ val formattedDuration = duration.map(d => SparkUIUtils.formatDuration(d)).getOrElse("-")
+ val detailUrl = s"${SparkUIUtils.prependBaseUri(parent.basePath)}/jobs/job?id=${sparkJob.jobId}"
// In the first row, output op id and its information needs to be shown. In other rows, these
// cells will be taken up due to "rowspan".
@@ -110,7 +110,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
- UIUtils.makeProgressBar(
+ SparkUIUtils.makeProgressBar(
started = sparkJob.numActiveTasks,
completed = sparkJob.numCompletedTasks,
failed = sparkJob.numFailedTasks,
@@ -135,7 +135,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
// If any job does not finish, set "formattedOutputOpDuration" to "-"
} else {
- UIUtils.formatDuration(sparkjobDurations.flatMap(x => x).sum)
+ SparkUIUtils.formatDuration(sparkjobDurations.flatMap(x => x).sum)
generateJobRow(outputOpId, formattedOutputOpDuration, sparkJobs.size, true, sparkJobs.head) ++
sparkJobs.tail.map { sparkJob =>
@@ -212,24 +212,24 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
val batchTime = Option(request.getParameter("id")).map(id => Time(id.toLong)).getOrElse {
throw new IllegalArgumentException(s"Missing id parameter")
- val formattedBatchTime = UIUtils.formatDate(batchTime.milliseconds)
+ val formattedBatchTime = SparkUIUtils.formatDate(batchTime.milliseconds)
val batchUIData = streamingListener.getBatchUIData(batchTime).getOrElse {
throw new IllegalArgumentException(s"Batch $formattedBatchTime does not exist")
val formattedSchedulingDelay =
- batchUIData.schedulingDelay.map(UIUtils.formatDuration).getOrElse("-")
+ batchUIData.schedulingDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
val formattedProcessingTime =
- batchUIData.processingDelay.map(UIUtils.formatDuration).getOrElse("-")
- val formattedTotalDelay = batchUIData.totalDelay.map(UIUtils.formatDuration).getOrElse("-")
+ batchUIData.processingDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
+ val formattedTotalDelay = batchUIData.totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
val summary: NodeSeq =
// scalastyle:on
- /** Generate stats of data received by the receivers in the streaming program */
- private def generateReceiverStats(): Seq[Node] = {
- val receivedRecordDistributions = listener.receivedRecordsDistributions
- val lastBatchReceivedRecord = listener.lastReceivedBatchRecords
- val table = if (receivedRecordDistributions.size > 0) {
- val headerRow = Seq(
- "Receiver",
- "Status",
- "Location",
- "Events in last batch\n[" + formatDate(Calendar.getInstance().getTime()) + "]",
- "Minimum rate\n[events/sec]",
- "Median rate\n[events/sec]",
- "Maximum rate\n[events/sec]",
- "Last Error"
- )
- val dataRows = listener.receiverIds().map { receiverId =>
- val receiverInfo = listener.receiverInfo(receiverId)
- val receiverName = receiverInfo.map(_.name).getOrElse(s"Receiver-$receiverId")
- val receiverActive = receiverInfo.map { info =>
- if (info.active) "ACTIVE" else "INACTIVE"
- }.getOrElse(emptyCell)
- val receiverLocation = receiverInfo.map(_.location).getOrElse(emptyCell)
- val receiverLastBatchRecords = formatNumber(lastBatchReceivedRecord(receiverId))
- val receivedRecordStats = receivedRecordDistributions(receiverId).map { d =>
- d.getQuantiles(Seq(0.0, 0.5, 1.0)).map(r => formatNumber(r.toLong))
- }.getOrElse {
- Seq(emptyCell, emptyCell, emptyCell, emptyCell, emptyCell)
- }
- val receiverLastError = listener.receiverInfo(receiverId).map { info =>
- val msg = s"${info.lastErrorMessage} - ${info.lastError}"
- if (msg.size > 100) msg.take(97) + "..." else msg
- }.getOrElse(emptyCell)
- Seq(receiverName, receiverActive, receiverLocation, receiverLastBatchRecords) ++
- receivedRecordStats ++ Seq(receiverLastError)
- }.toSeq
- Some(listingTable(headerRow, dataRows))
- } else {
- None
- }
+ /** Generate basic information of the streaming program */
+ private def generateBasicInfo(): Seq[Node] = {
+ val timeSinceStart = System.currentTimeMillis() - startTime
Running batches of
+ {SparkUIUtils.formatDurationVerbose(listener.batchDuration)}
+ for
+ {SparkUIUtils.formatDurationVerbose(timeSinceStart)}
+ since
+ {SparkUIUtils.formatDate(startTime)}
+ }
- val content =
Receiver Statistics
{table.getOrElse("No receivers")}
+ /**
+ * Generate a global "timeFormat" dictionary in the JavaScript to store the time and its formatted
+ * string. Because we cannot specify a timezone in JavaScript, to make sure the server and client
+ * use the same timezone, we use the "timeFormat" dictionary to format all time values used in the
+ * graphs.
+ *
+ * @param times all time values that will be used in the graphs.
+ */
+ private def generateTimeMap(times: Seq[Long]): Seq[Node] = {
+ val dateFormat = new SimpleDateFormat("HH:mm:ss")
+ val js = "var timeFormat = {};\n" + times.map { time =>
+ val formattedTime = dateFormat.format(new Date(time))
+ s"timeFormat[$time] = '$formattedTime';"
+ }.mkString("\n")
- content
- /** Generate stats of batch jobs of the streaming program */
- private def generateBatchStatsTable(): Seq[Node] = {
- val numBatches = listener.retainedCompletedBatches.size
- val lastCompletedBatch = listener.lastCompletedBatch
- val table = if (numBatches > 0) {
- val processingDelayQuantilesRow = {
- Seq(
- "Processing Time",
- formatDurationOption(lastCompletedBatch.flatMap(_.processingDelay))
- ) ++ getQuantiles(listener.processingDelayDistribution)
- }
- val schedulingDelayQuantilesRow = {
- Seq(
- "Scheduling Delay",
- formatDurationOption(lastCompletedBatch.flatMap(_.schedulingDelay))
- ) ++ getQuantiles(listener.schedulingDelayDistribution)
- }
- val totalDelayQuantilesRow = {
- Seq(
- "Total Delay",
- formatDurationOption(lastCompletedBatch.flatMap(_.totalDelay))
- ) ++ getQuantiles(listener.totalDelayDistribution)
- }
- val headerRow = Seq("Metric", "Last batch", "Minimum", "25th percentile",
- "Median", "75th percentile", "Maximum")
- val dataRows: Seq[Seq[String]] = Seq(
- processingDelayQuantilesRow,
- schedulingDelayQuantilesRow,
- totalDelayQuantilesRow
- )
- Some(listingTable(headerRow, dataRows))
- } else {
- None
- }
+ private def generateStatTable(): Seq[Node] = {
+ val batches = listener.retainedBatches
- val content =
Batch Processing Statistics
- {table.getOrElse("No statistics have been generated yet.")}
+ val batchTimes = batches.map(_.batchTime.milliseconds)
+ val minBatchTime = if (batchTimes.isEmpty) startTime else batchTimes.min
+ val maxBatchTime = if (batchTimes.isEmpty) startTime else batchTimes.max
- content
- }
+ val eventRateForAllStreams = new EventRateUIData(batches.map { batchInfo =>
+ (batchInfo.batchTime.milliseconds, batchInfo.numRecords * 1000.0 / listener.batchDuration)
+ })
+ val schedulingDelay = new MillisecondsStatUIData(batches.flatMap { batchInfo =>
+ batchInfo.schedulingDelay.map(batchInfo.batchTime.milliseconds -> _)
+ })
+ val processingTime = new MillisecondsStatUIData(batches.flatMap { batchInfo =>
+ batchInfo.processingDelay.map(batchInfo.batchTime.milliseconds -> _)
+ })
+ val totalDelay = new MillisecondsStatUIData(batches.flatMap { batchInfo =>
+ batchInfo.totalDelay.map(batchInfo.batchTime.milliseconds -> _)
+ })
- /**
- * Returns a human-readable string representing a duration such as "5 second 35 ms"
- */
- private def formatDurationOption(msOption: Option[Long]): String = {
- msOption.map(formatDurationVerbose).getOrElse(emptyCell)
+ // Use the max value of "schedulingDelay", "processingTime", and "totalDelay" to make the
+ // Y axis ranges same.
+ val _maxTime =
+ (for (m1 <- schedulingDelay.max; m2 <- processingTime.max; m3 <- totalDelay.max) yield
+ m1 max m2 max m3).getOrElse(0L)
+ // Should start at 0
+ val minTime = 0L
+ val (maxTime, normalizedUnit) = UIUtils.normalizeDuration(_maxTime)
+ val formattedUnit = UIUtils.shortTimeUnitString(normalizedUnit)
+ // Use the max input rate for all InputDStreams' graphs to make the Y axis ranges same.
+ // If it's not an integral number, just use its ceil integral number.
+ val maxEventRate = eventRateForAllStreams.max.map(_.ceil.toLong).getOrElse(0L)
+ val minEventRate = 0L
+ // JavaScript to show/hide the InputDStreams sub table.
+ val triangleJs =
+ s"""$$('#inputs-table').toggle('collapsed');
+ |var status = false;
+ |if ($$(this).html() == '$BLACK_RIGHT_TRIANGLE_HTML') {
+ |$$(this).html('$BLACK_DOWN_TRIANGLE_HTML');status = true;}
+ |else {$$(this).html('$BLACK_RIGHT_TRIANGLE_HTML');status = false;}
+ |window.history.pushState('',
+ | document.title, window.location.pathname + '?show-streams-detail=' + status);"""
+ .stripMargin.replaceAll("\\n", "") // it must be only one single line
+ val batchInterval = UIUtils.convertToTimeUnit(listener.batchDuration, normalizedUnit)
+ val jsCollector = new JsCollector
+ val graphUIDataForEventRateOfAllStreams =
+ new GraphUIData(
+ "all-stream-events-timeline",
+ "all-stream-events-histogram",
+ eventRateForAllStreams.data,
+ minBatchTime,
+ maxBatchTime,
+ minEventRate,
+ maxEventRate,
+ "events/sec")
+ graphUIDataForEventRateOfAllStreams.generateDataJs(jsCollector)
+ val graphUIDataForSchedulingDelay =
+ new GraphUIData(
+ "scheduling-delay-timeline",
+ "scheduling-delay-histogram",
+ schedulingDelay.timelineData(normalizedUnit),
+ minBatchTime,
+ maxBatchTime,
+ minTime,
+ maxTime,
+ formattedUnit)
+ graphUIDataForSchedulingDelay.generateDataJs(jsCollector)
+ val graphUIDataForProcessingTime =
+ new GraphUIData(
+ "processing-time-timeline",
+ "processing-time-histogram",
+ processingTime.timelineData(normalizedUnit),
+ minBatchTime,
+ maxBatchTime,
+ minTime,
+ maxTime,
+ formattedUnit, Some(batchInterval))
+ graphUIDataForProcessingTime.generateDataJs(jsCollector)
+ val graphUIDataForTotalDelay =
+ new GraphUIData(
+ "total-delay-timeline",
+ "total-delay-histogram",
+ totalDelay.timelineData(normalizedUnit),
+ minBatchTime,
+ maxBatchTime,
+ minTime,
+ maxTime,
+ formattedUnit)
+ graphUIDataForTotalDelay.generateDataJs(jsCollector)
+ // It's false before the user registers the first InputDStream
+ val hasStream = listener.streamIds.nonEmpty
+ val numCompletedBatches = listener.retainedCompletedBatches.size
+ val numActiveBatches = batchTimes.length - numCompletedBatches
+ val table =
+ // scalastyle:off
private def generateBatchListTables(): Seq[Node] = {
@@ -216,3 +489,67 @@ private[ui] class StreamingPage(parent: StreamingTab)
+private[ui] object StreamingPage {
+ val emptyCell = "-"
+ /**
+ * Returns a human-readable string representing a duration such as "5 second 35 ms"
+ */
+ def formatDurationOption(msOption: Option[Long]): String = {
+ msOption.map(SparkUIUtils.formatDurationVerbose).getOrElse(emptyCell)
+ }
+ * A helper class that allows the user to add JavaScript statements which will be executed when the
+ * DOM has finished loading.
+ */
+private[ui] class JsCollector {
+ private var variableId = 0
+ /**
+ * Return the next unused JavaScript variable name
+ */
+ def nextVariableName: String = {
+ variableId += 1
+ "v" + variableId
+ }
+ /**
+ * JavaScript statements that will execute before `statements`
+ */
+ private val preparedStatements = ArrayBuffer[String]()
+ /**
+ * JavaScript statements that will execute after `preparedStatements`
+ */
+ private val statements = ArrayBuffer[String]()
+ def addPreparedStatement(js: String): Unit = {
+ preparedStatements += js
+ }
+ def addStatement(js: String): Unit = {
+ statements += js
+ }
+ /**
+ * Generate a html snippet that will execute all scripts when the DOM has finished loading.
+ */
+ def toHtml: Seq[Node] = {
+ val js =
+ s"""
+ |$$(document).ready(function(){
+ | ${preparedStatements.mkString("\n")}
+ | ${statements.mkString("\n")}
+ |});""".stripMargin
+ }
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
new file mode 100644
index 0000000000000..c206f973b2c66
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
@@ -0,0 +1,74 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.ui
+import java.util.concurrent.TimeUnit
+object UIUtils {
+ /**
+ * Return the short string for a `TimeUnit`.
+ */
+ def shortTimeUnitString(unit: TimeUnit): String = unit match {
+ case TimeUnit.NANOSECONDS => "ns"
+ case TimeUnit.MICROSECONDS => "us"
+ case TimeUnit.MILLISECONDS => "ms"
+ case TimeUnit.SECONDS => "sec"
+ case TimeUnit.MINUTES => "min"
+ case TimeUnit.HOURS => "hrs"
+ case TimeUnit.DAYS => "days"
+ }
+ /**
+ * Find the best `TimeUnit` for converting milliseconds to a friendly string. Return the value
+ * after converting, also with its TimeUnit.
+ */
+ def normalizeDuration(milliseconds: Long): (Double, TimeUnit) = {
+ if (milliseconds < 1000) {
+ return (milliseconds, TimeUnit.MILLISECONDS)
+ }
+ val seconds = milliseconds.toDouble / 1000
+ if (seconds < 60) {
+ return (seconds, TimeUnit.SECONDS)
+ }
+ val minutes = seconds / 60
+ if (minutes < 60) {
+ return (minutes, TimeUnit.MINUTES)
+ }
+ val hours = minutes / 60
+ if (hours < 24) {
+ return (hours, TimeUnit.HOURS)
+ }
+ val days = hours / 24
+ (days, TimeUnit.DAYS)
+ }
+ /**
+ * Convert `milliseconds` to the specified `unit`. We cannot use `TimeUnit.convert` because it
+ * will discard the fractional part.
+ */
+ def convertToTimeUnit(milliseconds: Long, unit: TimeUnit): Double = unit match {
+ case TimeUnit.NANOSECONDS => milliseconds * 1000 * 1000
+ case TimeUnit.MICROSECONDS => milliseconds * 1000
+ case TimeUnit.MILLISECONDS => milliseconds
+ case TimeUnit.SECONDS => milliseconds / 1000.0
+ case TimeUnit.MINUTES => milliseconds / 1000.0 / 60.0
+ case TimeUnit.HOURS => milliseconds / 1000.0 / 60.0 / 60.0
+ case TimeUnit.DAYS => milliseconds / 1000.0 / 60.0 / 60.0 / 24.0
+ }
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
index 8de43baabc21d..2211f62383ce8 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
@@ -94,19 +94,34 @@ class UISeleniumSuite
eventually(timeout(10 seconds), interval(50 milliseconds)) {
// check whether streaming page exists
go to (sparkUI.appUIAddress.stripSuffix("/") + "/streaming")
- val statisticText = findAll(cssSelector("li strong")).map(_.text).toSeq
- statisticText should contain("Network receivers:")
- statisticText should contain("Batch interval:")
+ val h3Text = findAll(cssSelector("h3")).map(_.text).toSeq
+ h3Text should contain("Streaming Statistics")
+ // Check stat table
+ val statTableHeaders = findAll(cssSelector("#stat-table th")).map(_.text).toSeq
+ statTableHeaders.exists(
+ _.matches("Timelines \\(Last \\d+ batches, \\d+ active, \\d+ completed\\)")) should be
+ (true)
+ statTableHeaders should contain ("Histograms")
+ val statTableCells = findAll(cssSelector("#stat-table td")).map(_.text).toSeq
+ statTableCells.exists(_.contains("Input Rate")) should be (true)
+ statTableCells.exists(_.contains("Scheduling Delay")) should be (true)
+ statTableCells.exists(_.contains("Processing Time")) should be (true)
+ statTableCells.exists(_.contains("Total Delay")) should be (true)
+ // Check batch tables
val h4Text = findAll(cssSelector("h4")).map(_.text).toSeq
h4Text.exists(_.matches("Active Batches \\(\\d+\\)")) should be (true)
h4Text.exists(_.matches("Completed Batches \\(last \\d+ out of \\d+\\)")) should be (true)
findAll(cssSelector("""#active-batches-table th""")).map(_.text).toSeq should be {
- List("Batch Time", "Input Size", "Scheduling Delay", "Processing Time", "Status")
+ List("Batch Time", "Input Size", "Scheduling Delay", "Processing Time",
+ "Status")
findAll(cssSelector("""#completed-batches-table th""")).map(_.text).toSeq should be {
- List("Batch Time", "Input Size", "Scheduling Delay", "Processing Time", "Total Delay")
+ List("Batch Time", "Input Size", "Scheduling Delay", "Processing Time",
+ "Total Delay")
val batchLinks =
@@ -176,9 +191,8 @@ class UISeleniumSuite
eventually(timeout(10 seconds), interval(50 milliseconds)) {
go to (sparkUI.appUIAddress.stripSuffix("/") + "/streaming")
- val statisticText = findAll(cssSelector("li strong")).map(_.text).toSeq
- statisticText should not contain ("Network receivers:")
- statisticText should not contain ("Batch interval:")
+ val h3Text = findAll(cssSelector("h3")).map(_.text).toSeq
+ h3Text should not contain("Streaming Statistics")
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
index e874536e63518..2a0f45830e03c 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
@@ -94,7 +94,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
batchUIData.get.schedulingDelay should be (batchInfoStarted.schedulingDelay)
batchUIData.get.processingDelay should be (batchInfoStarted.processingDelay)
batchUIData.get.totalDelay should be (batchInfoStarted.totalDelay)
- batchUIData.get.receiverNumRecords should be (Map(0 -> 300L, 1 -> 300L))
+ batchUIData.get.streamIdToNumRecords should be (Map(0 -> 300L, 1 -> 300L))
batchUIData.get.numRecords should be(600)
batchUIData.get.outputOpIdSparkJobIdPairs should be
Seq(OutputOpIdAndSparkJobId(0, 0),
@@ -138,7 +138,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
test("Remove the old completed batches when exceeding the limit") {
val ssc = setupStreams(input, operation)
- val limit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
+ val limit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 1000)
val listener = new StreamingJobProgressListener(ssc)
val streamIdToNumRecords = Map(0 -> 300L, 1 -> 300L)
@@ -155,7 +155,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
test("out-of-order onJobStart and onBatchXXX") {
val ssc = setupStreams(input, operation)
- val limit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
+ val limit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 1000)
val listener = new StreamingJobProgressListener(ssc)
// fulfill completedBatchInfos
@@ -182,7 +182,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
batchUIData.get.schedulingDelay should be (batchInfoSubmitted.schedulingDelay)
batchUIData.get.processingDelay should be (batchInfoSubmitted.processingDelay)
batchUIData.get.totalDelay should be (batchInfoSubmitted.totalDelay)
- batchUIData.get.receiverNumRecords should be (Map.empty)
+ batchUIData.get.streamIdToNumRecords should be (Map.empty)
batchUIData.get.numRecords should be (0)
batchUIData.get.outputOpIdSparkJobIdPairs should be (Seq(OutputOpIdAndSparkJobId(0, 0)))
@@ -203,4 +203,48 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
(listener.waitingBatches.size + listener.runningBatches.size +
listener.retainedCompletedBatches.size + 10)
+ test("detect memory leak") {
+ val ssc = setupStreams(input, operation)
+ val listener = new StreamingJobProgressListener(ssc)
+ val limit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 1000)
+ for (_ <- 0 until 2 * limit) {
+ val streamIdToNumRecords = Map(0 -> 300L, 1 -> 300L)
+ // onBatchSubmitted
+ val batchInfoSubmitted = BatchInfo(Time(1000), streamIdToNumRecords, 1000, None, None)
+ listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted))
+ // onBatchStarted
+ val batchInfoStarted = BatchInfo(Time(1000), streamIdToNumRecords, 1000, Some(2000), None)
+ listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted))
+ // onJobStart
+ val jobStart1 = createJobStart(Time(1000), outputOpId = 0, jobId = 0)
+ listener.onJobStart(jobStart1)
+ val jobStart2 = createJobStart(Time(1000), outputOpId = 0, jobId = 1)
+ listener.onJobStart(jobStart2)
+ val jobStart3 = createJobStart(Time(1000), outputOpId = 1, jobId = 0)
+ listener.onJobStart(jobStart3)
+ val jobStart4 = createJobStart(Time(1000), outputOpId = 1, jobId = 1)
+ listener.onJobStart(jobStart4)
+ // onBatchCompleted
+ val batchInfoCompleted = BatchInfo(Time(1000), streamIdToNumRecords, 1000, Some(2000), None)
+ listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
+ }
+ listener.waitingBatches.size should be (0)
+ listener.runningBatches.size should be (0)
+ listener.retainedCompletedBatches.size should be (limit)
+ listener.batchTimeToOutputOpIdSparkJobIdPair.size() should be <=
+ (listener.waitingBatches.size + listener.runningBatches.size +
+ listener.retainedCompletedBatches.size + 10)
+ }
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/UIUtilsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/UIUtilsSuite.scala
new file mode 100644
index 0000000000000..6df1a63ab2e37
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/UIUtilsSuite.scala
@@ -0,0 +1,67 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.ui
+import java.util.concurrent.TimeUnit
+import org.scalatest.FunSuite
+import org.scalatest.Matchers
+class UIUtilsSuite extends FunSuite with Matchers{
+ test("shortTimeUnitString") {
+ assert("ns" === UIUtils.shortTimeUnitString(TimeUnit.NANOSECONDS))
+ assert("us" === UIUtils.shortTimeUnitString(TimeUnit.MICROSECONDS))
+ assert("ms" === UIUtils.shortTimeUnitString(TimeUnit.MILLISECONDS))
+ assert("sec" === UIUtils.shortTimeUnitString(TimeUnit.SECONDS))
+ assert("min" === UIUtils.shortTimeUnitString(TimeUnit.MINUTES))
+ assert("hrs" === UIUtils.shortTimeUnitString(TimeUnit.HOURS))
+ assert("days" === UIUtils.shortTimeUnitString(TimeUnit.DAYS))
+ }
+ test("normalizeDuration") {
+ verifyNormalizedTime(900, TimeUnit.MILLISECONDS, 900)
+ verifyNormalizedTime(1.0, TimeUnit.SECONDS, 1000)
+ verifyNormalizedTime(1.0, TimeUnit.MINUTES, 60 * 1000)
+ verifyNormalizedTime(1.0, TimeUnit.HOURS, 60 * 60 * 1000)
+ verifyNormalizedTime(1.0, TimeUnit.DAYS, 24 * 60 * 60 * 1000)
+ }
+ private def verifyNormalizedTime(
+ expectedTime: Double, expectedUnit: TimeUnit, input: Long): Unit = {
+ val (time, unit) = UIUtils.normalizeDuration(input)
+ time should be (expectedTime +- 1E-6)
+ unit should be (expectedUnit)
+ }
+ test("convertToTimeUnit") {
+ verifyConvertToTimeUnit(60.0 * 1000 * 1000 * 1000, 60 * 1000, TimeUnit.NANOSECONDS)
+ verifyConvertToTimeUnit(60.0 * 1000 * 1000, 60 * 1000, TimeUnit.MICROSECONDS)
+ verifyConvertToTimeUnit(60 * 1000, 60 * 1000, TimeUnit.MILLISECONDS)
+ verifyConvertToTimeUnit(60, 60 * 1000, TimeUnit.SECONDS)
+ verifyConvertToTimeUnit(1, 60 * 1000, TimeUnit.MINUTES)
+ verifyConvertToTimeUnit(1.0 / 60, 60 * 1000, TimeUnit.HOURS)
+ verifyConvertToTimeUnit(1.0 / 60 / 24, 60 * 1000, TimeUnit.DAYS)
+ }
+ private def verifyConvertToTimeUnit(
+ expectedTime: Double, milliseconds: Long, unit: TimeUnit): Unit = {
+ val convertedTime = UIUtils.convertToTimeUnit(milliseconds, unit)
+ convertedTime should be (expectedTime +- 1E-6)
+ }