Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 2644: Tasks can be run several times under certain conditions #2645

Merged
merged 14 commits into from
Aug 23, 2019

Conversation

GoodDok
Copy link
Contributor

@GoodDok GoodDok commented Feb 4, 2019

Description

Wrote an additional check in worker.py after all dependencies of the task were validated,
this should prevent the situation when the real status of the task was changed to DONE,
but the worker sends PENDING to the scheduler.

Motivation and Context

There may be a situation when a worker has stale state of a task and this leads to duplicate runs.
luigi diagram

For the specific example see the corresponding issue: #2644

Have you tested this? If so, how?

There is a test provided in the PR, it fails with the current implementation and PR resolves it.

Copy link
Contributor

@Tarrasch Tarrasch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code looks pretty solid and thank you very much for writing the test case! Would you mind just looking at my comments?

Thanks for linking to your issue. But can you in addition (in this PR) clarify when "certain conditions" is? Who was previously affected? How often did things rerun you think?

class ConcurrentTest(unittest.TestCase):

def setUp(self):
self.p = tempfile.mkdtemp()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add suffix='-luigi-testcase' for traceability?

@@ -2135,3 +2136,61 @@ def run(self):
self.assertEqual(0, self.sch._state.get_task(s1.task_id).failures.num_failures())
self.assertEqual(self.per_task_retry_count, self.sch._state.get_task(e2.task_id).failures.num_failures())
self.assertEqual(self.default_retry_count, self.sch._state.get_task(e1.task_id).failures.num_failures())


class SleepyLocalTarget(luigi.LocalTarget):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we reduce the complexity of the testcase you're building? I'm not sure I fully understand what you're solving and that maybe could be easier if the test case was simpler (if possible).

time_to_sleep = 0.5

def exists(self):
time.sleep(self.time_to_sleep)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit worried about this. We reserve the right to check complete() multiple times in luigi, and if an "refactoring" changes this (checks more often or for example caches and checks less), then this test-case might start to break.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test checks that the Parent task won't be run twice, so it shouldn't fail even if we add more or less complete() checks in the future

kwargs={"tasks": [Parent(output_dir=self.p)], "local_scheduler": True})
process.start()
time.sleep(SleepyLocalTarget.time_to_sleep * 2 + Parent.time_to_sleep + Child.time_to_sleep)
self.assertTrue(luigi.build([Parent(output_dir=self.p)], local_scheduler=True))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have little idea what this assertTrue shows. Did this fail before?? Did it fail because nothing was executed or what?

There is a (very) new thing called LuigiRunResult which you can use here to make it clearer what the test case is checking (it also adds an enum that could improve clarity further)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me clarify, there's a SleepyLocalTarget that does not allow to run the task twice: it fails with an exception. So if the Parent task is run twice, luigi.build will return False. This test fails without the changes in worker.py in this PR since luigi tries to run Parent task twice.
Do you think this can be expressed using LuigiRunResult in a clearer way? If yes, I'll give it a try.

def tearDown(self):
shutil.rmtree(self.p)

def test_sending_same_task_twice_does_not_lead_to_double_run(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the descriptive test name. Luigi could use more of these.

luigi/worker.py Outdated
@@ -842,7 +842,14 @@ def _add(self, task, is_complete):
task.trigger_event(Event.DEPENDENCY_DISCOVERED, task, d)
yield d # return additional tasks to add

deps = [d.task_id for d in deps]
# if dependencies check took too long, the parent task might have been completed by other worker
if status == PENDING and task.complete():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is status == PENDING needed? The comment doesn't cover that right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I wanted to filter out DISABLED status to call task.complete() only when it's really needed. WDYT?

@GoodDok
Copy link
Contributor Author

GoodDok commented Feb 5, 2019

@Tarrasch thank you very much for starting reviewing this PR! 👍
I attached a picture describing the problem that I'm facing: if worker A finishes the task right before worker B sends tasks status, we have duplicate runs. Please take a look and let me know if it's clear.
I have a working case described in the issue: #2644, if you need any other details please let me know. I suspect these guys faced the same problem: https://groups.google.com/forum/#!topic/luigi-user/YCEQIEz0jLo
After discovering this issue I think it has somewhat deeper roots. This PR will resolve most of the problem cases, but the race condition will still exist, even with the only one task:

  • worker A runs some task
  • worker B starts the same task and checks it's output does not exist
  • worker A finishes task right after that
  • workers A and B send status to the scheduler, worker A appears to be the first one - status DONE
  • then worker B - status PENDING
  • worker B asks for more work - runs the task for the second time

I think to completely resolve this issue there must be something like "timestamp of status check" to be sent to scheduler along with the status itself. And scheduler should resolve what to do if the status timestamp is older than the one it has in it's state. Just my thoughts.

@GoodDok GoodDok force-pushed the issue-2644-double-runs branch from ed772b2 to e66d89e Compare February 5, 2019 14:44
@GoodDok GoodDok force-pushed the issue-2644-double-runs branch 2 times, most recently from 147ec4c to 7064154 Compare March 7, 2019 16:07
@GoodDok GoodDok changed the title Issue 2644: Tasks can be run several times under certain conditions Issue 2644: Tasks can be run several times under certain conditions (WIP) Mar 7, 2019
@GoodDok
Copy link
Contributor Author

GoodDok commented Mar 7, 2019

@Tarrasch, can you please take a look at the changes I submitted? This is just the draft of an idea how this issue can be resolved with the help of distrust period.
At the moment, I don't see any way to "rerun" the check of the task status without rewriting a big part of the basic logic regarding the interaction between workers and the scheduler. So I chose to keep the task that was DONE very recently in the same state.
E. g., let's take a look at the code that sends the task metadata to the scheduler:

luigi/luigi/worker.py

Lines 561 to 583 in 3701c68

def _add_task(self, *args, **kwargs):
"""
Call ``self._scheduler.add_task``, but store the values too so we can
implement :py:func:`luigi.execution_summary.summary`.
"""
task_id = kwargs['task_id']
status = kwargs['status']
runnable = kwargs['runnable']
task = self._scheduled_tasks.get(task_id)
if task:
self._add_task_history.append((task, status, runnable))
kwargs['owners'] = task._owner_list()
if task_id in self._batch_running_tasks:
for batch_task in self._batch_running_tasks.pop(task_id):
self._add_task_history.append((batch_task, status, True))
if task and kwargs.get('params'):
kwargs['param_visibilities'] = task._get_param_visibilities()
self._scheduler.add_task(*args, **kwargs)
logger.info('Informed scheduler that task %s has status %s', task_id, status)

If I'm not wrong, it does not assume anything wrong may happen during self._scheduler.add_task, it even writes the record to the task history earlier than add_task itself was called. The interaction happens only from a worker to the scheduler at this point, not vice verca. So we cannot ask the worker here "wait, then retry sending the same task with the updated status again" here without significant code changes.

In the other hand, once we send the task in PENDING state, the scheduler treats is as the one that can be given to the worker unless we change the task's status on-the-fly. Maybe we could rerun check when a worker asks for work, but then we'll have to detect which of the tasks need this "status recheck" and which don't need it.

Actually, I'm not a big fan of the approach selected, but the main goal for me was to reduce the impact of these changes.

Please let me know if you have any thoughts regarding this!

Copy link
Contributor

@Tarrasch Tarrasch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I chose to keep the task that was DONE very recently in the same state.

This is exactly what I think is the way to do it.

So we cannot ask the worker here "wait, then retry sending the same task with the updated status again" here without significant code changes.

Exactly. Luigi is (proudly) a simple system - it's not claiming to be scheduling optimally but it claims to behave consistently and "eventually getting tasks done" if you periodically re-invoke it.


In other words. The draft implementation is exactly what I looked for, only these few lines to be changed in the scheduler. I don't think we need changes in the worker. So good job. I wrote some thoughts on your draft implementations.

@@ -150,6 +150,8 @@ class scheduler(Config):

metrics_collector = parameter.EnumParameter(enum=MetricsCollectors, default=MetricsCollectors.default)

rerun_task_blocking_period = parameter.IntParameter(default=10)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's come up with a better name for this? I don't think we can come up with a name people will understand with a few words, but it should still make sense.

  • Maybe stable_done_cooldown_secs?
  • Whichever we choose we maybe want _secs in the name. There's a TimeDeltaParameter but I think we should avoid those fancy stuff in the scheduler. I dream that one day that the scheduler is written in another language even.

Also add docs to this parameter.

@@ -840,6 +842,10 @@ def add_task(self, task_id=None, status=PENDING, runnable=True,
if task is None or (task.status != RUNNING and not worker.enabled):
return

# if the task was completed only recently, do not run the same task again
if status == PENDING and task.status == DONE and (time.time() - task.updated) < self._config.rerun_task_blocking_period:
status = DONE
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if you just return here instead???

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I understand, the same thing happens -- the scheduler rejects giving task to the worker. Changed to simple "return".

@@ -840,6 +842,10 @@ def add_task(self, task_id=None, status=PENDING, runnable=True,
if task is None or (task.status != RUNNING and not worker.enabled):
return

# if the task was completed only recently, do not run the same task again
if status == PENDING and task.status == DONE and (time.time() - task.updated) < self._config.rerun_task_blocking_period:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, pretty much what I expected. Great!

@@ -840,6 +842,10 @@ def add_task(self, task_id=None, status=PENDING, runnable=True,
if task is None or (task.status != RUNNING and not worker.enabled):
return

# if the task was completed only recently, do not run the same task again
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Describe what you actually do, the implementation, not why you do it. :)

 # Ignore claims that the task is PENDING if it very recently was marked as DONE.

Maybe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, changed to the proposed version

@@ -46,6 +46,7 @@ def get_scheduler_config(self):
'disable_window': 10,
'retry_count': 3,
'disable_hard_timeout': 60 * 60,
'rerun_task_blocking_period': 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good start. This is fine as other tests didn't assume this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also of course you should add a few other unit tests with this flag set :)

Konstantin Gudkov and others added 3 commits March 11, 2019 14:35
- changed `track_url_in_stderr` to `stream_for_searching_tracking_url` according to the latest changes in external_program.py
- added tests for spark url tracking in cluster and client modes
@GoodDok GoodDok force-pushed the issue-2644-double-runs branch from 028d083 to 292bf9a Compare April 16, 2019 14:50
@GoodDok GoodDok force-pushed the issue-2644-double-runs branch 4 times, most recently from b9ccc70 to 689ec4e Compare April 16, 2019 16:11
@GoodDok GoodDok force-pushed the issue-2644-double-runs branch 3 times, most recently from 2b30f00 to 9926a46 Compare April 23, 2019 19:13
@GoodDok GoodDok force-pushed the issue-2644-double-runs branch 4 times, most recently from 075066a to 8ab20a9 Compare April 24, 2019 14:17
@GoodDok
Copy link
Contributor Author

GoodDok commented Apr 24, 2019

Guys, please take a look at the fresh version of PR, let me know if anything else is needed here (maybe some other use cases should be covered).
BTW, docs for the parameter have been tested and look as expected.

@GoodDok GoodDok changed the title Issue 2644: Tasks can be run several times under certain conditions (WIP) Issue 2644: Tasks can be run several times under certain conditions Apr 24, 2019
@GoodDok
Copy link
Contributor Author

GoodDok commented Jul 2, 2019

@Tarrasch @honnix @dlstadther psst, guys, probably you forgot about this one

Copy link
Contributor

@Tarrasch Tarrasch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Sorry for the delays! Can you just rebase? This is an important bug fix indeed!

shutil.rmtree(self.p)

def run_task(self):
return luigi.build([FailingOnDoubleRunTask(output_dir=self.p)],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you implement this with LuigiRunResult? Test will become more readable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, done

time.sleep(FailingOnDoubleRunTask.time_to_run_secs + FailingOnDoubleRunTask.time_to_check_secs)
# second run of the same task is started
luigi_run_result = self.run_task()
self.assertEqual(luigi_run_result, not failure_expected)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line in particular is very hard to read.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OMG, of course :)
thank you, fixed

Sync with the main master
@GoodDok GoodDok force-pushed the issue-2644-double-runs branch 3 times, most recently from 9845e09 to 8bf768d Compare July 15, 2019 21:55
@GoodDok
Copy link
Contributor Author

GoodDok commented Jul 15, 2019

@Tarrasch thank you for the thorough review again!
I've rebased + followed your recommendations to improve the readability of the tests.
Please let me know if we need any other changes here.

@eloyfelix
Copy link

many thanks @GoodDok et al for this work, I'm afraid that I'm also having this issue :P Hope it gets merged soon!

@GoodDok
Copy link
Contributor Author

GoodDok commented Aug 14, 2019

Hi there, @Tarrasch @honnix @dlstadther,
Just wanted to let you know that I'm still here once you are ready to review my final changes.

Seems like I'm not the only one who has faced this issue.

@dlstadther
Copy link
Collaborator

It looks like @Tarrasch already approved the changes. I'll give him a little to re-confirm, otherwise i'll merge

@Tarrasch
Copy link
Contributor

My approval still stands. Sorry for being away for so long and not merging your rebase. I or any another maintainer may merge this once you rebase. Sorry!

@GoodDok GoodDok force-pushed the issue-2644-double-runs branch from 377c3cf to 8bf768d Compare August 23, 2019 10:09
- introduced parameter `stable_done_cooldown_secs` with description
- added an acceptance test
@GoodDok GoodDok force-pushed the issue-2644-double-runs branch from 8bf768d to d0dd8f7 Compare August 23, 2019 10:11
@GoodDok
Copy link
Contributor Author

GoodDok commented Aug 23, 2019

@dlstadther @Tarrasch guys, I've rebased the branch, please feel free to merge :)

@danielfoguelman
Copy link

I just came here to say Thank you @GoodDok and @Tarrasch, I found this bug, and I'm testing the fix ATM.

My observed behavior is a worker finishing to execute a task, informing the scheduler it is done, another worker informing the scheduler that the same task is pending (after the DONE inform), then the task gets executed again by the first worker (the one informing DONE). The problem wouldn't be big as the task is idempotent, but the container died, so it was never picked up again.

After reading through the initial issue and the pull request, I think this could be the same problem. @GoodDok @Tarrasch do you think they may be related?
Luigi version is 2.8.12.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants