-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Task Level Configuration and upstream-status-when-all
scheduler config
#1782
Closed
Closed
Changes from 14 commits
Commits
Show all changes
31 commits
Select commit
Hold shift + click to select a range
7993af6
#1073 is handled for some scheduler configuration. Luigi now support…
javrasya b8631e3
Optimization is added. `test_get_work_speed` test was failing because…
javrasya 308c072
An example is added for task level configuration
javrasya 63f218c
Unnecessary code formatting is rollbacked
javrasya fa0a774
Renamed MyTask->MyTaskHasConfig because of TaskClassAmbigiousException
javrasya 7659ba9
Renamed Unambigious task names
javrasya d5bfdf3
Unnecessary code formatting is rollback
javrasya 0235b2a
Python3 compatiable prints
javrasya ad5492a
#1073 is handled for some scheduler configuration. Luigi now support…
javrasya 5886373
Optimization is added. `test_get_work_speed` test was failing because…
javrasya 817256a
An example is added for task level configuration
javrasya d1c04b6
Unnecessary code formatting is rollbacked
javrasya c673d4e
Renamed MyTask->MyTaskHasConfig because of TaskClassAmbigiousException
javrasya a2f2694
Renamed Unambigious task names
javrasya b07b402
Unnecessary code formatting is rollback
javrasya 996c629
Python3 compatiable prints
javrasya 44415be
Revising the PR by considering comments on the PR
javrasya 48996b5
disable_num_failures -> retry_count in RetryPolicy namedtuple, exampl…
javrasya 4b49bf3
upstream-status-when-all config is removed. default behavior of luigi…
javrasya 10fc315
_get_retry_policy -> _generate_retry_policy in Scheduler. disable-num…
javrasya d2f7506
tests are fixed after luigi default behavior which is about seting pa…
javrasya 951abce
Merge remote-tracking branch 'origin/comment-update'
javrasya ddf788c
old example is removed
javrasya 63eaf7f
merge fix
javrasya 3a262f6
test_renamegs_dont_move_on_fs -> test_rename_dont_move_on_fs
javrasya 2112c28
configuration about per task retry policy doc is fixed.
javrasya c678689
retry-count -> retry_count in doc. name in config_path is `disable-nu…
javrasya 1d19519
Merge remote-tracking branch 'upstream/master'
javrasya f22ab11
doc fix in luigi.Task.retry_count
javrasya ad5060a
python3x iteritems fix
javrasya c669e92
Merge remote-tracking branch 'upstream/master'
javrasya File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -269,7 +269,7 @@ retry_external_tasks | |
This means that if external dependencies are satisfied after a workflow has | ||
started, any tasks dependent on that resource will be eligible for running. | ||
Note: Every time the task remains incomplete, it will count as FAILED, so | ||
normal retry logic applies (see: `disable-num-failures` and `retry-delay`). | ||
normal retry logic applies (see: `retry-count` and `retry-delay`). | ||
This setting works best with `worker-keep-alive: true`. | ||
If false, external tasks will only be evaluated when Luigi is first invoked. | ||
In this case, Luigi will not check whether external dependencies are | ||
|
@@ -550,10 +550,10 @@ disable-hard-timeout | |
**again** after this amount of time. E.g. if this was set to 600 | ||
(i.e. 10 minutes), and the task first failed at 10:00am, the task would | ||
be disabled if it failed again any time after 10:10am. Note: This setting | ||
does not consider the values of the `disable-num-failures` or | ||
does not consider the values of the `retry-count` or | ||
`disable-window-seconds` settings. | ||
|
||
disable-num-failures | ||
retry-count | ||
Number of times a task can fail within disable-window-seconds before | ||
the scheduler will automatically disable it. If not set, the scheduler | ||
will not automatically disable jobs. | ||
|
@@ -563,7 +563,7 @@ disable-persist-seconds | |
Defaults to 86400 (1 day). | ||
|
||
disable-window-seconds | ||
Number of seconds during which disable-num-failures failures must | ||
Number of seconds during which retry-count failures must | ||
occur in order for an automatic disable by the scheduler. The | ||
scheduler forgets about disables that have occurred longer ago than | ||
this amount of time. Defaults to 3600 (1 hour). | ||
|
@@ -602,19 +602,6 @@ worker-disconnect-delay | |
scheduler before removing it and marking all of its running tasks as | ||
failed. Defaults to 60. | ||
|
||
upstream-status-when-all | ||
Definition of how to set a task status as its upstream task statuses. | ||
If this is set as ``True``, it means the task status set as min severity status among | ||
upstream tasks. Otherwise it is set as max. For example; If this is set | ||
as ``True``, the task status will be ``DISABLED`` or ``FAILED`` when all of its | ||
upstream task statuses are ``DISABLED`` or ```FAILED```. Otherwise any of its | ||
upstream task status will be set as the task status. This is helpfull | ||
configuration while luigi tasks have a feature to define configuration | ||
at task level and those have different ``disable-num-failures`` values. | ||
Worker won't be shut down until last ``DISABLED`` task. Otherwise, First | ||
``DISABLED`` task will make the task ``DISABLED`` and cause the worker is | ||
shutdown. Defaults to ``False`` | ||
|
||
|
||
[spark] | ||
------- | ||
|
@@ -749,52 +736,42 @@ user | |
Configuration at Task Level | ||
--------------------------- | ||
|
||
Luigi also supports to define some configurations at task level like scheduling configurations. It is defined in ``config`` variable in the task. | ||
Luigi also supports to define some configurations at task level like scheduling configurations. It is as their names as a property variable in the task. | ||
|
||
.. code-block:: python | ||
|
||
class GenerateWordsFromHdfs(luigi.Task): | ||
|
||
config = { | ||
'disable-num-failures':5 | ||
} | ||
disable_num_failures = 5 | ||
|
||
... | ||
|
||
class GenerateWordsFromRDB(luigi.Task): | ||
class GenerateWordsFromRDBM(luigi.Task): | ||
|
||
config = { | ||
'disable-num-failures':3 | ||
} | ||
disable_num_failures = 3 | ||
|
||
... | ||
|
||
class CountLetters(luigi.Task): | ||
|
||
config = { | ||
'upstream-status-when-all':True | ||
} | ||
|
||
def requires(self): | ||
return [GenerateWordsFromHdfs(),GenerateWordsFromRDB()] | ||
return [GenerateWordsFromHdfs(),GenerateWordsFromRDBM()] | ||
|
||
... | ||
|
||
Supported Configurations | ||
************************ | ||
The configrations below are also definable in luigi config file. Check :ref:`scheduler-config` | ||
|
||
+--------------------------+-----------+ | ||
| Config | Section | | ||
+==========================+===========+ | ||
| disable-num-failures | scheduler | | ||
+--------------------------+-----------+ | ||
| disable-hard-timeout | scheduler | | ||
+--------------------------+-----------+ | ||
| disable-window-seconds | scheduler | | ||
+--------------------------+-----------+ | ||
| upstream-status-when-all | scheduler | | ||
+--------------------------+-----------+ | ||
+------------------------+-----------+ | ||
| Config | Section | | ||
+========================+===========+ | ||
| retry-count | scheduler | | ||
+------------------------+-----------+ | ||
| disable-hard-timeout | scheduler | | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The correct in config is to always use underscores. There are places where that's not the case, those cases are just yet not updated. :) |
||
+------------------------+-----------+ | ||
| disable-window-seconds | scheduler | | ||
+------------------------+-----------+ | ||
|
||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
# -*- coding: utf-8 -*- | ||
|
||
""" | ||
You can run this example like this: | ||
|
||
.. code:: console | ||
|
||
$ luigi --module examples.task_level_retry_policy examples.TaskLevelRetryPolicy --worker-keep-alive \ | ||
--local-scheduler --scheduler-retry-delay 5 --logging-conf-file test/testconfig/logging.cfg | ||
|
||
... | ||
... lots of spammy output | ||
... | ||
DEBUG: ErrorTask1__99914b932b task num failures is 1 and limit is 5 | ||
DEBUG: ErrorTask2__99914b932b task num failures is 1 and limit is 2 | ||
DEBUG: ErrorTask2__99914b932b task num failures limit(2) is exceeded | ||
DEBUG: ErrorTask1__99914b932b task num failures is 2 and limit is 5 | ||
DEBUG: ErrorTask2__99914b932b task num failures is 2 and limit is 2 | ||
DEBUG: ErrorTask1__99914b932b task num failures is 3 and limit is 5 | ||
DEBUG: ErrorTask1__99914b932b task num failures is 4 and limit is 5 | ||
DEBUG: ErrorTask1__99914b932b task num failures is 5 and limit is 5 | ||
DEBUG: ErrorTask1__99914b932b task num failures limit(5) is exceeded | ||
INFO: | ||
===== Luigi Execution Summary ===== | ||
|
||
Scheduled 5 tasks of which: | ||
* 2 ran successfully: | ||
- 1 SuccessSubTask1() | ||
- 1 SuccessTask1() | ||
* 2 failed: | ||
- 1 ErrorTask1() | ||
- 1 ErrorTask2() | ||
* 1 were left pending, among these: | ||
* 1 had failed dependencies: | ||
- 1 examples.TaskLevelRetryPolicy() | ||
|
||
This progress looks :( because there were failed tasks | ||
|
||
===== Luigi Execution Summary ===== | ||
|
||
|
||
As it seems, While ``ErrorTask1`` is retried 5 times (Exception: Test Exception. Retry Index 5 for ErrorTask1), | ||
``ErrorTask2`` is retried 2 times (Exception: Test Exception. Retry Index 2 for ErrorTask2). Luigi keeps retrying | ||
while keep-alive mode is active. | ||
""" | ||
|
||
import luigi | ||
|
||
|
||
class TaskLevelRetryPolicy(luigi.WrapperTask): | ||
""" | ||
Wrapper class for some error and success tasks. Worker won't be shutdown unless there is | ||
pending tasks or failed tasks which will be retried. While keep-alive is active, workers | ||
are not shutdown while there is/are some pending task(s). | ||
|
||
""" | ||
|
||
task_namespace = 'examples' | ||
|
||
def requires(self): | ||
return [ErrorTask1(), ErrorTask2(), SuccessTask1()] | ||
|
||
def output(self): | ||
return luigi.LocalTarget(path='/tmp/_docs-%s.ldj' % self.task_id) | ||
|
||
|
||
class ErrorTask1(luigi.Task): | ||
""" | ||
This error class raises error to retry the task. retry-count for this task is 5. It can be seen on | ||
""" | ||
|
||
retry = 0 | ||
|
||
disable_num_failures = 5 | ||
|
||
def run(self): | ||
self.retry += 1 | ||
raise Exception('Test Exception. Retry Index %s for %s' % (self.retry, self.task_family)) | ||
|
||
def output(self): | ||
return luigi.LocalTarget(path='/tmp/_docs-%s.ldj' % self.task_id) | ||
|
||
|
||
class ErrorTask2(luigi.Task): | ||
""" | ||
This error class raises error to retry the task. retry-count for this task is 2 | ||
""" | ||
|
||
retry = 0 | ||
|
||
disable_num_failures = 2 | ||
|
||
def run(self): | ||
self.retry += 1 | ||
raise Exception('Test Exception. Retry Index %s for %s' % (self.retry, self.task_family)) | ||
|
||
def output(self): | ||
return luigi.LocalTarget(path='/tmp/_docs-%s.ldj' % self.task_id) | ||
|
||
|
||
class SuccessTask1(luigi.Task): | ||
def requires(self): | ||
return [SuccessSubTask1()] | ||
|
||
def run(self): | ||
with self.output().open('w') as output: | ||
output.write('SUCCESS Test Task 4\n') | ||
|
||
def output(self): | ||
return luigi.LocalTarget(path='/tmp/_docs-%s.ldj' % self.task_id) | ||
|
||
|
||
class SuccessSubTask1(luigi.Task): | ||
""" | ||
This success task sleeps for a while and then it is completed successfully. | ||
""" | ||
|
||
def run(self): | ||
with self.output().open('w') as output: | ||
output.write('SUCCESS Test Task 4.1\n') | ||
|
||
def output(self): | ||
return luigi.LocalTarget(path='/tmp/_docs-%s.ldj' % self.task_id) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
retry_count
. Use underscores.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True it is from first version of this feature.