From 0c5fe55b1ff8da4cca28b91860afbcfbd28e7422 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 25 Nov 2015 17:54:44 -0800 Subject: [PATCH 1/9] Partition previous state RDD if partitioner not present --- .../streaming/dstream/TrackStateDStream.scala | 41 +++-- .../spark/streaming/CheckpointSuite.scala | 169 ++++++++++++------ .../streaming/TrackStateByKeySuite.scala | 77 ++++++-- 3 files changed, 209 insertions(+), 78 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TrackStateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TrackStateDStream.scala index 0ada1111ce30a..1a07929ee43e7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TrackStateDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TrackStateDStream.scala @@ -132,22 +132,39 @@ class InternalTrackStateDStream[K: ClassTag, V: ClassTag, S: ClassTag, E: ClassT /** Method that generates a RDD for the given time */ override def compute(validTime: Time): Option[RDD[TrackStateRDDRecord[K, S, E]]] = { // Get the previous state or create a new empty state RDD - val prevStateRDD = getOrCompute(validTime - slideDuration).getOrElse { - TrackStateRDD.createFromPairRDD[K, V, S, E]( - spec.getInitialStateRDD().getOrElse(new EmptyRDD[(K, S)](ssc.sparkContext)), - partitioner, validTime - ) + val prevStateRDD = getOrCompute(validTime - slideDuration) match { + case Some(rdd) => + if (rdd.partitioner != Some(partitioner)) { + // If the RDD is not partitioned the right way, let us repartition it using the + // partition index as the key. This is to ensure that state RDD is always partitioned + // before creating another state RDD using it + val kvRDD = rdd.mapPartitions { iter => + iter.map { x => (TaskContext.get().partitionId(), x)} + } + kvRDD.partitionBy(partitioner).mapPartitions(iter => iter.map { _._2 }, + preservesPartitioning = true) + } else { + rdd + } + case None => + TrackStateRDD.createFromPairRDD[K, V, S, E]( + spec.getInitialStateRDD().getOrElse(new EmptyRDD[(K, S)](ssc.sparkContext)), + partitioner, validTime + ) } + // Compute the new state RDD with previous state RDD and partitioned data RDD - parent.getOrCompute(validTime).map { dataRDD => - val partitionedDataRDD = dataRDD.partitionBy(partitioner) - val timeoutThresholdTime = spec.getTimeoutInterval().map { interval => - (validTime - interval).milliseconds - } - new TrackStateRDD( - prevStateRDD, partitionedDataRDD, trackingFunction, validTime, timeoutThresholdTime) + // Even if there is no data RDD, use an empty one to create a new state RDD + val dataRDD = parent.getOrCompute(validTime).getOrElse { + context.sparkContext.emptyRDD[(K, V)] + } + val partitionedDataRDD = dataRDD.partitionBy(partitioner) + val timeoutThresholdTime = spec.getTimeoutInterval().map { interval => + (validTime - interval).milliseconds } + Some(new TrackStateRDD( + prevStateRDD, partitionedDataRDD, trackingFunction, validTime, timeoutThresholdTime)) } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index b1cbc7163bee3..bdab96cd95855 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -33,17 +33,133 @@ import org.mockito.Mockito.mock import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ -import org.apache.spark.TestUtils +import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite, TestUtils} import org.apache.spark.streaming.dstream.{DStream, FileInputDStream} import org.apache.spark.streaming.scheduler._ import org.apache.spark.util.{MutableURLClassLoader, Clock, ManualClock, Utils} +/** + * A trait of that can be mixed in to get methods for testing DStream operations under + * DStream checkpointing. Note that the implementations of this trait has to implement + * the `setupCheckpointOperation` + */ +trait DStreamCheckpointTester { self: SparkFunSuite => + + /** + * Tests a streaming operation under checkpointing, by restarting the operation + * from checkpoint file and verifying whether the final output is correct. + * The output is assumed to have come from a reliable queue which an replay + * data as required. + * + * NOTE: This takes into consideration that the last batch processed before + * master failure will be re-processed after restart/recovery. + */ + protected def testCheckpointedOperation[U: ClassTag, V: ClassTag]( + input: Seq[Seq[U]], + operation: DStream[U] => DStream[V], + expectedOutput: Seq[Seq[V]], + numBatchesBeforeRestart: Int, + batchDuration: Duration = Seconds(1), + stopSparkContextAfterTest: Boolean = true + ) { + require(numBatchesBeforeRestart < expectedOutput.size, + "Number of batches before context restart less than number of expected output " + + "(i.e. number of total batches to run)") + require(StreamingContext.getActive().isEmpty, + "Cannot run test with already active streaming context") + + // Current code assumes that: + // number of inputs = number of outputs = number of batches to be run + val totalNumBatches = input.size + val nextNumBatches = totalNumBatches - numBatchesBeforeRestart + val initialNumExpectedOutputs = numBatchesBeforeRestart + val nextNumExpectedOutputs = expectedOutput.size - initialNumExpectedOutputs + 1 + // because the last batch will be processed again + + // Setup the stream computation + val checkpointDir = Utils.createTempDir(this.getClass.getSimpleName()).toString + logDebug(s"Using checkpoint directory $checkpointDir") + val ssc = createContextForCheckpointOperation(batchDuration) + require(ssc.conf.get("spark.streaming.clock") === classOf[ManualClock].getName, + "Cannot run test without manual clock in the conf") + + val inputStream = new TestInputStream(ssc, input, numPartitions = 2) + val operatedStream = operation(inputStream) + val outputStream = new TestOutputStreamWithPartitions(operatedStream, + new ArrayBuffer[Seq[Seq[V]]] with SynchronizedBuffer[Seq[Seq[V]]]) + outputStream.register() + ssc.checkpoint(checkpointDir) + + // Do the computation for initial number of batches, create checkpoint file and quit + generateAndAssertOutput[V](ssc, batchDuration, checkpointDir, + expectedOutput.take(numBatchesBeforeRestart), stopSparkContextAfterTest) + + // Restart and complete the computation from checkpoint file + logInfo( + "\n-------------------------------------------\n" + + " Restarting stream computation " + + "\n-------------------------------------------\n" + ) + val restartedSsc = new StreamingContext(checkpointDir) + generateAndAssertOutput[V](restartedSsc, batchDuration, checkpointDir, + expectedOutput.takeRight(nextNumExpectedOutputs), stopSparkContextAfterTest) + } + + protected def createContextForCheckpointOperation(batchDuration: Duration): StreamingContext = { + val conf = new SparkConf().setMaster("local").setAppName(this.getClass.getSimpleName) + conf.set("spark.streaming.clock", classOf[ManualClock].getName()) + new StreamingContext(SparkContext.getOrCreate(conf), Seconds(1)) + } + + private def generateAndAssertOutput[V: ClassTag]( + ssc: StreamingContext, + batchDuration: Duration, + checkpointDir: String, + expectedOutput: Seq[Seq[V]], + stopSparkContext: Boolean + ) { + try { + ssc.start() + val numBatches = expectedOutput.size + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + logDebug("Manual clock before advancing = " + clock.getTimeMillis()) + clock.advance((batchDuration * numBatches).milliseconds) + logDebug("Manual clock after advancing = " + clock.getTimeMillis()) + + val outputStream = ssc.graph.getOutputStreams().filter { dstream => + dstream.isInstanceOf[TestOutputStreamWithPartitions[V]] + }.head.asInstanceOf[TestOutputStreamWithPartitions[V]] + + eventually(timeout(10 seconds)) { + ssc.awaitTerminationOrTimeout(10) + assert(outputStream.output.size === expectedOutput.size) + } + + eventually(timeout(10 seconds)) { + Checkpoint.getCheckpointFiles(checkpointDir).exists { + _.toString.contains(clock.getTimeMillis.toString) + } + } + + val output = outputStream.output.map(_.flatten) + assert( + output.zip(expectedOutput).forall { case (o, e) => o.toSet === e.toSet }, + s"Set comparison failed\n" + + s"Expected output (${expectedOutput.size} items):\n${expectedOutput.mkString("\n")}\n" + + s"Generated output (${output.size} items): ${output.mkString("\n")}" + ) + } finally { + ssc.stop(stopSparkContext = stopSparkContext) + } + } +} + /** * This test suites tests the checkpointing functionality of DStreams - * the checkpointing of a DStream's RDDs as well as the checkpointing of * the whole DStream graph. */ -class CheckpointSuite extends TestSuiteBase { +class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester { var ssc: StreamingContext = null @@ -56,7 +172,7 @@ class CheckpointSuite extends TestSuiteBase { override def afterFunction() { super.afterFunction() - if (ssc != null) ssc.stop() + StreamingContext.getActive().foreach { _.stop() } Utils.deleteRecursively(new File(checkpointDir)) } @@ -634,53 +750,6 @@ class CheckpointSuite extends TestSuiteBase { checkpointWriter.stop() } - /** - * Tests a streaming operation under checkpointing, by restarting the operation - * from checkpoint file and verifying whether the final output is correct. - * The output is assumed to have come from a reliable queue which an replay - * data as required. - * - * NOTE: This takes into consideration that the last batch processed before - * master failure will be re-processed after restart/recovery. - */ - def testCheckpointedOperation[U: ClassTag, V: ClassTag]( - input: Seq[Seq[U]], - operation: DStream[U] => DStream[V], - expectedOutput: Seq[Seq[V]], - initialNumBatches: Int - ) { - - // Current code assumes that: - // number of inputs = number of outputs = number of batches to be run - val totalNumBatches = input.size - val nextNumBatches = totalNumBatches - initialNumBatches - val initialNumExpectedOutputs = initialNumBatches - val nextNumExpectedOutputs = expectedOutput.size - initialNumExpectedOutputs + 1 - // because the last batch will be processed again - - // Do the computation for initial number of batches, create checkpoint file and quit - ssc = setupStreams[U, V](input, operation) - ssc.start() - val output = advanceTimeWithRealDelay[V](ssc, initialNumBatches) - ssc.stop() - verifyOutput[V](output, expectedOutput.take(initialNumBatches), true) - Thread.sleep(1000) - - // Restart and complete the computation from checkpoint file - logInfo( - "\n-------------------------------------------\n" + - " Restarting stream computation " + - "\n-------------------------------------------\n" - ) - ssc = new StreamingContext(checkpointDir) - ssc.start() - val outputNew = advanceTimeWithRealDelay[V](ssc, nextNumBatches) - // the first element will be re-processed data of the last batch before restart - verifyOutput[V](outputNew, expectedOutput.takeRight(nextNumExpectedOutputs), true) - ssc.stop() - ssc = null - } - /** * Advances the manual clock on the streaming scheduler by given number of batches. * It also waits for the expected amount of time for each batch. diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TrackStateByKeySuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/TrackStateByKeySuite.scala index 58aef74c0040f..89eef2318fdff 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TrackStateByKeySuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TrackStateByKeySuite.scala @@ -25,31 +25,27 @@ import scala.reflect.ClassTag import org.scalatest.PrivateMethodTester._ import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} -import org.apache.spark.streaming.dstream.{InternalTrackStateDStream, TrackStateDStream, TrackStateDStreamImpl} +import org.apache.spark.streaming.dstream.{DStream, InternalTrackStateDStream, TrackStateDStream, TrackStateDStreamImpl} import org.apache.spark.util.{ManualClock, Utils} import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} -class TrackStateByKeySuite extends SparkFunSuite with BeforeAndAfterAll with BeforeAndAfter { +class TrackStateByKeySuite extends SparkFunSuite + with DStreamCheckpointTester with BeforeAndAfterAll with BeforeAndAfter { private var sc: SparkContext = null - private var ssc: StreamingContext = null - private var checkpointDir: File = null - private val batchDuration = Seconds(1) + protected var checkpointDir: File = null + protected val batchDuration = Seconds(1) before { - StreamingContext.getActive().foreach { - _.stop(stopSparkContext = false) - } + StreamingContext.getActive().foreach { _.stop(stopSparkContext = false) } checkpointDir = Utils.createTempDir("checkpoint") - - ssc = new StreamingContext(sc, batchDuration) - ssc.checkpoint(checkpointDir.toString) } after { - StreamingContext.getActive().foreach { - _.stop(stopSparkContext = false) + if (checkpointDir != null) { + Utils.deleteRecursively(checkpointDir) } + StreamingContext.getActive().foreach { _.stop(stopSparkContext = false) } } override def beforeAll(): Unit = { @@ -242,7 +238,7 @@ class TrackStateByKeySuite extends SparkFunSuite with BeforeAndAfterAll with Bef assert(dstreamImpl.stateClass === classOf[Double]) assert(dstreamImpl.emittedClass === classOf[Long]) } - + val ssc = new StreamingContext(sc, batchDuration) val inputStream = new TestInputStream[(String, Int)](ssc, Seq.empty, numPartitions = 2) // Defining StateSpec inline with trackStateByKey and simple function implicitly gets the types @@ -451,8 +447,9 @@ class TrackStateByKeySuite extends SparkFunSuite with BeforeAndAfterAll with Bef expectedCheckpointDuration: Duration, explicitCheckpointDuration: Option[Duration] = None ): Unit = { + val ssc = new StreamingContext(sc, batchDuration) + try { - ssc = new StreamingContext(sc, batchDuration) val inputStream = new TestInputStream(ssc, Seq.empty[Seq[Int]], 2).map(_ -> 1) val dummyFunc = (value: Option[Int], state: State[Int]) => 0 val trackStateStream = inputStream.trackStateByKey(StateSpec.function(dummyFunc)) @@ -462,11 +459,12 @@ class TrackStateByKeySuite extends SparkFunSuite with BeforeAndAfterAll with Bef trackStateStream.checkpoint(d) } trackStateStream.register() + ssc.checkpoint(checkpointDir.toString) ssc.start() // should initialize all the checkpoint durations assert(trackStateStream.checkpointDuration === null) assert(internalTrackStateStream.checkpointDuration === expectedCheckpointDuration) } finally { - StreamingContext.getActive().foreach { _.stop(stopSparkContext = false) } + ssc.stop(stopSparkContext = false) } } @@ -479,6 +477,50 @@ class TrackStateByKeySuite extends SparkFunSuite with BeforeAndAfterAll with Bef testCheckpointDuration(Seconds(10), Seconds(20), Some(Seconds(20))) } + + test("trackStateByKey - drivery failure recovery") { + val inputData = + Seq( + Seq(), + Seq("a"), + Seq("a", "b"), + Seq("a", "b", "c"), + Seq("a", "b"), + Seq("a"), + Seq() + ) + + val stateData = + Seq( + Seq(), + Seq(("a", 1)), + Seq(("a", 2), ("b", 1)), + Seq(("a", 3), ("b", 2), ("c", 1)), + Seq(("a", 4), ("b", 3), ("c", 1)), + Seq(("a", 5), ("b", 3), ("c", 1)), + Seq(("a", 5), ("b", 3), ("c", 1)) + ) + + def operation(dstream: DStream[String]): DStream[(String, Int)] = { + + val checkpointDuration = batchDuration * (stateData.size / 2) + + val runningCount = (value: Option[Int], state: State[Int]) => { + state.update(state.getOption().getOrElse(0) + value.getOrElse(0)) + state.get() + } + + val trackStateStream = dstream.map { _ -> 1 }.trackStateByKey( + StateSpec.function(runningCount)) + // Set internval make sure there is one RDD checkpointing + trackStateStream.checkpoint(checkpointDuration) + trackStateStream.stateSnapshots() + } + + testCheckpointedOperation(inputData, operation, stateData, inputData.size / 2, + batchDuration = batchDuration, stopSparkContextAfterTest = false) + } + private def testOperation[K: ClassTag, S: ClassTag, T: ClassTag]( input: Seq[Seq[K]], trackStateSpec: StateSpec[K, Int, S, T], @@ -500,6 +542,7 @@ class TrackStateByKeySuite extends SparkFunSuite with BeforeAndAfterAll with Bef ): (Seq[Seq[T]], Seq[Seq[(K, S)]]) = { // Setup the stream computation + val ssc = new StreamingContext(sc, Seconds(1)) val inputStream = new TestInputStream(ssc, input, numPartitions = 2) val trackeStateStream = inputStream.map(x => (x, 1)).trackStateByKey(trackStateSpec) val collectedOutputs = new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]] @@ -511,12 +554,14 @@ class TrackStateByKeySuite extends SparkFunSuite with BeforeAndAfterAll with Bef stateSnapshotStream.register() val batchCounter = new BatchCounter(ssc) + ssc.checkpoint(checkpointDir.toString) ssc.start() val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] clock.advance(batchDuration.milliseconds * numBatches) batchCounter.waitUntilBatchesCompleted(numBatches, 10000) + ssc.stop(stopSparkContext = false) (collectedOutputs, collectedStateSnapshots) } From a1d3f753bd9ddb89ea74e67385760b0921af51e9 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 30 Nov 2015 18:35:55 -0800 Subject: [PATCH 2/9] Refactored to address PR comments --- .../streaming/dstream/TrackStateDStream.scala | 10 +++---- .../spark/streaming/rdd/TrackStateRDD.scala | 29 ++++++++++++++++--- .../streaming/TrackStateByKeySuite.scala | 2 +- 3 files changed, 30 insertions(+), 11 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TrackStateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TrackStateDStream.scala index 1a07929ee43e7..ea6213420e7ab 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TrackStateDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TrackStateDStream.scala @@ -138,18 +138,16 @@ class InternalTrackStateDStream[K: ClassTag, V: ClassTag, S: ClassTag, E: ClassT // If the RDD is not partitioned the right way, let us repartition it using the // partition index as the key. This is to ensure that state RDD is always partitioned // before creating another state RDD using it - val kvRDD = rdd.mapPartitions { iter => - iter.map { x => (TaskContext.get().partitionId(), x)} - } - kvRDD.partitionBy(partitioner).mapPartitions(iter => iter.map { _._2 }, - preservesPartitioning = true) + TrackStateRDD.createFromRDD[K, V, S, E]( + rdd.flatMap { _.stateMap.getAll() }, partitioner, validTime) } else { rdd } case None => TrackStateRDD.createFromPairRDD[K, V, S, E]( spec.getInitialStateRDD().getOrElse(new EmptyRDD[(K, S)](ssc.sparkContext)), - partitioner, validTime + partitioner, + validTime ) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/TrackStateRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/TrackStateRDD.scala index 7050378d0feb0..30aafcf1460e3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/TrackStateRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/TrackStateRDD.scala @@ -179,22 +179,43 @@ private[streaming] class TrackStateRDD[K: ClassTag, V: ClassTag, S: ClassTag, E: private[streaming] object TrackStateRDD { - def createFromPairRDD[K: ClassTag, V: ClassTag, S: ClassTag, T: ClassTag]( + def createFromPairRDD[K: ClassTag, V: ClassTag, S: ClassTag, E: ClassTag]( pairRDD: RDD[(K, S)], partitioner: Partitioner, - updateTime: Time): TrackStateRDD[K, V, S, T] = { + updateTime: Time): TrackStateRDD[K, V, S, E] = { val rddOfTrackStateRecords = pairRDD.partitionBy(partitioner).mapPartitions ({ iterator => val stateMap = StateMap.create[K, S](SparkEnv.get.conf) iterator.foreach { case (key, state) => stateMap.put(key, state, updateTime.milliseconds) } - Iterator(TrackStateRDDRecord(stateMap, Seq.empty[T])) + Iterator(TrackStateRDDRecord(stateMap, Seq.empty[E])) }, preservesPartitioning = true) val emptyDataRDD = pairRDD.sparkContext.emptyRDD[(K, V)].partitionBy(partitioner) val noOpFunc = (time: Time, key: K, value: Option[V], state: State[S]) => None - new TrackStateRDD[K, V, S, T](rddOfTrackStateRecords, emptyDataRDD, noOpFunc, updateTime, None) + new TrackStateRDD[K, V, S, E](rddOfTrackStateRecords, emptyDataRDD, noOpFunc, updateTime, None) + } + + def createFromRDD[K: ClassTag, V: ClassTag, S: ClassTag, E: ClassTag]( + rdd: RDD[(K, S, Long)], + partitioner: Partitioner, + updateTime: Time): TrackStateRDD[K, V, S, E] = { + + val pairRDD = rdd.map { x => (x._1, (x._2, x._3)) } + val rddOfTrackStateRecords = pairRDD.partitionBy(partitioner).mapPartitions({ iterator => + val stateMap = StateMap.create[K, S](SparkEnv.get.conf) + iterator.foreach { case (key, (state, updateTime)) => + stateMap.put(key, state, updateTime) + } + Iterator(TrackStateRDDRecord(stateMap, Seq.empty[E])) + }, preservesPartitioning = true) + + val emptyDataRDD = pairRDD.sparkContext.emptyRDD[(K, V)].partitionBy(partitioner) + + val noOpFunc = (time: Time, key: K, value: Option[V], state: State[S]) => None + + new TrackStateRDD[K, V, S, E](rddOfTrackStateRecords, emptyDataRDD, noOpFunc, updateTime, None) } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TrackStateByKeySuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/TrackStateByKeySuite.scala index 89eef2318fdff..1fc320d31b18b 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TrackStateByKeySuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TrackStateByKeySuite.scala @@ -478,7 +478,7 @@ class TrackStateByKeySuite extends SparkFunSuite } - test("trackStateByKey - drivery failure recovery") { + test("trackStateByKey - driver failure recovery") { val inputData = Seq( Seq(), From f4fb5578b6cede9bf8faf20d6f495aa0f581b4e5 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 1 Dec 2015 14:28:19 -0800 Subject: [PATCH 3/9] Revering a line --- .../test/scala/org/apache/spark/streaming/CheckpointSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index bdab96cd95855..101059e11e406 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -172,7 +172,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester { override def afterFunction() { super.afterFunction() - StreamingContext.getActive().foreach { _.stop() } + if (ssc != null) { ssc.stop() } Utils.deleteRecursively(new File(checkpointDir)) } From d3da04f4d353bbb391b1b696d3a75164c80e38e2 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 1 Dec 2015 19:08:31 -0800 Subject: [PATCH 4/9] Fixed errors --- .../org/apache/spark/util/SerializableJobConf.scala | 2 ++ .../streaming/dstream/PairDStreamFunctions.scala | 3 ++- .../org/apache/spark/streaming/CheckpointSuite.scala | 12 +++++++----- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/SerializableJobConf.scala b/core/src/main/scala/org/apache/spark/util/SerializableJobConf.scala index cadae472b3f85..4d8869028fe51 100644 --- a/core/src/main/scala/org/apache/spark/util/SerializableJobConf.scala +++ b/core/src/main/scala/org/apache/spark/util/SerializableJobConf.scala @@ -19,6 +19,8 @@ package org.apache.spark.util import java.io.{ObjectInputStream, ObjectOutputStream} +import scala.util.control.NonFatal + import org.apache.hadoop.mapred.JobConf private[spark] diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala index fb691eed27e32..2762309134eb1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala @@ -730,7 +730,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)]) val serializableConf = new SerializableJobConf(conf) val saveFunc = (rdd: RDD[(K, V)], time: Time) => { val file = rddToFileName(prefix, suffix, time) - rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, serializableConf.value) + rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, + new JobConf(serializableConf.value)) } self.foreachRDD(saveFunc) } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 101059e11e406..e8a4828c27b71 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -59,7 +59,7 @@ trait DStreamCheckpointTester { self: SparkFunSuite => operation: DStream[U] => DStream[V], expectedOutput: Seq[Seq[V]], numBatchesBeforeRestart: Int, - batchDuration: Duration = Seconds(1), + batchDuration: Duration = Milliseconds(500), stopSparkContextAfterTest: Boolean = true ) { require(numBatchesBeforeRestart < expectedOutput.size, @@ -91,7 +91,7 @@ trait DStreamCheckpointTester { self: SparkFunSuite => ssc.checkpoint(checkpointDir) // Do the computation for initial number of batches, create checkpoint file and quit - generateAndAssertOutput[V](ssc, batchDuration, checkpointDir, + generateAndAssertOutput[V](ssc, batchDuration, checkpointDir, numBatchesBeforeRestart, expectedOutput.take(numBatchesBeforeRestart), stopSparkContextAfterTest) // Restart and complete the computation from checkpoint file @@ -101,24 +101,26 @@ trait DStreamCheckpointTester { self: SparkFunSuite => "\n-------------------------------------------\n" ) val restartedSsc = new StreamingContext(checkpointDir) - generateAndAssertOutput[V](restartedSsc, batchDuration, checkpointDir, + generateAndAssertOutput[V](restartedSsc, batchDuration, checkpointDir, nextNumBatches, expectedOutput.takeRight(nextNumExpectedOutputs), stopSparkContextAfterTest) } protected def createContextForCheckpointOperation(batchDuration: Duration): StreamingContext = { val conf = new SparkConf().setMaster("local").setAppName(this.getClass.getSimpleName) conf.set("spark.streaming.clock", classOf[ManualClock].getName()) - new StreamingContext(SparkContext.getOrCreate(conf), Seconds(1)) + new StreamingContext(SparkContext.getOrCreate(conf), batchDuration) } private def generateAndAssertOutput[V: ClassTag]( ssc: StreamingContext, batchDuration: Duration, checkpointDir: String, + numBatchesToRun: Int, expectedOutput: Seq[Seq[V]], stopSparkContext: Boolean ) { try { + val batchCounter = new BatchCounter(ssc) ssc.start() val numBatches = expectedOutput.size val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] @@ -132,7 +134,7 @@ trait DStreamCheckpointTester { self: SparkFunSuite => eventually(timeout(10 seconds)) { ssc.awaitTerminationOrTimeout(10) - assert(outputStream.output.size === expectedOutput.size) + assert(batchCounter.getNumCompletedBatches === numBatchesToRun) } eventually(timeout(10 seconds)) { From 42b35b7041a95537a378b6fdb132cd59f646dd6e Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 1 Dec 2015 19:09:57 -0800 Subject: [PATCH 5/9] Revert unnecessary change --- .../main/scala/org/apache/spark/util/SerializableJobConf.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/SerializableJobConf.scala b/core/src/main/scala/org/apache/spark/util/SerializableJobConf.scala index 4d8869028fe51..cadae472b3f85 100644 --- a/core/src/main/scala/org/apache/spark/util/SerializableJobConf.scala +++ b/core/src/main/scala/org/apache/spark/util/SerializableJobConf.scala @@ -19,8 +19,6 @@ package org.apache.spark.util import java.io.{ObjectInputStream, ObjectOutputStream} -import scala.util.control.NonFatal - import org.apache.hadoop.mapred.JobConf private[spark] From 34a52f9d95c92ff6db6f7bc90e981b9052f59390 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 2 Dec 2015 13:14:59 -0800 Subject: [PATCH 6/9] Minor changes --- .../spark/streaming/CheckpointSuite.scala | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index e8a4828c27b71..d260995d5d970 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -85,6 +85,7 @@ trait DStreamCheckpointTester { self: SparkFunSuite => val inputStream = new TestInputStream(ssc, input, numPartitions = 2) val operatedStream = operation(inputStream) + operatedStream.print() val outputStream = new TestOutputStreamWithPartitions(operatedStream, new ArrayBuffer[Seq[Seq[V]]] with SynchronizedBuffer[Seq[Seq[V]]]) outputStream.register() @@ -138,17 +139,19 @@ trait DStreamCheckpointTester { self: SparkFunSuite => } eventually(timeout(10 seconds)) { - Checkpoint.getCheckpointFiles(checkpointDir).exists { + val checkpointFilesOfLatestTime = Checkpoint.getCheckpointFiles(checkpointDir).filter { _.toString.contains(clock.getTimeMillis.toString) } + // Checkpoint files are written twice for every batch interval. So assert that both + // are written to make sure that both of them have been written. + assert(checkpointFilesOfLatestTime.size === 2) } val output = outputStream.output.map(_.flatten) - assert( - output.zip(expectedOutput).forall { case (o, e) => o.toSet === e.toSet }, - s"Set comparison failed\n" + - s"Expected output (${expectedOutput.size} items):\n${expectedOutput.mkString("\n")}\n" + - s"Generated output (${output.size} items): ${output.mkString("\n")}" + val setComparison = output.zip(expectedOutput).forall { case (o, e) => o.toSet === e.toSet } + assert(setComparison, s"set comparison failed\n" + + s"Expected output (${expectedOutput.size} items):\n${expectedOutput.mkString("\n")}\n" + + s"Generated output (${output.size} items): ${output.mkString("\n")}" ) } finally { ssc.stop(stopSparkContext = stopSparkContext) @@ -369,7 +372,9 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester { Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), - Seq(("", 2)), Seq() ), + Seq(("", 2)), + Seq() + ), 3 ) } From 96e95abe15e144c43a93dad10d741d079b0a7e10 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 2 Dec 2015 18:38:46 -0800 Subject: [PATCH 7/9] More debugging --- .../scala/org/apache/spark/streaming/Checkpoint.scala | 2 +- streaming/src/test/resources/log4j.properties | 1 + .../org/apache/spark/streaming/CheckpointSuite.scala | 10 +++++++--- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index fd0e8d5d690b6..d0046afdeb447 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -277,7 +277,7 @@ class CheckpointWriter( val bytes = Checkpoint.serialize(checkpoint, conf) executor.execute(new CheckpointWriteHandler( checkpoint.checkpointTime, bytes, clearCheckpointDataLater)) - logDebug("Submitted checkpoint of time " + checkpoint.checkpointTime + " writer queue") + logInfo("Submitted checkpoint of time " + checkpoint.checkpointTime + " writer queue") } catch { case rej: RejectedExecutionException => logError("Could not submit checkpoint task to the thread pool executor", rej) diff --git a/streaming/src/test/resources/log4j.properties b/streaming/src/test/resources/log4j.properties index 75e3b53a093f6..a295f56523254 100644 --- a/streaming/src/test/resources/log4j.properties +++ b/streaming/src/test/resources/log4j.properties @@ -25,4 +25,5 @@ log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{ # Ignore messages below warning level from Jetty, because it's a bit verbose log4j.logger.org.spark-project.jetty=WARN +log4j.appender.org.apache.spark.streaming=DEBUG diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index d260995d5d970..cec8755e4807a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -96,11 +96,13 @@ trait DStreamCheckpointTester { self: SparkFunSuite => expectedOutput.take(numBatchesBeforeRestart), stopSparkContextAfterTest) // Restart and complete the computation from checkpoint file - logInfo( + // scalastyle:off println + print( "\n-------------------------------------------\n" + " Restarting stream computation " + "\n-------------------------------------------\n" ) + // scalastyle:on println val restartedSsc = new StreamingContext(checkpointDir) generateAndAssertOutput[V](restartedSsc, batchDuration, checkpointDir, nextNumBatches, expectedOutput.takeRight(nextNumExpectedOutputs), stopSparkContextAfterTest) @@ -125,9 +127,11 @@ trait DStreamCheckpointTester { self: SparkFunSuite => ssc.start() val numBatches = expectedOutput.size val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - logDebug("Manual clock before advancing = " + clock.getTimeMillis()) + // scalastyle:off println + logInfo("Manual clock before advancing = " + clock.getTimeMillis()) clock.advance((batchDuration * numBatches).milliseconds) - logDebug("Manual clock after advancing = " + clock.getTimeMillis()) + logInfo("Manual clock after advancing = " + clock.getTimeMillis()) + // scalastyle:on println val outputStream = ssc.graph.getOutputStreams().filter { dstream => dstream.isInstanceOf[TestOutputStreamWithPartitions[V]] From 53846f56eaa5f423cae44c7e287971325703b12d Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 3 Dec 2015 01:25:14 -0800 Subject: [PATCH 8/9] Possibly fixed flakiness --- .../streaming/scheduler/JobGenerator.scala | 3 +- .../spark/streaming/CheckpointSuite.scala | 67 ++++++++++--------- .../spark/streaming/TestSuiteBase.scala | 6 ++ 3 files changed, 45 insertions(+), 31 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 2de035d166e7b..32efe9210cde2 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -220,7 +220,8 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { logInfo("Batches pending processing (" + pendingTimes.size + " batches): " + pendingTimes.mkString(", ")) // Reschedule jobs for these times - val timesToReschedule = (pendingTimes ++ downTimes).distinct.sorted(Time.ordering) + val timesToReschedule = (pendingTimes ++ downTimes).filter { _ != restartTime } + .distinct.sorted(Time.ordering) logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " + timesToReschedule.mkString(", ")) timesToReschedule.foreach { time => diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index cec8755e4807a..cd28d3cf408d5 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -68,13 +68,9 @@ trait DStreamCheckpointTester { self: SparkFunSuite => require(StreamingContext.getActive().isEmpty, "Cannot run test with already active streaming context") - // Current code assumes that: - // number of inputs = number of outputs = number of batches to be run + // Current code assumes that number of batches to be run = number of inputs val totalNumBatches = input.size - val nextNumBatches = totalNumBatches - numBatchesBeforeRestart - val initialNumExpectedOutputs = numBatchesBeforeRestart - val nextNumExpectedOutputs = expectedOutput.size - initialNumExpectedOutputs + 1 - // because the last batch will be processed again + val batchDurationMillis = batchDuration.milliseconds // Setup the stream computation val checkpointDir = Utils.createTempDir(this.getClass.getSimpleName()).toString @@ -92,20 +88,20 @@ trait DStreamCheckpointTester { self: SparkFunSuite => ssc.checkpoint(checkpointDir) // Do the computation for initial number of batches, create checkpoint file and quit - generateAndAssertOutput[V](ssc, batchDuration, checkpointDir, numBatchesBeforeRestart, - expectedOutput.take(numBatchesBeforeRestart), stopSparkContextAfterTest) - + val beforeRestartOutput = generateOutput[V](ssc, + Time(batchDurationMillis * numBatchesBeforeRestart), checkpointDir, stopSparkContextAfterTest) + assertOutput(beforeRestartOutput, expectedOutput, beforeRestart = true) // Restart and complete the computation from checkpoint file - // scalastyle:off println - print( + logInfo( "\n-------------------------------------------\n" + " Restarting stream computation " + "\n-------------------------------------------\n" ) - // scalastyle:on println + val restartedSsc = new StreamingContext(checkpointDir) - generateAndAssertOutput[V](restartedSsc, batchDuration, checkpointDir, nextNumBatches, - expectedOutput.takeRight(nextNumExpectedOutputs), stopSparkContextAfterTest) + val afterRestartOutput = generateOutput[V](restartedSsc, + Time(batchDurationMillis * totalNumBatches), checkpointDir, stopSparkContextAfterTest) + assertOutput(afterRestartOutput, expectedOutput, beforeRestart = false) } protected def createContextForCheckpointOperation(batchDuration: Duration): StreamingContext = { @@ -114,24 +110,22 @@ trait DStreamCheckpointTester { self: SparkFunSuite => new StreamingContext(SparkContext.getOrCreate(conf), batchDuration) } - private def generateAndAssertOutput[V: ClassTag]( + private def generateOutput[V: ClassTag]( ssc: StreamingContext, - batchDuration: Duration, + targetBatchTime: Time, checkpointDir: String, - numBatchesToRun: Int, - expectedOutput: Seq[Seq[V]], stopSparkContext: Boolean - ) { + ): Seq[Seq[V]] = { try { + val batchDuration = ssc.graph.batchDuration val batchCounter = new BatchCounter(ssc) ssc.start() - val numBatches = expectedOutput.size val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - // scalastyle:off println + val currentTime = clock.getTimeMillis() + logInfo("Manual clock before advancing = " + clock.getTimeMillis()) - clock.advance((batchDuration * numBatches).milliseconds) + clock.setTime(targetBatchTime.milliseconds) logInfo("Manual clock after advancing = " + clock.getTimeMillis()) - // scalastyle:on println val outputStream = ssc.graph.getOutputStreams().filter { dstream => dstream.isInstanceOf[TestOutputStreamWithPartitions[V]] @@ -139,7 +133,7 @@ trait DStreamCheckpointTester { self: SparkFunSuite => eventually(timeout(10 seconds)) { ssc.awaitTerminationOrTimeout(10) - assert(batchCounter.getNumCompletedBatches === numBatchesToRun) + assert(batchCounter.getLastCompletedBatchTime === targetBatchTime) } eventually(timeout(10 seconds)) { @@ -150,17 +144,30 @@ trait DStreamCheckpointTester { self: SparkFunSuite => // are written to make sure that both of them have been written. assert(checkpointFilesOfLatestTime.size === 2) } + outputStream.output.map(_.flatten) - val output = outputStream.output.map(_.flatten) - val setComparison = output.zip(expectedOutput).forall { case (o, e) => o.toSet === e.toSet } - assert(setComparison, s"set comparison failed\n" + - s"Expected output (${expectedOutput.size} items):\n${expectedOutput.mkString("\n")}\n" + - s"Generated output (${output.size} items): ${output.mkString("\n")}" - ) } finally { ssc.stop(stopSparkContext = stopSparkContext) } } + + private def assertOutput[V: ClassTag]( + output: Seq[Seq[V]], + expectedOutput: Seq[Seq[V]], + beforeRestart: Boolean): Unit = { + val expectedPartialOutput = if (beforeRestart) { + expectedOutput.take(output.size) + } else { + expectedOutput.takeRight(output.size) + } + val setComparison = output.zip(expectedPartialOutput).forall { + case (o, e) => o.toSet === e.toSet + } + assert(setComparison, s"set comparison failed\n" + + s"Expected output items:\n${expectedPartialOutput.mkString("\n")}\n" + + s"Generated output items: ${output.mkString("\n")}" + ) + } } /** diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index a45c92d9c7bc8..be0f4636a6cb8 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -142,6 +142,7 @@ class BatchCounter(ssc: StreamingContext) { // All access to this state should be guarded by `BatchCounter.this.synchronized` private var numCompletedBatches = 0 private var numStartedBatches = 0 + private var lastCompletedBatchTime: Time = null private val listener = new StreamingListener { override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = @@ -152,6 +153,7 @@ class BatchCounter(ssc: StreamingContext) { override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = BatchCounter.this.synchronized { numCompletedBatches += 1 + lastCompletedBatchTime = batchCompleted.batchInfo.batchTime BatchCounter.this.notifyAll() } } @@ -165,6 +167,10 @@ class BatchCounter(ssc: StreamingContext) { numStartedBatches } + def getLastCompletedBatchTime: Time = this.synchronized { + lastCompletedBatchTime + } + /** * Wait until `expectedNumCompletedBatches` batches are completed, or timeout. Return true if * `expectedNumCompletedBatches` batches are completed. Otherwise, return false to indicate it's From fd6b83e617c357949bfa44e135c0ba63e1502b29 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 7 Dec 2015 00:38:01 -0800 Subject: [PATCH 9/9] revert log4j.prop --- streaming/src/test/resources/log4j.properties | 1 - 1 file changed, 1 deletion(-) diff --git a/streaming/src/test/resources/log4j.properties b/streaming/src/test/resources/log4j.properties index a295f56523254..75e3b53a093f6 100644 --- a/streaming/src/test/resources/log4j.properties +++ b/streaming/src/test/resources/log4j.properties @@ -25,5 +25,4 @@ log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{ # Ignore messages below warning level from Jetty, because it's a bit verbose log4j.logger.org.spark-project.jetty=WARN -log4j.appender.org.apache.spark.streaming=DEBUG