From 2d35dfacd54d747e6a4167d46234d4b3ce87529b Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Fri, 25 May 2018 14:52:36 +0200 Subject: [PATCH 1/2] [SPARK-23991][DSTREAMS] Fix data loss when WAL write fails in allocateBlocksToBatch --- .../scheduler/ReceivedBlockTracker.scala | 5 +- .../streaming/ReceivedBlockTrackerSuite.scala | 54 ++++++++++++++++++- 2 files changed, 56 insertions(+), 3 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala index dacff69d55dd2..46e6902c9afd6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala @@ -112,10 +112,13 @@ private[streaming] class ReceivedBlockTracker( def allocateBlocksToBatch(batchTime: Time): Unit = synchronized { if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) { val streamIdToBlocks = streamIds.map { streamId => - (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true)) + (streamId, getReceivedBlockQueue(streamId).clone()) }.toMap val allocatedBlocks = AllocatedBlocks(streamIdToBlocks) if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) { + streamIds.foreach { + getReceivedBlockQueue(_).clear() + } timeToAllocatedBlocks.put(batchTime, allocatedBlocks) lastAllocatedBatchTime = batchTime } else { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala index 4fa236bd39663..6e62d1e0a52a7 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala @@ -26,10 +26,12 @@ import scala.language.{implicitConversions, postfixOps} import scala.util.Random import org.apache.hadoop.conf.Configuration +import org.mockito.Matchers.any +import org.mockito.Mockito.{doThrow, reset, spy} import org.scalatest.{BeforeAndAfter, Matchers} import org.scalatest.concurrent.Eventually._ -import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} +import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.internal.Logging import org.apache.spark.storage.StreamBlockId import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult @@ -115,6 +117,50 @@ class ReceivedBlockTrackerSuite tracker2.stop() } + test("block allocation to batch should not loose blocks from received queue") { + val tracker1 = createTracker(createSpyTracker = true) + tracker1.isWriteAheadLogEnabled should be (true) + tracker1.getUnallocatedBlocks(streamId) shouldEqual Seq.empty + + // Add blocks + val blockInfos = generateBlockInfos() + blockInfos.map(tracker1.addBlock) + tracker1.getUnallocatedBlocks(streamId) shouldEqual blockInfos + + // Try to allocate the blocks to a batch and verify that it's failing + // The blocks should stay in the received queue when WAL write failing + doThrow(new RuntimeException("Not able to write BatchAllocationEvent")) + .when(tracker1).writeToLog(any(classOf[BatchAllocationEvent])) + try { + tracker1.allocateBlocksToBatch(1) + assert(false) + } catch { + case _: RuntimeException => + // Nothing to do here + } + tracker1.getUnallocatedBlocks(streamId) shouldEqual blockInfos + tracker1.getBlocksOfBatch(1) shouldEqual Map.empty + tracker1.getBlocksOfBatchAndStream(1, streamId) shouldEqual Seq.empty + + // Allocate the blocks to a batch and verify that all of them have been allocated + reset(tracker1) + tracker1.allocateBlocksToBatch(2) + tracker1.getUnallocatedBlocks(streamId) shouldEqual Seq.empty + tracker1.hasUnallocatedReceivedBlocks should be (false) + tracker1.getBlocksOfBatch(2) shouldEqual Map(streamId -> blockInfos) + tracker1.getBlocksOfBatchAndStream(2, streamId) shouldEqual blockInfos + + tracker1.stop() + + // Recover from WAL to see the correctness + val tracker2 = createTracker(recoverFromWriteAheadLog = true) + tracker2.getUnallocatedBlocks(streamId) shouldEqual Seq.empty + tracker2.hasUnallocatedReceivedBlocks should be (false) + tracker2.getBlocksOfBatch(2) shouldEqual Map(streamId -> blockInfos) + tracker2.getBlocksOfBatchAndStream(2, streamId) shouldEqual blockInfos + tracker2.stop() + } + test("recovery and cleanup with write ahead logs") { val manualClock = new ManualClock // Set the time increment level to twice the rotation interval so that every increment creates @@ -308,12 +354,16 @@ class ReceivedBlockTrackerSuite * want to control time by manually incrementing it to test log clean. */ def createTracker( + createSpyTracker: Boolean = false, setCheckpointDir: Boolean = true, recoverFromWriteAheadLog: Boolean = false, clock: Clock = new SystemClock): ReceivedBlockTracker = { val cpDirOption = if (setCheckpointDir) Some(checkpointDirectory.toString) else None - val tracker = new ReceivedBlockTracker( + var tracker = new ReceivedBlockTracker( conf, hadoopConf, Seq(streamId), clock, recoverFromWriteAheadLog, cpDirOption) + if (createSpyTracker) { + tracker = spy(tracker) + } allReceivedBlockTrackers += tracker tracker } From 7233e8033e97b0903194122e6f5304653fb00e15 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Mon, 28 May 2018 09:26:42 +0200 Subject: [PATCH 2/2] Review fix. --- .../streaming/scheduler/ReceivedBlockTracker.scala | 4 +--- .../spark/streaming/ReceivedBlockTrackerSuite.scala | 13 +++---------- 2 files changed, 4 insertions(+), 13 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala index 46e6902c9afd6..cf4324578ea87 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala @@ -116,9 +116,7 @@ private[streaming] class ReceivedBlockTracker( }.toMap val allocatedBlocks = AllocatedBlocks(streamIdToBlocks) if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) { - streamIds.foreach { - getReceivedBlockQueue(_).clear() - } + streamIds.foreach(getReceivedBlockQueue(_).clear()) timeToAllocatedBlocks.put(batchTime, allocatedBlocks) lastAllocatedBatchTime = batchTime } else { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala index 6e62d1e0a52a7..fd7e00b1de25f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala @@ -118,7 +118,7 @@ class ReceivedBlockTrackerSuite } test("block allocation to batch should not loose blocks from received queue") { - val tracker1 = createTracker(createSpyTracker = true) + val tracker1 = spy(createTracker()) tracker1.isWriteAheadLogEnabled should be (true) tracker1.getUnallocatedBlocks(streamId) shouldEqual Seq.empty @@ -131,13 +131,10 @@ class ReceivedBlockTrackerSuite // The blocks should stay in the received queue when WAL write failing doThrow(new RuntimeException("Not able to write BatchAllocationEvent")) .when(tracker1).writeToLog(any(classOf[BatchAllocationEvent])) - try { + val errMsg = intercept[RuntimeException] { tracker1.allocateBlocksToBatch(1) - assert(false) - } catch { - case _: RuntimeException => - // Nothing to do here } + assert(errMsg.getMessage === "Not able to write BatchAllocationEvent") tracker1.getUnallocatedBlocks(streamId) shouldEqual blockInfos tracker1.getBlocksOfBatch(1) shouldEqual Map.empty tracker1.getBlocksOfBatchAndStream(1, streamId) shouldEqual Seq.empty @@ -354,16 +351,12 @@ class ReceivedBlockTrackerSuite * want to control time by manually incrementing it to test log clean. */ def createTracker( - createSpyTracker: Boolean = false, setCheckpointDir: Boolean = true, recoverFromWriteAheadLog: Boolean = false, clock: Clock = new SystemClock): ReceivedBlockTracker = { val cpDirOption = if (setCheckpointDir) Some(checkpointDirectory.toString) else None var tracker = new ReceivedBlockTracker( conf, hadoopConf, Seq(streamId), clock, recoverFromWriteAheadLog, cpDirOption) - if (createSpyTracker) { - tracker = spy(tracker) - } allReceivedBlockTrackers += tracker tracker }