From 4fc3861fee968ce23021f2bc3cd3b54885ef5d49 Mon Sep 17 00:00:00 2001 From: Arash Rouhani Date: Mon, 18 Jul 2016 16:32:48 +0700 Subject: [PATCH] Assistants: Don't affect longevity of tasks When assistants where introduced, we imagined that one worker would run tasks (the assistant) and one would upload them. In order to ensure that the tasks don't get pruned before tasks get completed, we added a what we now consider an "antifeature" that made tasks not getting pruned while assistants where alive. Real world use have showed that it wasn't a good feature. It have caused bugs like spotify/luigi@736c0f1, but worst, it's a feature that makes luigid change behavior for all tasks if you just submit 1 assistant. I didn't expect that the first time I tested assistants. There's an email thread where it was discussed to remove this feature. In there one can also read about how to get something similar to the old behavior. https://groups.google.com/forum/#!topic/luigi-user/b7Acym0n-g4 --- luigi/scheduler.py | 27 +++--------- test/central_planner_test.py | 83 +++++++++++++++++++++++++++--------- 2 files changed, 69 insertions(+), 41 deletions(-) diff --git a/luigi/scheduler.py b/luigi/scheduler.py index c1ad59911d..902ae1e8d3 100644 --- a/luigi/scheduler.py +++ b/luigi/scheduler.py @@ -444,11 +444,12 @@ def fail_dead_worker_task(self, task, config, assistants): def update_status(self, task, config): # Mark tasks with no remaining active stakeholders for deletion - if not task.stakeholders: - if task.remove is None: - logger.debug("Task %r has stakeholders %r but none remain connected -> might remove " - "task in %s seconds", task.id, task.stakeholders, config.remove_delay) - task.remove = time.time() + config.remove_delay + if (not task.stakeholders) and (task.remove is None) and (task.status != RUNNING): + # We don't check for the RUNNING case, because that is already handled + # by the fail_dead_worker_task function. + logger.debug("Task %r has no stakeholders anymore -> might remove " + "task in %s seconds", task.id, config.remove_delay) + task.remove = time.time() + config.remove_delay # Re-enable task after the disable time expires if task.status == DISABLED and task.scheduler_disable_time is not None: @@ -506,15 +507,6 @@ def disable_workers(self, workers): for worker in workers: self.get_worker(worker).disabled = True - def get_necessary_tasks(self): - necessary_tasks = set() - for task in self.get_active_tasks(): - if task.status not in (DONE, DISABLED, UNKNOWN) or \ - task.scheduler_disable_time is not None: - necessary_tasks.update(task.deps) - necessary_tasks.add(task.id) - return necessary_tasks - class CentralPlannerScheduler(Scheduler): """ @@ -573,15 +565,10 @@ def _prune_tasks(self): assistant_ids = set(w.id for w in self._state.get_assistants()) remove_tasks = [] - if assistant_ids: - necessary_tasks = self._state.get_necessary_tasks() - else: - necessary_tasks = () - for task in self._state.get_active_tasks(): self._state.fail_dead_worker_task(task, self._config, assistant_ids) self._state.update_status(task, self._config) - if self._state.may_prune(task) and task.id not in necessary_tasks: + if self._state.may_prune(task): logger.info("Removing task %r", task.id) remove_tasks.append(task.id) diff --git a/test/central_planner_test.py b/test/central_planner_test.py index ece3c3a0eb..00e9377877 100644 --- a/test/central_planner_test.py +++ b/test/central_planner_test.py @@ -22,7 +22,7 @@ import luigi.notifications from luigi.scheduler import DISABLED, DONE, FAILED, PENDING, \ - UNKNOWN, CentralPlannerScheduler + UNKNOWN, RUNNING, CentralPlannerScheduler luigi.notifications.DEBUG = True WORKER = 'myworker' @@ -343,19 +343,28 @@ def test_re_enable_failed_task_assistant(self): self.sch.ping(worker='X') # worker still alive self.assertEqual('PENDING', self.sch.task_list('', '')['A']['status']) - def test_fail_job_from_dead_worker_with_live_assistant(self): + def test_assistant_doesnt_keep_alive_task(self): self.setTime(0) self.sch.add_task(worker='X', task_id='A') self.assertEqual('A', self.sch.get_work(worker='X')['task_id']) - self.sch.add_worker('Y', [('assistant', True)]) + self.sch.add_worker('Y', {'assistant': True}) - self.setTime(600) + remove_delay = self.get_scheduler_config()['remove_delay'] + 1.0 + self.setTime(remove_delay) self.sch.ping(worker='Y') self.sch.prune() + self.assertEqual(['A'], list(self.sch.task_list(status='FAILED', upstream_status='').keys())) + self.assertEqual(['A'], list(self.sch.task_list(status='', upstream_status='').keys())) - self.assertEqual(['A'], list(self.sch.task_list('FAILED', '').keys())) + self.setTime(2*remove_delay) + self.sch.ping(worker='Y') + self.sch.prune() + self.assertEqual([], list(self.sch.task_list(status='', upstream_status='').keys())) def test_assistant_request_runnable_task(self): + """ + Test that an assistant gets a task despite it havent registered for it + """ self.setTime(0) self.sch.add_task(worker='X', task_id='A', runnable=True) self.setTime(600) @@ -367,27 +376,31 @@ def test_assistant_request_external_task(self): self.sch.add_task(worker='X', task_id='A', runnable=False) self.assertIsNone(self.sch.get_work(worker='Y', assistant=True)['task_id']) - def test_prune_done_tasks(self, expected=None): + def _test_prune_done_tasks(self, expected=None): self.setTime(0) self.sch.add_task(worker=WORKER, task_id='A', status=DONE) self.sch.add_task(worker=WORKER, task_id='B', deps=['A'], status=DONE) self.sch.add_task(worker=WORKER, task_id='C', deps=['B']) self.setTime(600) - self.sch.ping(worker='ASSISTANT') + self.sch.ping(worker='MAYBE_ASSITANT') self.sch.prune() self.setTime(2000) - self.sch.ping(worker='ASSISTANT') + self.sch.ping(worker='MAYBE_ASSITANT') self.sch.prune() - self.assertEqual(set(expected or ()), set(self.sch.task_list('', '').keys())) + self.assertEqual(set(expected), set(self.sch.task_list('', '').keys())) + + def test_prune_done_tasks_not_assistant(self, expected=None): + # Here, MAYBE_ASSISTANT isnt an assistant + self._test_prune_done_tasks(expected=[]) def test_keep_tasks_for_assistant(self): - self.sch.get_work(worker='ASSISTANT', assistant=True) # tell the scheduler this is an assistant - self.test_prune_done_tasks(['B', 'C']) + self.sch.get_work(worker='MAYBE_ASSITANT', assistant=True) # tell the scheduler this is an assistant + self._test_prune_done_tasks([]) def test_keep_scheduler_disabled_tasks_for_assistant(self): - self.sch.get_work(worker='ASSISTANT', assistant=True) # tell the scheduler this is an assistant + self.sch.get_work(worker='MAYBE_ASSITANT', assistant=True) # tell the scheduler this is an assistant # create a scheduler disabled task and a worker disabled task for i in range(10): @@ -396,12 +409,12 @@ def test_keep_scheduler_disabled_tasks_for_assistant(self): # scheduler prunes the worker disabled task self.assertEqual(set(['D', 'E']), set(self.sch.task_list(DISABLED, ''))) - self.test_prune_done_tasks(['B', 'C', 'D']) + self._test_prune_done_tasks([]) def test_keep_failed_tasks_for_assistant(self): - self.sch.get_work(worker='ASSISTANT', assistant=True) # tell the scheduler this is an assistant - self.sch.add_task(worker=WORKER, task_id='D', status=FAILED, deps='A') - self.test_prune_done_tasks(['A', 'B', 'C', 'D']) + self.sch.get_work(worker='MAYBE_ASSITANT', assistant=True) # tell the scheduler this is an assistant + self.sch.add_task(worker=WORKER, task_id='D', status=FAILED, deps=['A']) + self._test_prune_done_tasks([]) def test_scheduler_resources_none_allow_one(self): self.sch.add_task(worker='X', task_id='A', resources={'R1': 1}) @@ -1089,9 +1102,10 @@ def test_get_work_speed(self): def test_assistants_dont_nurture_finished_statuses(self): """ - Assistants should not affect longevity of DONE tasks + Test how assistants affect longevity of tasks - Also check for statuses DISABLED and UNKNOWN. + Assistants should not affect longevity expect for the tasks that it is + running, par the one it's actually running. """ self.sch = CentralPlannerScheduler(retry_delay=100000000000) # Never pendify failed tasks self.setTime(1) @@ -1114,8 +1128,8 @@ def test_assistants_dont_nurture_finished_statuses(self): self.setTime(200000) self.sch.ping(worker='assistant') self.sch.prune() - nurtured_statuses = ['PENDING', 'FAILED', 'RUNNING'] - not_nurtured_statuses = ['DONE', 'UNKNOWN', 'DISABLED'] + nurtured_statuses = [RUNNING] + not_nurtured_statuses = [DONE, UNKNOWN, DISABLED, PENDING, FAILED] for status in nurtured_statuses: print(status) @@ -1125,7 +1139,7 @@ def test_assistants_dont_nurture_finished_statuses(self): print(status) self.assertEqual(set([]), set(self.sch.task_list(status, ''))) - self.assertEqual(3, len(self.sch.task_list(None, ''))) # None == All statuses + self.assertEqual(1, len(self.sch.task_list(None, ''))) # None == All statuses def test_no_crash_on_only_disable_hard_timeout(self): """ @@ -1148,3 +1162,30 @@ def test_no_crash_on_only_disable_hard_timeout(self): self.setTime(10) self.sch.prune() self.assertEqual(self.sch.get_work(worker=WORKER)['task_id'], 'A') + + def test_assistant_running_task_dont_disappear(self): + """ + Tasks run by an assistant shouldn't be pruned + """ + self.setTime(1) + self.sch.add_worker(WORKER, []) + self.sch.ping(worker=WORKER) + + self.setTime(2) + self.sch.add_task(worker=WORKER, task_id='A') + self.assertEqual(self.sch.get_work(worker=WORKER)['task_id'], 'A') + self.sch.add_task(worker=WORKER, task_id='B') + self.sch.add_worker('assistant', [('assistant', True)]) + self.sch.ping(worker='assistant') + self.assertEqual(self.sch.get_work(worker='assistant', assistant=True)['task_id'], 'B') + + self.setTime(100000) + # Here, lets say WORKER disconnects (doesnt ping) + self.sch.ping(worker='assistant') + self.sch.prune() + + self.setTime(200000) + self.sch.ping(worker='assistant') + self.sch.prune() + self.assertEqual({'B'}, set(self.sch.task_list(RUNNING, ''))) + self.assertEqual({'B'}, set(self.sch.task_list('', '')))