Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task Level Configuration and upstream-status-when-all scheduler config #1782

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
7993af6
#1073 is handled for some scheduler configuration. Luigi now support…
javrasya Jul 22, 2016
b8631e3
Optimization is added. `test_get_work_speed` test was failing because…
javrasya Jul 22, 2016
308c072
An example is added for task level configuration
javrasya Jul 22, 2016
63f218c
Unnecessary code formatting is rollbacked
javrasya Jul 22, 2016
fa0a774
Renamed MyTask->MyTaskHasConfig because of TaskClassAmbigiousException
javrasya Jul 22, 2016
7659ba9
Renamed Unambigious task names
javrasya Jul 22, 2016
d5bfdf3
Unnecessary code formatting is rollback
javrasya Jul 22, 2016
0235b2a
Python3 compatiable prints
javrasya Jul 22, 2016
ad5492a
#1073 is handled for some scheduler configuration. Luigi now support…
javrasya Jul 22, 2016
5886373
Optimization is added. `test_get_work_speed` test was failing because…
javrasya Jul 22, 2016
817256a
An example is added for task level configuration
javrasya Jul 22, 2016
d1c04b6
Unnecessary code formatting is rollbacked
javrasya Jul 22, 2016
c673d4e
Renamed MyTask->MyTaskHasConfig because of TaskClassAmbigiousException
javrasya Jul 22, 2016
a2f2694
Renamed Unambigious task names
javrasya Jul 22, 2016
b07b402
Unnecessary code formatting is rollback
javrasya Jul 22, 2016
996c629
Python3 compatiable prints
javrasya Jul 22, 2016
44415be
Revising the PR by considering comments on the PR
javrasya Jul 25, 2016
48996b5
disable_num_failures -> retry_count in RetryPolicy namedtuple, exampl…
javrasya Jul 26, 2016
4b49bf3
upstream-status-when-all config is removed. default behavior of luigi…
javrasya Jul 27, 2016
10fc315
_get_retry_policy -> _generate_retry_policy in Scheduler. disable-num…
javrasya Jul 27, 2016
d2f7506
tests are fixed after luigi default behavior which is about seting pa…
javrasya Jul 27, 2016
951abce
Merge remote-tracking branch 'origin/comment-update'
javrasya Jul 27, 2016
ddf788c
old example is removed
javrasya Jul 27, 2016
63eaf7f
merge fix
javrasya Jul 27, 2016
3a262f6
test_renamegs_dont_move_on_fs -> test_rename_dont_move_on_fs
javrasya Jul 27, 2016
2112c28
configuration about per task retry policy doc is fixed.
javrasya Jul 27, 2016
c678689
retry-count -> retry_count in doc. name in config_path is `disable-nu…
javrasya Jul 27, 2016
1d19519
Merge remote-tracking branch 'upstream/master'
javrasya Jul 27, 2016
f22ab11
doc fix in luigi.Task.retry_count
javrasya Jul 27, 2016
ad5060a
python3x iteritems fix
javrasya Jul 27, 2016
c669e92
Merge remote-tracking branch 'upstream/master'
javrasya Jul 28, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 18 additions & 41 deletions doc/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ retry_external_tasks
This means that if external dependencies are satisfied after a workflow has
started, any tasks dependent on that resource will be eligible for running.
Note: Every time the task remains incomplete, it will count as FAILED, so
normal retry logic applies (see: `disable-num-failures` and `retry-delay`).
normal retry logic applies (see: `retry-count` and `retry-delay`).
This setting works best with `worker-keep-alive: true`.
If false, external tasks will only be evaluated when Luigi is first invoked.
In this case, Luigi will not check whether external dependencies are
Expand Down Expand Up @@ -550,10 +550,10 @@ disable-hard-timeout
**again** after this amount of time. E.g. if this was set to 600
(i.e. 10 minutes), and the task first failed at 10:00am, the task would
be disabled if it failed again any time after 10:10am. Note: This setting
does not consider the values of the `disable-num-failures` or
does not consider the values of the `retry-count` or
`disable-window-seconds` settings.

disable-num-failures
retry-count
Number of times a task can fail within disable-window-seconds before
the scheduler will automatically disable it. If not set, the scheduler
will not automatically disable jobs.
Expand All @@ -563,7 +563,7 @@ disable-persist-seconds
Defaults to 86400 (1 day).

disable-window-seconds
Number of seconds during which disable-num-failures failures must
Number of seconds during which retry-count failures must
occur in order for an automatic disable by the scheduler. The
scheduler forgets about disables that have occurred longer ago than
this amount of time. Defaults to 3600 (1 hour).
Expand Down Expand Up @@ -602,19 +602,6 @@ worker-disconnect-delay
scheduler before removing it and marking all of its running tasks as
failed. Defaults to 60.

upstream-status-when-all
Definition of how to set a task status as its upstream task statuses.
If this is set as ``True``, it means the task status set as min severity status among
upstream tasks. Otherwise it is set as max. For example; If this is set
as ``True``, the task status will be ``DISABLED`` or ``FAILED`` when all of its
upstream task statuses are ``DISABLED`` or ```FAILED```. Otherwise any of its
upstream task status will be set as the task status. This is helpfull
configuration while luigi tasks have a feature to define configuration
at task level and those have different ``disable-num-failures`` values.
Worker won't be shut down until last ``DISABLED`` task. Otherwise, First
``DISABLED`` task will make the task ``DISABLED`` and cause the worker is
shutdown. Defaults to ``False``


[spark]
-------
Expand Down Expand Up @@ -749,52 +736,42 @@ user
Configuration at Task Level
---------------------------

Luigi also supports to define some configurations at task level like scheduling configurations. It is defined in ``config`` variable in the task.
Luigi also supports to define some configurations at task level like scheduling configurations. It is as their names as a property variable in the task.

.. code-block:: python

class GenerateWordsFromHdfs(luigi.Task):

config = {
'disable-num-failures':5
}
disable_num_failures = 5

...

class GenerateWordsFromRDB(luigi.Task):
class GenerateWordsFromRDBM(luigi.Task):

config = {
'disable-num-failures':3
}
disable_num_failures = 3

...

class CountLetters(luigi.Task):

config = {
'upstream-status-when-all':True
}

def requires(self):
return [GenerateWordsFromHdfs(),GenerateWordsFromRDB()]
return [GenerateWordsFromHdfs(),GenerateWordsFromRDBM()]

...

Supported Configurations
************************
The configrations below are also definable in luigi config file. Check :ref:`scheduler-config`

+--------------------------+-----------+
| Config | Section |
+==========================+===========+
| disable-num-failures | scheduler |
+--------------------------+-----------+
| disable-hard-timeout | scheduler |
+--------------------------+-----------+
| disable-window-seconds | scheduler |
+--------------------------+-----------+
| upstream-status-when-all | scheduler |
+--------------------------+-----------+
+------------------------+-----------+
| Config | Section |
+========================+===========+
| retry-count | scheduler |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

retry_count. Use underscores.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True it is from first version of this feature.

+------------------------+-----------+
| disable-hard-timeout | scheduler |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The correct in config is to always use underscores. There are places where that's not the case, those cases are just yet not updated. :)

+------------------------+-----------+
| disable-window-seconds | scheduler |
+------------------------+-----------+



123 changes: 123 additions & 0 deletions examples/task_level_retry_policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# -*- coding: utf-8 -*-

"""
You can run this example like this:

.. code:: console

$ luigi --module examples.task_level_retry_policy examples.TaskLevelRetryPolicy --worker-keep-alive \
--local-scheduler --scheduler-retry-delay 5 --logging-conf-file test/testconfig/logging.cfg

...
... lots of spammy output
...
DEBUG: ErrorTask1__99914b932b task num failures is 1 and limit is 5
DEBUG: ErrorTask2__99914b932b task num failures is 1 and limit is 2
DEBUG: ErrorTask2__99914b932b task num failures limit(2) is exceeded
DEBUG: ErrorTask1__99914b932b task num failures is 2 and limit is 5
DEBUG: ErrorTask2__99914b932b task num failures is 2 and limit is 2
DEBUG: ErrorTask1__99914b932b task num failures is 3 and limit is 5
DEBUG: ErrorTask1__99914b932b task num failures is 4 and limit is 5
DEBUG: ErrorTask1__99914b932b task num failures is 5 and limit is 5
DEBUG: ErrorTask1__99914b932b task num failures limit(5) is exceeded
INFO:
===== Luigi Execution Summary =====

Scheduled 5 tasks of which:
* 2 ran successfully:
- 1 SuccessSubTask1()
- 1 SuccessTask1()
* 2 failed:
- 1 ErrorTask1()
- 1 ErrorTask2()
* 1 were left pending, among these:
* 1 had failed dependencies:
- 1 examples.TaskLevelRetryPolicy()

This progress looks :( because there were failed tasks

===== Luigi Execution Summary =====


As it seems, While ``ErrorTask1`` is retried 5 times (Exception: Test Exception. Retry Index 5 for ErrorTask1),
``ErrorTask2`` is retried 2 times (Exception: Test Exception. Retry Index 2 for ErrorTask2). Luigi keeps retrying
while keep-alive mode is active.
"""

import luigi


class TaskLevelRetryPolicy(luigi.WrapperTask):
"""
Wrapper class for some error and success tasks. Worker won't be shutdown unless there is
pending tasks or failed tasks which will be retried. While keep-alive is active, workers
are not shutdown while there is/are some pending task(s).

"""

task_namespace = 'examples'

def requires(self):
return [ErrorTask1(), ErrorTask2(), SuccessTask1()]

def output(self):
return luigi.LocalTarget(path='/tmp/_docs-%s.ldj' % self.task_id)


class ErrorTask1(luigi.Task):
"""
This error class raises error to retry the task. retry-count for this task is 5. It can be seen on
"""

retry = 0

disable_num_failures = 5

def run(self):
self.retry += 1
raise Exception('Test Exception. Retry Index %s for %s' % (self.retry, self.task_family))

def output(self):
return luigi.LocalTarget(path='/tmp/_docs-%s.ldj' % self.task_id)


class ErrorTask2(luigi.Task):
"""
This error class raises error to retry the task. retry-count for this task is 2
"""

retry = 0

disable_num_failures = 2

def run(self):
self.retry += 1
raise Exception('Test Exception. Retry Index %s for %s' % (self.retry, self.task_family))

def output(self):
return luigi.LocalTarget(path='/tmp/_docs-%s.ldj' % self.task_id)


class SuccessTask1(luigi.Task):
def requires(self):
return [SuccessSubTask1()]

def run(self):
with self.output().open('w') as output:
output.write('SUCCESS Test Task 4\n')

def output(self):
return luigi.LocalTarget(path='/tmp/_docs-%s.ldj' % self.task_id)


class SuccessSubTask1(luigi.Task):
"""
This success task sleeps for a while and then it is completed successfully.
"""

def run(self):
with self.output().open('w') as output:
output.write('SUCCESS Test Task 4.1\n')

def output(self):
return luigi.LocalTarget(path='/tmp/_docs-%s.ldj' % self.task_id)
Loading