diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala index 6218cf03d25e0..34b55717a1db2 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala @@ -35,9 +35,9 @@ import org.apache.spark.util.Distribution private[streaming] class StreamingJobProgressListener(ssc: StreamingContext) extends StreamingListener with SparkListener { - private val waitingBatchUIDatas = new HashMap[Time, BatchUIData] - private val runningBatchUIDatas = new HashMap[Time, BatchUIData] - private val completedBatchUIDatas = new Queue[BatchUIData] + private val waitingBatchUIData = new HashMap[Time, BatchUIData] + private val runningBatchUIData = new HashMap[Time, BatchUIData] + private val completedBatchUIData = new Queue[BatchUIData] private val batchUIDataLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100) private var totalCompletedBatches = 0L private var totalReceivedRecords = 0L @@ -64,7 +64,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext) // batches temporarily, so here we use "10" to handle such case. This is not a perfect // solution, but at least it can handle most of cases. size() > - waitingBatchUIDatas.size + runningBatchUIDatas.size + completedBatchUIDatas.size + 10 + waitingBatchUIData.size + runningBatchUIData.size + completedBatchUIData.size + 10 } } @@ -91,27 +91,27 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext) override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = { synchronized { - waitingBatchUIDatas(batchSubmitted.batchInfo.batchTime) = + waitingBatchUIData(batchSubmitted.batchInfo.batchTime) = BatchUIData(batchSubmitted.batchInfo) } } override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = synchronized { val batchUIData = BatchUIData(batchStarted.batchInfo) - runningBatchUIDatas(batchStarted.batchInfo.batchTime) = BatchUIData(batchStarted.batchInfo) - waitingBatchUIDatas.remove(batchStarted.batchInfo.batchTime) + runningBatchUIData(batchStarted.batchInfo.batchTime) = BatchUIData(batchStarted.batchInfo) + waitingBatchUIData.remove(batchStarted.batchInfo.batchTime) totalReceivedRecords += batchUIData.numRecords } override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = { synchronized { - waitingBatchUIDatas.remove(batchCompleted.batchInfo.batchTime) - runningBatchUIDatas.remove(batchCompleted.batchInfo.batchTime) + waitingBatchUIData.remove(batchCompleted.batchInfo.batchTime) + runningBatchUIData.remove(batchCompleted.batchInfo.batchTime) val batchUIData = BatchUIData(batchCompleted.batchInfo) - completedBatchUIDatas.enqueue(batchUIData) - if (completedBatchUIDatas.size > batchUIDataLimit) { - val removedBatch = completedBatchUIDatas.dequeue() + completedBatchUIData.enqueue(batchUIData) + if (completedBatchUIData.size > batchUIDataLimit) { + val removedBatch = completedBatchUIData.dequeue() batchTimeToOutputOpIdSparkJobIdPair.remove(removedBatch.batchTime) } totalCompletedBatches += 1L @@ -160,19 +160,19 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext) } def numUnprocessedBatches: Long = synchronized { - waitingBatchUIDatas.size + runningBatchUIDatas.size + waitingBatchUIData.size + runningBatchUIData.size } def waitingBatches: Seq[BatchUIData] = synchronized { - waitingBatchUIDatas.values.toSeq + waitingBatchUIData.values.toSeq } def runningBatches: Seq[BatchUIData] = synchronized { - runningBatchUIDatas.values.toSeq + runningBatchUIData.values.toSeq } def retainedCompletedBatches: Seq[BatchUIData] = synchronized { - completedBatchUIDatas.toSeq + completedBatchUIData.toSeq } def processingDelayDistribution: Option[Distribution] = synchronized { @@ -215,7 +215,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext) } def lastCompletedBatch: Option[BatchUIData] = synchronized { - completedBatchUIDatas.sortBy(_.batchTime)(Time.ordering).lastOption + completedBatchUIData.sortBy(_.batchTime)(Time.ordering).lastOption } def lastReceivedBatch: Option[BatchUIData] = synchronized { @@ -223,18 +223,18 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext) } private def retainedBatches: Seq[BatchUIData] = { - (waitingBatchUIDatas.values.toSeq ++ - runningBatchUIDatas.values.toSeq ++ completedBatchUIDatas).sortBy(_.batchTime)(Time.ordering) + (waitingBatchUIData.values.toSeq ++ + runningBatchUIData.values.toSeq ++ completedBatchUIData).sortBy(_.batchTime)(Time.ordering) } private def extractDistribution(getMetric: BatchUIData => Option[Long]): Option[Distribution] = { - Distribution(completedBatchUIDatas.flatMap(getMetric(_)).map(_.toDouble)) + Distribution(completedBatchUIData.flatMap(getMetric(_)).map(_.toDouble)) } def getBatchUIData(batchTime: Time): Option[BatchUIData] = synchronized { - val batchUIData = waitingBatchUIDatas.get(batchTime).orElse { - runningBatchUIDatas.get(batchTime).orElse { - completedBatchUIDatas.find(batch => batch.batchTime == batchTime) + val batchUIData = waitingBatchUIData.get(batchTime).orElse { + runningBatchUIData.get(batchTime).orElse { + completedBatchUIData.find(batch => batch.batchTime == batchTime) } } batchUIData.foreach { _batchUIData =>