-
-
Notifications
You must be signed in to change notification settings - Fork 722
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Capturing and logging stdout/stderr on workers #2033
Comments
I agree that getting stdout and stderr is a common desire. It is often handled by the supporting job scheduler (like SGE/SLURM/Kubernetes/Yarn) but I can understand that a dask-centric solution might also be valuable. Currently the workers maintain loggers that they can publish on the dask dashboard info pages, however these only collect lines recorded by the logging module. Perhaps this could be hijacked though. If more short-term or immediate support is desired then this is also the kind of thing that PubSub could probably implement in short-order. |
Thus far that hasn't been my experience. @TomAugspurger and I ran into this last week in the Kubernetes backed Distributed cluster. Also have had this issue with Yeah, hijacking import functools
import io
import logging
try:
from contextlib import ExitStack, redirect_stdout, redirect_stderr
except ImportError:
from contextlib2 import ExitStack, redirect_stdout, redirect_stderr
def func_log_stdoe(func):
@functools.wraps(func)
def wrapped(*args, **kwargs):
with ExitStack() as stack:
out, err = io.StringIO(), io.StringIO()
stack.enter_context(redirect_stdout(out))
stack.enter_context(redirect_stdout(err))
try:
return func(*args, **kwargs)
finally:
logging.getLogger("distributed.worker.stdout").info(out.getvalue())
logging.getLogger("distributed.worker.stderr").info(err.getvalue())
return wrapped How do you see this working with PubSub? |
Did a little digging in the code. FWICT it appears the |
A random walk-by comment. Collecting a logs of a large number of workers to a single destination does not feel easy to make scalable. If the purpose is to monitor the progress of the workers with granularity smaller than a unit of work, then maybe this can be done in a more controlled way -- e.g. when the workers report the health to the master, it can also report the current progress (e.g. as an arbitrary string) inside the current unit of work? |
Dask already batches small messages sent between scheduler and workers, so in practice what you are suggesting will likely happen anyway. We've had to build up infrastructure to efficiently pass around many small messages. We now get to rely on that infrastructure. |
Do you mean reporting the progress with something like this from the worker client? https://distributed.dask.org/en/latest/api.html#distributed.Client.set_metadata |
For publicly available APIs, I would probably recommend the Pub/Sub system
to move logs around.
If people wanted to use non-public APIs I would use
Worker.batched_stream.send
…On Sat, Nov 2, 2019 at 9:09 PM Yu Feng ***@***.***> wrote:
Do you mean reporting the progress with something like this from the
worker client?
https://distributed.dask.org/en/latest/api.html#distributed.Client.set_metadata
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
<#2033?email_source=notifications&email_token=AACKZTFCUZ7PJIRFPCFPYPDQRZFHTA5CNFSM4FD2IZNKYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOEC5KP2Y#issuecomment-549103595>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACKZTCPDCKBSRMGQXKAOBTQRZFHTANCNFSM4FD2IZNA>
.
|
Is this still an open issue? |
In #5217 (the ticket is a bit cryptic, below example pseudo code) we built a functionality to support "remote printing" that may be useful for some people in here. It appears we never updated the docs appropriately, at least I couldn't find it. from distributed import Client
from distributed.worker import print, warn
def func():
print("foo")
c = Client("<ip_to_remote_scheduler>")
c.submit(func) If you run a function on a remote worker and are using the distributed print function, it will print this message on stdout of your worker and forwards the message to the client and prints it on the client as well. The same works for This obviously only scales to a certain level as was already pointed out with collecting logging information in a central place. |
Any news on this one? It affects other projects as well, like Prefect |
I am using a similar approach (handle/log_event/subscribe_topic) on a private repo and would love to see your gist merged. |
Cool, thanks @epizut. I haven't heard back from the the dask maintainers on whether they'd be open to such a PR, but I'm happy to quickly put one together... most of it is in that gist, I'll just need to whip together some unit tests and docs. Can ping you when there's something to test. My approach will basically be to take the |
FWIW the original idea behind this issue was to have workers capture |
@epizut I have a PR up at #7276 -- feel free to test it if you'd like. See the docstr of @jakirkham Thanks for clarifying. It's a little confusing because there are a few ideas flying around:
My PR really addresses (2), while |
For various reasons, it is nice to be able to use
stdout
andstderr
on workers and see the results. This can be useful in simplistic debugging attempts, using 3rd party code that alreadyprint
s to one (or both) of these, etc. However it appearsstdout
andstderr
are ignored in these settings currently.A very useful feature would be to capture
stdout
andstderr
on the worker and redirect them to a logger. One simple option would be to use the loggersdistributed.worker.stdout
anddistributed.worker.stderr
withinfo
level messages. These loggers seem to be unused now and redirecting to them seems to work fine. This would at least give users one reasonable way to get this information.The text was updated successfully, but these errors were encountered: