Skip to content

Commit

Permalink
Add semantic exit codes
Browse files Browse the repository at this point in the history
This commits adds a long requested feature, to be able to tell what the luigi
client did (or didn't) based on its exit code. This closes #687.
  • Loading branch information
Tarrasch committed Oct 8, 2015
1 parent 37777e6 commit 8e9b439
Show file tree
Hide file tree
Showing 6 changed files with 255 additions and 8 deletions.
41 changes: 41 additions & 0 deletions doc/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,47 @@ hive resources and 1 mysql resource:
Note that it was not necessary to specify the 1 for mysql here, but it
is good practice to do so when you have a fixed set of resources.

.. _retcode-config:

[retcode]
----------

Configure return codes for the luigi binary. In the case of multiple return
codes that could apply, for example a failing task and missing data, the
*numerically greatest* return code is returned.

We recommend that you copy this set of exit codes to your ``luigi.cfg`` file:

::

[retcode]
# The following return codes are the recommended exit codes for luigi
# They are in increasing level of severity (for most applications)
already_running: 10
missing_data: 20
task_failed: 30
unhandled_exception: 40

unhandled_exception
For exceptions during scheduling (if you raise from the ``complete()`` or
``requires()`` methods for instance) or for internal luigi errors. Defaults
to 4, since this type of error probably will not recover over time.
missing_data
For when an :py:class:`~luigi.task.ExternalTask` is not complete, and this
caused the worker to give up. As an alternative to fiddling with this, see
the [worker] keep_alive option.
task_failed
For signaling that there were last known to have failed. Typically because
some exception have been raised.
already_running
This can happen in two different cases. Either the local lock file was taken
at the time the invocation starts up. Or, the central scheduler have reported
that some tasks could not have been run, because other workers are already
running the tasks.

If you customize return codes, prefer to set them in range 128 to 255 to avoid
conflicts. Return codes in range 0 to 127 are reserved for possible future use
by Luigi contributors.

[scalding]
----------
Expand Down
5 changes: 3 additions & 2 deletions luigi/cmdline.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
import logging
import sys

import luigi.interface
import luigi.server
import luigi.process
import luigi.configuration
import luigi.interface
from luigi.retcodes import run_with_retcodes


def luigi_run(argv=sys.argv[1:]):
luigi.interface.run(argv)
run_with_retcodes(argv)


def luigid(argv=sys.argv[1:]):
Expand Down
21 changes: 16 additions & 5 deletions luigi/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ def _schedule_and_run(tasks, worker_scheduler_factory=None, override_defaults=No
kill_signal = signal.SIGUSR1 if env_params.take_lock else None
if (not env_params.no_lock and
not(lock.acquire_for(env_params.lock_pid_dir, env_params.lock_size, kill_signal))):
sys.exit(1)
raise PidLockAlreadyTakenExit()

if env_params.local_scheduler:
sch = worker_scheduler_factory.create_local_scheduler()
Expand All @@ -193,11 +193,22 @@ def _schedule_and_run(tasks, worker_scheduler_factory=None, override_defaults=No
success &= w.run()
w.stop()
logger.info(execution_summary.summary(w))
return success
return dict(success=success, worker=w)


def run(cmdline_args=None, main_task_cls=None,
worker_scheduler_factory=None, use_dynamic_argparse=None, local_scheduler=False):
class PidLockAlreadyTakenExit(SystemExit):
"""
The exception thrown by :py:func:`luigi.run`, when the lock file is inaccessible
"""
pass


def run(*args, **kwargs):
return _run(*args, **kwargs)['success']


def _run(cmdline_args=None, main_task_cls=None,
worker_scheduler_factory=None, use_dynamic_argparse=None, local_scheduler=False):
"""
Please dont use. Instead use `luigi` binary.
Expand Down Expand Up @@ -249,4 +260,4 @@ def build(tasks, worker_scheduler_factory=None, **env_params):
if "no_lock" not in env_params:
env_params["no_lock"] = True

return _schedule_and_run(tasks, worker_scheduler_factory, override_defaults=env_params)
return _schedule_and_run(tasks, worker_scheduler_factory, override_defaults=env_params)['success']
80 changes: 80 additions & 0 deletions luigi/retcodes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# -*- coding: utf-8 -*-
#
# Copyright 2015-2015 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Module containing the logic for exit codes for the luigi binary. It's useful
when you in a programmatic way need to know if luigi actually finished the
given task, and if not why.
"""

import luigi
import sys
import logging
from luigi import IntParameter


class retcode(luigi.Config):
"""
See the :ref:`return codes configuration section <retcode-config>`.
"""
unhandled_exception = IntParameter(default=4,
description='For scheduling errors or internal luigi errors.',
)
missing_data = IntParameter(default=0,
description="For when there are incomplete ExternalTask dependencies.",
)
task_failed = IntParameter(default=0,
description='''For when a task's run() method fails.''',
)
already_running = IntParameter(default=0,
description='For both local --lock and luigid "lock"',
)


def run_with_retcodes(argv):
"""
Run luigi with command line parsing, but raise ``SystemExit`` with the configured exit code.
Note: Usually you use the luigi binary directly and don't call this function yourself.
:param argv: Should (conceptually) be ``sys.argv[1:]``
"""
logger = logging.getLogger('luigi-interface')
with luigi.cmdline_parser.CmdlineParser.global_instance(argv):
retcodes = retcode()

worker = None
try:
worker = luigi.interface._run(argv)['worker']
except luigi.interface.PidLockAlreadyTakenExit:
sys.exit(retcodes.already_running)
except Exception:
logger.exception("Uncaught exception in luigi")
sys.exit(retcodes.unhandled_exception)

task_sets = luigi.execution_summary._summary_dict(worker)
non_empty_categories = {k: v for k, v in task_sets.items() if v}.keys()

def has(status):
assert status in luigi.execution_summary._ORDERED_STATUSES
return status in non_empty_categories

codes_and_conds = (
(retcodes.missing_data, has('still_pending_ext')),
(retcodes.task_failed, has('failed')),
(retcodes.already_running, has('run_by_other_worker')),
)
sys.exit(max(code * (1 if cond else 0) for code, cond in codes_and_conds))
113 changes: 113 additions & 0 deletions test/retcodes_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# -*- coding: utf-8 -*-
#
# Copyright 2015-2015 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from helpers import LuigiTestCase, with_config
import mock
import luigi
import luigi.scheduler
from luigi.cmdline import luigi_run


class RetcodesTest(LuigiTestCase):

def run_and_expect(self, joined_params, retcode, extra_args=['--local-scheduler', '--no-lock']):
with self.assertRaises(SystemExit) as cm:
luigi_run((joined_params.split(' ') + extra_args))
self.assertEqual(cm.exception.code, retcode)

def run_with_config(self, retcode_config, *args, **kwargs):
with_config(dict(retcode=retcode_config))(self.run_and_expect)(*args, **kwargs)

def test_task_failed(self):
class FailingTask(luigi.Task):
def run(self):
raise ValueError()

self.run_and_expect('FailingTask', 0) # Test default value to be 0
self.run_and_expect('FailingTask --retcode-task-failed 5', 5)
self.run_with_config(dict(task_failed='3'), 'FailingTask', 3)

def test_missing_data(self):
class MissingDataTask(luigi.ExternalTask):
def complete(self):
return False

self.run_and_expect('MissingDataTask', 0) # Test default value to be 0
self.run_and_expect('MissingDataTask --retcode-missing-data 5', 5)
self.run_with_config(dict(missing_data='3'), 'MissingDataTask', 3)

def test_already_running(self):
class AlreadyRunningTask(luigi.Task):
def run(self):
pass

old_func = luigi.scheduler.CentralPlannerScheduler.get_work

def new_func(*args, **kwargs):
old_func(*args, **kwargs)
res = old_func(*args, **kwargs)
res['running_tasks'][0]['worker'] = "not me :)" # Otherwise it will be filtered
return res

with mock.patch('luigi.scheduler.CentralPlannerScheduler.get_work', new_func):
self.run_and_expect('AlreadyRunningTask', 0) # Test default value to be 0
self.run_and_expect('AlreadyRunningTask --retcode-already-running 5', 5)
self.run_with_config(dict(already_running='3'), 'AlreadyRunningTask', 3)

def test_when_locked(self):
def new_func(*args, **kwargs):
return False

with mock.patch('luigi.lock.acquire_for', new_func):
self.run_and_expect('Task', 0, extra_args=['--local-scheduler'])
self.run_and_expect('Task --retcode-already-running 5', 5, extra_args=['--local-scheduler'])
self.run_with_config(dict(already_running='3'), 'Task', 3, extra_args=['--local-scheduler'])

def test_unhandled_exception(self):
self.run_and_expect('UnknownTask', 4)
self.run_and_expect('UnknownTask --retcode-unhandled-exception 2', 2)

class TaskWithRequiredParam(luigi.Task):
param = luigi.Parameter()

self.run_and_expect('TaskWithRequiredParam --param hello', 0)
self.run_and_expect('TaskWithRequiredParam', 4)

def test_when_mixed_errors(self):

class FailingTask(luigi.Task):
def run(self):
raise ValueError()

class MissingDataTask(luigi.ExternalTask):
def complete(self):
return False

class RequiringTask(luigi.Task):
def requires(self):
yield FailingTask()
yield MissingDataTask()

self.run_and_expect('RequiringTask --retcode-task-failed 4 --retcode-missing-data 5', 5)
self.run_and_expect('RequiringTask --retcode-task-failed 7 --retcode-missing-data 6', 7)

def test_keyboard_interrupts(self):

class KeyboardInterruptTask(luigi.Task):
def run(self):
raise KeyboardInterrupt()

self.assertRaises(KeyboardInterrupt, luigi_run, ['KeyboardInterruptTask', '--local-scheduler', '--no-lock'])
3 changes: 2 additions & 1 deletion test/server_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ def test_with_cmdline(self):
"""
Test to run against the server as a normal luigi invocation does
"""
luigi.cmdline.luigi_run(['Task', '--scheduler-port', str(self.server_client.port), '--no-lock'])
params = ['Task', '--scheduler-port', str(self.server_client.port), '--no-lock']
self.assertTrue(luigi.interface.run(params))


class INETProcessServerTest(_INETServerTest):
Expand Down

0 comments on commit 8e9b439

Please sign in to comment.