-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Control worker processes via UI (scheduler → worker communication) #1993
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow! This is amazing!! Thanks for doing this. It wasn't as hard as I thought it would be to implement. I guess the next step to make tasks remotely killable to be an easy next step.
I have many in-line feedback points. Please take a look. Also, tests would be required too. :)
luigi/worker.py
Outdated
logger.info("Worker %s function %s is not availale as message callback" % tpl) | ||
else: | ||
logger.info("Worker %s successfully dispatched message to function %s" % tpl) | ||
func(*args) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking if it would be more stable to have it return kwargs
instead of args
luigi/worker.py
Outdated
@@ -1063,3 +1075,23 @@ def run(self): | |||
self._handle_next_task() | |||
|
|||
return self.run_succeeded | |||
|
|||
def _handle_message(self, message): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of calling it message
, can it be called rpc_message
throughout the code? The code in here always interprets the message as a remote procedure call to be invoked.
luigi/scheduler.py
Outdated
@@ -1121,7 +1130,8 @@ def get_work(self, host=None, assistant=False, current_tasks=None, worker=None, | |||
@rpc_method(attempts=1) | |||
def ping(self, **kwargs): | |||
worker_id = kwargs['worker'] | |||
self._update_worker(worker_id) | |||
worker = self._update_worker(worker_id) | |||
return {"messages": worker.fetch_messages()} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just thought about it now (not related so much to this patch). But when this is transfered to the worker. Is it returning a python-structure or an actual JSON message? I suppose JSON would be ideal so theoretically one can communicate with the server using other languages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like the @rpc_method
decorator uses RemoteScheduler._request
which json-dumps the return value, so this should be fine.
If cross-language support is desired, JSON-RPC 2.0 specs would be awesome. https://github.com/riga/jsonrpyc (sorry for the self-promotion ;))
luigi/worker.py
Outdated
logger.debug("Worker %s got message %s" % (self._id, message)) | ||
|
||
# the message is a tuple where the first element is an string that defines | ||
# the function to call and the remaining elements are its arguments |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking if it would be more stable to have it return kwargs
instead of args
. Like the second argument message[1]
is a dict. Or even better, why not it all being a dict? Something like: { "name": string, "kwargs": kwargs }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, sounds good.
luigi/worker.py
Outdated
func = getattr(self, name, None) | ||
tpl = (self._id, name) | ||
if not callable(func): | ||
logger.info("Worker %s has no function %s" % tpl) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a logger.error
right?
luigi/worker.py
Outdated
worker_processes += n | ||
else: | ||
worker_processes = n | ||
worker_processes = max(0, worker_processes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought we should have at least 1
not 0
. At least comment why 0 and not 1.
luigi/worker.py
Outdated
self.worker_processes = worker_processes | ||
|
||
# tell the scheduler | ||
self._scheduler.add_worker(self._id, {"workers": worker_processes}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool that you didn't forget this ^^
luigi/worker.py
Outdated
func(*args) | ||
|
||
@message_callback | ||
def set_worker_processes(self, n, diff): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't you make it into two different calls? One for set_worker_processes_increment
and one for set_worker_processes
. Both only taking one argument.
Or on second thought. I think it's better to not even having the diff
version. I see these problems:
- Not needed, humans trivially can make this calculation anyway.
- If the UI is unresponsive humans are likely to set it twice, getting undesired effect.
- Also, this is a new feature, let's start simple? :)
Altough, I'm also fine with including it. I don't have any strong opinions, I'll let you decide. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed the diff flag ;)
luigi/worker.py
Outdated
self._message_callback(message) | ||
|
||
|
||
def message_callback(fn): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for thinking about security from day one. I really like this decorator. :)
luigi/worker.py
Outdated
except: # httplib.BadStatusLine: | ||
logger.warning('Failed pinging scheduler') | ||
|
||
# handle messages | ||
if response is not None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's be pythonic and just do if response:
. :)
I'll try to add tests later this day. |
This looks good. Only tests needed. :) |
@Tarrasch I hope I added enough tests for the beginning ;) |
Conflicts: luigi/static/visualiser/js/visualiserApp.js
@riga, I tried to fix the merge conflicts for you. Let's see if the tests pass. It would be nice if you could quickly review my last merge merge commit. I believe it was just about matching opening and closing brackets for javascript functions... |
Note, the visualiser tests passed locally for me. |
Should be alright now =) |
Merged. I'll check this out and see if it works for me too or not. But after you refresh the page, you'll see that the workers is still the old value (probably because the workers hasn't pinged yet). That doesn't look so good. Can you add a visual hint like "This worker has 13 unread messages" or something like that? |
…potify#1993) This patch adds the possibility for the scheduler to communicate with the workers. The workers fetch messages when they do pings. This patch also includes one a usage of this right away. You can now control the number of worker processes via the scheduler web interface.
Description
This PR adds a message queue for remote workers that is fetched via the scheduler ping and processed by the actual worker. As a usage example, this PR also adds a feature to the UI that allows users to control the number of processes of a worker.
Motivation and Context
During our daily work, the scheduling of tasks can take quite some time (even though we enabled parallel scheduling) as we often have thousands of required tasks whose output (several GB or TB) is distributed over a number of remote resources. Sometimes, it takes ~30min - 1h only to build the dependency tree. After starting such a task with e.g.
--workers 1
it might happen that we like to add worker processes to that task (don't ask me why, high-energy physics infrastructure is funny) as if we called it with e.g.--workers 10
. To avoid the overhead of creating the dependency tree again, I added the feature to control the number of worker processes via the UI.Screenshot:
For this feature to work (commit 2), I had to introduce communication scheduler → worker that is essentially piggybacked by the ping polling. Besides using
get_work
(which imho might not be the best way to exchange data that is not related to, well, new work/tasks to run), the keepalive polling seems like a good place for that (commit 1).Also, this bidirectional-ish communication can be used to add further interactive features to the UI.
I tested the features with these tasks with a single worker process via
and added more processes using the UI.
If this is a desired feature, I can also contribute actual tests.