Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Capturing and logging stdout/stderr on workers #2033

Open
jakirkham opened this issue Jun 7, 2018 · 15 comments · May be fixed by #4502
Open

Capturing and logging stdout/stderr on workers #2033

jakirkham opened this issue Jun 7, 2018 · 15 comments · May be fixed by #4502

Comments

@jakirkham
Copy link
Member

For various reasons, it is nice to be able to use stdout and stderr on workers and see the results. This can be useful in simplistic debugging attempts, using 3rd party code that already prints to one (or both) of these, etc. However it appears stdout and stderr are ignored in these settings currently.

A very useful feature would be to capture stdout and stderr on the worker and redirect them to a logger. One simple option would be to use the loggers distributed.worker.stdout and distributed.worker.stderr with info level messages. These loggers seem to be unused now and redirecting to them seems to work fine. This would at least give users one reasonable way to get this information.

@mrocklin
Copy link
Member

mrocklin commented Jun 7, 2018

I agree that getting stdout and stderr is a common desire. It is often handled by the supporting job scheduler (like SGE/SLURM/Kubernetes/Yarn) but I can understand that a dask-centric solution might also be valuable.

Currently the workers maintain loggers that they can publish on the dask dashboard info pages, however these only collect lines recorded by the logging module. Perhaps this could be hijacked though.

If more short-term or immediate support is desired then this is also the kind of thing that PubSub could probably implement in short-order.

@jakirkham
Copy link
Member Author

Thus far that hasn't been my experience. @TomAugspurger and I ran into this last week in the Kubernetes backed Distributed cluster. Also have had this issue with dask-drmaa and the LocalCluster. With dask-drmaa, stdout/stderr are hooked into distributed's logging system. Not sure if how things are configured logging-wise is blocking this output from getting through or if there is something I need to be doing at a configuration level. That said, this is probably ok to keep things structured. Just would like stdout/stderr to be included in these logs somehow.

Yeah, hijacking stdout/stderr from functions run with Distributed SGTM. Have come up with some code for a decorator that may act as a good initial prototype. It works for now, but wouldn't be surprised if there are better things we should be doing to get this functionality into distributed.

import functools
import io
import logging

try:
    from contextlib import ExitStack, redirect_stdout, redirect_stderr
except ImportError:
    from contextlib2 import ExitStack, redirect_stdout, redirect_stderr


def func_log_stdoe(func):
    @functools.wraps(func)
    def wrapped(*args, **kwargs):
        with ExitStack() as stack:
            out, err = io.StringIO(), io.StringIO()

            stack.enter_context(redirect_stdout(out))
            stack.enter_context(redirect_stdout(err))

            try:
                return func(*args, **kwargs)
            finally:
                logging.getLogger("distributed.worker.stdout").info(out.getvalue())
                logging.getLogger("distributed.worker.stderr").info(err.getvalue())

    return wrapped

How do you see this working with PubSub?

@jakirkham
Copy link
Member Author

Did a little digging in the code. FWICT it appears the run method of _WorkItem is where functions are run. Would it make sense to capture stdout and stderr at this level? Any thoughts on how we might want to do this?

@rainwoodman
Copy link
Contributor

A random walk-by comment. Collecting a logs of a large number of workers to a single destination does not feel easy to make scalable.

If the purpose is to monitor the progress of the workers with granularity smaller than a unit of work, then maybe this can be done in a more controlled way -- e.g. when the workers report the health to the master, it can also report the current progress (e.g. as an arbitrary string) inside the current unit of work?

@mrocklin
Copy link
Member

mrocklin commented Nov 3, 2019

Dask already batches small messages sent between scheduler and workers, so in practice what you are suggesting will likely happen anyway. We've had to build up infrastructure to efficiently pass around many small messages. We now get to rely on that infrastructure.

@rainwoodman
Copy link
Contributor

Do you mean reporting the progress with something like this from the worker client?

https://distributed.dask.org/en/latest/api.html#distributed.Client.set_metadata

@mrocklin
Copy link
Member

mrocklin commented Nov 3, 2019 via email

@anna-geller
Copy link

Is this still an open issue?

@fjetter
Copy link
Member

fjetter commented Jun 1, 2022

In #5217 (the ticket is a bit cryptic, below example pseudo code) we built a functionality to support "remote printing" that may be useful for some people in here. It appears we never updated the docs appropriately, at least I couldn't find it.

from distributed import Client
from distributed.worker import print, warn

def func():
    print("foo")
c = Client("<ip_to_remote_scheduler>")
c.submit(func)

If you run a function on a remote worker and are using the distributed print function, it will print this message on stdout of your worker and forwards the message to the client and prints it on the client as well. The same works for warn as the stdlib warn works and you can build your own client handlers using Client.subscribe_topic in combination with Worker.log_event.

This obviously only scales to a certain level as was already pointed out with collecting logging information in a central place.
At the very least the "simplistic debugging attempt" use case should be covered by this. For proper logging, I would not recommend using this.

@ntonasa
Copy link

ntonasa commented Sep 15, 2022

Any news on this one? It affects other projects as well, like Prefect
PrefectHQ/prefect#5850

@maxbane
Copy link
Contributor

maxbane commented Oct 15, 2022

@ntonasa See my gist here for a proof-of-concept of how to use the subscribe mechanism that was added in #5217 to configure the forwarding of arbitrary logging statements by tasks running on workers to the client session.

@epizut
Copy link

epizut commented Oct 17, 2022

@ntonasa See my gist here for a proof-of-concept of how to use the subscribe mechanism that was added in #5217 to configure the forwarding of arbitrary logging statements by tasks running on workers to the client session.

I am using a similar approach (handle/log_event/subscribe_topic) on a private repo and would love to see your gist merged.
I can help test any PR if needed.

@maxbane
Copy link
Contributor

maxbane commented Oct 17, 2022

Cool, thanks @epizut. I haven't heard back from the the dask maintainers on whether they'd be open to such a PR, but I'm happy to quickly put one together... most of it is in that gist, I'll just need to whip together some unit tests and docs. Can ping you when there's something to test.

My approach will basically be to take the forward_logging() function from that gist and convert it to an instance method on Client objects, perhaps with some additional edge case handling (like what happens if you call it twice).

@jakirkham
Copy link
Member Author

FWIW the original idea behind this issue was to have workers capture print calls and log them in their own specific worker log. Not necessarily aggregate them. Users can already inspect worker logs through the dashboard. Idk if that is still of interest to others, but this might impose less of a burden and be more scalable.

@maxbane
Copy link
Contributor

maxbane commented Nov 9, 2022

@epizut I have a PR up at #7276 -- feel free to test it if you'd like. See the docstr of Client.forward_logging() for examples.

@jakirkham Thanks for clarifying. It's a little confusing because there are a few ideas flying around:

  1. Capture print calls and forward them to the client
  2. Capture logging calls and forward them to the client
  3. Capture print calls and log them locally in the worker without forwarding to the client

My PR really addresses (2), while distributed.print() addresses (1) in a selective way (i.e., user-chosen print calls, not all of stdout), and now @mrocklin's idea #7202 would address (1) on all worker stdout (non-selective) and would also be the foundation for an implementation of (3), which would presumably use a similar Tee-like object in a WorkerPlugin.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

8 participants