-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-12004] Preserve the RDD partitioner through RDD checkpointing #9983
Conversation
@zsxwing @andrewor14 Can you take a look at this. |
@@ -55,25 +55,7 @@ private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient private v | |||
* This is called immediately after the first action invoked on this RDD has completed. | |||
*/ | |||
protected override def doCheckpoint(): CheckpointRDD[T] = { | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All this code has been moved in the ReliableCheckpointRDD.createCheckpointedRDD
/** | ||
* Write a RDD partition's data to a checkpoint file. | ||
*/ | ||
def writePartitionToCheckpointFile[T: ClassTag]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private
LGTM. Just style and naming nits. |
Test build #46724 has finished for PR 9983 at commit
|
jenkins test this please |
Test build #2132 has finished for PR 9983 at commit
|
Test build #46931 has finished for PR 9983 at commit
|
Test build #46935 has finished for PR 9983 at commit
|
retest this please |
Test build #46969 has finished for PR 9983 at commit
|
m1.6 |
The solution is the save the RDD partitioner in a separate file in the RDD checkpoint directory. That is, `<checkpoint dir>/_partitioner`. In most cases, whether the RDD partitioner was recovered or not, does not affect the correctness, only reduces performance. So this solution makes a best-effort attempt to save and recover the partitioner. If either fails, the checkpointing is not affected. This makes this patch safe and backward compatible. Author: Tathagata Das <[email protected]> Closes #9983 from tdas/SPARK-12004. (cherry picked from commit 60b541e) Signed-off-by: Andrew Or <[email protected]>
…ner not present The reason is that TrackStateRDDs generated by trackStateByKey expect the previous batch's TrackStateRDDs to have a partitioner. However, when recovery from DStream checkpoints, the RDDs recovered from RDD checkpoints do not have a partitioner attached to it. This is because RDD checkpoints do not preserve the partitioner (SPARK-12004). While #9983 solves SPARK-12004 by preserving the partitioner through RDD checkpoints, there may be a non-zero chance that the saving and recovery fails. To be resilient, this PR repartitions the previous state RDD if the partitioner is not detected. Author: Tathagata Das <[email protected]> Closes #9988 from tdas/SPARK-11932.
…ner not present The reason is that TrackStateRDDs generated by trackStateByKey expect the previous batch's TrackStateRDDs to have a partitioner. However, when recovery from DStream checkpoints, the RDDs recovered from RDD checkpoints do not have a partitioner attached to it. This is because RDD checkpoints do not preserve the partitioner (SPARK-12004). While #9983 solves SPARK-12004 by preserving the partitioner through RDD checkpoints, there may be a non-zero chance that the saving and recovery fails. To be resilient, this PR repartitions the previous state RDD if the partitioner is not detected. Author: Tathagata Das <[email protected]> Closes #9988 from tdas/SPARK-11932. (cherry picked from commit 5d80d8c) Signed-off-by: Tathagata Das <[email protected]>
The solution is the save the RDD partitioner in a separate file in the RDD checkpoint directory. That is,
<checkpoint dir>/_partitioner
. In most cases, whether the RDD partitioner was recovered or not, does not affect the correctness, only reduces performance. So this solution makes a best-effort attempt to save and recover the partitioner. If either fails, the checkpointing is not affected. This makes this patch safe and backward compatible.