Skip to content

Commit

Permalink
[STREAMING][MINOR] Scaladoc + logs
Browse files Browse the repository at this point in the history
Found while doing code review

Author: Jacek Laskowski <[email protected]>

Closes #10878 from jaceklaskowski/streaming-scaladoc-logs-tiny-fixes.
  • Loading branch information
jaceklaskowski authored and rxin committed Jan 23, 2016
1 parent 423783a commit cfdcef7
Show file tree
Hide file tree
Showing 4 changed files with 5 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ import org.apache.spark.util.ClosureCleaner
* `mapWithState` operation of a
* [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair DStream]] (Scala) or a
* [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] (Java).
* Use the [[org.apache.spark.streaming.StateSpec StateSpec.apply()]] or
* [[org.apache.spark.streaming.StateSpec StateSpec.create()]] to create instances of
* this class.
* Use [[org.apache.spark.streaming.StateSpec.function() StateSpec.function]] factory methods
* to create instances of this class.
*
* Example in Scala:
* {{{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private[streaming] class MapWithStateDStreamImpl[
}

/**
* A DStream that allows per-key state to be maintains, and arbitrary records to be generated
* A DStream that allows per-key state to be maintained, and arbitrary records to be generated
* based on updates to the state. This is the main DStream that implements the `mapWithState`
* operation on DStreams.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ private[streaming] class ReceivedBlockTracker(
def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized {
require(cleanupThreshTime.milliseconds < clock.getTimeMillis())
val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq
logInfo("Deleting batches " + timesToCleanup)
logInfo(s"Deleting batches: ${timesToCleanup.mkString(" ")}")
if (writeToLog(BatchCleanupEvent(timesToCleanup))) {
timeToAllocatedBlocks --= timesToCleanup
writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)

override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = synchronized {
val batchUIData = BatchUIData(batchStarted.batchInfo)
runningBatchUIData(batchStarted.batchInfo.batchTime) = BatchUIData(batchStarted.batchInfo)
runningBatchUIData(batchStarted.batchInfo.batchTime) = batchUIData
waitingBatchUIData.remove(batchStarted.batchInfo.batchTime)

totalReceivedRecords += batchUIData.numRecords
Expand Down

0 comments on commit cfdcef7

Please sign in to comment.