-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23991][DSTREAMS] Fix data loss when WAL write fails in allocateBlocksToBatch #21430
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,10 +26,12 @@ import scala.language.{implicitConversions, postfixOps} | |
import scala.util.Random | ||
|
||
import org.apache.hadoop.conf.Configuration | ||
import org.mockito.Matchers.any | ||
import org.mockito.Mockito.{doThrow, reset, spy} | ||
import org.scalatest.{BeforeAndAfter, Matchers} | ||
import org.scalatest.concurrent.Eventually._ | ||
|
||
import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} | ||
import org.apache.spark.{SparkConf, SparkFunSuite} | ||
import org.apache.spark.internal.Logging | ||
import org.apache.spark.storage.StreamBlockId | ||
import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult | ||
|
@@ -115,6 +117,50 @@ class ReceivedBlockTrackerSuite | |
tracker2.stop() | ||
} | ||
|
||
test("block allocation to batch should not loose blocks from received queue") { | ||
val tracker1 = createTracker(createSpyTracker = true) | ||
tracker1.isWriteAheadLogEnabled should be (true) | ||
tracker1.getUnallocatedBlocks(streamId) shouldEqual Seq.empty | ||
|
||
// Add blocks | ||
val blockInfos = generateBlockInfos() | ||
blockInfos.map(tracker1.addBlock) | ||
tracker1.getUnallocatedBlocks(streamId) shouldEqual blockInfos | ||
|
||
// Try to allocate the blocks to a batch and verify that it's failing | ||
// The blocks should stay in the received queue when WAL write failing | ||
doThrow(new RuntimeException("Not able to write BatchAllocationEvent")) | ||
.when(tracker1).writeToLog(any(classOf[BatchAllocationEvent])) | ||
try { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed. |
||
tracker1.allocateBlocksToBatch(1) | ||
assert(false) | ||
} catch { | ||
case _: RuntimeException => | ||
// Nothing to do here | ||
} | ||
tracker1.getUnallocatedBlocks(streamId) shouldEqual blockInfos | ||
tracker1.getBlocksOfBatch(1) shouldEqual Map.empty | ||
tracker1.getBlocksOfBatchAndStream(1, streamId) shouldEqual Seq.empty | ||
|
||
// Allocate the blocks to a batch and verify that all of them have been allocated | ||
reset(tracker1) | ||
tracker1.allocateBlocksToBatch(2) | ||
tracker1.getUnallocatedBlocks(streamId) shouldEqual Seq.empty | ||
tracker1.hasUnallocatedReceivedBlocks should be (false) | ||
tracker1.getBlocksOfBatch(2) shouldEqual Map(streamId -> blockInfos) | ||
tracker1.getBlocksOfBatchAndStream(2, streamId) shouldEqual blockInfos | ||
|
||
tracker1.stop() | ||
|
||
// Recover from WAL to see the correctness | ||
val tracker2 = createTracker(recoverFromWriteAheadLog = true) | ||
tracker2.getUnallocatedBlocks(streamId) shouldEqual Seq.empty | ||
tracker2.hasUnallocatedReceivedBlocks should be (false) | ||
tracker2.getBlocksOfBatch(2) shouldEqual Map(streamId -> blockInfos) | ||
tracker2.getBlocksOfBatchAndStream(2, streamId) shouldEqual blockInfos | ||
tracker2.stop() | ||
} | ||
|
||
test("recovery and cleanup with write ahead logs") { | ||
val manualClock = new ManualClock | ||
// Set the time increment level to twice the rotation interval so that every increment creates | ||
|
@@ -308,12 +354,16 @@ class ReceivedBlockTrackerSuite | |
* want to control time by manually incrementing it to test log clean. | ||
*/ | ||
def createTracker( | ||
createSpyTracker: Boolean = false, | ||
setCheckpointDir: Boolean = true, | ||
recoverFromWriteAheadLog: Boolean = false, | ||
clock: Clock = new SystemClock): ReceivedBlockTracker = { | ||
val cpDirOption = if (setCheckpointDir) Some(checkpointDirectory.toString) else None | ||
val tracker = new ReceivedBlockTracker( | ||
var tracker = new ReceivedBlockTracker( | ||
conf, hadoopConf, Seq(streamId), clock, recoverFromWriteAheadLog, cpDirOption) | ||
if (createSpyTracker) { | ||
tracker = spy(tracker) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not do this in the caller? Less code and cleaner. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed. |
||
} | ||
allReceivedBlockTrackers += tracker | ||
tracker | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use parentheses when using placeholders (
_
), braces otherwise.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed.