-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Send messages from scheduler to tasks #2426
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really cool!
In addition to the code comments, please consider:
- Docs would be really great!
- I think adding the button unconditionally in the UI isn't a great idea. Most people will play around with it and see that nothing happens. Anyway we can make the worker register that it's open to taking messages? Perhaps in the future it could be a description of what kind of messages it accepts even.
Keep up the awesome work!
luigi/scheduler.py
Outdated
return | ||
|
||
kwargs = dict(task_id=task, message=message) | ||
self._state.get_worker(worker).add_rpc_message('dispatch_scheduler_message', **kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe cleaner to to inline the kwargs
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, will be in the next commits.
luigi/worker.py
Outdated
@@ -1148,3 +1155,14 @@ def set_worker_processes(self, n): | |||
|
|||
# tell the scheduler | |||
self._scheduler.add_worker(self._id, {'workers': self.worker_processes}) | |||
|
|||
@rpc_message_callback | |||
def dispatch_scheduler_message(self, task_id, message): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add , **kwargs):
so that we can add more things in the future (without breaking old workers). One next step could be that a message has an ID so the worker can report back that it got the message, perhaps even "reply" etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implemented a draft if this reply/response mechanism.
I implemented the two changes you requested, and now, the action button is only shown when the worker is allowed to receive messages. In addition I added some simple response functionality (the screenshots in the PR description show the new behavior). In the next iteration, one could also add descriptions of what messages a task accepts and maybe even which values are accepted with some nice dropdown buttons in the scheduler. However, this requires a well-defined interface on task level which might be something for the next PR. If you're fine with the idea, I'll append additional tests and docs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So far so good really. Thanks for doing the message_id thing!
Yes, please focus on fixing/documenting existing functionalities of this PR rather than adding yet another one. I'm happy to review new features too once this PR is in. :)
luigi/scheduler.py
Outdated
@@ -932,6 +936,31 @@ def disable_worker(self, worker): | |||
def set_worker_processes(self, worker, n): | |||
self._state.get_worker(worker).add_rpc_message('set_worker_processes', n=n) | |||
|
|||
@rpc_method() | |||
def send_scheduler_message(self, worker, task, message): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The parameter name message
maybe be should be renamed to content
or payload
throughout to make it easier to follow. I at least got confused that message
became content
in the scheduler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✔︎
luigi/scheduler.py
Outdated
self._state.get_worker(worker).add_rpc_message('dispatch_scheduler_message', task_id=task, | ||
message_id=message_id, message=message) | ||
|
||
return {"messageId": message_id} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yuck, I just realize that we mix camelCase and snake_case in the key-names. But it seems snake_case is more used. Maybe use that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✔︎
luigi/worker.py
Outdated
@@ -392,6 +420,10 @@ class worker(Config): | |||
description='If true, use multiprocessing also when ' | |||
'running with 1 worker') | |||
|
|||
receive_messages = BoolParameter(default=True, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe can_receive_messages
?
Also I'm thinking of having the default to False
, so the UI button doesn't appear by default.
luigi/worker.py
Outdated
|
||
assert self._config.wait_interval >= _WAIT_INTERVAL_EPS, "[worker] wait_interval must be positive" | ||
assert self._config.wait_jitter >= 0.0, "[worker] wait_jitter must be equal or greater than zero" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the need to move this code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(feel free to if you have a reason, I'm mostly curious)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was required as _generate_worker_info()
needs to access _config
.
luigi/worker.py
Outdated
@@ -540,7 +572,8 @@ def _generate_worker_info(self): | |||
# Generate as much info as possible about the worker | |||
# Some of these calls might not be available on all OS's | |||
args = [('salt', '%09d' % random.randrange(0, 999999999)), | |||
('workers', self.worker_processes)] | |||
('workers', self.worker_processes), | |||
('receive_messages', self._config.receive_messages)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm... wait, why is this a property of the worker...
Thinking about it I'm sure it makes sense. But I really would like to see some user docs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mh, but you have a point. Another alternative would be to have sth like a "can_receive_messages()" hook (defaulting to false) defined on a task. Actually this makes more sense as the worker should just dispatch and let tasks decide what to do with messages. Are you fine with that change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well. I think it would make sense to both have message for tasks and workers.
Worker messages could be "shut down asap", "shut down task x", "decrease num workers" (we already have that, don't we)? Tasks messages I imagine are mostly task-specific.
I'm not sure how to implement it. Anyway, I'm fine with any changes, I'm sure if you think it becomes better so will I. :)
Task can configure on their own if incoming scheduler messages are accepted using the `accepted_messages` property. In future PR, this can be extended to accept only certain messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Starts to look good! :)
(sorry for the slow review)
doc/luigi_patterns.rst
Outdated
... | ||
|
||
# configure the task to accept all incoming messages | ||
accepted_messages = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"accepts_messages" sounds better no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✔︎
luigi/task.py
Outdated
@property | ||
def accepted_messages(self): | ||
""" | ||
Configures which scheduler messages can be received and returns them. When falsy, this tasks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe "For configuring which scheduler messages can be received."?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✔︎
Are you happy with this now? Is it ready to merge? Is it reasonably tested? |
Yep, I'm happy with the PR. Docs and tests should be fine. |
Thanks!! This is so cool! |
Description
This PR enables the scheduler to send messages to particular tasks. More precisely, messages are sent to workers via the RPC message system which dispatches them to a specified task. Running tasks
can access a
multiprocessing.Queue
object (namedscheduler_messages
, set via theStatusReporter
) which stores incoming messages. Users can implement custom behavior to react to certain messages (see code example below).The new config
scheduler.send_messages
controls whether a scheduler is capable of sending messages in the first place. By definingaccepted_messages
a task can define whether it accepts messages, independent of the scheduler the worker is talking to. The message button is only visible in the UI when the scheduler is able to send messages, and the task can receive them (see screenshots).There is also a simple "respond" mechanism so tasks can immediately respond to incoming messages which is shown in the scheduler.
"Send message" action button:
Message prompt:
Awaiting response:
Response arrived:
When
Await response
is not checked, the prompt closes uponSend
. Otherwise, the prompt remains open and a response container is shown. This triggers a simple polling to fetch and display the actual response.Motivation and Context
There are many use cases when the messaging could be useful. One particular example is machine learning. Our training tasks publish their current results to the scheduler. Although they contain predefined termination criteria, it could be nice to manually trigger graceful training termination if the training stagnates:
Have you tested this? If so, how?
Yep, I added unit tests and docs.