diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index 3a2dadb5661a9..2ddc55d56567f 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -35,43 +35,6 @@ private[spark] object UIUtils extends Logging {
override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
}
- /**
- * Return the short string for a `TimeUnit`.
- */
- def shortTimeUnitString(unit: TimeUnit): String = unit match {
- case TimeUnit.NANOSECONDS => "ns"
- case TimeUnit.MICROSECONDS => "us"
- case TimeUnit.MILLISECONDS => "ms"
- case TimeUnit.SECONDS => "sec"
- case TimeUnit.MINUTES => "min"
- case TimeUnit.HOURS => "hrs"
- case TimeUnit.DAYS => "days"
- }
-
- /**
- * Find the best `TimeUnit` for converting milliseconds to a friendly string. Return the value
- * after converting, also with its TimeUnit.
- */
- def normalizeDuration(milliseconds: Long): (Double, TimeUnit) = {
- if (milliseconds < 1000) {
- return (milliseconds, TimeUnit.MILLISECONDS)
- }
- val seconds = milliseconds.toDouble / 1000
- if (seconds < 60) {
- return (seconds, TimeUnit.SECONDS)
- }
- val minutes = seconds / 60
- if (minutes < 60) {
- return (minutes, TimeUnit.MINUTES)
- }
- val hours = minutes / 60
- if (hours < 24) {
- return (hours, TimeUnit.HOURS)
- }
- val days = hours / 24
- (days, TimeUnit.DAYS)
- }
-
def formatDate(date: Date): String = dateFormat.get.format(date)
def formatDate(timestamp: Long): String = dateFormat.get.format(new Date(timestamp))
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
index 5526c85c181c0..f8ed28addabb0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
@@ -27,7 +27,7 @@ import scala.xml.{Node, Unparsed}
import org.apache.spark.Logging
import org.apache.spark.ui._
-import org.apache.spark.ui.UIUtils._
+import org.apache.spark.ui.{UIUtils => SparkUIUtils}
/**
* @param timelineDivId the timeline `id` used in the html `div` tag
@@ -103,13 +103,13 @@ private[ui] class MillisecondsStatUIData(data: Seq[(Long, Long)]) {
* Converting the original data as per `unit`.
*/
def timelineData(unit: TimeUnit): Seq[(Long, Double)] =
- data.map(x => x._1 -> StreamingPage.convertToTimeUnit(x._2, unit))
+ data.map(x => x._1 -> UIUtils.convertToTimeUnit(x._2, unit))
/**
* Converting the original data as per `unit`.
*/
def histogramData(unit: TimeUnit): Seq[Double] =
- data.map(x => StreamingPage.convertToTimeUnit(x._2, unit))
+ data.map(x => UIUtils.convertToTimeUnit(x._2, unit))
val avg: Option[Long] = if (data.isEmpty) None else Some(data.map(_._2).sum / data.size)
@@ -149,7 +149,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
generateStatTable() ++
generateBatchListTables()
}
- UIUtils.headerSparkPage("Streaming Statistics", content, parent, Some(5000))
+ SparkUIUtils.headerSparkPage("Streaming Statistics", content, parent, Some(5000))
}
/**
@@ -157,9 +157,9 @@ private[ui] class StreamingPage(parent: StreamingTab)
*/
private def generateLoadResources(): Seq[Node] = {
// scalastyle:off
-
-
-
+
+
+
// scalastyle:on
}
@@ -168,15 +168,15 @@ private[ui] class StreamingPage(parent: StreamingTab)
val timeSinceStart = System.currentTimeMillis() - startTime
Running batches of
- {formatDurationVerbose(listener.batchDuration)}
+ {SparkUIUtils.formatDurationVerbose(listener.batchDuration)}
for
- {formatDurationVerbose(timeSinceStart)}
+ {SparkUIUtils.formatDurationVerbose(timeSinceStart)}
since
- {UIUtils.formatDate(startTime)}
+ {SparkUIUtils.formatDate(startTime)}
@@ -247,7 +247,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
| document.title, window.location.pathname + '?show-streams-detail=' + status);"""
.stripMargin.replaceAll("\\n", "") // it must be only one single line
- val batchInterval = StreamingPage.convertToTimeUnit(listener.batchDuration, normalizedUnit)
+ val batchInterval = UIUtils.convertToTimeUnit(listener.batchDuration, normalizedUnit)
val jsCollector = new JsCollector
@@ -423,7 +423,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
if (msg.size > 100) msg.take(97) + "..." else msg
}.getOrElse(emptyCell)
val receiverLastErrorTime = receiverInfo.map {
- r => if (r.lastErrorTime < 0) "-" else UIUtils.formatDate(r.lastErrorTime)
+ r => if (r.lastErrorTime < 0) "-" else SparkUIUtils.formatDate(r.lastErrorTime)
}.getOrElse(emptyCell)
val receivedRecords = new EventRateUIData(eventRates)
@@ -491,22 +491,9 @@ private[ui] object StreamingPage {
* Returns a human-readable string representing a duration such as "5 second 35 ms"
*/
def formatDurationOption(msOption: Option[Long]): String = {
- msOption.map(formatDurationVerbose).getOrElse(emptyCell)
+ msOption.map(SparkUIUtils.formatDurationVerbose).getOrElse(emptyCell)
}
- /**
- * Convert `milliseconds` to the specified `unit`. We cannot use `TimeUnit.convert` because it
- * will discard the fractional part.
- */
- def convertToTimeUnit(milliseconds: Long, unit: TimeUnit): Double = unit match {
- case TimeUnit.NANOSECONDS => milliseconds * 1000 * 1000
- case TimeUnit.MICROSECONDS => milliseconds * 1000
- case TimeUnit.MILLISECONDS => milliseconds
- case TimeUnit.SECONDS => milliseconds / 1000.0
- case TimeUnit.MINUTES => milliseconds / 1000.0 / 60.0
- case TimeUnit.HOURS => milliseconds / 1000.0 / 60.0 / 60.0
- case TimeUnit.DAYS => milliseconds / 1000.0 / 60.0 / 60.0 / 24.0
- }
}
/**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
new file mode 100644
index 0000000000000..c206f973b2c66
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.ui
+
+import java.util.concurrent.TimeUnit
+
+object UIUtils {
+
+ /**
+ * Return the short string for a `TimeUnit`.
+ */
+ def shortTimeUnitString(unit: TimeUnit): String = unit match {
+ case TimeUnit.NANOSECONDS => "ns"
+ case TimeUnit.MICROSECONDS => "us"
+ case TimeUnit.MILLISECONDS => "ms"
+ case TimeUnit.SECONDS => "sec"
+ case TimeUnit.MINUTES => "min"
+ case TimeUnit.HOURS => "hrs"
+ case TimeUnit.DAYS => "days"
+ }
+
+ /**
+ * Find the best `TimeUnit` for converting milliseconds to a friendly string. Return the value
+ * after converting, also with its TimeUnit.
+ */
+ def normalizeDuration(milliseconds: Long): (Double, TimeUnit) = {
+ if (milliseconds < 1000) {
+ return (milliseconds, TimeUnit.MILLISECONDS)
+ }
+ val seconds = milliseconds.toDouble / 1000
+ if (seconds < 60) {
+ return (seconds, TimeUnit.SECONDS)
+ }
+ val minutes = seconds / 60
+ if (minutes < 60) {
+ return (minutes, TimeUnit.MINUTES)
+ }
+ val hours = minutes / 60
+ if (hours < 24) {
+ return (hours, TimeUnit.HOURS)
+ }
+ val days = hours / 24
+ (days, TimeUnit.DAYS)
+ }
+
+ /**
+ * Convert `milliseconds` to the specified `unit`. We cannot use `TimeUnit.convert` because it
+ * will discard the fractional part.
+ */
+ def convertToTimeUnit(milliseconds: Long, unit: TimeUnit): Double = unit match {
+ case TimeUnit.NANOSECONDS => milliseconds * 1000 * 1000
+ case TimeUnit.MICROSECONDS => milliseconds * 1000
+ case TimeUnit.MILLISECONDS => milliseconds
+ case TimeUnit.SECONDS => milliseconds / 1000.0
+ case TimeUnit.MINUTES => milliseconds / 1000.0 / 60.0
+ case TimeUnit.HOURS => milliseconds / 1000.0 / 60.0 / 60.0
+ case TimeUnit.DAYS => milliseconds / 1000.0 / 60.0 / 60.0 / 24.0
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/UIUtilsSuite.scala
similarity index 71%
rename from core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala
rename to streaming/src/test/scala/org/apache/spark/streaming/ui/UIUtilsSuite.scala
index 636b2c389e05e..6df1a63ab2e37 100644
--- a/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/UIUtilsSuite.scala
@@ -15,8 +15,7 @@
* limitations under the License.
*/
-
-package org.apache.spark.ui
+package org.apache.spark.streaming.ui
import java.util.concurrent.TimeUnit
@@ -49,4 +48,20 @@ class UIUtilsSuite extends FunSuite with Matchers{
time should be (expectedTime +- 1E-6)
unit should be (expectedUnit)
}
+
+ test("convertToTimeUnit") {
+ verifyConvertToTimeUnit(60.0 * 1000 * 1000 * 1000, 60 * 1000, TimeUnit.NANOSECONDS)
+ verifyConvertToTimeUnit(60.0 * 1000 * 1000, 60 * 1000, TimeUnit.MICROSECONDS)
+ verifyConvertToTimeUnit(60 * 1000, 60 * 1000, TimeUnit.MILLISECONDS)
+ verifyConvertToTimeUnit(60, 60 * 1000, TimeUnit.SECONDS)
+ verifyConvertToTimeUnit(1, 60 * 1000, TimeUnit.MINUTES)
+ verifyConvertToTimeUnit(1.0 / 60, 60 * 1000, TimeUnit.HOURS)
+ verifyConvertToTimeUnit(1.0 / 60 / 24, 60 * 1000, TimeUnit.DAYS)
+ }
+
+ private def verifyConvertToTimeUnit(
+ expectedTime: Double, milliseconds: Long, unit: TimeUnit): Unit = {
+ val convertedTime = UIUtils.convertToTimeUnit(milliseconds, unit)
+ convertedTime should be (expectedTime +- 1E-6)
+ }
}