From 160475a054a59a0e6d763bcff5b26b27ce2a2e52 Mon Sep 17 00:00:00 2001 From: Erik Welch Date: Sat, 25 Sep 2021 21:58:37 -0500 Subject: [PATCH 1/7] Display print statements async in real time. --- afar/_core.py | 143 +++++++++++++++++++++++++++++++++------------- afar/_printing.py | 63 ++++++-------------- requirements.txt | 2 +- setup.py | 3 +- 4 files changed, 123 insertions(+), 88 deletions(-) diff --git a/afar/_core.py b/afar/_core.py index 78fe8dc..ef8683a 100644 --- a/afar/_core.py +++ b/afar/_core.py @@ -1,13 +1,16 @@ import dis +import sys from functools import partial from inspect import currentframe, findsource +from uuid import uuid4 from weakref import WeakKeyDictionary, WeakSet from dask import distributed +from dask.distributed import get_worker from ._abra import cadabra -from ._printing import PrintRecorder, print_outputs, print_outputs_async -from ._reprs import repr_afar +from ._printing import PrintRecorder +from ._reprs import display_repr, repr_afar from ._utils import is_kernel, supports_async_output from ._where import find_where @@ -59,6 +62,8 @@ def get_body(lines): class Run: _gather_data = False + # Used to update outputs asynchronously + _outputs = {} def __init__(self, *names, client=None, data=None): self.names = names @@ -261,11 +266,29 @@ def _run( data.update(scattered) for key in to_scatter: del self._magic_func._scoped.outer_scope[key] + + if capture_print and "afar-print" not in client._event_handlers: + client.subscribe_topic("afar-print", self._handle_print) + async_print = capture_print and supports_async_output() + if capture_print: + unique_key = uuid4().hex + self._setup_print(unique_key, async_print) + else: + unique_key = None + # Scatter magic_func to avoid "Large object" UserWarning - magic_func = client.scatter(self._magic_func) + magic_func = client.scatter(self._magic_func, hash=False) weak_futures.add(magic_func) + remote_dict = client.submit( - run_afar, magic_func, names, futures, capture_print, pure=False, **submit_kwargs + run_afar, + magic_func, + names, + futures, + capture_print, + unique_key, + pure=False, + **submit_kwargs, ) weak_futures.add(remote_dict) magic_func.release() # Let go ASAP @@ -284,11 +307,13 @@ def _run( return_future = None else: repr_future = None + if capture_print: - stdout_future = client.submit(get_afar, remote_dict, "_afar_stdout_") - weak_futures.add(stdout_future) - stderr_future = client.submit(get_afar, remote_dict, "_afar_stderr_") - weak_futures.add(stderr_future) + obj = repr_future if display_expr else remote_dict + obj.add_done_callback( + partial(self._finalize_print, self._outputs[unique_key], display_expr) + ) + if self._gather_data: futures_to_name = { client.submit(get_afar, remote_dict, name, **submit_kwargs): name @@ -304,21 +329,6 @@ def _run( weak_futures.add(future) data[name] = future remote_dict.release() # Let go ASAP - - if capture_print and supports_async_output(): - # Display in `out` cell when data is ready: non-blocking - from IPython.display import display - from ipywidgets import Output - - out = Output() - display(out) - out.append_stdout("\N{SPARKLES} Running afar... \N{SPARKLES}") - stdout_future.add_done_callback( - partial(print_outputs_async, out, stderr_future, repr_future) - ) - elif capture_print: - # blocks! - print_outputs(stdout_future, stderr_future, repr_future) elif where == "locally": # Run locally. This is handy for testing and debugging. results = self._magic_func() @@ -352,6 +362,55 @@ def cancel(self, *, client=None, force=False): ) weak_futures.clear() + def _setup_print(self, key, async_print): + if async_print: + from IPython.display import display + from ipywidgets import Output + + out = Output() + display(out) + out.append_stdout("\N{SPARKLES} Running afar... \N{SPARKLES}") + else: + out = None + self._outputs[key] = [out, False] # False means has not been updated + + @classmethod + def _handle_print(cls, event): + # XXX: can we assume all messages from a single task arrive in FIFO order? + _, msg = event + key, stream_name, string = msg + out, is_updated = cls._outputs[key] + if out is not None: + if not is_updated: + # Clear the "Running afar..." message + out.outputs = type(out.outputs)() + cls._outputs[key][1] = True # is updated + if stream_name == "stdout": + out.append_stdout(string) + elif stream_name == "stderr": + out.append_stderr(string) + elif stream_name == "stdout": + print(string, end="") + elif stream_name == "stderr": + print(string, end="", file=sys.stderr) + if stream_name == "finish": + del cls._outputs[key] + + def _finalize_print(self, info, display_expr, future): + # Can we move this to `_handle_print`? + # _handle_print for this key may get called *after* _finalize_print + out, is_updated = info + if out is not None and not is_updated: + # Clear the "Running afar..." message to indicate it's finished + # out.clear_output() # Not thread-safe! + # See: https://github.com/jupyter-widgets/ipywidgets/issues/3260 + out.outputs = type(out.outputs)() # current workaround + info[1] = True # is updated + if display_expr: + repr_val = future.result() + if repr_val is not None: + display_repr(repr_val, out=out) + class Get(Run): """Unlike ``run``, ``get`` automatically gathers the data locally""" @@ -359,25 +418,31 @@ class Get(Run): _gather_data = True -def run_afar(magic_func, names, futures, capture_print): - if capture_print: - rec = PrintRecorder() - if "print" in magic_func._scoped.builtin_names and "print" not in futures: - sfunc = magic_func._scoped.bind(futures, print=rec) +def run_afar(magic_func, names, futures, capture_print, unique_key): + try: + if capture_print: + rec = PrintRecorder(unique_key) + if "print" in magic_func._scoped.builtin_names and "print" not in futures: + sfunc = magic_func._scoped.bind(futures, print=rec) + else: + sfunc = magic_func._scoped.bind(futures) + with rec: + results = sfunc() else: sfunc = magic_func._scoped.bind(futures) - with rec: results = sfunc() - else: - sfunc = magic_func._scoped.bind(futures) - results = sfunc() - - rv = {key: results[key] for key in names} - if magic_func._display_expr: - rv["_afar_return_value_"] = results.return_value - if capture_print: - rv["_afar_stdout_"] = rec.stdout.getvalue() - rv["_afar_stderr_"] = rec.stderr.getvalue() + + rv = {key: results[key] for key in names} + if magic_func._display_expr: + rv["_afar_return_value_"] = results.return_value + finally: + if capture_print: + try: + worker = get_worker() + except ValueError: + pass + else: + worker.log_event("afar-print", (unique_key, "finish", None)) return rv diff --git a/afar/_printing.py b/afar/_printing.py index b8083e8..1bd2137 100644 --- a/afar/_printing.py +++ b/afar/_printing.py @@ -3,7 +3,7 @@ from io import StringIO from threading import Lock, local -from ._reprs import display_repr +from dask.distributed import get_worker # Here's the plan: we'll capture all print statements to stdout and stderr @@ -21,9 +21,8 @@ class PrintRecorder: local_print = LocalPrint() print_lock = Lock() - def __init__(self): - self.stdout = StringIO() - self.stderr = StringIO() + def __init__(self, key): + self.key = key def __enter__(self): with self.print_lock: @@ -44,48 +43,18 @@ def __exit__(self, exc_type, exc_value, exc_traceback): def __call__(self, *args, file=None, **kwargs): if file is None or file is sys.stdout: - file = self.stdout + file = StringIO() + stream_name = "stdout" elif file is sys.stderr: - file = self.stderr + file = StringIO() + stream_name = "stderr" + else: + stream_name = None LocalPrint.printer(*args, **kwargs, file=file) - - -def print_outputs(stdout_future, stderr_future, repr_future): - """Print results to the user""" - stdout_val = stdout_future.result() - stdout_future.release() - if stdout_val: - print(stdout_val, end="") - stderr_val = stderr_future.result() - stderr_future.release() - if stderr_val: - print(stderr_val, end="", file=sys.stderr) - if repr_future is not None: - repr_val = repr_future.result() - repr_future.release() - if repr_val is not None: - display_repr(repr_val) - - -def print_outputs_async(out, stderr_future, repr_future, stdout_future): - """Display output streams and final expression to the user. - - This is used as a callback to `stdout_future`. - """ - try: - stdout_val = stdout_future.result() - # out.clear_output() # Not thread-safe! - # See: https://github.com/jupyter-widgets/ipywidgets/issues/3260 - out.outputs = type(out.outputs)() # current workaround - if stdout_val: - out.append_stdout(stdout_val) - stderr_val = stderr_future.result() - if stderr_val: - out.append_stderr(stderr_val) - if repr_future is not None: - repr_val = repr_future.result() - if repr_val is not None: - display_repr(repr_val, out=out) - except Exception as exc: - print(exc, file=sys.stderr) - raise + if stream_name is not None: + try: + worker = get_worker() + except ValueError: + pass + else: + worker.log_event("afar-print", (self.key, stream_name, file.getvalue())) diff --git a/requirements.txt b/requirements.txt index 17d4993..bee4088 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ innerscope -distributed +distributed >=2021.9.1 diff --git a/setup.py b/setup.py index 820fff9..b557df5 100644 --- a/setup.py +++ b/setup.py @@ -1,5 +1,6 @@ +from setuptools import find_packages, setup + import versioneer -from setuptools import setup, find_packages install_requires = open("requirements.txt").read().strip().split("\n") with open("README.md") as f: From 0234f915463bd9be884efa9cbbdcc4a2cf240aae Mon Sep 17 00:00:00 2001 From: Erik Welch Date: Sun, 26 Sep 2021 10:29:33 -0500 Subject: [PATCH 2/7] Use event stream instead of callback for repr --- afar/_abra.py | 4 +- afar/_core.py | 172 ++++++++++------------------------------------- afar/_inspect.py | 83 +++++++++++++++++++++++ afar/_utils.py | 7 +- pyproject.toml | 1 + 5 files changed, 128 insertions(+), 139 deletions(-) create mode 100644 afar/_inspect.py diff --git a/afar/_abra.py b/afar/_abra.py index 50eb25a..eed4faa 100644 --- a/afar/_abra.py +++ b/afar/_abra.py @@ -5,7 +5,7 @@ from innerscope import scoped_function from ._reprs import get_repr_methods -from ._utils import code_replace, is_kernel +from ._utils import code_replace, is_ipython def endswith_expr(func): @@ -85,7 +85,7 @@ def cadabra(context_body, where, names, data, global_ns, local_ns): # Create a new function from the code block of the context. # For now, we require that the source code is available. source = "def _afar_magic_():\n" + "".join(context_body) - func, display_expr = create_func(source, global_ns, is_kernel()) + func, display_expr = create_func(source, global_ns, is_ipython()) # If no variable names were given, only get the last assignment if not names: diff --git a/afar/_core.py b/afar/_core.py index ef8683a..409d7a8 100644 --- a/afar/_core.py +++ b/afar/_core.py @@ -1,7 +1,6 @@ import dis import sys -from functools import partial -from inspect import currentframe, findsource +from inspect import currentframe from uuid import uuid4 from weakref import WeakKeyDictionary, WeakSet @@ -9,57 +8,13 @@ from dask.distributed import get_worker from ._abra import cadabra +from ._inspect import get_body, get_body_start, get_lines from ._printing import PrintRecorder from ._reprs import display_repr, repr_afar -from ._utils import is_kernel, supports_async_output +from ._utils import supports_async_output from ._where import find_where -def get_body_start(lines, with_start): - line = lines[with_start] - stripped = line.lstrip() - body = line[: len(line) - len(stripped)] + " pass\n" - body *= 2 - with_lines = [stripped] - try: - code = compile(stripped, "", "exec") - except Exception: - pass - else: - raise RuntimeError( - "Failed to analyze the context! When using afar, " - "please put the context body on a new line." - ) - for i, line in enumerate(lines[with_start:]): - if i > 0: - with_lines.append(line) - if ":" in line: - source = "".join(with_lines) + body - try: - code = compile(source, "", "exec") - except Exception: - pass - else: - num_with = code.co_code.count(dis.opmap["SETUP_WITH"]) - body_start = with_start + i + 1 - return num_with, body_start - raise RuntimeError("Failed to analyze the context!") - - -def get_body(lines): - head = "def f():\n with x:\n " - tail = " pass\n pass\n" - while lines: - source = head + " ".join(lines) + tail - try: - compile(source, "", "exec") - except Exception: - lines.pop() - else: - return lines - raise RuntimeError("Failed to analyze the context body!") - - class Run: _gather_data = False # Used to update outputs asynchronously @@ -99,36 +54,8 @@ def __enter__(self): if self.data: raise RuntimeError("uh oh!") self.data = {} - try: - lines, offset = findsource(self._frame) - except OSError: - # Try to fine the source if we are in %%time or %%timeit magic - if self._frame.f_code.co_filename in {"", ""} and is_kernel(): - from IPython import get_ipython - - ip = get_ipython() - if ip is None: - raise - cell = ip.history_manager._i00 # The current cell! - lines = cell.splitlines(keepends=True) - # strip the magic - for i, line in enumerate(lines): - if line.strip().startswith("%%time"): - lines = lines[i + 1 :] - break - else: - raise - # strip blank lines - for i, line in enumerate(lines): - if line.strip(): - if i: - lines = lines[i:] - lines[-1] += "\n" - break - else: - raise - else: - raise + + lines = get_lines(self._frame) while not lines[with_lineno].lstrip().startswith("with"): with_lineno -= 1 @@ -241,10 +168,10 @@ def _run( else: weak_futures = self._client_to_futures[client] + # Should we always capture print now that it's handled async? has_print = "print" in self._magic_func._scoped.builtin_names capture_print = ( - self._gather_data # we're blocking anyway to gather data - or display_expr # we need to display an expression (sync or async) + display_expr # we need to display an expression (sync or async) or has_print # print is in the context body or supports_async_output() # no need to block, so why not? ) @@ -292,27 +219,6 @@ def _run( ) weak_futures.add(remote_dict) magic_func.release() # Let go ASAP - if display_expr: - return_future = client.submit(get_afar, remote_dict, "_afar_return_value_") - repr_future = client.submit( - repr_afar, - return_future, - self._magic_func._repr_methods, - ) - weak_futures.add(repr_future) - if return_expr: - weak_futures.add(return_future) - else: - return_future.release() # Let go ASAP - return_future = None - else: - repr_future = None - - if capture_print: - obj = repr_future if display_expr else remote_dict - obj.add_done_callback( - partial(self._finalize_print, self._outputs[unique_key], display_expr) - ) if self._gather_data: futures_to_name = { @@ -378,38 +284,27 @@ def _setup_print(self, key, async_print): def _handle_print(cls, event): # XXX: can we assume all messages from a single task arrive in FIFO order? _, msg = event - key, stream_name, string = msg + key, action, payload = msg out, is_updated = cls._outputs[key] if out is not None: if not is_updated: # Clear the "Running afar..." message out.outputs = type(out.outputs)() cls._outputs[key][1] = True # is updated - if stream_name == "stdout": - out.append_stdout(string) - elif stream_name == "stderr": - out.append_stderr(string) - elif stream_name == "stdout": - print(string, end="") - elif stream_name == "stderr": - print(string, end="", file=sys.stderr) - if stream_name == "finish": + # ipywidgets.Output is pretty slow if there are lots of messages + if action == "stdout": + out.append_stdout(payload) + elif action == "stderr": + out.append_stderr(payload) + elif action == "stdout": + print(payload, end="") + elif action == "stderr": + print(payload, end="", file=sys.stderr) + if action == "display_expr": + display_repr(payload, out=out) + del cls._outputs[key] + elif action == "finish": del cls._outputs[key] - - def _finalize_print(self, info, display_expr, future): - # Can we move this to `_handle_print`? - # _handle_print for this key may get called *after* _finalize_print - out, is_updated = info - if out is not None and not is_updated: - # Clear the "Running afar..." message to indicate it's finished - # out.clear_output() # Not thread-safe! - # See: https://github.com/jupyter-widgets/ipywidgets/issues/3260 - out.outputs = type(out.outputs)() # current workaround - info[1] = True # is updated - if display_expr: - repr_val = future.result() - if repr_val is not None: - display_repr(repr_val, out=out) class Get(Run): @@ -419,8 +314,14 @@ class Get(Run): def run_afar(magic_func, names, futures, capture_print, unique_key): + if capture_print: + try: + worker = get_worker() + send_finish = True + except ValueError: + worker = None try: - if capture_print: + if capture_print and worker is not None: rec = PrintRecorder(unique_key) if "print" in magic_func._scoped.builtin_names and "print" not in futures: sfunc = magic_func._scoped.bind(futures, print=rec) @@ -433,16 +334,15 @@ def run_afar(magic_func, names, futures, capture_print, unique_key): results = sfunc() rv = {key: results[key] for key in names} - if magic_func._display_expr: - rv["_afar_return_value_"] = results.return_value + + if magic_func._display_expr and worker is not None: + pretty_repr = repr_afar(results.return_value, magic_func._repr_methods) + if pretty_repr is not None: + worker.log_event("afar-print", (unique_key, "display_expr", pretty_repr)) + send_finish = False finally: - if capture_print: - try: - worker = get_worker() - except ValueError: - pass - else: - worker.log_event("afar-print", (unique_key, "finish", None)) + if capture_print and worker is not None and send_finish: + worker.log_event("afar-print", (unique_key, "finish", None)) return rv diff --git a/afar/_inspect.py b/afar/_inspect.py new file mode 100644 index 0000000..b9713b6 --- /dev/null +++ b/afar/_inspect.py @@ -0,0 +1,83 @@ +import dis +from inspect import findsource + +from ._utils import is_ipython + + +def get_lines(frame): + try: + lines, offset = findsource(frame) + except OSError: + # Try to fine the source if we are in %%time or %%timeit magic + if frame.f_code.co_filename in {"", ""} and is_ipython(): + from IPython import get_ipython + + ip = get_ipython() + if ip is None: + raise + cell = ip.history_manager._i00 # The current cell! + lines = cell.splitlines(keepends=True) + # strip the magic + for i, line in enumerate(lines): + if line.strip().startswith("%%time"): + lines = lines[i + 1 :] + break + else: + raise + # strip blank lines + for i, line in enumerate(lines): + if line.strip(): + if i: + lines = lines[i:] + lines[-1] += "\n" + break + else: + raise + else: + raise + return lines + + +def get_body_start(lines, with_start): + line = lines[with_start] + stripped = line.lstrip() + body = line[: len(line) - len(stripped)] + " pass\n" + body *= 2 + with_lines = [stripped] + try: + code = compile(stripped, "", "exec") + except Exception: + pass + else: + raise RuntimeError( + "Failed to analyze the context! When using afar, " + "please put the context body on a new line." + ) + for i, line in enumerate(lines[with_start:]): + if i > 0: + with_lines.append(line) + if ":" in line: + source = "".join(with_lines) + body + try: + code = compile(source, "", "exec") + except Exception: + pass + else: + num_with = code.co_code.count(dis.opmap["SETUP_WITH"]) + body_start = with_start + i + 1 + return num_with, body_start + raise RuntimeError("Failed to analyze the context!") + + +def get_body(lines): + head = "def f():\n with x:\n " + tail = " pass\n pass\n" + while lines: + source = head + " ".join(lines) + tail + try: + compile(source, "", "exec") + except Exception: + lines.pop() + else: + return lines + raise RuntimeError("Failed to analyze the context body!") diff --git a/afar/_utils.py b/afar/_utils.py index c957814..c3aec0e 100644 --- a/afar/_utils.py +++ b/afar/_utils.py @@ -1,3 +1,4 @@ +import builtins import sys from types import CodeType @@ -5,13 +6,17 @@ def is_terminal(): - if "IPython" not in sys.modules: # IPython hasn't been imported + if not is_ipython(): return False from IPython import get_ipython return type(get_ipython()).__name__ == "TerminalInteractiveShell" +def is_ipython(): + return hasattr(builtins, "__IPYTHON__") and "IPython" in sys.modules + + def supports_async_output(): if is_kernel() and not is_terminal(): try: diff --git a/pyproject.toml b/pyproject.toml index aa4949a..225abc7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,2 +1,3 @@ [tool.black] line-length = 100 +extend-exclude = "test_notebook.ipynb" From 5955e07193194847e0347c402d94abded4b0fd92 Mon Sep 17 00:00:00 2001 From: Erik Welch Date: Sun, 26 Sep 2021 10:43:28 -0500 Subject: [PATCH 3/7] Always capture print, and be more paranoid about retried tasks. --- afar/_core.py | 36 +++++++++++++++++++----------------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/afar/_core.py b/afar/_core.py index 409d7a8..73fc841 100644 --- a/afar/_core.py +++ b/afar/_core.py @@ -168,14 +168,6 @@ def _run( else: weak_futures = self._client_to_futures[client] - # Should we always capture print now that it's handled async? - has_print = "print" in self._magic_func._scoped.builtin_names - capture_print = ( - display_expr # we need to display an expression (sync or async) - or has_print # print is in the context body - or supports_async_output() # no need to block, so why not? - ) - to_scatter = data.keys() & self._magic_func._scoped.outer_scope.keys() if to_scatter: # Scatter value in `data` that we need in this calculation. @@ -194,6 +186,7 @@ def _run( for key in to_scatter: del self._magic_func._scoped.outer_scope[key] + capture_print = True if capture_print and "afar-print" not in client._event_handlers: client.subscribe_topic("afar-print", self._handle_print) async_print = capture_print and supports_async_output() @@ -285,17 +278,25 @@ def _handle_print(cls, event): # XXX: can we assume all messages from a single task arrive in FIFO order? _, msg = event key, action, payload = msg + if key not in cls._outputs: + return out, is_updated = cls._outputs[key] if out is not None: - if not is_updated: - # Clear the "Running afar..." message - out.outputs = type(out.outputs)() - cls._outputs[key][1] = True # is updated - # ipywidgets.Output is pretty slow if there are lots of messages - if action == "stdout": - out.append_stdout(payload) - elif action == "stderr": - out.append_stderr(payload) + if action == "begin": + if is_updated: + out.outputs = type(out.outputs)() + out.append_stdout("\N{SPARKLES} Running afar... (restarted) \N{SPARKLES}") + cls._outputs[key][1] = False # is not updated + else: + if not is_updated: + # Clear the "Running afar..." message + out.outputs = type(out.outputs)() + cls._outputs[key][1] = True # is updated + # ipywidgets.Output is pretty slow if there are lots of messages + if action == "stdout": + out.append_stdout(payload) + elif action == "stderr": + out.append_stderr(payload) elif action == "stdout": print(payload, end="") elif action == "stderr": @@ -322,6 +323,7 @@ def run_afar(magic_func, names, futures, capture_print, unique_key): worker = None try: if capture_print and worker is not None: + worker.log_event("afar-print", (unique_key, "begin", None)) rec = PrintRecorder(unique_key) if "print" in magic_func._scoped.builtin_names and "print" not in futures: sfunc = magic_func._scoped.bind(futures, print=rec) From da212453b4e8104ba63404a585028d4fede66bd8 Mon Sep 17 00:00:00 2001 From: Erik Welch Date: Sun, 26 Sep 2021 10:46:41 -0500 Subject: [PATCH 4/7] comment --- afar/_core.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/afar/_core.py b/afar/_core.py index 73fc841..0ed2c81 100644 --- a/afar/_core.py +++ b/afar/_core.py @@ -338,6 +338,8 @@ def run_afar(magic_func, names, futures, capture_print, unique_key): rv = {key: results[key] for key in names} if magic_func._display_expr and worker is not None: + # Hopefully computing the repr is fast. If it is slow, perhaps it would be + # better to add the return value to rv and call repr_afar as a separate task. pretty_repr = repr_afar(results.return_value, magic_func._repr_methods) if pretty_repr is not None: worker.log_event("afar-print", (unique_key, "display_expr", pretty_repr)) From bcfc1e7f7871ca079b87fa4b500d4f1f90df0cb3 Mon Sep 17 00:00:00 2001 From: Erik Welch Date: Sun, 26 Sep 2021 11:33:27 -0500 Subject: [PATCH 5/7] Use unique topic name to better support multitenancy --- afar/_core.py | 19 ++++++++++++------- afar/_printing.py | 5 +++-- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/afar/_core.py b/afar/_core.py index 0ed2c81..fb83222 100644 --- a/afar/_core.py +++ b/afar/_core.py @@ -19,6 +19,7 @@ class Run: _gather_data = False # Used to update outputs asynchronously _outputs = {} + _channel = "afar-" + uuid4().hex def __init__(self, *names, client=None, data=None): self.names = names @@ -187,8 +188,9 @@ def _run( del self._magic_func._scoped.outer_scope[key] capture_print = True - if capture_print and "afar-print" not in client._event_handlers: - client.subscribe_topic("afar-print", self._handle_print) + if capture_print and self._channel not in client._event_handlers: + client.subscribe_topic(self._channel, self._handle_print) + # When would be a good time to unsubscribe? async_print = capture_print and supports_async_output() if capture_print: unique_key = uuid4().hex @@ -206,6 +208,7 @@ def _run( names, futures, capture_print, + self._channel, unique_key, pure=False, **submit_kwargs, @@ -314,7 +317,7 @@ class Get(Run): _gather_data = True -def run_afar(magic_func, names, futures, capture_print, unique_key): +def run_afar(magic_func, names, futures, capture_print, channel, unique_key): if capture_print: try: worker = get_worker() @@ -323,8 +326,8 @@ def run_afar(magic_func, names, futures, capture_print, unique_key): worker = None try: if capture_print and worker is not None: - worker.log_event("afar-print", (unique_key, "begin", None)) - rec = PrintRecorder(unique_key) + worker.log_event(channel, (unique_key, "begin", None)) + rec = PrintRecorder(channel, unique_key) if "print" in magic_func._scoped.builtin_names and "print" not in futures: sfunc = magic_func._scoped.bind(futures, print=rec) else: @@ -340,13 +343,15 @@ def run_afar(magic_func, names, futures, capture_print, unique_key): if magic_func._display_expr and worker is not None: # Hopefully computing the repr is fast. If it is slow, perhaps it would be # better to add the return value to rv and call repr_afar as a separate task. + # Also, pretty_repr must be msgpack serializable if done via events. + # Hence, custom _ipython_display_ probably won't work. pretty_repr = repr_afar(results.return_value, magic_func._repr_methods) if pretty_repr is not None: - worker.log_event("afar-print", (unique_key, "display_expr", pretty_repr)) + worker.log_event(channel, (unique_key, "display_expr", pretty_repr)) send_finish = False finally: if capture_print and worker is not None and send_finish: - worker.log_event("afar-print", (unique_key, "finish", None)) + worker.log_event(channel, (unique_key, "finish", None)) return rv diff --git a/afar/_printing.py b/afar/_printing.py index 1bd2137..836bffc 100644 --- a/afar/_printing.py +++ b/afar/_printing.py @@ -21,7 +21,8 @@ class PrintRecorder: local_print = LocalPrint() print_lock = Lock() - def __init__(self, key): + def __init__(self, channel, key): + self.channel = channel self.key = key def __enter__(self): @@ -57,4 +58,4 @@ def __call__(self, *args, file=None, **kwargs): except ValueError: pass else: - worker.log_event("afar-print", (self.key, stream_name, file.getvalue())) + worker.log_event(self.channel, (self.key, stream_name, file.getvalue())) From c58b17b401bdb480302d5996a32e4cb28d915420 Mon Sep 17 00:00:00 2001 From: Erik Welch Date: Mon, 27 Sep 2021 09:35:42 -0500 Subject: [PATCH 6/7] Print locally on the worker too --- .github/workflows/test_conda.yml | 3 +-- .github/workflows/test_pip.yml | 2 +- afar/__init__.py | 15 +++++++++++++++ afar/_abra.py | 5 +++++ afar/_core.py | 17 +++++++++++++++-- afar/_inspect.py | 1 + afar/_magic.py | 1 + afar/_printing.py | 4 ++++ afar/_reprs.py | 1 + 9 files changed, 44 insertions(+), 5 deletions(-) diff --git a/.github/workflows/test_conda.yml b/.github/workflows/test_conda.yml index e0a5907..99c5988 100644 --- a/.github/workflows/test_conda.yml +++ b/.github/workflows/test_conda.yml @@ -31,8 +31,7 @@ jobs: activate-environment: afar - name: Install dependencies run: | - conda install -y -c conda-forge distributed pytest - pip install innerscope + conda install -y -c conda-forge distributed pytest innerscope pip install -e . - name: PyTest run: | diff --git a/.github/workflows/test_pip.yml b/.github/workflows/test_pip.yml index 63a70e7..4b51141 100644 --- a/.github/workflows/test_pip.yml +++ b/.github/workflows/test_pip.yml @@ -39,7 +39,7 @@ jobs: run: | pip install black flake8 flake8 . - black afar *.py --check --diff + black . --check --diff - name: Coverage env: GITHUB_TOKEN: ${{ secrets.github_token }} diff --git a/afar/__init__.py b/afar/__init__.py index c85f0e6..a5fd7cf 100644 --- a/afar/__init__.py +++ b/afar/__init__.py @@ -1,3 +1,18 @@ +"""afar runs code within a context manager or IPython magic on a Dask cluster. + +>>> with afar.run, remotely: +... import dask_cudf +... df = dask_cudf.read_parquet("s3://...") +... result = df.sum().compute() + +or to use an IPython magic: + +>>> %load_ext afar +>>> %afar z = x + y + +Read the documentation at https://github.com/eriknw/afar +""" + from ._core import get, run # noqa from ._version import get_versions from ._where import later, locally, remotely # noqa diff --git a/afar/_abra.py b/afar/_abra.py index eed4faa..fca6b63 100644 --- a/afar/_abra.py +++ b/afar/_abra.py @@ -1,3 +1,8 @@ +"""Perform a magic trick: given lines of code, create a function to run remotely. + +This callable object is able to provide the values of the requested argument +names and return the final expression so it can be displayed. +""" import dis from types import FunctionType diff --git a/afar/_core.py b/afar/_core.py index fb83222..e552de7 100644 --- a/afar/_core.py +++ b/afar/_core.py @@ -1,5 +1,7 @@ +"""Define the user-facing `run` object; this is where it all comes together.""" import dis import sys +import traceback from inspect import currentframe from uuid import uuid4 from weakref import WeakKeyDictionary, WeakSet @@ -344,10 +346,21 @@ def run_afar(magic_func, names, futures, capture_print, channel, unique_key): # Hopefully computing the repr is fast. If it is slow, perhaps it would be # better to add the return value to rv and call repr_afar as a separate task. # Also, pretty_repr must be msgpack serializable if done via events. - # Hence, custom _ipython_display_ probably won't work. + # Hence, custom _ipython_display_ probably won't work, and we resort to + # trying to use a basic repr (if that fails, we show the first exception). pretty_repr = repr_afar(results.return_value, magic_func._repr_methods) if pretty_repr is not None: - worker.log_event(channel, (unique_key, "display_expr", pretty_repr)) + try: + worker.log_event(channel, (unique_key, "display_expr", pretty_repr)) + except Exception: + exc_info = sys.exc_info() + tb = traceback.format_exception(*exc_info) + try: + basic_repr = (repr(results.return_value), "__repr__", False) + worker.log_event(channel, (unique_key, "display_expr", basic_repr)) + except Exception: + exc_repr = (tb, pretty_repr[1], True) + worker.log_event(channel, (unique_key, "display_expr", exc_repr)) send_finish = False finally: if capture_print and worker is not None and send_finish: diff --git a/afar/_inspect.py b/afar/_inspect.py index b9713b6..dcdc77a 100644 --- a/afar/_inspect.py +++ b/afar/_inspect.py @@ -1,3 +1,4 @@ +"""Utilities to get the lines of the context body.""" import dis from inspect import findsource diff --git a/afar/_magic.py b/afar/_magic.py index 889a183..194edfa 100644 --- a/afar/_magic.py +++ b/afar/_magic.py @@ -1,3 +1,4 @@ +"""Define the IPython magic for using afar""" from textwrap import indent from dask.distributed import Client diff --git a/afar/_printing.py b/afar/_printing.py index 836bffc..3cf42d4 100644 --- a/afar/_printing.py +++ b/afar/_printing.py @@ -1,3 +1,4 @@ +"""Classes used to capture print statements within a Dask task.""" import builtins import sys from io import StringIO @@ -59,3 +60,6 @@ def __call__(self, *args, file=None, **kwargs): pass else: worker.log_event(self.channel, (self.key, stream_name, file.getvalue())) + # Print locally too + stream = sys.stdout if stream_name == "stdout" else sys.stderr + LocalPrint.printer(file.getvalue(), end="", file=stream) diff --git a/afar/_reprs.py b/afar/_reprs.py index 19ad0f9..a3e8c4a 100644 --- a/afar/_reprs.py +++ b/afar/_reprs.py @@ -1,3 +1,4 @@ +"""Utilities to calculate the (pretty) repr of objects remotely and display locally.""" import sys import traceback From 403e9e55086806eac72cc7724fa817e60f7b504e Mon Sep 17 00:00:00 2001 From: Erik Welch Date: Mon, 27 Sep 2021 12:34:20 -0500 Subject: [PATCH 7/7] Don't support `_ipython_display_` rich repr. Too complicated atm! --- afar/_core.py | 18 +++--------------- afar/_reprs.py | 6 +++++- 2 files changed, 8 insertions(+), 16 deletions(-) diff --git a/afar/_core.py b/afar/_core.py index e552de7..130a952 100644 --- a/afar/_core.py +++ b/afar/_core.py @@ -1,7 +1,6 @@ """Define the user-facing `run` object; this is where it all comes together.""" import dis import sys -import traceback from inspect import currentframe from uuid import uuid4 from weakref import WeakKeyDictionary, WeakSet @@ -345,22 +344,11 @@ def run_afar(magic_func, names, futures, capture_print, channel, unique_key): if magic_func._display_expr and worker is not None: # Hopefully computing the repr is fast. If it is slow, perhaps it would be # better to add the return value to rv and call repr_afar as a separate task. - # Also, pretty_repr must be msgpack serializable if done via events. - # Hence, custom _ipython_display_ probably won't work, and we resort to - # trying to use a basic repr (if that fails, we show the first exception). + # Also, pretty_repr must be msgpack serializable if done via events. Hence, + # custom _ipython_display_ doesn't work, and we resort to using a basic repr. pretty_repr = repr_afar(results.return_value, magic_func._repr_methods) if pretty_repr is not None: - try: - worker.log_event(channel, (unique_key, "display_expr", pretty_repr)) - except Exception: - exc_info = sys.exc_info() - tb = traceback.format_exception(*exc_info) - try: - basic_repr = (repr(results.return_value), "__repr__", False) - worker.log_event(channel, (unique_key, "display_expr", basic_repr)) - except Exception: - exc_repr = (tb, pretty_repr[1], True) - worker.log_event(channel, (unique_key, "display_expr", exc_repr)) + worker.log_event(channel, (unique_key, "display_expr", pretty_repr)) send_finish = False finally: if capture_print and worker is not None and send_finish: diff --git a/afar/_reprs.py b/afar/_reprs.py index a3e8c4a..f3ea60d 100644 --- a/afar/_reprs.py +++ b/afar/_reprs.py @@ -14,7 +14,9 @@ def __init__(self): self._attrs = [] def __getattr__(self, attr): - if "canary" not in attr: + if "canary" not in attr and attr != "_ipython_display_": + # _ipython_display_ requires sending the object back to the client. + # Let's not bother with this hassle for now. self._attrs.append(attr) raise AttributeError(attr) @@ -44,6 +46,7 @@ def repr_afar(val, repr_methods): continue if method_name == "_ipython_display_": # Custom display! Send the object to the client + # We don't allow _ipython_display_ at the moment return val, method_name, False try: rv = method() @@ -100,6 +103,7 @@ def display_repr(results, out=None): from IPython.display import display if method_name == "_ipython_display_": + # We don't allow _ipython_display_ at the moment if out is None: display(val) else: