diff --git a/luigi/configuration.py b/luigi/configuration.py index 92b3a71808..1e12d3a4a5 100644 --- a/luigi/configuration.py +++ b/luigi/configuration.py @@ -33,11 +33,12 @@ import os import warnings -from configparser import Interpolation try: from ConfigParser import ConfigParser, NoOptionError, NoSectionError + Interpolation = object except ImportError: from configparser import ConfigParser, NoOptionError, NoSectionError + from configparser import Interpolation class LuigiConfigParser(ConfigParser): diff --git a/luigi/scheduler.py b/luigi/scheduler.py index 76785d9b82..2023a4cebd 100644 --- a/luigi/scheduler.py +++ b/luigi/scheduler.py @@ -1047,9 +1047,19 @@ def count_pending(self, worker): for task in worker.get_tasks(self._state, PENDING, FAILED): if self._upstream_status(task.id, upstream_status_table) == UPSTREAM_DISABLED: continue - num_pending += 1 - num_unique_pending += int(len(task.workers) == 1) - num_pending_last_scheduled += int(task.workers.peek(last=True) == worker_id) + has_failed_dependency = False + for dep in task.deps: + dep_task = self._state.get_task(dep, default=None) + if dep_task.status == UNKNOWN: + # consider this task as not pending since these dependencies have broken + # requires. this means that they won't ever be retried and can't succeed at all + has_failed_dependency = True + break + + if not has_failed_dependency: + num_pending += 1 + num_unique_pending += int(len(task.workers) == 1) + num_pending_last_scheduled += int(task.workers.peek(last=True) == worker_id) return { 'n_pending_tasks': num_pending, diff --git a/setup.py b/setup.py index db4d5909d3..5857bf8a3c 100644 --- a/setup.py +++ b/setup.py @@ -54,7 +54,7 @@ def get_static_files(path): setup( name='luigi', - version='2.7.5.affirm.1.2.0', + version='2.7.5.affirm.1.3.0', description='Workflow mgmgt + task scheduling + dependency resolution', long_description=long_description, author='The Luigi Authors', diff --git a/test/unknown_state_handling_test.py b/test/unknown_state_handling_test.py new file mode 100644 index 0000000000..54e7a2f842 --- /dev/null +++ b/test/unknown_state_handling_test.py @@ -0,0 +1,88 @@ +from helpers import LuigiTestCase + + +import luigi +import luigi.worker +import luigi.execution_summary + + +class DummyRequires(luigi.Task): + def run(self): + print('just a dummy task') + + +class FailInRun(luigi.Task): + def run(self): + print('failing in run') + raise Exception + + +class FailInRequires(luigi.Task): + def requires(self): + print('failing') + raise Exception + + def run(self): + print('running') + + +class FailInDepRequires(luigi.Task): + def requires(self): + return [FailInRequires()] + + def run(self): + print('doing a thing') + + +class FailInDepRun(luigi.Task): + def requires(self): + return [FailInRun()] + + def run(self): + print('doing a thing') + + +class UnknownStateTest(LuigiTestCase): + def setUp(self): + super(UnknownStateTest, self).setUp() + self.scheduler = luigi.scheduler.Scheduler( + prune_on_get_work=False, + retry_count=1 + ) + self.worker = luigi.worker.Worker( + scheduler=self.scheduler, + keep_alive=True + ) + + def run_task(self, task): + self.worker.add(task) # schedule + self.worker.run() # run + + def summary_dict(self): + return luigi.execution_summary._summary_dict(self.worker) + + def test_fail_in_run(self): + self.run_task(FailInRun()) + summary_dict = self.summary_dict() + + self.assertEqual({FailInRun()}, summary_dict['failed']) + + def test_fail_in_requires(self): + self.run_task(FailInRequires()) + summary_dict = self.summary_dict() + + self.assertEqual({FailInRequires()}, summary_dict['scheduling_error']) + + def test_fail_in_dep_run(self): + self.run_task(FailInDepRun()) + summary_dict = self.summary_dict() + + self.assertEqual({FailInRun()}, summary_dict['failed']) + self.assertEqual({FailInDepRun()}, summary_dict['still_pending_not_ext']) + + def test_fail_in_dep_requires(self): + self.run_task(FailInDepRequires()) + summary_dict = self.summary_dict() + + self.assertEqual({FailInRequires()}, summary_dict['scheduling_error']) + self.assertEqual({FailInDepRequires()}, summary_dict['still_pending_not_ext'])