Skip to content

Commit

Permalink
More debugging
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Dec 3, 2015
1 parent 34a52f9 commit 96e95ab
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ class CheckpointWriter(
val bytes = Checkpoint.serialize(checkpoint, conf)
executor.execute(new CheckpointWriteHandler(
checkpoint.checkpointTime, bytes, clearCheckpointDataLater))
logDebug("Submitted checkpoint of time " + checkpoint.checkpointTime + " writer queue")
logInfo("Submitted checkpoint of time " + checkpoint.checkpointTime + " writer queue")
} catch {
case rej: RejectedExecutionException =>
logError("Could not submit checkpoint task to the thread pool executor", rej)
Expand Down
1 change: 1 addition & 0 deletions streaming/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{

# Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.spark-project.jetty=WARN
log4j.appender.org.apache.spark.streaming=DEBUG

Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,13 @@ trait DStreamCheckpointTester { self: SparkFunSuite =>
expectedOutput.take(numBatchesBeforeRestart), stopSparkContextAfterTest)

// Restart and complete the computation from checkpoint file
logInfo(
// scalastyle:off println
print(
"\n-------------------------------------------\n" +
" Restarting stream computation " +
"\n-------------------------------------------\n"
)
// scalastyle:on println
val restartedSsc = new StreamingContext(checkpointDir)
generateAndAssertOutput[V](restartedSsc, batchDuration, checkpointDir, nextNumBatches,
expectedOutput.takeRight(nextNumExpectedOutputs), stopSparkContextAfterTest)
Expand All @@ -125,9 +127,11 @@ trait DStreamCheckpointTester { self: SparkFunSuite =>
ssc.start()
val numBatches = expectedOutput.size
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
logDebug("Manual clock before advancing = " + clock.getTimeMillis())
// scalastyle:off println
logInfo("Manual clock before advancing = " + clock.getTimeMillis())
clock.advance((batchDuration * numBatches).milliseconds)
logDebug("Manual clock after advancing = " + clock.getTimeMillis())
logInfo("Manual clock after advancing = " + clock.getTimeMillis())
// scalastyle:on println

val outputStream = ssc.graph.getOutputStreams().filter { dstream =>
dstream.isInstanceOf[TestOutputStreamWithPartitions[V]]
Expand Down

0 comments on commit 96e95ab

Please sign in to comment.