Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Control worker processes via UI (scheduler → worker communication) #1993

Merged
merged 11 commits into from
Feb 3, 2017

Conversation

riga
Copy link
Contributor

@riga riga commented Jan 19, 2017

Description

This PR adds a message queue for remote workers that is fetched via the scheduler ping and processed by the actual worker. As a usage example, this PR also adds a feature to the UI that allows users to control the number of processes of a worker.

Motivation and Context

During our daily work, the scheduling of tasks can take quite some time (even though we enabled parallel scheduling) as we often have thousands of required tasks whose output (several GB or TB) is distributed over a number of remote resources. Sometimes, it takes ~30min - 1h only to build the dependency tree. After starting such a task with e.g. --workers 1 it might happen that we like to add worker processes to that task (don't ask me why, high-energy physics infrastructure is funny) as if we called it with e.g. --workers 10. To avoid the overhead of creating the dependency tree again, I added the feature to control the number of worker processes via the UI.

Screenshot:

For this feature to work (commit 2), I had to introduce communication scheduler → worker that is essentially piggybacked by the ping polling. Besides using get_work(which imho might not be the best way to exchange data that is not related to, well, new work/tasks to run), the keepalive polling seems like a good place for that (commit 1).

Also, this bidirectional-ish communication can be used to add further interactive features to the UI.

I tested the features with these tasks with a single worker process via

python tasks.py TaskB

and added more processes using the UI.

If this is a desired feature, I can also contribute actual tests.

Copy link
Contributor

@Tarrasch Tarrasch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow! This is amazing!! Thanks for doing this. It wasn't as hard as I thought it would be to implement. I guess the next step to make tasks remotely killable to be an easy next step.

I have many in-line feedback points. Please take a look. Also, tests would be required too. :)

luigi/worker.py Outdated
logger.info("Worker %s function %s is not availale as message callback" % tpl)
else:
logger.info("Worker %s successfully dispatched message to function %s" % tpl)
func(*args)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking if it would be more stable to have it return kwargs instead of args

luigi/worker.py Outdated
@@ -1063,3 +1075,23 @@ def run(self):
self._handle_next_task()

return self.run_succeeded

def _handle_message(self, message):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of calling it message, can it be called rpc_message throughout the code? The code in here always interprets the message as a remote procedure call to be invoked.

@@ -1121,7 +1130,8 @@ def get_work(self, host=None, assistant=False, current_tasks=None, worker=None,
@rpc_method(attempts=1)
def ping(self, **kwargs):
worker_id = kwargs['worker']
self._update_worker(worker_id)
worker = self._update_worker(worker_id)
return {"messages": worker.fetch_messages()}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just thought about it now (not related so much to this patch). But when this is transfered to the worker. Is it returning a python-structure or an actual JSON message? I suppose JSON would be ideal so theoretically one can communicate with the server using other languages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the @rpc_method decorator uses RemoteScheduler._request which json-dumps the return value, so this should be fine.

If cross-language support is desired, JSON-RPC 2.0 specs would be awesome. https://github.com/riga/jsonrpyc (sorry for the self-promotion ;))

luigi/worker.py Outdated
logger.debug("Worker %s got message %s" % (self._id, message))

# the message is a tuple where the first element is an string that defines
# the function to call and the remaining elements are its arguments
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking if it would be more stable to have it return kwargs instead of args. Like the second argument message[1] is a dict. Or even better, why not it all being a dict? Something like: { "name": string, "kwargs": kwargs }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, sounds good.

luigi/worker.py Outdated
func = getattr(self, name, None)
tpl = (self._id, name)
if not callable(func):
logger.info("Worker %s has no function %s" % tpl)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a logger.error right?

luigi/worker.py Outdated
worker_processes += n
else:
worker_processes = n
worker_processes = max(0, worker_processes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we should have at least 1 not 0. At least comment why 0 and not 1.

luigi/worker.py Outdated
self.worker_processes = worker_processes

# tell the scheduler
self._scheduler.add_worker(self._id, {"workers": worker_processes})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool that you didn't forget this ^^

luigi/worker.py Outdated
func(*args)

@message_callback
def set_worker_processes(self, n, diff):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't you make it into two different calls? One for set_worker_processes_increment and one for set_worker_processes. Both only taking one argument.

Or on second thought. I think it's better to not even having the diff version. I see these problems:

  • Not needed, humans trivially can make this calculation anyway.
  • If the UI is unresponsive humans are likely to set it twice, getting undesired effect.
  • Also, this is a new feature, let's start simple? :)

Altough, I'm also fine with including it. I don't have any strong opinions, I'll let you decide. :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the diff flag ;)

luigi/worker.py Outdated
self._message_callback(message)


def message_callback(fn):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for thinking about security from day one. I really like this decorator. :)

luigi/worker.py Outdated
except: # httplib.BadStatusLine:
logger.warning('Failed pinging scheduler')

# handle messages
if response is not None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's be pythonic and just do if response:. :)

@riga
Copy link
Contributor Author

riga commented Jan 20, 2017

I'll try to add tests later this day.

@Tarrasch
Copy link
Contributor

This looks good. Only tests needed. :)

@riga
Copy link
Contributor Author

riga commented Jan 25, 2017

@Tarrasch I hope I added enough tests for the beginning ;)

Conflicts:
	luigi/static/visualiser/js/visualiserApp.js
@Tarrasch
Copy link
Contributor

Tarrasch commented Feb 2, 2017

@riga, I tried to fix the merge conflicts for you. Let's see if the tests pass. It would be nice if you could quickly review my last merge merge commit. I believe it was just about matching opening and closing brackets for javascript functions...

@Tarrasch
Copy link
Contributor

Tarrasch commented Feb 2, 2017

Note, the visualiser tests passed locally for me.

@riga
Copy link
Contributor Author

riga commented Feb 2, 2017

Should be alright now =)

@Tarrasch Tarrasch merged commit b33aa30 into spotify:master Feb 3, 2017
@Tarrasch
Copy link
Contributor

Tarrasch commented Feb 3, 2017

Merged. I'll check this out and see if it works for me too or not.

But after you refresh the page, you'll see that the workers is still the old value (probably because the workers hasn't pinged yet). That doesn't look so good. Can you add a visual hint like "This worker has 13 unread messages" or something like that?

kreczko pushed a commit to kreczko/luigi that referenced this pull request Mar 28, 2017
…potify#1993)

This patch adds the possibility for the scheduler to communicate with the workers. The workers fetch messages when they do pings.

This patch also includes one a usage of this right away. You can now control the number of worker processes via the scheduler web interface.
This was referenced Jun 29, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants