Skip to content

Commit

Permalink
Refactor BatchInfo to store only necessary fields
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Apr 29, 2015
1 parent cb62e4f commit 087ba98
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,4 @@ case class BatchInfo(
*/
def totalDelay: Option[Long] = schedulingDelay.zip(processingDelay)
.map(x => x._1 + x._2).headOption

/**
* The number of recorders received by the receivers in this batch.
*/
def numRecords: Long = receivedBlockInfo.map { case (_, infos) =>
infos.map(_.numRecords).sum
}.sum
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.streaming.ui

import scala.xml.Node

import org.apache.spark.streaming.scheduler.BatchInfo
import org.apache.spark.ui.UIUtils

private[ui] abstract class BatchTableBase(tableId: String) {
Expand All @@ -31,12 +30,10 @@ private[ui] abstract class BatchTableBase(tableId: String) {
<th>Processing Time</th>
}

protected def baseRow(batch: BatchInfo): Seq[Node] = {
protected def baseRow(batch: BatchUIData): Seq[Node] = {
val batchTime = batch.batchTime.milliseconds
val formattedBatchTime = UIUtils.formatDate(batch.batchTime.milliseconds)
val eventCount = batch.receivedBlockInfo.values.map {
receivers => receivers.map(_.numRecords).sum
}.sum
val eventCount = batch.numRecords
val schedulingDelay = batch.schedulingDelay
val formattedSchedulingDelay = schedulingDelay.map(UIUtils.formatDuration).getOrElse("-")
val processingTime = batch.processingDelay
Expand Down Expand Up @@ -77,8 +74,9 @@ private[ui] abstract class BatchTableBase(tableId: String) {
protected def renderRows: Seq[Node]
}

private[ui] class ActiveBatchTable(runningBatches: Seq[BatchInfo], waitingBatches: Seq[BatchInfo])
extends BatchTableBase("active-batches-table") {
private[ui] class ActiveBatchTable(
runningBatches: Seq[BatchUIData],
waitingBatches: Seq[BatchUIData]) extends BatchTableBase("active-batches-table") {

override protected def columns: Seq[Node] = super.columns ++ <th>Status</th>

Expand All @@ -89,16 +87,16 @@ private[ui] class ActiveBatchTable(runningBatches: Seq[BatchInfo], waitingBatche
runningBatches.flatMap(batch => <tr>{runningBatchRow(batch)}</tr>)
}

private def runningBatchRow(batch: BatchInfo): Seq[Node] = {
private def runningBatchRow(batch: BatchUIData): Seq[Node] = {
baseRow(batch) ++ <td>processing</td>
}

private def waitingBatchRow(batch: BatchInfo): Seq[Node] = {
private def waitingBatchRow(batch: BatchUIData): Seq[Node] = {
baseRow(batch) ++ <td>queued</td>
}
}

private[ui] class CompletedBatchTable(batches: Seq[BatchInfo])
private[ui] class CompletedBatchTable(batches: Seq[BatchUIData])
extends BatchTableBase("completed-batches-table") {

override protected def columns: Seq[Node] = super.columns ++ <th>Total Delay</th>
Expand All @@ -107,7 +105,7 @@ private[ui] class CompletedBatchTable(batches: Seq[BatchInfo])
batches.flatMap(batch => <tr>{completedBatchRow(batch)}</tr>)
}

private def completedBatchRow(batch: BatchInfo): Seq[Node] = {
private def completedBatchRow(batch: BatchUIData): Seq[Node] = {
val totalDelay = batch.totalDelay
val formattedTotalDelay = totalDelay.map(UIUtils.formatDuration).getOrElse("-")
baseRow(batch) ++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@ package org.apache.spark.streaming.ui

import javax.servlet.http.HttpServletRequest

import scala.collection.mutable.{ArrayBuffer, Map}
import scala.xml.{NodeSeq, Node}

import org.apache.commons.lang3.StringEscapeUtils

import org.apache.spark.streaming.Time
import org.apache.spark.streaming.scheduler.BatchInfo
import org.apache.spark.ui.{UIUtils, WebUIPage}
import org.apache.spark.streaming.ui.StreamingJobProgressListener.{SparkJobId, OutputOpId}
import org.apache.spark.ui.jobs.UIData.JobUIData
Expand Down Expand Up @@ -182,11 +180,11 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
* Generate the job table for the batch.
*/
private def generateJobTable(batchUIData: BatchUIData): Seq[Node] = {
val outputOpIdToSparkJobIds = batchUIData.outputOpIdSparkJobIdPairs.groupBy(_._1).toSeq.
val outputOpIdToSparkJobIds = batchUIData.outputOpIdSparkJobIdPairs.groupBy(_.outputOpId).toSeq.
sortBy(_._1). // sorted by OutputOpId
map { case (outputOpId, outputOpIdAndSparkJobIdPairs) =>
map { case (outputOpId, outputOpIdAndSparkJobIds) =>
// sort SparkJobIds for each OutputOpId
(outputOpId, outputOpIdAndSparkJobIdPairs.map(_._2).sorted)
(outputOpId, outputOpIdAndSparkJobIds.map(_.sparkJobId).sorted)
}
sparkListener.synchronized {
val outputOpIdWithJobs: Seq[(OutputOpId, Seq[JobUIData])] =
Expand Down Expand Up @@ -219,13 +217,12 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
val batchUIData = streamingListener.getBatchUIData(batchTime).getOrElse {
throw new IllegalArgumentException(s"Batch $formattedBatchTime does not exist")
}
val batchInfo = batchUIData.batchInfo

val formattedSchedulingDelay =
batchInfo.schedulingDelay.map(UIUtils.formatDuration).getOrElse("-")
batchUIData.schedulingDelay.map(UIUtils.formatDuration).getOrElse("-")
val formattedProcessingTime =
batchInfo.processingDelay.map(UIUtils.formatDuration).getOrElse("-")
val formattedTotalDelay = batchInfo.totalDelay.map(UIUtils.formatDuration).getOrElse("-")
batchUIData.processingDelay.map(UIUtils.formatDuration).getOrElse("-")
val formattedTotalDelay = batchUIData.totalDelay.map(UIUtils.formatDuration).getOrElse("-")

val summary: NodeSeq =
<div>
Expand All @@ -236,7 +233,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
</li>
<li>
<strong>Input data size: </strong>
{batchInfo.numRecords} records
{batchUIData.numRecords} records
</li>
<li>
<strong>Scheduling delay: </strong>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,79 @@

package org.apache.spark.streaming.ui

import org.apache.spark.streaming.Time
import org.apache.spark.streaming.scheduler.BatchInfo
import org.apache.spark.streaming.ui.StreamingJobProgressListener._

private[ui] case class BatchUIData(
batchInfo: BatchInfo,
outputOpIdSparkJobIdPairs: Seq[(OutputOpId, SparkJobId)]) {
private[ui] case class OutputOpIdAndSparkJobId(outputOpId: OutputOpId, sparkJobId: SparkJobId)

private[ui] class BatchUIData(
val batchTime: Time,
val receiverNumRecords: Map[Int, Long],
val submissionTime: Long,
val processingStartTime: Option[Long],
val processingEndTime: Option[Long]) {

var outputOpIdSparkJobIdPairs: Seq[OutputOpIdAndSparkJobId] = Seq.empty

/**
* Time taken for the first job of this batch to start processing from the time this batch
* was submitted to the streaming scheduler. Essentially, it is
* `processingStartTime` - `submissionTime`.
*/
def schedulingDelay: Option[Long] = processingStartTime.map(_ - submissionTime)

/**
* Time taken for the all jobs of this batch to finish processing from the time they started
* processing. Essentially, it is `processingEndTime` - `processingStartTime`.
*/
def processingDelay: Option[Long] = {
for (start <- processingStartTime;
end <- processingEndTime)
yield end - start
}

/**
* Time taken for all the jobs of this batch to finish processing from the time they
* were submitted. Essentially, it is `processingDelay` + `schedulingDelay`.
*/
def totalDelay: Option[Long] = processingEndTime.map(_ - submissionTime)

/**
* The number of recorders received by the receivers in this batch.
*/
def numRecords: Long = receiverNumRecords.map(_._2).sum

def canEqual(other: Any): Boolean = other.isInstanceOf[BatchUIData]

override def equals(other: Any): Boolean = other match {
case that: BatchUIData =>
(that canEqual this) &&
outputOpIdSparkJobIdPairs == that.outputOpIdSparkJobIdPairs &&
batchTime == that.batchTime &&
receiverNumRecords == that.receiverNumRecords &&
submissionTime == that.submissionTime &&
processingStartTime == that.processingStartTime &&
processingEndTime == that.processingEndTime
case _ => false
}

override def hashCode(): Int = {
val state = Seq(outputOpIdSparkJobIdPairs, batchTime, receiverNumRecords, submissionTime,
processingStartTime, processingEndTime)
state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
}
}

private[ui] object BatchUIData {

def apply(batchInfo: BatchInfo): BatchUIData = {
new BatchUIData(
batchInfo.batchTime,
batchInfo.receivedBlockInfo.mapValues(_.map(_.numRecords).sum),
batchInfo.submissionTime,
batchInfo.processingStartTime,
batchInfo.processingEndTime
)
}
}
Loading

0 comments on commit 087ba98

Please sign in to comment.