-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-18905][STREAMING] Fix the issue of removing a failed jobset from JobScheduler.jobSets #16542
Conversation
Test build #71172 has finished for PR 16542 at commit
|
jobSet.totalDelay / 1000.0, jobSet.time.toString, | ||
jobSet.processingDelay / 1000.0 | ||
)) | ||
listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you also post this event for failure jobSet? Otherwise, the web UI cannot show it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
Test build #71288 has finished for PR 16542 at commit
|
LGTM. Thanks! Merging to master and 2.1. |
…om JobScheduler.jobSets ## What changes were proposed in this pull request? the current implementation of Spark streaming considers a batch is completed no matter the results of the jobs (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203) Let's consider the following case: A micro batch contains 2 jobs and they read from two different kafka topics respectively. One of these jobs is failed due to some problem in the user defined logic, after the other one is finished successfully. 1. The main thread in the Spark streaming application will execute the line mentioned above, 2. and another thread (checkpoint writer) will make a checkpoint file immediately after this line is executed. 3. Then due to the current error handling mechanism in Spark Streaming, StreamingContext will be closed (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L214) the user recovers from the checkpoint file, and because the JobSet containing the failed job has been removed (taken as completed) before the checkpoint is constructed, the data being processed by the failed job would never be reprocessed This PR fix it by removing jobset from JobScheduler.jobSets only when all jobs in a jobset are successfully finished ## How was this patch tested? existing tests Author: CodingCat <[email protected]> Author: Nan Zhu <[email protected]> Closes #16542 from CodingCat/SPARK-18905. (cherry picked from commit f8db894) Signed-off-by: Shixiong Zhu <[email protected]>
Thanks |
…et from JobScheduler.jobSets apache#16542
…om JobScheduler.jobSets ## What changes were proposed in this pull request? the current implementation of Spark streaming considers a batch is completed no matter the results of the jobs (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203) Let's consider the following case: A micro batch contains 2 jobs and they read from two different kafka topics respectively. One of these jobs is failed due to some problem in the user defined logic, after the other one is finished successfully. 1. The main thread in the Spark streaming application will execute the line mentioned above, 2. and another thread (checkpoint writer) will make a checkpoint file immediately after this line is executed. 3. Then due to the current error handling mechanism in Spark Streaming, StreamingContext will be closed (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L214) the user recovers from the checkpoint file, and because the JobSet containing the failed job has been removed (taken as completed) before the checkpoint is constructed, the data being processed by the failed job would never be reprocessed This PR fix it by removing jobset from JobScheduler.jobSets only when all jobs in a jobset are successfully finished ## How was this patch tested? existing tests Author: CodingCat <[email protected]> Author: Nan Zhu <[email protected]> Closes apache#16542 from CodingCat/SPARK-18905.
…om JobScheduler.jobSets ## What changes were proposed in this pull request? the current implementation of Spark streaming considers a batch is completed no matter the results of the jobs (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203) Let's consider the following case: A micro batch contains 2 jobs and they read from two different kafka topics respectively. One of these jobs is failed due to some problem in the user defined logic, after the other one is finished successfully. 1. The main thread in the Spark streaming application will execute the line mentioned above, 2. and another thread (checkpoint writer) will make a checkpoint file immediately after this line is executed. 3. Then due to the current error handling mechanism in Spark Streaming, StreamingContext will be closed (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L214) the user recovers from the checkpoint file, and because the JobSet containing the failed job has been removed (taken as completed) before the checkpoint is constructed, the data being processed by the failed job would never be reprocessed This PR fix it by removing jobset from JobScheduler.jobSets only when all jobs in a jobset are successfully finished ## How was this patch tested? existing tests Author: CodingCat <[email protected]> Author: Nan Zhu <[email protected]> Closes apache#16542 from CodingCat/SPARK-18905.
What changes were proposed in this pull request?
the current implementation of Spark streaming considers a batch is completed no matter the results of the jobs (
spark/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
Line 203 in 1169db4
Let's consider the following case:
A micro batch contains 2 jobs and they read from two different kafka topics respectively. One of these jobs is failed due to some problem in the user defined logic, after the other one is finished successfully.
spark/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
Line 214 in 1169db4
the user recovers from the checkpoint file, and because the JobSet containing the failed job has been removed (taken as completed) before the checkpoint is constructed, the data being processed by the failed job would never be reprocessed
This PR fix it by removing jobset from JobScheduler.jobSets only when all jobs in a jobset are successfully finished
How was this patch tested?
existing tests