-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-11932][STREAMING] Partition previous TrackStateRDD if partitioner not present #9988
Closed
Closed
Changes from 9 commits
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
0c5fe55
Partition previous state RDD if partitioner not present
tdas a1d3f75
Refactored to address PR comments
tdas f4fb557
Revering a line
tdas d3da04f
Fixed errors
tdas 42b35b7
Revert unnecessary change
tdas 34a52f9
Minor changes
tdas 96e95ab
More debugging
tdas 53846f5
Possibly fixed flakiness
tdas 3136f27
Merge remote-tracking branch 'apache-github/master' into SPARK-11932
tdas fd6b83e
revert log4j.prop
tdas File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,4 +25,5 @@ log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{ | |
|
||
# Ignore messages below warning level from Jetty, because it's a bit verbose | ||
log4j.logger.org.spark-project.jetty=WARN | ||
log4j.appender.org.apache.spark.streaming=DEBUG | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: should revert this |
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,17 +33,149 @@ import org.mockito.Mockito.mock | |
import org.scalatest.concurrent.Eventually._ | ||
import org.scalatest.time.SpanSugar._ | ||
|
||
import org.apache.spark.TestUtils | ||
import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite, TestUtils} | ||
import org.apache.spark.streaming.dstream.{DStream, FileInputDStream} | ||
import org.apache.spark.streaming.scheduler._ | ||
import org.apache.spark.util.{MutableURLClassLoader, Clock, ManualClock, Utils} | ||
|
||
/** | ||
* A trait of that can be mixed in to get methods for testing DStream operations under | ||
* DStream checkpointing. Note that the implementations of this trait has to implement | ||
* the `setupCheckpointOperation` | ||
*/ | ||
trait DStreamCheckpointTester { self: SparkFunSuite => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is refactoring where I extract out the |
||
|
||
/** | ||
* Tests a streaming operation under checkpointing, by restarting the operation | ||
* from checkpoint file and verifying whether the final output is correct. | ||
* The output is assumed to have come from a reliable queue which an replay | ||
* data as required. | ||
* | ||
* NOTE: This takes into consideration that the last batch processed before | ||
* master failure will be re-processed after restart/recovery. | ||
*/ | ||
protected def testCheckpointedOperation[U: ClassTag, V: ClassTag]( | ||
input: Seq[Seq[U]], | ||
operation: DStream[U] => DStream[V], | ||
expectedOutput: Seq[Seq[V]], | ||
numBatchesBeforeRestart: Int, | ||
batchDuration: Duration = Milliseconds(500), | ||
stopSparkContextAfterTest: Boolean = true | ||
) { | ||
require(numBatchesBeforeRestart < expectedOutput.size, | ||
"Number of batches before context restart less than number of expected output " + | ||
"(i.e. number of total batches to run)") | ||
require(StreamingContext.getActive().isEmpty, | ||
"Cannot run test with already active streaming context") | ||
|
||
// Current code assumes that number of batches to be run = number of inputs | ||
val totalNumBatches = input.size | ||
val batchDurationMillis = batchDuration.milliseconds | ||
|
||
// Setup the stream computation | ||
val checkpointDir = Utils.createTempDir(this.getClass.getSimpleName()).toString | ||
logDebug(s"Using checkpoint directory $checkpointDir") | ||
val ssc = createContextForCheckpointOperation(batchDuration) | ||
require(ssc.conf.get("spark.streaming.clock") === classOf[ManualClock].getName, | ||
"Cannot run test without manual clock in the conf") | ||
|
||
val inputStream = new TestInputStream(ssc, input, numPartitions = 2) | ||
val operatedStream = operation(inputStream) | ||
operatedStream.print() | ||
val outputStream = new TestOutputStreamWithPartitions(operatedStream, | ||
new ArrayBuffer[Seq[Seq[V]]] with SynchronizedBuffer[Seq[Seq[V]]]) | ||
outputStream.register() | ||
ssc.checkpoint(checkpointDir) | ||
|
||
// Do the computation for initial number of batches, create checkpoint file and quit | ||
val beforeRestartOutput = generateOutput[V](ssc, | ||
Time(batchDurationMillis * numBatchesBeforeRestart), checkpointDir, stopSparkContextAfterTest) | ||
assertOutput(beforeRestartOutput, expectedOutput, beforeRestart = true) | ||
// Restart and complete the computation from checkpoint file | ||
logInfo( | ||
"\n-------------------------------------------\n" + | ||
" Restarting stream computation " + | ||
"\n-------------------------------------------\n" | ||
) | ||
|
||
val restartedSsc = new StreamingContext(checkpointDir) | ||
val afterRestartOutput = generateOutput[V](restartedSsc, | ||
Time(batchDurationMillis * totalNumBatches), checkpointDir, stopSparkContextAfterTest) | ||
assertOutput(afterRestartOutput, expectedOutput, beforeRestart = false) | ||
} | ||
|
||
protected def createContextForCheckpointOperation(batchDuration: Duration): StreamingContext = { | ||
val conf = new SparkConf().setMaster("local").setAppName(this.getClass.getSimpleName) | ||
conf.set("spark.streaming.clock", classOf[ManualClock].getName()) | ||
new StreamingContext(SparkContext.getOrCreate(conf), batchDuration) | ||
} | ||
|
||
private def generateOutput[V: ClassTag]( | ||
ssc: StreamingContext, | ||
targetBatchTime: Time, | ||
checkpointDir: String, | ||
stopSparkContext: Boolean | ||
): Seq[Seq[V]] = { | ||
try { | ||
val batchDuration = ssc.graph.batchDuration | ||
val batchCounter = new BatchCounter(ssc) | ||
ssc.start() | ||
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] | ||
val currentTime = clock.getTimeMillis() | ||
|
||
logInfo("Manual clock before advancing = " + clock.getTimeMillis()) | ||
clock.setTime(targetBatchTime.milliseconds) | ||
logInfo("Manual clock after advancing = " + clock.getTimeMillis()) | ||
|
||
val outputStream = ssc.graph.getOutputStreams().filter { dstream => | ||
dstream.isInstanceOf[TestOutputStreamWithPartitions[V]] | ||
}.head.asInstanceOf[TestOutputStreamWithPartitions[V]] | ||
|
||
eventually(timeout(10 seconds)) { | ||
ssc.awaitTerminationOrTimeout(10) | ||
assert(batchCounter.getLastCompletedBatchTime === targetBatchTime) | ||
} | ||
|
||
eventually(timeout(10 seconds)) { | ||
val checkpointFilesOfLatestTime = Checkpoint.getCheckpointFiles(checkpointDir).filter { | ||
_.toString.contains(clock.getTimeMillis.toString) | ||
} | ||
// Checkpoint files are written twice for every batch interval. So assert that both | ||
// are written to make sure that both of them have been written. | ||
assert(checkpointFilesOfLatestTime.size === 2) | ||
} | ||
outputStream.output.map(_.flatten) | ||
|
||
} finally { | ||
ssc.stop(stopSparkContext = stopSparkContext) | ||
} | ||
} | ||
|
||
private def assertOutput[V: ClassTag]( | ||
output: Seq[Seq[V]], | ||
expectedOutput: Seq[Seq[V]], | ||
beforeRestart: Boolean): Unit = { | ||
val expectedPartialOutput = if (beforeRestart) { | ||
expectedOutput.take(output.size) | ||
} else { | ||
expectedOutput.takeRight(output.size) | ||
} | ||
val setComparison = output.zip(expectedPartialOutput).forall { | ||
case (o, e) => o.toSet === e.toSet | ||
} | ||
assert(setComparison, s"set comparison failed\n" + | ||
s"Expected output items:\n${expectedPartialOutput.mkString("\n")}\n" + | ||
s"Generated output items: ${output.mkString("\n")}" | ||
) | ||
} | ||
} | ||
|
||
/** | ||
* This test suites tests the checkpointing functionality of DStreams - | ||
* the checkpointing of a DStream's RDDs as well as the checkpointing of | ||
* the whole DStream graph. | ||
*/ | ||
class CheckpointSuite extends TestSuiteBase { | ||
class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester { | ||
|
||
var ssc: StreamingContext = null | ||
|
||
|
@@ -56,7 +188,7 @@ class CheckpointSuite extends TestSuiteBase { | |
|
||
override def afterFunction() { | ||
super.afterFunction() | ||
if (ssc != null) ssc.stop() | ||
if (ssc != null) { ssc.stop() } | ||
Utils.deleteRecursively(new File(checkpointDir)) | ||
} | ||
|
||
|
@@ -251,7 +383,9 @@ class CheckpointSuite extends TestSuiteBase { | |
Seq(("", 2)), | ||
Seq(), | ||
Seq(("a", 2), ("b", 1)), | ||
Seq(("", 2)), Seq() ), | ||
Seq(("", 2)), | ||
Seq() | ||
), | ||
3 | ||
) | ||
} | ||
|
@@ -634,53 +768,6 @@ class CheckpointSuite extends TestSuiteBase { | |
checkpointWriter.stop() | ||
} | ||
|
||
/** | ||
* Tests a streaming operation under checkpointing, by restarting the operation | ||
* from checkpoint file and verifying whether the final output is correct. | ||
* The output is assumed to have come from a reliable queue which an replay | ||
* data as required. | ||
* | ||
* NOTE: This takes into consideration that the last batch processed before | ||
* master failure will be re-processed after restart/recovery. | ||
*/ | ||
def testCheckpointedOperation[U: ClassTag, V: ClassTag]( | ||
input: Seq[Seq[U]], | ||
operation: DStream[U] => DStream[V], | ||
expectedOutput: Seq[Seq[V]], | ||
initialNumBatches: Int | ||
) { | ||
|
||
// Current code assumes that: | ||
// number of inputs = number of outputs = number of batches to be run | ||
val totalNumBatches = input.size | ||
val nextNumBatches = totalNumBatches - initialNumBatches | ||
val initialNumExpectedOutputs = initialNumBatches | ||
val nextNumExpectedOutputs = expectedOutput.size - initialNumExpectedOutputs + 1 | ||
// because the last batch will be processed again | ||
|
||
// Do the computation for initial number of batches, create checkpoint file and quit | ||
ssc = setupStreams[U, V](input, operation) | ||
ssc.start() | ||
val output = advanceTimeWithRealDelay[V](ssc, initialNumBatches) | ||
ssc.stop() | ||
verifyOutput[V](output, expectedOutput.take(initialNumBatches), true) | ||
Thread.sleep(1000) | ||
|
||
// Restart and complete the computation from checkpoint file | ||
logInfo( | ||
"\n-------------------------------------------\n" + | ||
" Restarting stream computation " + | ||
"\n-------------------------------------------\n" | ||
) | ||
ssc = new StreamingContext(checkpointDir) | ||
ssc.start() | ||
val outputNew = advanceTimeWithRealDelay[V](ssc, nextNumBatches) | ||
// the first element will be re-processed data of the last batch before restart | ||
verifyOutput[V](outputNew, expectedOutput.takeRight(nextNumExpectedOutputs), true) | ||
ssc.stop() | ||
ssc = null | ||
} | ||
|
||
/** | ||
* Advances the manual clock on the streaming scheduler by given number of batches. | ||
* It also waits for the expected amount of time for each batch. | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note to self: revert this