Skip to content

Commit

Permalink
Control worker processes via UI (scheduler → worker communication) (#…
Browse files Browse the repository at this point in the history
…1993)

This patch adds the possibility for the scheduler to communicate with the workers. The workers fetch messages when they do pings.

This patch also includes one a usage of this right away. You can now control the number of worker processes via the scheduler web interface.
  • Loading branch information
riga authored and Tarrasch committed Feb 3, 2017
1 parent f3bc4fd commit b33aa30
Show file tree
Hide file tree
Showing 6 changed files with 367 additions and 5 deletions.
17 changes: 16 additions & 1 deletion luigi/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ def __init__(self, worker_id, last_active=None):
self.tasks = set() # task objects
self.info = {}
self.disabled = False
self.rpc_messages = []

def add_info(self, info):
self.info.update(info)
Expand Down Expand Up @@ -402,6 +403,15 @@ def state(self):
else:
return WORKER_STATE_DISABLED

def add_rpc_message(self, name, **kwargs):
# the message has the format {'name': <function_name>, 'kwargs': <function_kwargs>}
self.rpc_messages.append({'name': name, 'kwargs': kwargs})

def fetch_rpc_messages(self):
messages = self.rpc_messages[:]
del self.rpc_messages[:]
return messages

def __str__(self):
return self.id

Expand Down Expand Up @@ -877,6 +887,10 @@ def add_worker(self, worker, info, **kwargs):
def disable_worker(self, worker):
self._state.disable_workers({worker})

@rpc_method()
def set_worker_processes(self, worker, n):
self._state.get_worker(worker).add_rpc_message('set_worker_processes', n=n)

@rpc_method()
def update_resources(self, **resources):
if self._resources is None:
Expand Down Expand Up @@ -1121,7 +1135,8 @@ def get_work(self, host=None, assistant=False, current_tasks=None, worker=None,
@rpc_method(attempts=1)
def ping(self, **kwargs):
worker_id = kwargs['worker']
self._update_worker(worker_id)
worker = self._update_worker(worker_id)
return {"rpc_messages": worker.fetch_rpc_messages()}

def _upstream_status(self, task_id, upstream_status_table):
if task_id in upstream_status_table:
Expand Down
48 changes: 47 additions & 1 deletion luigi/static/visualiser/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,14 @@
width: 20em;
}

#workerList .box-tools > div {
display: inline-block;
}

#workerList .btn-set-workers > span.caret {
margin-left: 4px;
}

</style>

<script type="text/template" name="actionsTemplate">
Expand Down Expand Up @@ -266,6 +274,28 @@ <h4 class="modal-title" id="myModalLabel">Status message for {{displayName}}</h4
</div>
</script>
<script type="text/template" name="workerTemplate">
<div class="modal fade" id="setWorkersModal" tabindex="-1" role="dialog" aria-labelledby="setWorkersLabel">
<div class="modal-dialog" role="document">
<div class="modal-content">
<div class="modal-header">
<button type="button" class="close" data-dismiss="modal" aria-label="Close"><span aria-hidden="true">&times;</span></button>
<h4 class="modal-title" id="setWorkersLabel">Set workers</h4>
</div>
<div class="modal-body">
<form>
<div class="form-group">
<label for="setWorkersInput">New number of workers:</label>
<input type="text" class="form-control" id="setWorkersInput" placeholder="positive number">
</div>
</form>
</div>
<div class="modal-footer">
<button type="button" class="btn btn-default" data-dismiss="modal">Cancel</button>
<button type="button" id="setWorkersButton" data-dismiss="modal" class="btn btn-primary">Set</button>
</div>
</div>
</div>
</div>
<div class="modal fade" id="disableWorkerModal" tabindex="-1" role="dialog" aria-labelledby="disableWorkerLabel">
<div class="modal-dialog" role="document">
<div class="modal-content">
Expand Down Expand Up @@ -297,8 +327,24 @@ <h4 class="modal-title" id="disiableWorkerLabel">Disable worker?</h4>
<h3 class="box-title">{{name}}</h3>
<div class="box-tools pull-right">
{{^is_disabled}}
<div class="btn-group">
<button type="button" class="btn btn-sm btn-default dropdown-toggle btn-set-workers" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">
Workers: <span id="label-n-workers" data-worker="{{name}}">{{workers}}</span> <span class="caret"></span>
</button>
<ul class="dropdown-menu">
<li><a href="#" id="btn-increment-workers" data-worker="{{name}}">
<i class="glyphicon glyphicon-plus"></i> Add 1 worker
</a></li>
<li><a href="#" id="btn-decrement-workers" data-worker="{{name}}">
<i class="glyphicon glyphicon-minus"></i> Remove 1 worker
</a></li>
<li><a href="#" id="btn-set-workers" data-toggle="modal" data-target="#setWorkersModal" data-worker="{{name}}">
<i class="glyphicon glyphicon-pencil"></i> Set workers ...
</a></li>
</ul>
</div>
<div class="button-tooltip" data-toggle="tooltip" title="Disable Worker">
<button type="button" class="btn btn-danger btn-disable-worker" data-toggle="modal" data-target="#disableWorkerModal" data-worker="{{name}}">
<button type="button" class="btn btn-sm btn-danger btn-disable-worker" data-toggle="modal" data-target="#disableWorkerModal" data-worker="{{name}}">
<i class="fa fa-fire-extinguisher"></i>
</button>
</div>
Expand Down
7 changes: 7 additions & 0 deletions luigi/static/visualiser/js/luigi.js
Original file line number Diff line number Diff line change
Expand Up @@ -139,5 +139,12 @@ var LuigiAPI = (function() {
jsonRPC(this.urlRoot + "/disable_worker", {'worker': workerId});
}

LuigiAPI.prototype.setWorkerProcesses = function(workerId, n, callback) {
var data = {worker: workerId, n: n};
jsonRPC(this.urlRoot + "/set_worker_processes", data, function(response) {
callback();
});
}

return LuigiAPI;
})();
63 changes: 63 additions & 0 deletions luigi/static/visualiser/js/visualiserApp.js
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,23 @@ function visualiserApp(luigi) {
return htmls.join(', ');
}

/**
* Updates the number of worker processes of a worker
* @param worker: the id of the worker
* @param n: the number of processes to set
*/
function updateWorkerProcesses(worker, n) {
n = Math.max(1, n);

// the spinner is just for visual feedback
var $label = $('#workerList').find('#label-n-workers[data-worker="' + worker + '"]');
$label.html('<i class="fa fa-spinner fa-spin" aria-hidden="true"></i>');

luigi.setWorkerProcesses(worker, n, function() {
$label.text(n);
});
}

function changeState(key, value) {
var fragmentQuery = URI.parseQuery(location.hash.replace('#', ''));
if (value) {
Expand Down Expand Up @@ -1028,6 +1045,52 @@ function visualiserApp(luigi) {
triggerButton.remove();
});

$('#workerList').on('click', '#btn-increment-workers', function($event) {
var worker = $(this).data("worker");
var $label = $('#workerList').find('#label-n-workers[data-worker="' + worker + '"]');
var n = parseInt($label.text());
if (!isNaN(n)) {
updateWorkerProcesses(worker, n + 1);
}
$event.preventDefault();
});

$('#workerList').on('click', '#btn-decrement-workers', function($event) {
var worker = $(this).data("worker");
var $label = $('#workerList').find('#label-n-workers[data-worker="' + worker + '"]');
var n = parseInt($label.text());
if (!isNaN(n)) {
updateWorkerProcesses(worker, n - 1);
}
$event.preventDefault();
});

$('#workerList').on('show.bs.modal', '#setWorkersModal', function($event) {
$('#setWorkersButton').data('worker', $($event.relatedTarget).data('worker'));
var $input = $(this).find('#setWorkersInput').on('keypress', function($event) {
if (event.keyCode == 13) {
$('#workerList').find('#setWorkersButton').trigger('click');
}
$event.stopPropagation();
});
setTimeout(function() {
$input.focus();
}.bind(this), 600);
});

$('#workerList').on('hidden.bs.modal', '#setWorkersModal', function() {
$(this).find('#setWorkersInput').off('keypress').val('');
});

$('#workerList').on('click', '#setWorkersButton', function($event) {
var worker = $(this).data('worker');
var n = parseInt($("#setWorkersInput").val());
if (!isNaN(n)) {
updateWorkerProcesses(worker, n);
}
$event.preventDefault();
});

$('.js-nav-link').click(function(e) {
// User followed tab from navigation link. Copy state from fields to hash.
e.preventDefault();
Expand Down
47 changes: 44 additions & 3 deletions luigi/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,12 +374,13 @@ class KeepAliveThread(threading.Thread):
Periodically tell the scheduler that the worker still lives.
"""

def __init__(self, scheduler, worker_id, ping_interval):
def __init__(self, scheduler, worker_id, ping_interval, rpc_message_callback):
super(KeepAliveThread, self).__init__()
self._should_stop = threading.Event()
self._scheduler = scheduler
self._worker_id = worker_id
self._ping_interval = ping_interval
self._rpc_message_callback = rpc_message_callback

def stop(self):
self._should_stop.set()
Expand All @@ -391,11 +392,22 @@ def run(self):
logger.info("Worker %s was stopped. Shutting down Keep-Alive thread" % self._worker_id)
break
with fork_lock:
response = None
try:
self._scheduler.ping(worker=self._worker_id)
response = self._scheduler.ping(worker=self._worker_id)
except: # httplib.BadStatusLine:
logger.warning('Failed pinging scheduler')

# handle rpc messages
if response:
for message in response["rpc_messages"]:
self._rpc_message_callback(message)


def rpc_message_callback(fn):
fn.is_rpc_message_callback = True
return fn


class Worker(object):
"""
Expand Down Expand Up @@ -483,7 +495,9 @@ def __enter__(self):
"""
Start the KeepAliveThread.
"""
self._keep_alive_thread = KeepAliveThread(self._scheduler, self._id, self._config.ping_interval)
self._keep_alive_thread = KeepAliveThread(self._scheduler, self._id,
self._config.ping_interval,
self._handle_rpc_message)
self._keep_alive_thread.daemon = True
self._keep_alive_thread.start()
return self
Expand Down Expand Up @@ -1076,3 +1090,30 @@ def run(self):
self._handle_next_task()

return self.run_succeeded

def _handle_rpc_message(self, message):
logger.info("Worker %s got message %s" % (self._id, message))

# the message is a dict {'name': <function_name>, 'kwargs': <function_kwargs>}
name = message['name']
kwargs = message['kwargs']

# find the function and check if it's callable and configured to work
# as a message callback
func = getattr(self, name, None)
tpl = (self._id, name)
if not callable(func):
logger.error("Worker %s has no function '%s'" % tpl)
elif not getattr(func, "is_rpc_message_callback", False):
logger.error("Worker %s function '%s' is not available as rpc message callback" % tpl)
else:
logger.info("Worker %s successfully dispatched rpc message to function '%s'" % tpl)
func(**kwargs)

@rpc_message_callback
def set_worker_processes(self, n):
# set the new value
self.worker_processes = max(1, n)

# tell the scheduler
self._scheduler.add_worker(self._id, {'workers': self.worker_processes})
Loading

0 comments on commit b33aa30

Please sign in to comment.