-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fixed] Task status messages #1625
Conversation
Todo's from discussion in #1621:
|
def set_status_message(self, message): | ||
""" | ||
Sets the status message of the task to message, i.e., invokes _status_message_callback if it | ||
is a callable. This propagates the message down to the scheduler. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the docs here, can you add a reference the _Task.set_status_message:
section? I think there are a few example in this file you can copy paste.
Cool. This looks totally merge-worthy. Feel free to fix the minor doc-comment. |
Done ;) |
Nice! |
can you resolve the failing tests? https://travis-ci.org/spotify/luigi/jobs/120027335 |
Ok, this is a tough one.
This is not the case with Here is a suggestion for changes in worker.py that would cleanup this mechanism in order to cope for arbitrary callbacks (like update_status_message) as well (and would solve the "problem" that @Tarrasch mentioned here): from inspect import getargspec
class TaskProcess(multiprocessing.Process):
def __init__(self, <current kw/args except tracking_url_callback>, run_callbacks=None):
...
if run_callbacks is None:
run_callbacks = {}
self.run_callbacks = run_callbacks
def _run_get_new_deps(self):
run_spec = getargspec(self.task.run)
if run_spec.keywords:
run_kwargs = self.run_callbacks.copy()
else:
run_kwargs = {key: cb for key, cb in self.run_callbacks.items() if key in run_sepc.args}
task_gen = self.task.run(**run_kwargs)
if not isinstance(task_gen, types.GeneratorType):
return None
... This way, the code in def _create_task_process(self, task):
...
run_callbacks = {
"tracking_url_callback": update_tracking_url,
"status_message_callback": update_status_message
}
return TaskProcess(
task, self._id, self._task_result_queue,
random_seed=bool(self.worker_processes > 1),
worker_timeout=self._config.timeout,
run_callbacks=run_callbacks
) I'm not a fan of using |
I remember running into similar issues before What's the reason we pickle the Tasks in the first place? |
I think parallel scheduling requires that tasks can be pickled. At least it's mentioned in the docs at "parallel-scheduling". Many contrib tasks also make using of it, e.g. in contrib/spark.py. |
Hm right makes sense. Tasks are in theory interchangeable given the same name and parameters so instead of pickling you could just de/serialize it using that mechanism (that's how the assistant works). It would be quite easy to fix in worker.py But you are right that some tasks use pickling (eg Hadoop mapreduce) so it would be mess to avoid pickling everywhere. Wouldn't your suggestion with inspect run into the same issue when contrib tasks use pickle? |
The |
Actually the use of inspect has another advantage. Users might use decorators on Task.run, which are called twice in the current implementation if tracking_url_callback is missing in the signature of run. |
Due to the changes in worker.py, do any of its tests need to be added/updated? |
LGTM. |
luigi.notifications.DEBUG = True | ||
|
||
|
||
class TaskStatusMessageTest(LuigiTestCase): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually we do have a test suite which is run in two modes automatically (in-memory scheduler and rpc to external process scheduler). Can you make sure that the rpc code path is also tested?
We do also have a set of scheduler tests (they are disabled in Travis for flakiness reasons), but can you add a that case there to and make sure it passes locally? |
for i in range(100): | ||
# do some hard work here | ||
if i % 10 == 0: | ||
status_message_callback("Progress: %d / 100" % i) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this still correct?! It thought you pass a dict now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keyword argument unpacking also works for arguments, so
def func(a, b):
print(a, b)
func(**{"b": "bVal", "a": "aVal"})
# -> ("aVal", "bVal")
I appreciate that you introduce inspect to make what about making the dict callable as I suggested in an inline comment? We would keep it to only one positional parameter for run then. Less magic, no inspection of argument-name. |
Ok. How about this solution: class TaskProcess(multiprocessing.Process):
def __init__(self, ..., tracking_url_callback=None, status_message_callback=None):
...
self.tracking_url_callback = tracking_url_callback
self.status_message_callback = status_message_callback
...
def _run_get_new_deps(self):
self.task.set_tracking_url = self.tracking_url_callback
self.task.set_status_message = self.status_message_callback
def deprecated_tracking_url_callback(*args, **kwargs):
warnings.warn("tracking_url_callback in run() args is deprecated, use "
"set_tracking_url instead.", DeprecationWarning)
self.tracking_url_callback(*args, **kwargs)
run_again = False
try:
task_gen = self.task.run(tracking_url_callback=deprecated_tracking_url_callback)
except TypeError as ex:
if 'unexpected keyword argument' not in str(ex):
raise
run_again = True
if run_again:
task_gen = self.task.run()
self.task.set_tracking_url = None
self.task.set_status_message = None
... The callbacks are set right before run() is called, and then reset. This way pickling is not affected, it's backward compatible, and at some point the try-except block can be refactored. I just ran the tests and everything looks good. |
Sure. But maybe lets hear from @daveFNbuck first, perhaps there were other reasons he opted for the extra-arg in Though I'm very positive to this. It feels a bit hasted now to have introduced that extra arg in Also, of course you'll have to change the places in the code-base that uses |
Yep, already done that, mainly in hadoop, hadoop_jar, hive and scalding) =) I'm preparing the commits right now. Update: working on the failed tests ... |
I changed hadoop.JobTask.dump to disregard callbacks, as they're not need in the deserialized job anyway. E.g. the tracking url is not set actively but parsed from the hadoop job stderr. All tests pass now. |
This looks like a good alternative to my solution, and more scalable. I see that the example attached here is of a progress percentage. If this is the main use case, I've been thinking it would be nice to add an optional progress bar that can be automatically displayed in the visualizer. That would be a bit better than having a popup to show a single number. |
Good idea. Maybe I can add an additional PR that implements a progress bar. However, I think there are more use cases than just sending the progress. We use it for important intermediate output and to some extent for a summary of final results. Maybe one can parse the status message with a regexp (e.g. |
return self.has_run | ||
|
||
def run(self): | ||
if self.set_tracking_url is not None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait. Do the user really have to do this check before using the tracking_url? Can't we somehow guarantee that it's always present?
This looks very good. Thank you so much for doing this! Just see my inline comment/worry. Other than that. This is good to merge right? |
Yep, Worker._create_task_process is the only place where |
@riga, ok, is this "fine to merge" now you think? |
Yes =) |
Ok. Let's wait for test results and then I'll merge. :) |
@Tarrasch Looks good, glad I could help. |
@@ -870,6 +871,7 @@ def _serialize_task(self, task_id, include_deps=True, deps=None): | |||
'priority': task.priority, | |||
'resources': task.resources, | |||
'tracking_url': getattr(task, "tracking_url", None), | |||
'status_message': task.status_message |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line is causing a stack-trace like this:
Traceback (most recent call last):
File "/zserver/Python-3.4.3/lib/python3.4/site-packages/tornado/web.py", line 1443, in _execute
result = method(*self.path_args, **self.path_kwargs)
File "/zserver/apps/luigi/code-for-luigid/luigi/server.py", line 101, in get
result = getattr(self._scheduler, method)(**arguments)
File "/zserver/apps/luigi/code-for-luigid/luigi/scheduler.py", line 986, in task_list
serialized = self._serialize_task(task.id, False)
File "/zserver/apps/luigi/code-for-luigid/luigi/scheduler.py", line 868, in _serialize_task
'status_message': task.status_message
For anyone using an old pickled state-file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to do anything to fix this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can copy the line above. Make it like
getattr(task, "status_message", None)
instead of task.status_message
. Then a couple of months later we're changing it back to task.status_message
. Do you see how it'll work? Do you mind submitting a PR? (you can use the online editor)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I created PR #1645 for this issue.
Hi @michcio1234 , |
Then I'm probably doing something wrong since I can't see this icon. So it seems that setting a message is not as simple as stated in the documentation. |
This PR adds status messages to tasks which are also visible on the scheduler GUI.
Examples:
Status messages are meant to change during the run method in an opportunistic way. Especially for long-running non-Hadoop tasks, the ability to read those messages directly in the scheduler GUI is quite helpful (at least for us). Internally, message changes are propagated to the scheduler via a
_status_message_callback
which is - in contrast totracking_url_callback
- not passed to the run method, but set by theTaskProcess
.Usage example:
I know that you don't like PR's that affect core code (which is reasonable =) ), but imho this feature is both lightweight and really helpful.