Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow release of resources during task running. #2346

Merged
merged 13 commits into from
Apr 8, 2018

Conversation

riga
Copy link
Contributor

@riga riga commented Feb 3, 2018

Description

At scheduling time, the luigi scheduler needs to be aware of the maximum resource consumption a task might have once it runs. For some tasks, however, it can be beneficial to release or reduce a resource between two steps within their run method (e.g. after some heavy computation). In this case, a different task waiting for that particular resource can already be scheduled.

I simply added another method to the TaskStatusReporter which is forwarded to the task in TaskProcess._run_get_new_deps. The scheduler also got two new rpc methods set_running_task_resources and get_running_task_resources.

(Maybe the TaskStatusReporter should be renamed since it's doing more than just reporting the status now.)

Within a task, the resources can be updated via set_running_resource() which gives you complete access to the resource dict held by the scheduler for that task. Another approach could be something like a method release_resource() which is used to just reduce resource values. Right now, one can also increase resources which is certainly dangerous. Which way do you prefer?

Motivation and Context

We have some long running tasks which control remote jobs on some computing grids. Although those grids have batch systems / queues, we want to use luigi's resource system to ensure we don't exceed a certain number (few k) of running jobs.

I also added test cases and some docs =)

edit: In the third commit, I propagated the running resources to the visualizer (didn't think about that when I opened the PR).

@riga
Copy link
Contributor Author

riga commented Feb 4, 2018

The nonhdfs tests fail, the others are fine. I guess it‘s because a remote scheduler is used in one of the test cases. Any advise?

@Tarrasch
Copy link
Contributor

Tarrasch commented Feb 6, 2018

The nonhdfs tests fail, the others are fine. I guess it‘s because a remote scheduler is used in one of the test cases. Any advise?

I would simply disable the test case for the remote scheduler if it's to hard to get it working, as long as the in-memory scheduler test works. I think there's already a couple of tests like that.

Which way do you prefer?

I think you're the best person to form this API, since you are the one using/needing it. Some things to consider:

  • You can still make the luigi Task not do the invalid changes that the code says. For example if the start resource is 5, make changes to >5 no
  • What would make your code easier? To have set_resources(), release_resources() or decrease_resources(). I slightly lean towards the decrease version, so I could just specify the one resource I want to decrease and not respecify the others, like decrease_resources({'my_resource': 3}). But I let you pick. :)

Copy link
Contributor

@Tarrasch Tarrasch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice.

@daveFNbuck, since you created the resources feature. Do you mind reviewing this PR?

for resource, amount in six.iteritems(getattr(task, 'resources_running', task.resources)):
resources_running = getattr(task, "resources_running", task.resources)
if resources_running:
for resource, amount in six.iteritems(resources_running):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice code improvement :)

if self.reduce_foo:
self.set_running_resources({"foo": 1})

time.sleep(2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose it was too hard to make it work without using time.sleep right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this time also interferes with the workers waint_interval. I couldn't come up yet with a better approach to check that the scheduler really allows to run the two tasks in parallel...

@daveFNbuck
Copy link
Contributor

The API needs to be designed so that you can't increase any of the resources, as that can push you over the limit. We want to be able to guarantee that you never exceed a resource limit.

@riga
Copy link
Contributor Author

riga commented Feb 7, 2018

I disabled the remote scheduler tests on travis. Some other tests fail, I'm not entirely sure if my changes cause the problems...

@riga
Copy link
Contributor Author

riga commented Feb 13, 2018

Could you maybe start the tests again? I'm curious whether this was just a glitch. Thanks! =)

@dlstadther
Copy link
Collaborator

Travis build has been restarted

@riga
Copy link
Contributor Author

riga commented Feb 13, 2018

Mh, again the nonhdfs tests fail. In particular:

  • contrib.docker_runner_test.TestDockerTask
  • contrib.spark_test.PySparkTaskTest

I don't see how these tests are related to the changes in this PR :/

@dlstadther
Copy link
Collaborator

@riga I think this is a Travis issue. We didn't merge anything with failing tests, but these two tests have been failing for all new PRs.

@riga
Copy link
Contributor Author

riga commented Mar 2, 2018

Hi! Is there any progress on this matter?

@dlstadther
Copy link
Collaborator

@riga These tests were fixed in #2356

Mind pulling those changes in here? Thanks!

@riga
Copy link
Contributor Author

riga commented Mar 2, 2018

One test still fails, but it is not clear to me what is actually causing this ...

Copy link
Contributor

@Tarrasch Tarrasch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you are adding new rpc methods, did you consider adding tests here?

https://github.com/spotify/luigi/blob/master/test/scheduler_api_test.py

luigi.build([task_a, task_b], self.factory, workers=2,
scheduler_port=self.get_http_port())

@skipOnTravis("https://travis-ci.org/spotify/luigi/jobs/338398994")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was really no way to get these tests to run on Travis? I'm not sure having them helps at all then. This is a luigi core feature you are adding, such important functionalities should be properly tested even on Travis.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Maybe the travis errors I've seen at the beginning were also related to #2356. I'll check again.

@riga
Copy link
Contributor Author

riga commented Mar 3, 2018

I noticed that the same task resource dicts are used in multiple places, esp.:

  • when setting task.resources_running to task.resources before a task starts running
  • when setting the resources_running of batch tasks

As this PR introduces dynamic resources_running, the initial resource dict needs to be copied at those places → 036b813.

@riga
Copy link
Contributor Author

riga commented Mar 16, 2018

I fixed one of the two failing tests in this PR.

I wasn't aware that there were hardcoded attributes in Task.no_unpicklable_properties. Those attributes consist only of temporary task callbacks added in TaskProcess, right before the task's run method is called. As a new callback is added in this PR (decrease_running_resources), the spark task tests failed.

What do you think about making this dynamic (depending on a mapping defined on class level)? In addition, Task.no_unpicklable_properties can use this mapping to obtain the list of unpicklable props at runtime. I implemented this in dc2a74a and c319bf1.

I'm about the solve the remaining failure.

@Tarrasch
Copy link
Contributor

Awesome to see progress. I commented on the commits you linked too.

@riga
Copy link
Contributor Author

riga commented Mar 16, 2018

Sounds good to me. If you're tired on being stuck with #2346, you can submit this as a separate PR.

I think I'm close to fixing the issue, just getting a tornado error

RuntimeError: Cannot share PollIOLoops across processes

This is closely related to what you guys experienced as well ;)
https://github.com/spotify/luigi/blob/master/test/server_test.py#L45-L53

import server_test


luigi.notifications.DEBUG = True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this copied from anywhere? Is it necessary? You just want to delete this line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

luigi.build([task_c, task_d], self.factory, workers=2,
scheduler_port=self.get_http_port())

def test_local_scheduler(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, Maybe this test can be outside of server_test.ServerTestBase?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

# the total "foo" resource (3) is sufficient to run both tasks in parallel shortly after
# the first task started, so the entire process should not exceed 4 seconds
task_c = ResourceTestTask(param="c", reduce_foo=True)
task_d = ResourceTestTask(param="d")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You shouldn't need to use new task names right? might as well call them "a" and "b"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

task_d = ResourceTestTask(param="d")

with self.assert_duration(max_duration=4):
luigi.build([task_c, task_d], self.factory, workers=2,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if you get strange behavior because of the simplistic and unintuitive way luigi.build is implemented. I think it just runs those two guys serially (it won't attempt task_d before task_c is complete. But you might just want to test to build with a single WrapperTask that depends on both of those tasks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, you're right, will be simplified in the next commit.

@riga
Copy link
Contributor Author

riga commented Mar 16, 2018

The test that demonstrates how two tasks with concurrent resources can be accelerated via resource reduction within the run() method required a remote scheduler. I adapted some of the server_test._ServerTest features to setup the server, remote scheduler, etc.

@riga
Copy link
Contributor Author

riga commented Mar 26, 2018

Anything else I can/should do? =)


class ConcurrentRunningResourcesTest(unittest.TestCase):

def get_app(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remove this in a follow-up? You don't need it anymore right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup!

@Tarrasch Tarrasch merged commit 65dc89c into spotify:master Apr 8, 2018
@Tarrasch
Copy link
Contributor

Tarrasch commented Apr 8, 2018

Awesome! Well done!

@tiamot
Copy link

tiamot commented Nov 9, 2018

Where is the documentation on how this is used?
I am attempting to reduce resources for long running tasks to allow more tasks to run in parallel.
Based on the current documentation https://luigi.readthedocs.io/en/stable/luigi_patterns.html?highlight=decrease#decreasing-resources-of-running-tasks

I attempted to do the following with the expectation that after 10 seconds the first task would release the resource and allow the second task to run in parallel:

import luigi


class Y(luigi.Task):
    wait = luigi.IntParameter(default=10)
    n = luigi.IntParameter()

    resources = ({'A': 1})

    def output(self):
        return luigi.LocalTarget('out{}.txt'.format(self.n))

    def run(self):
        sleep(self.wait)
        self.decrease_running_resources({'A': 0})
        sleep(self.wait)
        f = self.output().open('w')
        f.close()


class Z(luigi.WrapperTask):
    tasks = [Y(n=x) for x in range(20)]

    def requires(self):
        yield self.tasks

$ luigid
$ luigi --module x Z --workers 20

@Tarrasch
Copy link
Contributor

Tarrasch commented Nov 9, 2018

See discussion in #2576 which you created.

This was referenced Jun 29, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants