-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Task Level Configuration and upstream-status-when-all
scheduler config
#1782
Conversation
…supports defining some configuration at task level. Those configuration can be spesified at task level are `disable-num-failures`, `disable-hard-timeout`, `disable-window-seconds`, `upstream-status-when-all`. There is also added a config for scheduler named as `upstream-status-when-all` which is definition of how to set a task status as its upstream task statuses. Tests and documentation are also added.
… of adding task is so slow.
# -*- coding: utf-8 -*- | ||
|
||
# | ||
# To make this run, you probably want to edit /etc/luigi/client.cfg and add something like: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something like what? :)
Summary of my comments:
Looks really promising! :) |
@@ -112,5 +112,129 @@ def __init__(self): | |||
|
|||
worker.prune(TmpCfg()) | |||
|
|||
@with_config({'scheduler': {'disable-num-failures': '44'}}) | |||
def test_scheduler_with_task_level_config(self): | |||
cps = luigi.scheduler.CentralPlannerScheduler() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, as I merged #1781 you have to update this line :)
…supports defining some configuration at task level. Those configuration can be spesified at task level are `disable-num-failures`, `disable-hard-timeout`, `disable-window-seconds`, `upstream-status-when-all`. There is also added a config for scheduler named as `upstream-status-when-all` which is definition of how to set a task status as its upstream task statuses. Tests and documentation are also added.
… of adding task is so slow.
@Tarrasch I don't want to bother more by pushing more commit in this PR without being sure. Can you review changes according to your comments . I made sort of major changes.
Please keep commenting there and I will correct it and merge it into my master branch to update PR after all. |
…e task command line runing tip is updated and more clear and readable now and other small changes
Ok. I think we aren't on the same page when it comes to Let's render an example. Task X depends on Y2 and Y5. Y2 has retry-count 2 and Y5 has retry-count 5. Let's say that In our example X will never run because both Y2 and Y5 will not both get DONE. Now let's say Y5 failed and is now marked as red while Y2 have not run yet and is pending.
We note that UPSTREAM_FAILED is not something a task ever is marked as, it's just a view that's only exists in the luigi scheduler. It's possible that a task is UPSTREAM_FAILED, UPSTREAM_DISABLED, both, or none. Are we on the same page so far? Actually. I downloaded your branch and tested to run your example. And I do see what you mean. now. It only works when that variable is true. What about this proposal. Because I really don't want to introduce a configuration option that takes a luigi maintainer hours to understand, can you try to change the default behaviour to do as if your new variable was true? Yes, it's a bit provocative to change this over 3 years old concept in luigi. But I don't really seeing it being used anywhere except for visualization. There's only one place (as of #871) and in that case I believe the author mistakenly assumed the behaviour works as I propose we change it too. I checked that no scheduler-api tests fail after the change, and I can help you to provide a test case that will fail until we change the behaviour. How does that sound? Less code for both of us and I think we fix a design-flaw in luigi. And yes, you will run into this bug in production as of today, it's just that in your case both child tasks crash after 0 seconds, if one task has a delay you will get into the same problem even if both have the same retry-count. |
Also linking in #686, even there the author said "if all dependencies", it seems that |
I would go for not including this configuration if we have a chance and changing the default behavior for 'all' instead of 'any'. If you say it, I will change the behavior and remove the added configuration. |
Yes. I think the default behaviour should change, today I'll work on creating a test case which only passes with the new behaviour. |
… on setting parent task severity as `UPSTREAM_DISABLED` or `UPSTREAM_FAILED` when any of its upstream task is `DISABLED` or `FAILED` is changed to when all of its upstream task is `DISABLED` or `FAILED`. `disable_failures`->`retry-count` in scheduler class.
…-failures -> retry-count in config file and its documentation is updated.
A few tests fail after I change the current behavior of luigi on that. I will fix them and update the PR soon. |
…rent task severity as its upstream tasks statuses, is changed
Conflicts: doc/configuration.rst luigi/scheduler.py luigi/worker.py test/scheduler_test.py
@javrasya, it would be so ideal if that could be made into a separate PR that we merge first. Could you arrange for that? Then we have one diffview where it's clear what that change affects. I remember scheduler_api tests did not start failing, but maybe I ran it incorrectly. |
The one which fails is |
+------------------------+-----------+ | ||
| Config | Section | | ||
+========================+===========+ | ||
| retry-count | scheduler | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
retry_count
. Use underscores.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True it is from first version of this feature.
disable_failures = parameter.IntParameter(default=999999999, | ||
config_path=dict(section='scheduler', name='disable-num-failures')) | ||
retry_count = parameter.IntParameter(default=999999999, | ||
config_path=dict(section='scheduler', name='retry-count')) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
noooo... s/retry-count/disable_failures
this please.
Sorry for the confusion. But config_path
should have really been called deprecated_config_path
from day one. The confusion is as of our bad naming. Remember, the non-deprecated config path is automatic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok probably, I got it wrongly. I first didn't change it but, after a comment of yours, I changed it. I can rollback
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK I am bit of confused about your comment Oh, even more important. Change this line to say name='disable_failures'! Also update the docs to tell people to use retry_count.
. Should I rollback it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code should be name='disable_failures'
so we keep backward compatiblity. Docs should encourage usage of the newest name retry_count
, so people dont get deprecation warnings.
Just add upon commits. There's no point in "rolling back". In the end I'll squash everything anyway as the history is a bit messy already. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I guess I got it now. You mean, If I describe it as retry_count
in .cfg
file, even the name is disable-num-failures
, both will be able to be used ? So all I need to commit is to change the part as name='disable-num-failures'
, not the doc and doc can remain as retry_count
right?
…m-failures` back again for backward compatibility
for a_task_id in dep.deps), | ||
key=UPSTREAM_SEVERITY_KEY) | ||
upstream_severities = list(upstream_status_table.get(a_task_id) for a_task_id in dep.deps if a_task_id in upstream_status_table) or [''] | ||
status = min(upstream_severities, key=UPSTREAM_SEVERITY_KEY) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is the change of luigi default behavior to detect upstream severity. Parent task is marked as UPSTREAM_DISABLED
when any of its upstream task is DISABLED
and the worker is shutdown because of it with how luigi works currently. This behavior is changed to wait for all upstream tasks to be DISABLED
or FAILED
to mark parent task as UPSTREAM_DISABLED
or UPSTREAM_FAILED
.
Ok so your first next step here is to create a new pr with only the " This PR is really messy now. It's better to have many small PRs that get merged so we have the feeling that we're progressing. :) |
Description
Defining some scheduling configuration at task level as it is defined in #1073 . Also have new configuration named as
upstream-status-when-all
in[scheduler]
section.Motivation and Context
Task Level Configuration
It may be required to have different scheduling configuration for different tasks like retry-count(
disable-num-failures
). Assume that, a task is retrieving data from Hive which responses in long time and other one is retrieving data from RDBM which responses in short time. I may want Hive task not to retry or has small retry-count not to block all my system again an again. But, retrying RDBM task many times may not be problem for me. Assume that, I have some network issue for a while and I should be able to give its retry-count more than Hive task. So I would need to define those configuration at task level.This fixes the issue defined in #1073
upstream-status-when-all
configNormally luigi scheduler sets a wrapper task status according to its upstream task severity statuses. For example; if any of its upstream task severity is
DISABLED
orFAILED
, scheduler sets the task status as FAILED. If this configuration is set asTrue
; scheduler won't make the task asUPSTREAM_FAILED
orUPSTREAM_DISABLED
unless all of its tasks excludingSUCCESS
ones are
FAILED
orDISABLED
.This is not a problem when there was
task level configuration
feature. But After this feature, some upstream task may retry even one of other upstream tasks isDISABLED
orFAILED
.Please check the example which is added in this PR.
Have you tested this? If so, how?
There is also a unit test for those features. Debugging with PyCharm, running available task with tox (Some of them are not able to run in my local machine. I don't know, maybe tests include hdfs or other outer integration systems) and finally with travis(with my account) on my forked repo. I met some optimization problem. Adding task by scheduler was so slow and I realised it because of a test. Thanks to
test_get_work_speed
incentral_planner_test.py
.What is added extra?