Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task Level Configuration and upstream-status-when-all scheduler config #1782

Closed
wants to merge 31 commits into from

Conversation

javrasya
Copy link
Contributor

@javrasya javrasya commented Jul 22, 2016

Description

Defining some scheduling configuration at task level as it is defined in #1073 . Also have new configuration named as upstream-status-when-all in [scheduler] section.

Motivation and Context

Task Level Configuration

It may be required to have different scheduling configuration for different tasks like retry-count(disable-num-failures). Assume that, a task is retrieving data from Hive which responses in long time and other one is retrieving data from RDBM which responses in short time. I may want Hive task not to retry or has small retry-count not to block all my system again an again. But, retrying RDBM task many times may not be problem for me. Assume that, I have some network issue for a while and I should be able to give its retry-count more than Hive task. So I would need to define those configuration at task level.

This fixes the issue defined in #1073

upstream-status-when-all config

Normally luigi scheduler sets a wrapper task status according to its upstream task severity statuses. For example; if any of its upstream task severity is DISABLED or FAILED, scheduler sets the task status as FAILED. If this configuration is set as True; scheduler won't make the task as UPSTREAM_FAILED or UPSTREAM_DISABLED unless all of its tasks excluding SUCCESS
ones are FAILED or DISABLED.

This is not a problem when there was task level configuration feature. But After this feature, some upstream task may retry even one of other upstream tasks is DISABLED or FAILED.
Please check the example which is added in this PR.

Have you tested this? If so, how?

There is also a unit test for those features. Debugging with PyCharm, running available task with tox (Some of them are not able to run in my local machine. I don't know, maybe tests include hdfs or other outer integration systems) and finally with travis(with my account) on my forked repo. I met some optimization problem. Adding task by scheduler was so slow and I realised it because of a test. Thanks to test_get_work_speed in central_planner_test.py.

What is added extra?

  • Documentation
  • Unit-test
  • An example

javrasya added 8 commits July 22, 2016 20:10
…supports defining some configuration at task level. Those configuration can be spesified at task level are `disable-num-failures`, `disable-hard-timeout`, `disable-window-seconds`, `upstream-status-when-all`. There is also added a config for scheduler named as `upstream-status-when-all` which is definition of how to set a task status as its upstream task statuses. Tests and documentation are also added.

Verified

This commit was signed with the committer’s verified signature. The key has expired.
hiyuki2578 Shota Tsunehiro

Partially verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
We cannot verify signatures from co-authors, and some of the co-authors attributed to this commit require their commits to be signed.
# -*- coding: utf-8 -*-

#
# To make this run, you probably want to edit /etc/luigi/client.cfg and add something like:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like what? :)

@Tarrasch
Copy link
Contributor

Tarrasch commented Jul 25, 2016

Summary of my comments:

  • Excellent documentation
  • Despite it's a pretty short code change I think it could be even shorter and easier to read/maintain etc.
  • I think there's 1 bug related to run()time dependencies feature (new_deps_config)

Looks really promising! :)

@@ -112,5 +112,129 @@ def __init__(self):

worker.prune(TmpCfg())

@with_config({'scheduler': {'disable-num-failures': '44'}})
def test_scheduler_with_task_level_config(self):
cps = luigi.scheduler.CentralPlannerScheduler()
Copy link
Contributor

@Tarrasch Tarrasch Jul 25, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, as I merged #1781 you have to update this line :)

javrasya added 8 commits July 25, 2016 12:53
…supports defining some configuration at task level. Those configuration can be spesified at task level are `disable-num-failures`, `disable-hard-timeout`, `disable-window-seconds`, `upstream-status-when-all`. There is also added a config for scheduler named as `upstream-status-when-all` which is definition of how to set a task status as its upstream task statuses. Tests and documentation are also added.
@javrasya
Copy link
Contributor Author

javrasya commented Jul 25, 2016

@Tarrasch I don't want to bother more by pushing more commit in this PR without being sure. Can you review changes according to your comments . I made sort of major changes.

  • I changed the way of using dict to @Property
  • named-tuples are used to make it more readable and maintainable.
  • config -> retry-policy as term. (task_cofigs -> retry_policy, deps_task_configs -> deps_retry_policies)
  • new_deps_configs is removed.
  • It is added that how to run the added example on command line.

Please keep commenting there and I will correct it and merge it into my master branch to update PR after all.

…e task command line runing tip is updated and more clear and readable now and other small changes
@Tarrasch
Copy link
Contributor

Ok. I think we aren't on the same page when it comes to upstream-status-when-all. Maybe I've misunderstood parts of the PR. Also comment javrasya@44415be#commitcomment-18391330 comes to mind.

Let's render an example.


Task X depends on Y2 and Y5. Y2 has retry-count 2 and Y5 has retry-count 5. Let's say that In our example X will never run because both Y2 and Y5 will not both get DONE.

Now let's say Y5 failed and is now marked as red while Y2 have not run yet and is pending.

upstream-status-when-all: If this config is set to false (default) for task X, then X will be in the set of UPSTREAM_FAILED because Y5 is. But if this variable is set to true, it will not be in UPSTREAM_FAILED, becuase Y2 is still pending.

We note that UPSTREAM_FAILED is not something a task ever is marked as, it's just a view that's only exists in the luigi scheduler. It's possible that a task is UPSTREAM_FAILED, UPSTREAM_DISABLED, both, or none.

Are we on the same page so far?


Actually. I downloaded your branch and tested to run your example. And I do see what you mean. now. It only works when that variable is true. What about this proposal. Because I really don't want to introduce a configuration option that takes a luigi maintainer hours to understand, can you try to change the default behaviour to do as if your new variable was true?

Yes, it's a bit provocative to change this over 3 years old concept in luigi. But I don't really seeing it being used anywhere except for visualization. There's only one place (as of #871) and in that case I believe the author mistakenly assumed the behaviour works as I propose we change it too.

I checked that no scheduler-api tests fail after the change, and I can help you to provide a test case that will fail until we change the behaviour. How does that sound? Less code for both of us and I think we fix a design-flaw in luigi. And yes, you will run into this bug in production as of today, it's just that in your case both child tasks crash after 0 seconds, if one task has a delay you will get into the same problem even if both have the same retry-count.

@Tarrasch
Copy link
Contributor

Also linking in #686, even there the author said "if all dependencies", it seems that upstream-status-when-all should always be True and it's False for some ancient mistake years ago.

@javrasya
Copy link
Contributor Author

javrasya commented Jul 26, 2016

I would go for not including this configuration if we have a chance and changing the default behavior for 'all' instead of 'any'. If you say it, I will change the behavior and remove the added configuration.

@Tarrasch
Copy link
Contributor

Yes. I think the default behaviour should change, today I'll work on creating a test case which only passes with the new behaviour.

javrasya added 2 commits July 27, 2016 09:55
… on setting parent task severity as `UPSTREAM_DISABLED` or `UPSTREAM_FAILED` when any of its upstream task is `DISABLED` or `FAILED` is changed to when all of its upstream task is `DISABLED` or `FAILED`. `disable_failures`->`retry-count` in scheduler class.
…-failures -> retry-count in config file and its documentation is updated.
@javrasya
Copy link
Contributor Author

A few tests fail after I change the current behavior of luigi on that. I will fix them and update the PR soon.

javrasya added 2 commits July 27, 2016 12:48
…rent task severity as its upstream tasks statuses, is changed
Conflicts:
	doc/configuration.rst
	luigi/scheduler.py
	luigi/worker.py
	test/scheduler_test.py
@Tarrasch
Copy link
Contributor

@javrasya, it would be so ideal if that could be made into a separate PR that we merge first. Could you arrange for that? Then we have one diffview where it's clear what that change affects. I remember scheduler_api tests did not start failing, but maybe I ran it incorrectly.

@javrasya
Copy link
Contributor Author

The one which fails is test_task_list_upstream_status. I fixed it and pushed it but I can rollback it if you want.

+------------------------+-----------+
| Config | Section |
+========================+===========+
| retry-count | scheduler |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

retry_count. Use underscores.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True it is from first version of this feature.

disable_failures = parameter.IntParameter(default=999999999,
config_path=dict(section='scheduler', name='disable-num-failures'))
retry_count = parameter.IntParameter(default=999999999,
config_path=dict(section='scheduler', name='retry-count'))
Copy link
Contributor

@Tarrasch Tarrasch Jul 27, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

noooo... s/retry-count/disable_failures this please.

Sorry for the confusion. But config_path should have really been called deprecated_config_path from day one. The confusion is as of our bad naming. Remember, the non-deprecated config path is automatic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok probably, I got it wrongly. I first didn't change it but, after a comment of yours, I changed it. I can rollback

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I am bit of confused about your comment Oh, even more important. Change this line to say name='disable_failures'! Also update the docs to tell people to use retry_count. . Should I rollback it?

Copy link
Contributor

@Tarrasch Tarrasch Jul 27, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code should be name='disable_failures' so we keep backward compatiblity. Docs should encourage usage of the newest name retry_count, so people dont get deprecation warnings.

Just add upon commits. There's no point in "rolling back". In the end I'll squash everything anyway as the history is a bit messy already. :)

Copy link
Contributor Author

@javrasya javrasya Jul 27, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I guess I got it now. You mean, If I describe it as retry_count in .cfg file, even the name is disable-num-failures, both will be able to be used ? So all I need to commit is to change the part as name='disable-num-failures', not the doc and doc can remain as retry_count right?

…m-failures` back again for backward compatibility
for a_task_id in dep.deps),
key=UPSTREAM_SEVERITY_KEY)
upstream_severities = list(upstream_status_table.get(a_task_id) for a_task_id in dep.deps if a_task_id in upstream_status_table) or ['']
status = min(upstream_severities, key=UPSTREAM_SEVERITY_KEY)
Copy link
Contributor Author

@javrasya javrasya Jul 27, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the change of luigi default behavior to detect upstream severity. Parent task is marked as UPSTREAM_DISABLED when any of its upstream task is DISABLED and the worker is shutdown because of it with how luigi works currently. This behavior is changed to wait for all upstream tasks to be DISABLED or FAILED to mark parent task as UPSTREAM_DISABLED or UPSTREAM_FAILED.

@Tarrasch
Copy link
Contributor

Ok so your first next step here is to create a new pr with only the "upstream-status-when-all" change. I'll try to quickly merge it and then you can create a follow up PR with the rest of the changes.

This PR is really messy now. It's better to have many small PRs that get merged so we have the feeling that we're progressing. :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants