Skip to content

Commit

Permalink
Fix the addressed comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jerryshao committed Nov 27, 2014
1 parent c7a9376 commit 02dd44f
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,6 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source {
_.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L)

// Gauge for last received batch records and total received batch records.
private var totalReceivedBatchRecords: Long = 0L
def getTotalReceivedBatchRecords(listener: StreamingJobProgressListener): Long = {
totalReceivedBatchRecords += listener.lastReceivedBatchRecords.values.sum
totalReceivedBatchRecords
}

registerGauge("lastReceivedBatchRecords", _.lastReceivedBatchRecords.values.sum, 0L)
registerGauge("totalReceivedBatchRecords", getTotalReceivedBatchRecords, 0L)
registerGauge("totalReceivedBatchRecords", _.numTotalReceivedBatchRecords, 0L)
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted
import org.apache.spark.streaming.scheduler.BatchInfo
import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted
import org.apache.spark.util.Distribution
import org.apache.spark.Logging


private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
Expand All @@ -37,6 +36,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
private val batchInfoLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
private var totalCompletedBatches = 0L
private val receiverInfos = new HashMap[Int, ReceiverInfo]
private var totalReceivedBatchRecords = 0L

val batchDuration = ssc.graph.batchDuration.milliseconds

Expand Down Expand Up @@ -65,6 +65,10 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
override def onBatchStarted(batchStarted: StreamingListenerBatchStarted) = synchronized {
runningBatchInfos(batchStarted.batchInfo.batchTime) = batchStarted.batchInfo
waitingBatchInfos.remove(batchStarted.batchInfo.batchTime)

batchStarted.batchInfo.receivedBlockInfo.foreach { case (_, infos) =>
totalReceivedBatchRecords += infos.map(_.numRecords).sum
}
}

override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) = synchronized {
Expand All @@ -83,6 +87,10 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
totalCompletedBatches
}

def numTotalReceivedBatchRecords: Long = synchronized {
totalReceivedBatchRecords
}

def numUnprocessedBatches: Long = synchronized {
waitingBatchInfos.size + runningBatchInfos.size
}
Expand Down

0 comments on commit 02dd44f

Please sign in to comment.