Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send messages from scheduler to tasks #2426

Merged
merged 12 commits into from
Jun 16, 2018
5 changes: 0 additions & 5 deletions doc/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -276,11 +276,6 @@ force_multiprocessing
the number of workers.
Defaults to false.

receive_messages
If true, workers can receive messages sent by the scheduler and dispatch them
to running tasks.
Defaults to true.


[elasticsearch]
---------------
Expand Down
36 changes: 36 additions & 0 deletions doc/luigi_patterns.rst
Original file line number Diff line number Diff line change
Expand Up @@ -317,3 +317,39 @@ built-in solutions. In the case of you're dealing with a file system
:meth:`~luigi.target.FileSystemTarget.temporary_path`. For other targets, you
should ensure that the way you're writing your final output directory is
atomic.

Sending messages to tasks
~~~~~~~~~~~~~~~~~~~~~~~~~

The central scheduler is able to send messages to particular tasks. When a running task accepts
messages, it can access a `multiprocessing.Queue <https://docs.python.org/3/library/multiprocessing.html#pipes-and-queues>`__
object storing incoming messages. You can implement custom behavior to react and respond to
messages:

.. code-block:: python

class Example(luigi.Task):

# common task setup
...

# configure the task to accept all incoming messages
accepted_messages = True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"accepts_messages" sounds better no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✔︎


def run(self):
# this example runs some loop and listens for the
# "terminate" message, and responds to all other messages
for _ in some_loop():
# check incomming messages
if not self.scheduler_messages.empty():
msg = self.scheduler_messages.get()
if msg.content == "terminate":
break
else:
msg.respond("unknown message")

# finalize
...

Messages can be sent right from the scheduler UI which also displays responses (if any). Note that
this feature is only available when the scheduler is configured to send messages (see the :ref:`scheduler-config` config), and the task is configured to accept them.
21 changes: 11 additions & 10 deletions luigi/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,8 @@ def __eq__(self, other):

class Task(object):
def __init__(self, task_id, status, deps, resources=None, priority=0, family='', module=None,
params=None, tracking_url=None, status_message=None, progress_percentage=None, retry_policy='notoptional'):
params=None, accepted_messages=False, tracking_url=None, status_message=None,
progress_percentage=None, retry_policy='notoptional'):
self.id = task_id
self.stakeholders = set() # workers ids that are somehow related to this task (i.e. don't prune while any of these workers are still active)
self.workers = OrderedSet() # workers ids that can perform task - task is 'BROKEN' if none of these workers are active
Expand All @@ -302,6 +303,7 @@ def __init__(self, task_id, status, deps, resources=None, priority=0, family='',
self.module = module
self.params = _get_default(params, {})

self.accepted_messages = accepted_messages
self.retry_policy = retry_policy
self.failures = Failures(self.retry_policy.disable_window)
self.tracking_url = tracking_url
Expand Down Expand Up @@ -776,7 +778,7 @@ def forgive_failures(self, task_id=None):
@rpc_method()
def add_task(self, task_id=None, status=PENDING, runnable=True,
deps=None, new_deps=None, expl=None, resources=None,
priority=0, family='', module=None, params=None,
priority=0, family='', module=None, params=None, accepted_messages=False,
assistant=False, tracking_url=None, worker=None, batchable=None,
batch_id=None, retry_policy_dict=None, owners=None, **kwargs):
"""
Expand All @@ -801,6 +803,7 @@ def add_task(self, task_id=None, status=PENDING, runnable=True,
_default_task = self._make_task(
task_id=task_id, status=PENDING, deps=deps, resources=resources,
priority=priority, family=family, module=module, params=params,
accepted_messages=accepted_messages,
)
else:
_default_task = None
Expand Down Expand Up @@ -937,15 +940,15 @@ def set_worker_processes(self, worker, n):
self._state.get_worker(worker).add_rpc_message('set_worker_processes', n=n)

@rpc_method()
def send_scheduler_message(self, worker, task, message):
def send_scheduler_message(self, worker, task, content):
if not self._config.send_messages:
return {"messageId": None}
return {"message_id": None}

message_id = str(uuid.uuid4())
self._state.get_worker(worker).add_rpc_message('dispatch_scheduler_message', task_id=task,
message_id=message_id, message=message)
message_id=message_id, content=content)

return {"messageId": message_id}
return {"message_id": message_id}

@rpc_method()
def add_scheduler_message_response(self, task_id, message_id, response):
Expand Down Expand Up @@ -1283,15 +1286,13 @@ def _serialize_task(self, task_id, include_deps=True, deps=None):
'tracking_url': getattr(task, "tracking_url", None),
'status_message': getattr(task, "status_message", None),
'progress_percentage': getattr(task, "progress_percentage", None),
'enabled_scheduler_messages': False,
}
if task.status == DISABLED:
ret['re_enable_able'] = task.scheduler_disable_time is not None
if include_deps:
ret['deps'] = list(task.deps if deps is None else deps)
if self._config.send_messages and task.status == RUNNING and task.worker_running:
worker = self._state.get_worker(task.worker_running)
ret['enabled_scheduler_messages'] = worker.info.get('receive_messages', False)
if self._config.send_messages and task.status == RUNNING:
ret['accepted_messages'] = task.accepted_messages
return ret

@rpc_method()
Expand Down
6 changes: 3 additions & 3 deletions luigi/static/visualiser/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
{{#progressPercentage}}<button class="btn btn-primary btn-xs statusMessage" title="Status message" data-toggle="tooltip" data-task-id="{{taskId}}" data-display-name="{{displayName}}"><i class="fa fa-comment"></i></button>
{{/progressPercentage}}
{{/statusMessage}}
{{#enabledSchedulerMessages}}<button class="btn btn-success btn-xs schedulerMessage" title="Send message" data-toggle="tooltip" data-task-id="{{taskId}}" data-display-name="{{displayName}}" data-worker="{{workerIdRunning}}"><i class="fa fa-paper-plane"></i></button>{{/enabledSchedulerMessages}}
{{#acceptedMessages}}<button class="btn btn-success btn-xs schedulerMessage" title="Send message" data-toggle="tooltip" data-task-id="{{taskId}}" data-display-name="{{displayName}}" data-worker="{{workerIdRunning}}"><i class="fa fa-paper-plane"></i></button>{{/acceptedMessages}}
</div>
</script>
<script type="text/template" name="errorTemplate">
Expand Down Expand Up @@ -230,8 +230,8 @@ <h3 class="box-title">{{name}}</h3>
{{#progressPercentage}}<button class="btn btn-primary btn-xs statusMessage" title="Status message" data-toggle="tooltip" data-task-id="{{taskId}}" data-display-name="{{displayName}}"><i class="fa fa-comment"></i></button>
{{/progressPercentage}}
{{/statusMessage}}
{{#enabledSchedulerMessages}}<button class="btn btn-success btn-xs schedulerMessage" title="Send message" data-toggle="tooltip" data-task-id="{{taskId}}" data-display-name="{{displayName}}" data-worker="{{workerIdRunning}}"><i class="fa fa-paper-plane"></i></button>
{{/enabledSchedulerMessages}}
{{#acceptedMessages}}<button class="btn btn-success btn-xs schedulerMessage" title="Send message" data-toggle="tooltip" data-task-id="{{taskId}}" data-display-name="{{displayName}}" data-worker="{{workerIdRunning}}"><i class="fa fa-paper-plane"></i></button>
{{/acceptedMessages}}
</td>
</tr>
{{/tasks}}
Expand Down
6 changes: 3 additions & 3 deletions luigi/static/visualiser/js/luigi.js
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,11 @@ var LuigiAPI = (function() {
});
};

LuigiAPI.prototype.sendSchedulerMessage = function(workerId, taskId, message, callback) {
var data = {worker: workerId, task: taskId, message: message};
LuigiAPI.prototype.sendSchedulerMessage = function(workerId, taskId, content, callback) {
var data = {worker: workerId, task: taskId, content: content};
jsonRPC(this.urlRoot + "/send_scheduler_message", data, function(response) {
if (callback) {
callback(response.response.messageId);
callback(response.response.message_id);
}
});
};
Expand Down
10 changes: 5 additions & 5 deletions luigi/static/visualiser/js/visualiserApp.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ function visualiserApp(luigi) {
re_enable: task.status == "DISABLED" && task.re_enable_able,
statusMessage: task.status_message,
progressPercentage: task.progress_percentage,
enabledSchedulerMessages: task.enabled_scheduler_messages,
acceptedMessages: task.accepted_messages,
workerIdRunning: task.worker_running,
};
}
Expand Down Expand Up @@ -348,14 +348,14 @@ function visualiserApp(luigi) {
});

$send.on("click", function($event) {
var message = $input.val();
var content = $input.val();
var awaitResponse = $awaitResponse.prop("checked");
if (message && data.worker) {
if (content && data.worker) {
if (awaitResponse) {
$responseContainer.show();
$responseSpinner.show();
$responseContent.empty();
luigi.sendSchedulerMessage(data.worker, data.taskId, message, function(messageId) {
luigi.sendSchedulerMessage(data.worker, data.taskId, content, function(messageId) {
var interval = window.setInterval(function() {
luigi.getSchedulerMessageResponse(data.taskId, messageId, function(response) {
if (response != null) {
Expand All @@ -369,7 +369,7 @@ function visualiserApp(luigi) {
$event.stopPropagation();
} else {
$responseContainer.hide();
luigi.sendSchedulerMessage(data.worker, data.taskId, message);
luigi.sendSchedulerMessage(data.worker, data.taskId, content);
}
}
});
Expand Down
8 changes: 8 additions & 0 deletions luigi/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,14 @@ def trigger_event(self, event, *args, **kwargs):
except BaseException:
logger.exception("Error in event callback for %r", event)

@property
def accepted_messages(self):
"""
Configures which scheduler messages can be received and returns them. When falsy, this tasks
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe "For configuring which scheduler messages can be received."?

Copy link
Contributor Author

@riga riga Jun 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✔︎

does not accept any message. When True, all messages are accepted.
"""
return False

@property
def task_module(self):
''' Returns what Python module to import to get access to this class. '''
Expand Down
28 changes: 10 additions & 18 deletions luigi/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,11 +420,6 @@ class worker(Config):
description='If true, use multiprocessing also when '
'running with 1 worker')

receive_messages = BoolParameter(default=True,
description='If true, the worker can receive messages from '
'the scheduler and dispatch them to tasks')


class KeepAliveThread(threading.Thread):
"""
Periodically tell the scheduler that the worker still lives.
Expand Down Expand Up @@ -479,17 +474,17 @@ def __init__(self, scheduler=None, worker_id=None, worker_processes=1, assistant
if scheduler is None:
scheduler = Scheduler()

self._config = worker(**kwargs)

assert self._config.wait_interval >= _WAIT_INTERVAL_EPS, "[worker] wait_interval must be positive"
assert self._config.wait_jitter >= 0.0, "[worker] wait_jitter must be equal or greater than zero"

self.worker_processes = int(worker_processes)
self._worker_info = self._generate_worker_info()

if not worker_id:
worker_id = 'Worker(%s)' % ', '.join(['%s=%s' % (k, v) for k, v in self._worker_info])

self._config = worker(**kwargs)

assert self._config.wait_interval >= _WAIT_INTERVAL_EPS, "[worker] wait_interval must be positive"
assert self._config.wait_jitter >= 0.0, "[worker] wait_jitter must be equal or greater than zero"

self._id = worker_id
self._scheduler = scheduler
self._assistant = assistant
Expand Down Expand Up @@ -572,8 +567,7 @@ def _generate_worker_info(self):
# Generate as much info as possible about the worker
# Some of these calls might not be available on all OS's
args = [('salt', '%09d' % random.randrange(0, 999999999)),
('workers', self.worker_processes),
('receive_messages', self._config.receive_messages)]
('workers', self.worker_processes)]
try:
args += [('host', socket.gethostname())]
except BaseException:
Expand Down Expand Up @@ -827,6 +821,7 @@ def _add(self, task, is_complete):
module=task.task_module,
batchable=task.batchable,
retry_policy_dict=_get_retry_policy_dict(task),
accepted_messages=task.accepted_messages,
)

def _validate_dependency(self, dependency):
Expand Down Expand Up @@ -967,7 +962,7 @@ def _run_task(self, task_id):
task_process.run()

def _create_task_process(self, task):
message_queue = multiprocessing.Queue() if self._config.receive_messages else None
message_queue = multiprocessing.Queue() if task.accepted_messages else None
reporter = TaskStatusReporter(self._scheduler, task.task_id, self._id, message_queue)
use_multiprocessing = self._config.force_multiprocessing or bool(self.worker_processes > 1)
return TaskProcess(
Expand Down Expand Up @@ -1184,13 +1179,10 @@ def set_worker_processes(self, n):
self._scheduler.add_worker(self._id, {'workers': self.worker_processes})

@rpc_message_callback
def dispatch_scheduler_message(self, task_id, message_id, message, **kwargs):
if not self._config.receive_messages:
return

def dispatch_scheduler_message(self, task_id, message_id, content, **kwargs):
task_id = str(task_id)
if task_id in self._running_tasks:
task_process = self._running_tasks[task_id]
if task_process.status_reporter.scheduler_messages:
message = SchedulerMessage(self._scheduler, task_id, message_id, message, **kwargs)
message = SchedulerMessage(self._scheduler, task_id, message_id, content, **kwargs)
task_process.status_reporter.scheduler_messages.put(message)
8 changes: 5 additions & 3 deletions test/scheduler_message_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ def fast_worker(scheduler, **kwargs):
class WriteMessageToFile(luigi.Task):

path = luigi.Parameter()

accepted_messages = True

def output(self):
return luigi.LocalTarget(self.path)
Expand All @@ -54,7 +56,7 @@ class SchedulerMessageTest(LuigiTestCase):

def test_receive_messsage(self):
sch = luigi.scheduler.Scheduler(send_messages=True)
with fast_worker(sch, receive_messages=True) as w:
with fast_worker(sch) as w:
with tempfile.NamedTemporaryFile() as tmp:
if os.path.exists(tmp.name):
os.remove(tmp.name)
Expand All @@ -71,7 +73,7 @@ def test_receive_messsage(self):

def test_receive_messages_disabled(self):
sch = luigi.scheduler.Scheduler(send_messages=True)
with fast_worker(sch, receive_messages=False, force_multiprocessing=False) as w:
with fast_worker(sch, force_multiprocessing=False) as w:
class MyTask(RunOnceTask):
def run(self):
self.had_queue = self.scheduler_messages is not None
Expand All @@ -87,7 +89,7 @@ def run(self):

def test_send_messages_disabled(self):
sch = luigi.scheduler.Scheduler(send_messages=False)
with fast_worker(sch, receive_messages=True) as w:
with fast_worker(sch) as w:
with tempfile.NamedTemporaryFile() as tmp:
if os.path.exists(tmp.name):
os.remove(tmp.name)
Expand Down