Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send messages from scheduler to tasks #2426

Merged
merged 12 commits into from
Jun 16, 2018

Conversation

riga
Copy link
Contributor

@riga riga commented May 18, 2018

Description

This PR enables the scheduler to send messages to particular tasks. More precisely, messages are sent to workers via the RPC message system which dispatches them to a specified task. Running tasks
can access a multiprocessing.Queue object (named scheduler_messages, set via the StatusReporter) which stores incoming messages. Users can implement custom behavior to react to certain messages (see code example below).

The new config scheduler.send_messages controls whether a scheduler is capable of sending messages in the first place. By defining accepted_messages a task can define whether it accepts messages, independent of the scheduler the worker is talking to. The message button is only visible in the UI when the scheduler is able to send messages, and the task can receive them (see screenshots).

There is also a simple "respond" mechanism so tasks can immediately respond to incoming messages which is shown in the scheduler.

"Send message" action button:

Action button

Message prompt:

Message prompt

Awaiting response:

Awaiting response

Response arrived:

Response arrived

When Await response is not checked, the prompt closes upon Send. Otherwise, the prompt remains open and a response container is shown. This triggers a simple polling to fetch and display the actual response.

Motivation and Context

There are many use cases when the messaging could be useful. One particular example is machine learning. Our training tasks publish their current results to the scheduler. Although they contain predefined termination criteria, it could be nice to manually trigger graceful training termination if the training stagnates:

class Training(luigi.Task):

    def requires(self):
        ...

    def output(self):
        return luigi.LocalTarget("my_ml_model.pb")

    def run(self):
        # setup the training
        model = ...

        for _ in training_loop():
            # training stuff with internal termination criterion
            model.train(...)

            # check messages
            if not self.scheduler_messages.empty():
                msg = self.scheduler_messages.get()
                if msg.content == "terminate":
                    break
                else:
                    msg.respond("unknown message")

        # save the model
        model.save(self.output().path)

Have you tested this? If so, how?

Yep, I added unit tests and docs.

Copy link
Contributor

@Tarrasch Tarrasch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really cool!

In addition to the code comments, please consider:

  • Docs would be really great!
  • I think adding the button unconditionally in the UI isn't a great idea. Most people will play around with it and see that nothing happens. Anyway we can make the worker register that it's open to taking messages? Perhaps in the future it could be a description of what kind of messages it accepts even.

Keep up the awesome work!

return

kwargs = dict(task_id=task, message=message)
self._state.get_worker(worker).add_rpc_message('dispatch_scheduler_message', **kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe cleaner to to inline the kwargs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, will be in the next commits.

luigi/worker.py Outdated
@@ -1148,3 +1155,14 @@ def set_worker_processes(self, n):

# tell the scheduler
self._scheduler.add_worker(self._id, {'workers': self.worker_processes})

@rpc_message_callback
def dispatch_scheduler_message(self, task_id, message):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add , **kwargs): so that we can add more things in the future (without breaking old workers). One next step could be that a message has an ID so the worker can report back that it got the message, perhaps even "reply" etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented a draft if this reply/response mechanism.

@riga
Copy link
Contributor Author

riga commented May 23, 2018

I implemented the two changes you requested, and now, the action button is only shown when the worker is allowed to receive messages.

In addition I added some simple response functionality (the screenshots in the PR description show the new behavior). In the next iteration, one could also add descriptions of what messages a task accepts and maybe even which values are accepted with some nice dropdown buttons in the scheduler. However, this requires a well-defined interface on task level which might be something for the next PR.

If you're fine with the idea, I'll append additional tests and docs.

Copy link
Contributor

@Tarrasch Tarrasch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So far so good really. Thanks for doing the message_id thing!

Yes, please focus on fixing/documenting existing functionalities of this PR rather than adding yet another one. I'm happy to review new features too once this PR is in. :)

@@ -932,6 +936,31 @@ def disable_worker(self, worker):
def set_worker_processes(self, worker, n):
self._state.get_worker(worker).add_rpc_message('set_worker_processes', n=n)

@rpc_method()
def send_scheduler_message(self, worker, task, message):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parameter name message maybe be should be renamed to content or payload throughout to make it easier to follow. I at least got confused that message became content in the scheduler.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✔︎

self._state.get_worker(worker).add_rpc_message('dispatch_scheduler_message', task_id=task,
message_id=message_id, message=message)

return {"messageId": message_id}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yuck, I just realize that we mix camelCase and snake_case in the key-names. But it seems snake_case is more used. Maybe use that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✔︎

luigi/worker.py Outdated
@@ -392,6 +420,10 @@ class worker(Config):
description='If true, use multiprocessing also when '
'running with 1 worker')

receive_messages = BoolParameter(default=True,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe can_receive_messages?

Also I'm thinking of having the default to False, so the UI button doesn't appear by default.

luigi/worker.py Outdated

assert self._config.wait_interval >= _WAIT_INTERVAL_EPS, "[worker] wait_interval must be positive"
assert self._config.wait_jitter >= 0.0, "[worker] wait_jitter must be equal or greater than zero"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the need to move this code?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(feel free to if you have a reason, I'm mostly curious)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was required as _generate_worker_info() needs to access _config.

luigi/worker.py Outdated
@@ -540,7 +572,8 @@ def _generate_worker_info(self):
# Generate as much info as possible about the worker
# Some of these calls might not be available on all OS's
args = [('salt', '%09d' % random.randrange(0, 999999999)),
('workers', self.worker_processes)]
('workers', self.worker_processes),
('receive_messages', self._config.receive_messages)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm... wait, why is this a property of the worker...

Thinking about it I'm sure it makes sense. But I really would like to see some user docs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mh, but you have a point. Another alternative would be to have sth like a "can_receive_messages()" hook (defaulting to false) defined on a task. Actually this makes more sense as the worker should just dispatch and let tasks decide what to do with messages. Are you fine with that change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well. I think it would make sense to both have message for tasks and workers.

Worker messages could be "shut down asap", "shut down task x", "decrease num workers" (we already have that, don't we)? Tasks messages I imagine are mostly task-specific.


I'm not sure how to implement it. Anyway, I'm fine with any changes, I'm sure if you think it becomes better so will I. :)

riga added 4 commits June 5, 2018 17:14
Task can configure on their own if incoming scheduler messages are accepted using the `accepted_messages` property. In future PR, this can be extended to accept only certain messages.
Copy link
Contributor

@Tarrasch Tarrasch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Starts to look good! :)

(sorry for the slow review)

...

# configure the task to accept all incoming messages
accepted_messages = True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"accepts_messages" sounds better no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✔︎

luigi/task.py Outdated
@property
def accepted_messages(self):
"""
Configures which scheduler messages can be received and returns them. When falsy, this tasks
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe "For configuring which scheduler messages can be received."?

Copy link
Contributor Author

@riga riga Jun 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✔︎

@Tarrasch
Copy link
Contributor

Tarrasch commented Jun 6, 2018

Are you happy with this now? Is it ready to merge? Is it reasonably tested?

@riga
Copy link
Contributor Author

riga commented Jun 7, 2018

Yep, I'm happy with the PR. Docs and tests should be fine.

@Tarrasch Tarrasch merged commit bda236d into spotify:master Jun 16, 2018
@Tarrasch
Copy link
Contributor

Thanks!! This is so cool!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants