-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-11932][STREAMING] Partition previous TrackStateRDD if partitioner not present #9988
Conversation
* DStream checkpointing. Note that the implementations of this trait has to implement | ||
* the `setupCheckpointOperation` | ||
*/ | ||
trait DStreamCheckpointTester { self: SparkFunSuite => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is refactoring where I extract out the testCheckpointedOperation
so that it can be used in other unit tests.
@zsxwing Please take a look. |
Test build #46730 has finished for PR 9988 at commit
|
case None => | ||
TrackStateRDD.createFromPairRDD[K, V, S, E]( | ||
spec.getInitialStateRDD().getOrElse(new EmptyRDD[(K, S)](ssc.sparkContext)), | ||
partitioner, validTime |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: validTime
should be in a new line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Test build #2133 has finished for PR 9988 at commit
|
Test build #46930 has finished for PR 9988 at commit
|
retest this please |
@@ -56,7 +172,7 @@ class CheckpointSuite extends TestSuiteBase { | |||
|
|||
override def afterFunction() { | |||
super.afterFunction() | |||
if (ssc != null) ssc.stop() | |||
StreamingContext.getActive().foreach { _.stop() } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If SparkContext is created in the StreamingContext's constructor, but StreamingContext's.stop
is not called, the line cannot stop the SparkContext
.
test this please |
Test build #46992 has finished for PR 9988 at commit
|
Test build #47005 has finished for PR 9988 at commit
|
@@ -730,7 +730,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)]) | |||
val serializableConf = new SerializableJobConf(conf) | |||
val saveFunc = (rdd: RDD[(K, V)], time: Time) => { | |||
val file = rddToFileName(prefix, suffix, time) | |||
rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, serializableConf.value) | |||
rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the same change done in #10088
Test build #47033 has finished for PR 9988 at commit
|
Test build #2148 has finished for PR 9988 at commit
|
…HadoopFiles The JobConf object created in `DStream.saveAsHadoopFiles` is used concurrently in multiple places: * The JobConf is updated by `RDD.saveAsHadoopFile()` before the job is launched * The JobConf is serialized as part of the DStream checkpoints. These concurrent accesses (updating in one thread, while the another thread is serializing it) can lead to concurrentModidicationException in the underlying Java hashmap using in the internal Hadoop Configuration object. The solution is to create a new JobConf in every batch, that is updated by `RDD.saveAsHadoopFile()`, while the checkpointing serializes the original JobConf. Tests to be added in #9988 will fail reliably without this patch. Keeping this patch really small to make sure that it can be added to previous branches. Author: Tathagata Das <[email protected]> Closes #10088 from tdas/SPARK-12087. (cherry picked from commit 8a75a30) Signed-off-by: Shixiong Zhu <[email protected]>
…HadoopFiles The JobConf object created in `DStream.saveAsHadoopFiles` is used concurrently in multiple places: * The JobConf is updated by `RDD.saveAsHadoopFile()` before the job is launched * The JobConf is serialized as part of the DStream checkpoints. These concurrent accesses (updating in one thread, while the another thread is serializing it) can lead to concurrentModidicationException in the underlying Java hashmap using in the internal Hadoop Configuration object. The solution is to create a new JobConf in every batch, that is updated by `RDD.saveAsHadoopFile()`, while the checkpointing serializes the original JobConf. Tests to be added in #9988 will fail reliably without this patch. Keeping this patch really small to make sure that it can be added to previous branches. Author: Tathagata Das <[email protected]> Closes #10088 from tdas/SPARK-12087. (cherry picked from commit 8a75a30) Signed-off-by: Shixiong Zhu <[email protected]>
…HadoopFiles The JobConf object created in `DStream.saveAsHadoopFiles` is used concurrently in multiple places: * The JobConf is updated by `RDD.saveAsHadoopFile()` before the job is launched * The JobConf is serialized as part of the DStream checkpoints. These concurrent accesses (updating in one thread, while the another thread is serializing it) can lead to concurrentModidicationException in the underlying Java hashmap using in the internal Hadoop Configuration object. The solution is to create a new JobConf in every batch, that is updated by `RDD.saveAsHadoopFile()`, while the checkpointing serializes the original JobConf. Tests to be added in #9988 will fail reliably without this patch. Keeping this patch really small to make sure that it can be added to previous branches. Author: Tathagata Das <[email protected]> Closes #10088 from tdas/SPARK-12087. (cherry picked from commit 8a75a30) Signed-off-by: Shixiong Zhu <[email protected]>
…HadoopFiles The JobConf object created in `DStream.saveAsHadoopFiles` is used concurrently in multiple places: * The JobConf is updated by `RDD.saveAsHadoopFile()` before the job is launched * The JobConf is serialized as part of the DStream checkpoints. These concurrent accesses (updating in one thread, while the another thread is serializing it) can lead to concurrentModidicationException in the underlying Java hashmap using in the internal Hadoop Configuration object. The solution is to create a new JobConf in every batch, that is updated by `RDD.saveAsHadoopFile()`, while the checkpointing serializes the original JobConf. Tests to be added in #9988 will fail reliably without this patch. Keeping this patch really small to make sure that it can be added to previous branches. Author: Tathagata Das <[email protected]> Closes #10088 from tdas/SPARK-12087.
retest this please |
Test build #47037 has finished for PR 9988 at commit
|
Test build #47085 has finished for PR 9988 at commit
|
Test build #2153 has finished for PR 9988 at commit
|
Test build #2152 has finished for PR 9988 at commit
|
Test build #2155 has finished for PR 9988 at commit
|
Test build #2154 has finished for PR 9988 at commit
|
Test build #47109 has finished for PR 9988 at commit
|
Test build #2159 has finished for PR 9988 at commit
|
Test build #47133 has finished for PR 9988 at commit
|
Test build #2158 has finished for PR 9988 at commit
|
Test build #2160 has finished for PR 9988 at commit
|
@@ -277,7 +277,7 @@ class CheckpointWriter( | |||
val bytes = Checkpoint.serialize(checkpoint, conf) | |||
executor.execute(new CheckpointWriteHandler( | |||
checkpoint.checkpointTime, bytes, clearCheckpointDataLater)) | |||
logDebug("Submitted checkpoint of time " + checkpoint.checkpointTime + " writer queue") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note to self: revert this
@zsxwing can you take a look at this once again. |
Test build #47194 has finished for PR 9988 at commit
|
Test build #2167 has finished for PR 9988 at commit
|
Test build #2168 has finished for PR 9988 at commit
|
@@ -25,4 +25,5 @@ log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{ | |||
|
|||
# Ignore messages below warning level from Jetty, because it's a bit verbose | |||
log4j.logger.org.spark-project.jetty=WARN | |||
log4j.appender.org.apache.spark.streaming=DEBUG |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should revert this
LGTM except a nit |
Test build #47262 has finished for PR 9988 at commit
|
Thanks @zsxwing Merging this to master and 1.6 |
…ner not present The reason is that TrackStateRDDs generated by trackStateByKey expect the previous batch's TrackStateRDDs to have a partitioner. However, when recovery from DStream checkpoints, the RDDs recovered from RDD checkpoints do not have a partitioner attached to it. This is because RDD checkpoints do not preserve the partitioner (SPARK-12004). While #9983 solves SPARK-12004 by preserving the partitioner through RDD checkpoints, there may be a non-zero chance that the saving and recovery fails. To be resilient, this PR repartitions the previous state RDD if the partitioner is not detected. Author: Tathagata Das <[email protected]> Closes #9988 from tdas/SPARK-11932. (cherry picked from commit 5d80d8c) Signed-off-by: Tathagata Das <[email protected]>
The reason is that TrackStateRDDs generated by trackStateByKey expect the previous batch's TrackStateRDDs to have a partitioner. However, when recovery from DStream checkpoints, the RDDs recovered from RDD checkpoints do not have a partitioner attached to it. This is because RDD checkpoints do not preserve the partitioner (SPARK-12004).
While #9983 solves SPARK-12004 by preserving the partitioner through RDD checkpoints, there may be a non-zero chance that the saving and recovery fails. To be resilient, this PR repartitions the previous state RDD if the partitioner is not detected.