Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Assistants: Don't affect longevity of tasks #1772

Merged
merged 1 commit into from
Jul 20, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 7 additions & 20 deletions luigi/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,11 +444,12 @@ def fail_dead_worker_task(self, task, config, assistants):

def update_status(self, task, config):
# Mark tasks with no remaining active stakeholders for deletion
if not task.stakeholders:
if task.remove is None:
logger.debug("Task %r has stakeholders %r but none remain connected -> might remove "
"task in %s seconds", task.id, task.stakeholders, config.remove_delay)
task.remove = time.time() + config.remove_delay
if (not task.stakeholders) and (task.remove is None) and (task.status != RUNNING):
# We don't check for the RUNNING case, because that is already handled
# by the fail_dead_worker_task function.
logger.debug("Task %r has no stakeholders anymore -> might remove "
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry. The changed log message is not related to this patch. Just see it as a bonus update of legacy code.

"task in %s seconds", task.id, config.remove_delay)
task.remove = time.time() + config.remove_delay

# Re-enable task after the disable time expires
if task.status == DISABLED and task.scheduler_disable_time is not None:
Expand Down Expand Up @@ -506,15 +507,6 @@ def disable_workers(self, workers):
for worker in workers:
self.get_worker(worker).disabled = True

def get_necessary_tasks(self):
necessary_tasks = set()
for task in self.get_active_tasks():
if task.status not in (DONE, DISABLED, UNKNOWN) or \
task.scheduler_disable_time is not None:
necessary_tasks.update(task.deps)
necessary_tasks.add(task.id)
return necessary_tasks


class CentralPlannerScheduler(Scheduler):
"""
Expand Down Expand Up @@ -573,15 +565,10 @@ def _prune_tasks(self):
assistant_ids = set(w.id for w in self._state.get_assistants())
remove_tasks = []

if assistant_ids:
necessary_tasks = self._state.get_necessary_tasks()
else:
necessary_tasks = ()

for task in self._state.get_active_tasks():
self._state.fail_dead_worker_task(task, self._config, assistant_ids)
self._state.update_status(task, self._config)
if self._state.may_prune(task) and task.id not in necessary_tasks:
if self._state.may_prune(task):
logger.info("Removing task %r", task.id)
remove_tasks.append(task.id)

Expand Down
83 changes: 62 additions & 21 deletions test/central_planner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import luigi.notifications
from luigi.scheduler import DISABLED, DONE, FAILED, PENDING, \
UNKNOWN, CentralPlannerScheduler
UNKNOWN, RUNNING, CentralPlannerScheduler

luigi.notifications.DEBUG = True
WORKER = 'myworker'
Expand Down Expand Up @@ -343,19 +343,28 @@ def test_re_enable_failed_task_assistant(self):
self.sch.ping(worker='X') # worker still alive
self.assertEqual('PENDING', self.sch.task_list('', '')['A']['status'])

def test_fail_job_from_dead_worker_with_live_assistant(self):
def test_assistant_doesnt_keep_alive_task(self):
self.setTime(0)
self.sch.add_task(worker='X', task_id='A')
self.assertEqual('A', self.sch.get_work(worker='X')['task_id'])
self.sch.add_worker('Y', [('assistant', True)])
self.sch.add_worker('Y', {'assistant': True})

self.setTime(600)
remove_delay = self.get_scheduler_config()['remove_delay'] + 1.0
self.setTime(remove_delay)
self.sch.ping(worker='Y')
self.sch.prune()
self.assertEqual(['A'], list(self.sch.task_list(status='FAILED', upstream_status='').keys()))
self.assertEqual(['A'], list(self.sch.task_list(status='', upstream_status='').keys()))

self.assertEqual(['A'], list(self.sch.task_list('FAILED', '').keys()))
self.setTime(2*remove_delay)
self.sch.ping(worker='Y')
self.sch.prune()
self.assertEqual([], list(self.sch.task_list(status='', upstream_status='').keys()))

def test_assistant_request_runnable_task(self):
"""
Test that an assistant gets a task despite it havent registered for it
"""
self.setTime(0)
self.sch.add_task(worker='X', task_id='A', runnable=True)
self.setTime(600)
Expand All @@ -367,27 +376,31 @@ def test_assistant_request_external_task(self):
self.sch.add_task(worker='X', task_id='A', runnable=False)
self.assertIsNone(self.sch.get_work(worker='Y', assistant=True)['task_id'])

def test_prune_done_tasks(self, expected=None):
def _test_prune_done_tasks(self, expected=None):
self.setTime(0)
self.sch.add_task(worker=WORKER, task_id='A', status=DONE)
self.sch.add_task(worker=WORKER, task_id='B', deps=['A'], status=DONE)
self.sch.add_task(worker=WORKER, task_id='C', deps=['B'])

self.setTime(600)
self.sch.ping(worker='ASSISTANT')
self.sch.ping(worker='MAYBE_ASSITANT')
self.sch.prune()
self.setTime(2000)
self.sch.ping(worker='ASSISTANT')
self.sch.ping(worker='MAYBE_ASSITANT')
self.sch.prune()

self.assertEqual(set(expected or ()), set(self.sch.task_list('', '').keys()))
self.assertEqual(set(expected), set(self.sch.task_list('', '').keys()))

def test_prune_done_tasks_not_assistant(self, expected=None):
# Here, MAYBE_ASSISTANT isnt an assistant
self._test_prune_done_tasks(expected=[])

def test_keep_tasks_for_assistant(self):
self.sch.get_work(worker='ASSISTANT', assistant=True) # tell the scheduler this is an assistant
self.test_prune_done_tasks(['B', 'C'])
self.sch.get_work(worker='MAYBE_ASSITANT', assistant=True) # tell the scheduler this is an assistant
self._test_prune_done_tasks([])

def test_keep_scheduler_disabled_tasks_for_assistant(self):
self.sch.get_work(worker='ASSISTANT', assistant=True) # tell the scheduler this is an assistant
self.sch.get_work(worker='MAYBE_ASSITANT', assistant=True) # tell the scheduler this is an assistant

# create a scheduler disabled task and a worker disabled task
for i in range(10):
Expand All @@ -396,12 +409,12 @@ def test_keep_scheduler_disabled_tasks_for_assistant(self):

# scheduler prunes the worker disabled task
self.assertEqual(set(['D', 'E']), set(self.sch.task_list(DISABLED, '')))
self.test_prune_done_tasks(['B', 'C', 'D'])
self._test_prune_done_tasks([])

def test_keep_failed_tasks_for_assistant(self):
self.sch.get_work(worker='ASSISTANT', assistant=True) # tell the scheduler this is an assistant
self.sch.add_task(worker=WORKER, task_id='D', status=FAILED, deps='A')
self.test_prune_done_tasks(['A', 'B', 'C', 'D'])
self.sch.get_work(worker='MAYBE_ASSITANT', assistant=True) # tell the scheduler this is an assistant
self.sch.add_task(worker=WORKER, task_id='D', status=FAILED, deps=['A'])
self._test_prune_done_tasks([])

def test_scheduler_resources_none_allow_one(self):
self.sch.add_task(worker='X', task_id='A', resources={'R1': 1})
Expand Down Expand Up @@ -1089,9 +1102,10 @@ def test_get_work_speed(self):

def test_assistants_dont_nurture_finished_statuses(self):
"""
Assistants should not affect longevity of DONE tasks
Test how assistants affect longevity of tasks

Also check for statuses DISABLED and UNKNOWN.
Assistants should not affect longevity expect for the tasks that it is
running, par the one it's actually running.
"""
self.sch = CentralPlannerScheduler(retry_delay=100000000000) # Never pendify failed tasks
self.setTime(1)
Expand All @@ -1114,8 +1128,8 @@ def test_assistants_dont_nurture_finished_statuses(self):
self.setTime(200000)
self.sch.ping(worker='assistant')
self.sch.prune()
nurtured_statuses = ['PENDING', 'FAILED', 'RUNNING']
not_nurtured_statuses = ['DONE', 'UNKNOWN', 'DISABLED']
nurtured_statuses = [RUNNING]
not_nurtured_statuses = [DONE, UNKNOWN, DISABLED, PENDING, FAILED]

for status in nurtured_statuses:
print(status)
Expand All @@ -1125,7 +1139,7 @@ def test_assistants_dont_nurture_finished_statuses(self):
print(status)
self.assertEqual(set([]), set(self.sch.task_list(status, '')))

self.assertEqual(3, len(self.sch.task_list(None, ''))) # None == All statuses
self.assertEqual(1, len(self.sch.task_list(None, ''))) # None == All statuses

def test_no_crash_on_only_disable_hard_timeout(self):
"""
Expand All @@ -1148,3 +1162,30 @@ def test_no_crash_on_only_disable_hard_timeout(self):
self.setTime(10)
self.sch.prune()
self.assertEqual(self.sch.get_work(worker=WORKER)['task_id'], 'A')

def test_assistant_running_task_dont_disappear(self):
"""
Tasks run by an assistant shouldn't be pruned
"""
self.setTime(1)
self.sch.add_worker(WORKER, [])
self.sch.ping(worker=WORKER)

self.setTime(2)
self.sch.add_task(worker=WORKER, task_id='A')
self.assertEqual(self.sch.get_work(worker=WORKER)['task_id'], 'A')
self.sch.add_task(worker=WORKER, task_id='B')
self.sch.add_worker('assistant', [('assistant', True)])
self.sch.ping(worker='assistant')
self.assertEqual(self.sch.get_work(worker='assistant', assistant=True)['task_id'], 'B')

self.setTime(100000)
# Here, lets say WORKER disconnects (doesnt ping)
self.sch.ping(worker='assistant')
self.sch.prune()

self.setTime(200000)
self.sch.ping(worker='assistant')
self.sch.prune()
self.assertEqual({'B'}, set(self.sch.task_list(RUNNING, '')))
self.assertEqual({'B'}, set(self.sch.task_list('', '')))