Skip to content

Commit

Permalink
Marking as minimum upstream severity instead of max (#1789)
Browse files Browse the repository at this point in the history
Luigi marks a wrapper task as `UPSTREAM_FAILED` or `UPSTREAM_DISABLED`
when ANY of its upstream task is `FAILED` or `DISABLED`. Moreover, when
the wrapper task is marked as `UPSTREAM_DISABLED`, luigi shut the worker
down as it should do. This causes another PENDING_TASK in the wrapper
task not to run because worker is down. This behaviour should change.
Marking as `UPSTREAM_` should be ALL instead of ANY
  • Loading branch information
javrasya authored and Tarrasch committed Jul 28, 2016
1 parent fd18bd6 commit b12be00
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 10 deletions.
5 changes: 2 additions & 3 deletions luigi/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -862,9 +862,8 @@ def _upstream_status(self, task_id, upstream_status_table):
elif upstream_status_table[dep_id] == '' and dep.deps:
# This is the postorder update step when we set the
# status based on the previously calculated child elements
status = max((upstream_status_table.get(a_task_id, '')
for a_task_id in dep.deps),
key=UPSTREAM_SEVERITY_KEY)
upstream_severities = list(upstream_status_table.get(a_task_id) for a_task_id in dep.deps if a_task_id in upstream_status_table) or ['']
status = min(upstream_severities, key=UPSTREAM_SEVERITY_KEY)
upstream_status_table[dep_id] = status
return upstream_status_table[dep_id]

Expand Down
15 changes: 8 additions & 7 deletions test/scheduler_visualisation_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ def requires(self):
self.assertEqual(db['status'], 'DONE')

missing_input = remote.task_list('PENDING', 'UPSTREAM_MISSING_INPUT')
self.assertEqual(len(missing_input), 2)
self.assertEqual(len(missing_input), 3)

pa = missing_input.get(A().task_id)
self.assertEqual(pa['status'], 'PENDING')
Expand All @@ -433,14 +433,15 @@ def requires(self):
self.assertEqual(pc['status'], 'PENDING')
self.assertEqual(remote._upstream_status(C().task_id, {}), 'UPSTREAM_MISSING_INPUT')

upstream_failed = remote.task_list('PENDING', 'UPSTREAM_FAILED')
self.assertEqual(len(upstream_failed), 2)
pe = upstream_failed.get(E().task_id)
pe = missing_input.get(E().task_id)
self.assertEqual(pe['status'], 'PENDING')
self.assertEqual(remote._upstream_status(E().task_id, {}), 'UPSTREAM_FAILED')
self.assertEqual(remote._upstream_status(E().task_id, {}), 'UPSTREAM_MISSING_INPUT')

pe = upstream_failed.get(D().task_id)
self.assertEqual(pe['status'], 'PENDING')
upstream_failed = remote.task_list('PENDING', 'UPSTREAM_FAILED')
self.assertEqual(len(upstream_failed), 1)

pd = upstream_failed.get(D().task_id)
self.assertEqual(pd['status'], 'PENDING')
self.assertEqual(remote._upstream_status(D().task_id, {}), 'UPSTREAM_FAILED')

pending = dict(missing_input)
Expand Down

0 comments on commit b12be00

Please sign in to comment.