Skip to content

Commit

Permalink
Revert "Marking as minimum upstream severity instead of max (spotify#…
Browse files Browse the repository at this point in the history
…1789)"

This reverts commit b12be00.
  • Loading branch information
daveFNbuck committed Nov 10, 2016
1 parent 24d01fa commit 42a8e63
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 10 deletions.
5 changes: 3 additions & 2 deletions luigi/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1196,8 +1196,9 @@ def _upstream_status(self, task_id, upstream_status_table):
elif upstream_status_table[dep_id] == '' and dep.deps:
# This is the postorder update step when we set the
# status based on the previously calculated child elements
upstream_severities = list(upstream_status_table.get(a_task_id) for a_task_id in dep.deps if a_task_id in upstream_status_table) or ['']
status = min(upstream_severities, key=UPSTREAM_SEVERITY_KEY)
status = max((upstream_status_table.get(a_task_id, '')
for a_task_id in dep.deps),
key=UPSTREAM_SEVERITY_KEY)
upstream_status_table[dep_id] = status
return upstream_status_table[dep_id]

Expand Down
15 changes: 7 additions & 8 deletions test/scheduler_visualisation_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ def requires(self):
self.assertEqual(db['status'], 'DONE')

missing_input = remote.task_list('PENDING', 'UPSTREAM_MISSING_INPUT')
self.assertEqual(len(missing_input), 3)
self.assertEqual(len(missing_input), 2)

pa = missing_input.get(A().task_id)
self.assertEqual(pa['status'], 'PENDING')
Expand All @@ -433,15 +433,14 @@ def requires(self):
self.assertEqual(pc['status'], 'PENDING')
self.assertEqual(remote._upstream_status(C().task_id, {}), 'UPSTREAM_MISSING_INPUT')

pe = missing_input.get(E().task_id)
self.assertEqual(pe['status'], 'PENDING')
self.assertEqual(remote._upstream_status(E().task_id, {}), 'UPSTREAM_MISSING_INPUT')

upstream_failed = remote.task_list('PENDING', 'UPSTREAM_FAILED')
self.assertEqual(len(upstream_failed), 1)
self.assertEqual(len(upstream_failed), 2)
pe = upstream_failed.get(E().task_id)
self.assertEqual(pe['status'], 'PENDING')
self.assertEqual(remote._upstream_status(E().task_id, {}), 'UPSTREAM_FAILED')

pd = upstream_failed.get(D().task_id)
self.assertEqual(pd['status'], 'PENDING')
pe = upstream_failed.get(D().task_id)
self.assertEqual(pe['status'], 'PENDING')
self.assertEqual(remote._upstream_status(D().task_id, {}), 'UPSTREAM_FAILED')

pending = dict(missing_input)
Expand Down

0 comments on commit 42a8e63

Please sign in to comment.