-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow release of resources during task running. #2346
Conversation
The nonhdfs tests fail, the others are fine. I guess it‘s because a remote scheduler is used in one of the test cases. Any advise? |
I would simply disable the test case for the remote scheduler if it's to hard to get it working, as long as the in-memory scheduler test works. I think there's already a couple of tests like that.
I think you're the best person to form this API, since you are the one using/needing it. Some things to consider:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice.
@daveFNbuck, since you created the resources feature. Do you mind reviewing this PR?
for resource, amount in six.iteritems(getattr(task, 'resources_running', task.resources)): | ||
resources_running = getattr(task, "resources_running", task.resources) | ||
if resources_running: | ||
for resource, amount in six.iteritems(resources_running): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice code improvement :)
test/task_running_resources_test.py
Outdated
if self.reduce_foo: | ||
self.set_running_resources({"foo": 1}) | ||
|
||
time.sleep(2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose it was too hard to make it work without using time.sleep
right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this time also interferes with the workers waint_interval
. I couldn't come up yet with a better approach to check that the scheduler really allows to run the two tasks in parallel...
The API needs to be designed so that you can't increase any of the resources, as that can push you over the limit. We want to be able to guarantee that you never exceed a resource limit. |
I disabled the remote scheduler tests on travis. Some other tests fail, I'm not entirely sure if my changes cause the problems... |
Could you maybe start the tests again? I'm curious whether this was just a glitch. Thanks! =) |
Travis build has been restarted |
Mh, again the nonhdfs tests fail. In particular:
I don't see how these tests are related to the changes in this PR :/ |
@riga I think this is a Travis issue. We didn't merge anything with failing tests, but these two tests have been failing for all new PRs. |
Hi! Is there any progress on this matter? |
One test still fails, but it is not clear to me what is actually causing this ... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since you are adding new rpc methods, did you consider adding tests here?
https://github.com/spotify/luigi/blob/master/test/scheduler_api_test.py
test/task_running_resources_test.py
Outdated
luigi.build([task_a, task_b], self.factory, workers=2, | ||
scheduler_port=self.get_http_port()) | ||
|
||
@skipOnTravis("https://travis-ci.org/spotify/luigi/jobs/338398994") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was really no way to get these tests to run on Travis? I'm not sure having them helps at all then. This is a luigi core feature you are adding, such important functionalities should be properly tested even on Travis.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. Maybe the travis errors I've seen at the beginning were also related to #2356. I'll check again.
I noticed that the same task resource dicts are used in multiple places, esp.:
As this PR introduces dynamic |
I fixed one of the two failing tests in this PR. I wasn't aware that there were hardcoded attributes in What do you think about making this dynamic (depending on a mapping defined on class level)? In addition, I'm about the solve the remaining failure. |
Awesome to see progress. I commented on the commits you linked too. |
I think I'm close to fixing the issue, just getting a tornado error
This is closely related to what you guys experienced as well ;) |
test/task_running_resources_test.py
Outdated
import server_test | ||
|
||
|
||
luigi.notifications.DEBUG = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this copied from anywhere? Is it necessary? You just want to delete this line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
test/task_running_resources_test.py
Outdated
luigi.build([task_c, task_d], self.factory, workers=2, | ||
scheduler_port=self.get_http_port()) | ||
|
||
def test_local_scheduler(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw, Maybe this test can be outside of server_test.ServerTestBase
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
test/task_running_resources_test.py
Outdated
# the total "foo" resource (3) is sufficient to run both tasks in parallel shortly after | ||
# the first task started, so the entire process should not exceed 4 seconds | ||
task_c = ResourceTestTask(param="c", reduce_foo=True) | ||
task_d = ResourceTestTask(param="d") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You shouldn't need to use new task names right? might as well call them "a" and "b"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
test/task_running_resources_test.py
Outdated
task_d = ResourceTestTask(param="d") | ||
|
||
with self.assert_duration(max_duration=4): | ||
luigi.build([task_c, task_d], self.factory, workers=2, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if you get strange behavior because of the simplistic and unintuitive way luigi.build
is implemented. I think it just runs those two guys serially (it won't attempt task_d
before task_c
is complete. But you might just want to test to build with a single WrapperTask that depends on both of those tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, you're right, will be simplified in the next commit.
The test that demonstrates how two tasks with concurrent resources can be accelerated via resource reduction within the |
Anything else I can/should do? =) |
|
||
class ConcurrentRunningResourcesTest(unittest.TestCase): | ||
|
||
def get_app(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you remove this in a follow-up? You don't need it anymore right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup!
Awesome! Well done! |
Where is the documentation on how this is used? I attempted to do the following with the expectation that after 10 seconds the first task would release the resource and allow the second task to run in parallel:
|
See discussion in #2576 which you created. |
Description
At scheduling time, the luigi scheduler needs to be aware of the maximum resource consumption a task might have once it runs. For some tasks, however, it can be beneficial to release or reduce a resource between two steps within their run method (e.g. after some heavy computation). In this case, a different task waiting for that particular resource can already be scheduled.
I simply added another method to the
TaskStatusReporter
which is forwarded to the task inTaskProcess._run_get_new_deps
. The scheduler also got two new rpc methodsset_running_task_resources
andget_running_task_resources
.(Maybe the
TaskStatusReporter
should be renamed since it's doing more than just reporting the status now.)Within a task, the resources can be updated via
set_running_resource()
which gives you complete access to the resource dict held by the scheduler for that task. Another approach could be something like a methodrelease_resource()
which is used to just reduce resource values. Right now, one can also increase resources which is certainly dangerous. Which way do you prefer?Motivation and Context
We have some long running tasks which control remote jobs on some computing grids. Although those grids have batch systems / queues, we want to use luigi's resource system to ensure we don't exceed a certain number (few k) of running jobs.
I also added test cases and some docs =)
edit: In the third commit, I propagated the running resources to the visualizer (didn't think about that when I opened the PR).