Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send messages from scheduler to tasks #2426

Merged
merged 12 commits into from
Jun 16, 2018
5 changes: 5 additions & 0 deletions doc/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,11 @@ pause_enabled
If false, disables pause/unpause operations and hides the pause toggle from
the visualiser.

send_messages
When true, the scheduler is allowed to send messages to running tasks and
the central scheduler provides a simple prompt per task to send messages.
Defaults to true.


[sendgrid]
----------
Expand Down
36 changes: 36 additions & 0 deletions doc/luigi_patterns.rst
Original file line number Diff line number Diff line change
Expand Up @@ -317,3 +317,39 @@ built-in solutions. In the case of you're dealing with a file system
:meth:`~luigi.target.FileSystemTarget.temporary_path`. For other targets, you
should ensure that the way you're writing your final output directory is
atomic.

Sending messages to tasks
~~~~~~~~~~~~~~~~~~~~~~~~~

The central scheduler is able to send messages to particular tasks. When a running task accepts
messages, it can access a `multiprocessing.Queue <https://docs.python.org/3/library/multiprocessing.html#pipes-and-queues>`__
object storing incoming messages. You can implement custom behavior to react and respond to
messages:

.. code-block:: python

class Example(luigi.Task):

# common task setup
...

# configure the task to accept all incoming messages
accepts_messages = True

def run(self):
# this example runs some loop and listens for the
# "terminate" message, and responds to all other messages
for _ in some_loop():
# check incomming messages
if not self.scheduler_messages.empty():
msg = self.scheduler_messages.get()
if msg.content == "terminate":
break
else:
msg.respond("unknown message")

# finalize
...

Messages can be sent right from the scheduler UI which also displays responses (if any). Note that
this feature is only available when the scheduler is configured to send messages (see the :ref:`scheduler-config` config), and the task is configured to accept them.
40 changes: 37 additions & 3 deletions luigi/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import os
import re
import time
import uuid

from luigi import six

Expand Down Expand Up @@ -149,6 +150,8 @@ class scheduler(Config):

pause_enabled = parameter.BoolParameter(default=True)

send_messages = parameter.BoolParameter(default=True)

def _get_retry_policy(self):
return RetryPolicy(self.retry_count, self.disable_hard_timeout, self.disable_window)

Expand Down Expand Up @@ -277,7 +280,8 @@ def __eq__(self, other):

class Task(object):
def __init__(self, task_id, status, deps, resources=None, priority=0, family='', module=None,
params=None, tracking_url=None, status_message=None, progress_percentage=None, retry_policy='notoptional'):
params=None, accepts_messages=False, tracking_url=None, status_message=None,
progress_percentage=None, retry_policy='notoptional'):
self.id = task_id
self.stakeholders = set() # workers ids that are somehow related to this task (i.e. don't prune while any of these workers are still active)
self.workers = OrderedSet() # workers ids that can perform task - task is 'BROKEN' if none of these workers are active
Expand All @@ -299,11 +303,13 @@ def __init__(self, task_id, status, deps, resources=None, priority=0, family='',
self.module = module
self.params = _get_default(params, {})

self.accepts_messages = accepts_messages
self.retry_policy = retry_policy
self.failures = Failures(self.retry_policy.disable_window)
self.tracking_url = tracking_url
self.status_message = status_message
self.progress_percentage = progress_percentage
self.scheduler_message_responses = {}
self.scheduler_disable_time = None
self.runnable = False
self.batchable = False
Expand Down Expand Up @@ -772,7 +778,7 @@ def forgive_failures(self, task_id=None):
@rpc_method()
def add_task(self, task_id=None, status=PENDING, runnable=True,
deps=None, new_deps=None, expl=None, resources=None,
priority=0, family='', module=None, params=None,
priority=0, family='', module=None, params=None, accepts_messages=False,
assistant=False, tracking_url=None, worker=None, batchable=None,
batch_id=None, retry_policy_dict=None, owners=None, **kwargs):
"""
Expand All @@ -797,6 +803,7 @@ def add_task(self, task_id=None, status=PENDING, runnable=True,
_default_task = self._make_task(
task_id=task_id, status=PENDING, deps=deps, resources=resources,
priority=priority, family=family, module=module, params=params,
accepts_messages=accepts_messages,
)
else:
_default_task = None
Expand Down Expand Up @@ -932,6 +939,31 @@ def disable_worker(self, worker):
def set_worker_processes(self, worker, n):
self._state.get_worker(worker).add_rpc_message('set_worker_processes', n=n)

@rpc_method()
def send_scheduler_message(self, worker, task, content):
if not self._config.send_messages:
return {"message_id": None}

message_id = str(uuid.uuid4())
self._state.get_worker(worker).add_rpc_message('dispatch_scheduler_message', task_id=task,
message_id=message_id, content=content)

return {"message_id": message_id}

@rpc_method()
def add_scheduler_message_response(self, task_id, message_id, response):
if self._state.has_task(task_id):
task = self._state.get_task(task_id)
task.scheduler_message_responses[message_id] = response

@rpc_method()
def get_scheduler_message_response(self, task_id, message_id):
response = None
if self._state.has_task(task_id):
task = self._state.get_task(task_id)
response = task.scheduler_message_responses.pop(message_id, None)
return {"response": response}

@rpc_method()
def is_pause_enabled(self):
return {'enabled': self._config.pause_enabled}
Expand Down Expand Up @@ -1253,12 +1285,14 @@ def _serialize_task(self, task_id, include_deps=True, deps=None):
'resources_running': getattr(task, "resources_running", None),
'tracking_url': getattr(task, "tracking_url", None),
'status_message': getattr(task, "status_message", None),
'progress_percentage': getattr(task, "progress_percentage", None)
'progress_percentage': getattr(task, "progress_percentage", None),
}
if task.status == DISABLED:
ret['re_enable_able'] = task.scheduler_disable_time is not None
if include_deps:
ret['deps'] = list(task.deps if deps is None else deps)
if self._config.send_messages and task.status == RUNNING:
ret['accepts_messages'] = task.accepts_messages
return ret

@rpc_method()
Expand Down
36 changes: 36 additions & 0 deletions luigi/static/visualiser/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
{{#progressPercentage}}<button class="btn btn-primary btn-xs statusMessage" title="Status message" data-toggle="tooltip" data-task-id="{{taskId}}" data-display-name="{{displayName}}"><i class="fa fa-comment"></i></button>
{{/progressPercentage}}
{{/statusMessage}}
{{#acceptsMessages}}<button class="btn btn-success btn-xs schedulerMessage" title="Send message" data-toggle="tooltip" data-task-id="{{taskId}}" data-display-name="{{displayName}}" data-worker="{{workerIdRunning}}"><i class="fa fa-paper-plane"></i></button>{{/acceptsMessages}}
</div>
</script>
<script type="text/template" name="errorTemplate">
Expand Down Expand Up @@ -80,6 +81,37 @@ <h4 class="modal-title" id="myModalLabel">Status message for {{displayName}}</h4
</div>
</div>
</script>
<script type="text/template" name="schedulerMessageTemplate">
<div class="modal-dialog">
<div class="modal-content">
<div class="modal-header">
<button type="button" class="close" data-dismiss="modal"><span aria-hidden="true">&times;</span><span class="sr-only">Close</span></button>
<h4 class="modal-title" id="myModalLabel">Send message to {{displayName}}</h4>
</div>
<div class="modal-body">
<form>
<div class="form-group">
<label for="schedulerMessageInput">Message:</label>
<input type="text" class="form-control" id="schedulerMessageInput" placeholder="">
</div>
<div class="form-group">
<input type="checkbox" class="form-check-input" id="schedulerMessageAwaitResponse">
<label class="form-check-label" for="schedulerMessageAwaitResponse">Await response</label>
</div>
</form>
<div class="form-group" id="schedulerMessageResponse" style="display: none;">
<hr />
<label>Response:</label>
<pre class="pre-scrollable"><i class="fa fa-spinner fa-pulse" id="schedulerMessageSpinner"></i><div></div></pre>
</div>
</div>
<div class="modal-footer">
<button type="button" class="btn btn-default" data-dismiss="modal">Cancel</button>
<button type="button" id="schedulerMessageButton" data-dismiss="modal" class="btn btn-primary">Send</button>
</div>
</div>
</div>
</script>
<script type="text/template" name="workerTemplate">
<div class="modal fade" id="setWorkersModal" tabindex="-1" role="dialog" aria-labelledby="setWorkersLabel">
<div class="modal-dialog" role="document">
Expand Down Expand Up @@ -198,6 +230,8 @@ <h3 class="box-title">{{name}}</h3>
{{#progressPercentage}}<button class="btn btn-primary btn-xs statusMessage" title="Status message" data-toggle="tooltip" data-task-id="{{taskId}}" data-display-name="{{displayName}}"><i class="fa fa-comment"></i></button>
{{/progressPercentage}}
{{/statusMessage}}
{{#acceptsMessages}}<button class="btn btn-success btn-xs schedulerMessage" title="Send message" data-toggle="tooltip" data-task-id="{{taskId}}" data-display-name="{{displayName}}" data-worker="{{workerIdRunning}}"><i class="fa fa-paper-plane"></i></button>
{{/acceptsMessages}}
</td>
</tr>
{{/tasks}}
Expand Down Expand Up @@ -326,6 +360,8 @@ <h3 class="box-title">{{name}}</h3>
</div>
<div class="modal fade" id="statusMessageModal" tabindex="-1" role="dialog" aria-labelledby="myModalLabel" aria-hidden="true">
</div>
<div class="modal fade" id="schedulerMessageModal" tabindex="-1" role="dialog" aria-labelledby="myModalLabel" aria-hidden="true">
</div>

<div class="wrapper">
<div class="main-header">
Expand Down
16 changes: 16 additions & 0 deletions luigi/static/visualiser/js/luigi.js
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,22 @@ var LuigiAPI = (function() {
});
};

LuigiAPI.prototype.sendSchedulerMessage = function(workerId, taskId, content, callback) {
var data = {worker: workerId, task: taskId, content: content};
jsonRPC(this.urlRoot + "/send_scheduler_message", data, function(response) {
if (callback) {
callback(response.response.message_id);
}
});
};

LuigiAPI.prototype.getSchedulerMessageResponse = function(taskId, messageId, callback) {
var data = {task_id: taskId, message_id: messageId};
jsonRPC(this.urlRoot + "/get_scheduler_message_response", data, function(response) {
callback(response.response.response);
});
};

LuigiAPI.prototype.isPauseEnabled = function(callback) {
jsonRPC(this.urlRoot + '/is_pause_enabled', {}, function(response) {
callback(response.response.enabled);
Expand Down
66 changes: 65 additions & 1 deletion luigi/static/visualiser/js/visualiserApp.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ function visualiserApp(luigi) {
error: task.status == "FAILED",
re_enable: task.status == "DISABLED" && task.re_enable_able,
statusMessage: task.status_message,
progressPercentage: task.progress_percentage
progressPercentage: task.progress_percentage,
acceptsMessages: task.accepts_messages,
workerIdRunning: task.worker_running,
};
}

Expand Down Expand Up @@ -327,6 +329,58 @@ function visualiserApp(luigi) {
);
}

function showSchedulerMessageModal(data) {
var $modal = $("#schedulerMessageModal");

$modal.empty().append(renderTemplate("schedulerMessageTemplate", data));
var $input = $modal.find("#schedulerMessageInput");
var $send = $modal.find("#schedulerMessageButton");
var $awaitResponse = $modal.find("#schedulerMessageAwaitResponse");
var $responseContainer = $modal.find("#schedulerMessageResponse");
var $responseSpinner = $responseContainer.find("pre > i");
var $responseContent = $responseContainer.find("pre > div");

$input.on("keypress", function($event) {
if (event.keyCode == 13) {
$send.trigger("click");
$event.preventDefault();
}
});

$send.on("click", function($event) {
var content = $input.val();
var awaitResponse = $awaitResponse.prop("checked");
if (content && data.worker) {
if (awaitResponse) {
$responseContainer.show();
$responseSpinner.show();
$responseContent.empty();
luigi.sendSchedulerMessage(data.worker, data.taskId, content, function(messageId) {
var interval = window.setInterval(function() {
luigi.getSchedulerMessageResponse(data.taskId, messageId, function(response) {
if (response != null) {
clearInterval(interval);
$responseSpinner.hide();
$responseContent.html(response);
}
});
}, 1000);
});
$event.stopPropagation();
} else {
$responseContainer.hide();
luigi.sendSchedulerMessage(data.worker, data.taskId, content);
}
}
});

$modal.on("shown.bs.modal", function() {
$input.focus();
});

$modal.modal({});
}

function preProcessGraph(dependencyGraph) {
var extraNodes = [];
var seen = {};
Expand Down Expand Up @@ -1024,6 +1078,11 @@ function visualiserApp(luigi) {
var data = $(this).data();
showStatusMessage(data);
});

$('.worker-table tbody').on('click', 'td .schedulerMessage', function() {
var data = $(this).data();
showSchedulerMessageModal(data);
});
});

luigi.getResourceList(function(resources) {
Expand Down Expand Up @@ -1231,6 +1290,11 @@ function visualiserApp(luigi) {
showStatusMessage(data);
});

$('#taskTable tbody').on('click', 'td.details-control .schedulerMessage', function () {
var data = $(this).data();
showSchedulerMessageModal(data);
});

$('.navbar-nav').on('click', 'a', function () {
var tabName = $(this).data('tab');
updateSidebar(tabName);
Expand Down
10 changes: 9 additions & 1 deletion luigi/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,14 @@ def trigger_event(self, event, *args, **kwargs):
except BaseException:
logger.exception("Error in event callback for %r", event)

@property
def accepts_messages(self):
"""
For configuring which scheduler messages can be received. When falsy, this tasks does not
accept any message. When True, all messages are accepted.
"""
return False

@property
def task_module(self):
''' Returns what Python module to import to get access to this class. '''
Expand Down Expand Up @@ -687,7 +695,7 @@ def _dump(self):
pickle.dumps(self)

"""
unpicklable_properties = tuple(luigi.worker.TaskProcess.forward_reporter_callbacks.values())
unpicklable_properties = tuple(luigi.worker.TaskProcess.forward_reporter_attributes.values())
reserved_properties = {}
for property_name in unpicklable_properties:
if hasattr(self, property_name):
Expand Down
Loading