-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Issue 2644: Tasks can be run several times under certain conditions #2645
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code looks pretty solid and thank you very much for writing the test case! Would you mind just looking at my comments?
Thanks for linking to your issue. But can you in addition (in this PR) clarify when "certain conditions" is? Who was previously affected? How often did things rerun you think?
test/worker_test.py
Outdated
class ConcurrentTest(unittest.TestCase): | ||
|
||
def setUp(self): | ||
self.p = tempfile.mkdtemp() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add suffix='-luigi-testcase'
for traceability?
test/worker_test.py
Outdated
@@ -2135,3 +2136,61 @@ def run(self): | |||
self.assertEqual(0, self.sch._state.get_task(s1.task_id).failures.num_failures()) | |||
self.assertEqual(self.per_task_retry_count, self.sch._state.get_task(e2.task_id).failures.num_failures()) | |||
self.assertEqual(self.default_retry_count, self.sch._state.get_task(e1.task_id).failures.num_failures()) | |||
|
|||
|
|||
class SleepyLocalTarget(luigi.LocalTarget): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we reduce the complexity of the testcase you're building? I'm not sure I fully understand what you're solving and that maybe could be easier if the test case was simpler (if possible).
test/worker_test.py
Outdated
time_to_sleep = 0.5 | ||
|
||
def exists(self): | ||
time.sleep(self.time_to_sleep) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit worried about this. We reserve the right to check complete()
multiple times in luigi, and if an "refactoring" changes this (checks more often or for example caches and checks less), then this test-case might start to break.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test checks that the Parent
task won't be run twice, so it shouldn't fail even if we add more or less complete()
checks in the future
test/worker_test.py
Outdated
kwargs={"tasks": [Parent(output_dir=self.p)], "local_scheduler": True}) | ||
process.start() | ||
time.sleep(SleepyLocalTarget.time_to_sleep * 2 + Parent.time_to_sleep + Child.time_to_sleep) | ||
self.assertTrue(luigi.build([Parent(output_dir=self.p)], local_scheduler=True)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have little idea what this assertTrue shows. Did this fail before?? Did it fail because nothing was executed or what?
There is a (very) new thing called LuigiRunResult which you can use here to make it clearer what the test case is checking (it also adds an enum that could improve clarity further)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me clarify, there's a SleepyLocalTarget
that does not allow to run the task twice: it fails with an exception. So if the Parent
task is run twice, luigi.build
will return False. This test fails without the changes in worker.py
in this PR since luigi tries to run Parent
task twice.
Do you think this can be expressed using LuigiRunResult
in a clearer way? If yes, I'll give it a try.
test/worker_test.py
Outdated
def tearDown(self): | ||
shutil.rmtree(self.p) | ||
|
||
def test_sending_same_task_twice_does_not_lead_to_double_run(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the descriptive test name. Luigi could use more of these.
luigi/worker.py
Outdated
@@ -842,7 +842,14 @@ def _add(self, task, is_complete): | |||
task.trigger_event(Event.DEPENDENCY_DISCOVERED, task, d) | |||
yield d # return additional tasks to add | |||
|
|||
deps = [d.task_id for d in deps] | |||
# if dependencies check took too long, the parent task might have been completed by other worker | |||
if status == PENDING and task.complete(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is status == PENDING
needed? The comment doesn't cover that right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, I wanted to filter out DISABLED
status to call task.complete()
only when it's really needed. WDYT?
@Tarrasch thank you very much for starting reviewing this PR! 👍
I think to completely resolve this issue there must be something like "timestamp of status check" to be sent to scheduler along with the status itself. And scheduler should resolve what to do if the status timestamp is older than the one it has in it's state. Just my thoughts. |
ed772b2
to
e66d89e
Compare
147ec4c
to
7064154
Compare
@Tarrasch, can you please take a look at the changes I submitted? This is just the draft of an idea how this issue can be resolved with the help of distrust period. Lines 561 to 583 in 3701c68
If I'm not wrong, it does not assume anything wrong may happen during self._scheduler.add_task , it even writes the record to the task history earlier than add_task itself was called. The interaction happens only from a worker to the scheduler at this point, not vice verca. So we cannot ask the worker here "wait, then retry sending the same task with the updated status again" here without significant code changes.
In the other hand, once we send the task in PENDING state, the scheduler treats is as the one that can be given to the worker unless we change the task's status on-the-fly. Maybe we could rerun check when a worker asks for work, but then we'll have to detect which of the tasks need this "status recheck" and which don't need it. Actually, I'm not a big fan of the approach selected, but the main goal for me was to reduce the impact of these changes. Please let me know if you have any thoughts regarding this! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I chose to keep the task that was DONE very recently in the same state.
This is exactly what I think is the way to do it.
So we cannot ask the worker here "wait, then retry sending the same task with the updated status again" here without significant code changes.
Exactly. Luigi is (proudly) a simple system - it's not claiming to be scheduling optimally but it claims to behave consistently and "eventually getting tasks done" if you periodically re-invoke it.
In other words. The draft implementation is exactly what I looked for, only these few lines to be changed in the scheduler. I don't think we need changes in the worker. So good job. I wrote some thoughts on your draft implementations.
luigi/scheduler.py
Outdated
@@ -150,6 +150,8 @@ class scheduler(Config): | |||
|
|||
metrics_collector = parameter.EnumParameter(enum=MetricsCollectors, default=MetricsCollectors.default) | |||
|
|||
rerun_task_blocking_period = parameter.IntParameter(default=10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's come up with a better name for this? I don't think we can come up with a name people will understand with a few words, but it should still make sense.
- Maybe
stable_done_cooldown_secs
? - Whichever we choose we maybe want
_secs
in the name. There's a TimeDeltaParameter but I think we should avoid those fancy stuff in the scheduler. I dream that one day that the scheduler is written in another language even.
Also add docs to this parameter.
luigi/scheduler.py
Outdated
@@ -840,6 +842,10 @@ def add_task(self, task_id=None, status=PENDING, runnable=True, | |||
if task is None or (task.status != RUNNING and not worker.enabled): | |||
return | |||
|
|||
# if the task was completed only recently, do not run the same task again | |||
if status == PENDING and task.status == DONE and (time.time() - task.updated) < self._config.rerun_task_blocking_period: | |||
status = DONE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if you just return here instead???
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I understand, the same thing happens -- the scheduler rejects giving task to the worker. Changed to simple "return".
luigi/scheduler.py
Outdated
@@ -840,6 +842,10 @@ def add_task(self, task_id=None, status=PENDING, runnable=True, | |||
if task is None or (task.status != RUNNING and not worker.enabled): | |||
return | |||
|
|||
# if the task was completed only recently, do not run the same task again | |||
if status == PENDING and task.status == DONE and (time.time() - task.updated) < self._config.rerun_task_blocking_period: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, pretty much what I expected. Great!
luigi/scheduler.py
Outdated
@@ -840,6 +842,10 @@ def add_task(self, task_id=None, status=PENDING, runnable=True, | |||
if task is None or (task.status != RUNNING and not worker.enabled): | |||
return | |||
|
|||
# if the task was completed only recently, do not run the same task again |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Describe what you actually do, the implementation, not why you do it. :)
# Ignore claims that the task is PENDING if it very recently was marked as DONE.
Maybe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, changed to the proposed version
test/scheduler_api_test.py
Outdated
@@ -46,6 +46,7 @@ def get_scheduler_config(self): | |||
'disable_window': 10, | |||
'retry_count': 3, | |||
'disable_hard_timeout': 60 * 60, | |||
'rerun_task_blocking_period': 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good start. This is fine as other tests didn't assume this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also of course you should add a few other unit tests with this flag set :)
- changed `track_url_in_stderr` to `stream_for_searching_tracking_url` according to the latest changes in external_program.py - added tests for spark url tracking in cluster and client modes
In-sync with master
028d083
to
292bf9a
Compare
Merge into the fork
b9ccc70
to
689ec4e
Compare
Merge back
2b30f00
to
9926a46
Compare
075066a
to
8ab20a9
Compare
Guys, please take a look at the fresh version of PR, let me know if anything else is needed here (maybe some other use cases should be covered). |
@Tarrasch @honnix @dlstadther psst, guys, probably you forgot about this one |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Sorry for the delays! Can you just rebase? This is an important bug fix indeed!
shutil.rmtree(self.p) | ||
|
||
def run_task(self): | ||
return luigi.build([FailingOnDoubleRunTask(output_dir=self.p)], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you implement this with LuigiRunResult? Test will become more readable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, done
test/scheduler_test.py
Outdated
time.sleep(FailingOnDoubleRunTask.time_to_run_secs + FailingOnDoubleRunTask.time_to_check_secs) | ||
# second run of the same task is started | ||
luigi_run_result = self.run_task() | ||
self.assertEqual(luigi_run_result, not failure_expected) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line in particular is very hard to read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OMG, of course :)
thank you, fixed
Sync with the main master
9845e09
to
8bf768d
Compare
@Tarrasch thank you for the thorough review again! |
many thanks @GoodDok et al for this work, I'm afraid that I'm also having this issue :P Hope it gets merged soon! |
Hi there, @Tarrasch @honnix @dlstadther, Seems like I'm not the only one who has faced this issue. |
It looks like @Tarrasch already approved the changes. I'll give him a little to re-confirm, otherwise i'll merge |
My approval still stands. Sorry for being away for so long and not merging your rebase. I or any another maintainer may merge this once you rebase. Sorry! |
Merge into master
377c3cf
to
8bf768d
Compare
- introduced parameter `stable_done_cooldown_secs` with description - added an acceptance test
8bf768d
to
d0dd8f7
Compare
@dlstadther @Tarrasch guys, I've rebased the branch, please feel free to merge :) |
I just came here to say Thank you @GoodDok and @Tarrasch, I found this bug, and I'm testing the fix ATM. My observed behavior is a worker finishing to execute a task, informing the scheduler it is done, another worker informing the scheduler that the same task is pending (after the DONE inform), then the task gets executed again by the first worker (the one informing DONE). The problem wouldn't be big as the task is idempotent, but the container died, so it was never picked up again. After reading through the initial issue and the pull request, I think this could be the same problem. @GoodDok @Tarrasch do you think they may be related? |
Description
Wrote an additional check in
worker.py
after all dependencies of the task were validated,this should prevent the situation when the real status of the task was changed to DONE,
but the worker sends PENDING to the scheduler.
Motivation and Context
There may be a situation when a worker has stale state of a task and this leads to duplicate runs.
For the specific example see the corresponding issue: #2644
Have you tested this? If so, how?
There is a test provided in the PR, it fails with the current implementation and PR resolves it.