From 8e9b439ce93d75a9e162a2e4289b04f3e1ad7e7e Mon Sep 17 00:00:00 2001 From: Arash Rouhani Date: Thu, 24 Sep 2015 19:58:13 +0200 Subject: [PATCH] Add semantic exit codes This commits adds a long requested feature, to be able to tell what the luigi client did (or didn't) based on its exit code. This closes spotify/luigi#687. --- doc/configuration.rst | 41 +++++++++++++++ luigi/cmdline.py | 5 +- luigi/interface.py | 21 ++++++-- luigi/retcodes.py | 80 ++++++++++++++++++++++++++++++ test/retcodes_test.py | 113 ++++++++++++++++++++++++++++++++++++++++++ test/server_test.py | 3 +- 6 files changed, 255 insertions(+), 8 deletions(-) create mode 100644 luigi/retcodes.py create mode 100644 test/retcodes_test.py diff --git a/doc/configuration.rst b/doc/configuration.rst index 4d3e3d1c41..af204aef20 100644 --- a/doc/configuration.rst +++ b/doc/configuration.rst @@ -411,6 +411,47 @@ hive resources and 1 mysql resource: Note that it was not necessary to specify the 1 for mysql here, but it is good practice to do so when you have a fixed set of resources. +.. _retcode-config: + +[retcode] +---------- + +Configure return codes for the luigi binary. In the case of multiple return +codes that could apply, for example a failing task and missing data, the +*numerically greatest* return code is returned. + +We recommend that you copy this set of exit codes to your ``luigi.cfg`` file: + +:: + + [retcode] + # The following return codes are the recommended exit codes for luigi + # They are in increasing level of severity (for most applications) + already_running: 10 + missing_data: 20 + task_failed: 30 + unhandled_exception: 40 + +unhandled_exception + For exceptions during scheduling (if you raise from the ``complete()`` or + ``requires()`` methods for instance) or for internal luigi errors. Defaults + to 4, since this type of error probably will not recover over time. +missing_data + For when an :py:class:`~luigi.task.ExternalTask` is not complete, and this + caused the worker to give up. As an alternative to fiddling with this, see + the [worker] keep_alive option. +task_failed + For signaling that there were last known to have failed. Typically because + some exception have been raised. +already_running + This can happen in two different cases. Either the local lock file was taken + at the time the invocation starts up. Or, the central scheduler have reported + that some tasks could not have been run, because other workers are already + running the tasks. + +If you customize return codes, prefer to set them in range 128 to 255 to avoid +conflicts. Return codes in range 0 to 127 are reserved for possible future use +by Luigi contributors. [scalding] ---------- diff --git a/luigi/cmdline.py b/luigi/cmdline.py index f0bf8ff5cc..fcb2cf1d96 100644 --- a/luigi/cmdline.py +++ b/luigi/cmdline.py @@ -3,14 +3,15 @@ import logging import sys +import luigi.interface import luigi.server import luigi.process import luigi.configuration -import luigi.interface +from luigi.retcodes import run_with_retcodes def luigi_run(argv=sys.argv[1:]): - luigi.interface.run(argv) + run_with_retcodes(argv) def luigid(argv=sys.argv[1:]): diff --git a/luigi/interface.py b/luigi/interface.py index 2be21ffa9d..14bc47bccf 100644 --- a/luigi/interface.py +++ b/luigi/interface.py @@ -167,7 +167,7 @@ def _schedule_and_run(tasks, worker_scheduler_factory=None, override_defaults=No kill_signal = signal.SIGUSR1 if env_params.take_lock else None if (not env_params.no_lock and not(lock.acquire_for(env_params.lock_pid_dir, env_params.lock_size, kill_signal))): - sys.exit(1) + raise PidLockAlreadyTakenExit() if env_params.local_scheduler: sch = worker_scheduler_factory.create_local_scheduler() @@ -193,11 +193,22 @@ def _schedule_and_run(tasks, worker_scheduler_factory=None, override_defaults=No success &= w.run() w.stop() logger.info(execution_summary.summary(w)) - return success + return dict(success=success, worker=w) -def run(cmdline_args=None, main_task_cls=None, - worker_scheduler_factory=None, use_dynamic_argparse=None, local_scheduler=False): +class PidLockAlreadyTakenExit(SystemExit): + """ + The exception thrown by :py:func:`luigi.run`, when the lock file is inaccessible + """ + pass + + +def run(*args, **kwargs): + return _run(*args, **kwargs)['success'] + + +def _run(cmdline_args=None, main_task_cls=None, + worker_scheduler_factory=None, use_dynamic_argparse=None, local_scheduler=False): """ Please dont use. Instead use `luigi` binary. @@ -249,4 +260,4 @@ def build(tasks, worker_scheduler_factory=None, **env_params): if "no_lock" not in env_params: env_params["no_lock"] = True - return _schedule_and_run(tasks, worker_scheduler_factory, override_defaults=env_params) + return _schedule_and_run(tasks, worker_scheduler_factory, override_defaults=env_params)['success'] diff --git a/luigi/retcodes.py b/luigi/retcodes.py new file mode 100644 index 0000000000..b5694cf4e9 --- /dev/null +++ b/luigi/retcodes.py @@ -0,0 +1,80 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2015-2015 Spotify AB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +Module containing the logic for exit codes for the luigi binary. It's useful +when you in a programmatic way need to know if luigi actually finished the +given task, and if not why. +""" + +import luigi +import sys +import logging +from luigi import IntParameter + + +class retcode(luigi.Config): + """ + See the :ref:`return codes configuration section `. + """ + unhandled_exception = IntParameter(default=4, + description='For scheduling errors or internal luigi errors.', + ) + missing_data = IntParameter(default=0, + description="For when there are incomplete ExternalTask dependencies.", + ) + task_failed = IntParameter(default=0, + description='''For when a task's run() method fails.''', + ) + already_running = IntParameter(default=0, + description='For both local --lock and luigid "lock"', + ) + + +def run_with_retcodes(argv): + """ + Run luigi with command line parsing, but raise ``SystemExit`` with the configured exit code. + + Note: Usually you use the luigi binary directly and don't call this function yourself. + + :param argv: Should (conceptually) be ``sys.argv[1:]`` + """ + logger = logging.getLogger('luigi-interface') + with luigi.cmdline_parser.CmdlineParser.global_instance(argv): + retcodes = retcode() + + worker = None + try: + worker = luigi.interface._run(argv)['worker'] + except luigi.interface.PidLockAlreadyTakenExit: + sys.exit(retcodes.already_running) + except Exception: + logger.exception("Uncaught exception in luigi") + sys.exit(retcodes.unhandled_exception) + + task_sets = luigi.execution_summary._summary_dict(worker) + non_empty_categories = {k: v for k, v in task_sets.items() if v}.keys() + + def has(status): + assert status in luigi.execution_summary._ORDERED_STATUSES + return status in non_empty_categories + + codes_and_conds = ( + (retcodes.missing_data, has('still_pending_ext')), + (retcodes.task_failed, has('failed')), + (retcodes.already_running, has('run_by_other_worker')), + ) + sys.exit(max(code * (1 if cond else 0) for code, cond in codes_and_conds)) diff --git a/test/retcodes_test.py b/test/retcodes_test.py new file mode 100644 index 0000000000..75ec0b1b52 --- /dev/null +++ b/test/retcodes_test.py @@ -0,0 +1,113 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2015-2015 Spotify AB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from helpers import LuigiTestCase, with_config +import mock +import luigi +import luigi.scheduler +from luigi.cmdline import luigi_run + + +class RetcodesTest(LuigiTestCase): + + def run_and_expect(self, joined_params, retcode, extra_args=['--local-scheduler', '--no-lock']): + with self.assertRaises(SystemExit) as cm: + luigi_run((joined_params.split(' ') + extra_args)) + self.assertEqual(cm.exception.code, retcode) + + def run_with_config(self, retcode_config, *args, **kwargs): + with_config(dict(retcode=retcode_config))(self.run_and_expect)(*args, **kwargs) + + def test_task_failed(self): + class FailingTask(luigi.Task): + def run(self): + raise ValueError() + + self.run_and_expect('FailingTask', 0) # Test default value to be 0 + self.run_and_expect('FailingTask --retcode-task-failed 5', 5) + self.run_with_config(dict(task_failed='3'), 'FailingTask', 3) + + def test_missing_data(self): + class MissingDataTask(luigi.ExternalTask): + def complete(self): + return False + + self.run_and_expect('MissingDataTask', 0) # Test default value to be 0 + self.run_and_expect('MissingDataTask --retcode-missing-data 5', 5) + self.run_with_config(dict(missing_data='3'), 'MissingDataTask', 3) + + def test_already_running(self): + class AlreadyRunningTask(luigi.Task): + def run(self): + pass + + old_func = luigi.scheduler.CentralPlannerScheduler.get_work + + def new_func(*args, **kwargs): + old_func(*args, **kwargs) + res = old_func(*args, **kwargs) + res['running_tasks'][0]['worker'] = "not me :)" # Otherwise it will be filtered + return res + + with mock.patch('luigi.scheduler.CentralPlannerScheduler.get_work', new_func): + self.run_and_expect('AlreadyRunningTask', 0) # Test default value to be 0 + self.run_and_expect('AlreadyRunningTask --retcode-already-running 5', 5) + self.run_with_config(dict(already_running='3'), 'AlreadyRunningTask', 3) + + def test_when_locked(self): + def new_func(*args, **kwargs): + return False + + with mock.patch('luigi.lock.acquire_for', new_func): + self.run_and_expect('Task', 0, extra_args=['--local-scheduler']) + self.run_and_expect('Task --retcode-already-running 5', 5, extra_args=['--local-scheduler']) + self.run_with_config(dict(already_running='3'), 'Task', 3, extra_args=['--local-scheduler']) + + def test_unhandled_exception(self): + self.run_and_expect('UnknownTask', 4) + self.run_and_expect('UnknownTask --retcode-unhandled-exception 2', 2) + + class TaskWithRequiredParam(luigi.Task): + param = luigi.Parameter() + + self.run_and_expect('TaskWithRequiredParam --param hello', 0) + self.run_and_expect('TaskWithRequiredParam', 4) + + def test_when_mixed_errors(self): + + class FailingTask(luigi.Task): + def run(self): + raise ValueError() + + class MissingDataTask(luigi.ExternalTask): + def complete(self): + return False + + class RequiringTask(luigi.Task): + def requires(self): + yield FailingTask() + yield MissingDataTask() + + self.run_and_expect('RequiringTask --retcode-task-failed 4 --retcode-missing-data 5', 5) + self.run_and_expect('RequiringTask --retcode-task-failed 7 --retcode-missing-data 6', 7) + + def test_keyboard_interrupts(self): + + class KeyboardInterruptTask(luigi.Task): + def run(self): + raise KeyboardInterrupt() + + self.assertRaises(KeyboardInterrupt, luigi_run, ['KeyboardInterruptTask', '--local-scheduler', '--no-lock']) diff --git a/test/server_test.py b/test/server_test.py index 2cdfd0ec72..46c9d1fdf2 100644 --- a/test/server_test.py +++ b/test/server_test.py @@ -200,7 +200,8 @@ def test_with_cmdline(self): """ Test to run against the server as a normal luigi invocation does """ - luigi.cmdline.luigi_run(['Task', '--scheduler-port', str(self.server_client.port), '--no-lock']) + params = ['Task', '--scheduler-port', str(self.server_client.port), '--no-lock'] + self.assertTrue(luigi.interface.run(params)) class INETProcessServerTest(_INETServerTest):