Skip to content

Commit

Permalink
Add coalesce_streams
Browse files Browse the repository at this point in the history
Co-authored-by: chrisjsewell <[email protected]>
  • Loading branch information
davidbrochart and chrisjsewell committed Apr 1, 2023
1 parent 0f6a79d commit b4e9f3b
Showing 1 changed file with 50 additions and 0 deletions.
50 changes: 50 additions & 0 deletions nbclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import base64
import collections
import datetime
import re
import signal
import typing as t
from contextlib import asynccontextmanager, contextmanager
Expand All @@ -28,6 +29,9 @@
from .output_widget import OutputWidget
from .util import ensure_async, run_hook, run_sync

_RGX_CARRIAGERETURN = re.compile(r".*\r(?=[^\n])")
_RGX_BACKSPACE = re.compile(r"[^\n]\b")


def timestamp(msg: t.Optional[t.Dict] = None) -> str:
"""Get the timestamp for a message."""
Expand Down Expand Up @@ -426,6 +430,14 @@ def _kernel_manager_class_default(self) -> t.Type[KernelManager]:
)
)

coalesce_streams = Bool(
help=dedent(
"""
Merge all stream outputs with shared names into single streams.
"""
)
)

def __init__(self, nb: NotebookNode, km: t.Optional[KernelManager] = None, **kw: t.Any) -> None:
"""Initializes the execution manager.
Expand Down Expand Up @@ -1006,6 +1018,44 @@ async def async_execute_cell(
self.on_cell_executed, cell=cell, cell_index=cell_index, execute_reply=exec_reply
)
await self._check_raise_for_error(cell, cell_index, exec_reply)

if self.coalesce_streams and cell.outputs:
new_outputs = []
streams: dict[str, NotebookNode] = {}
for output in cell.outputs:
if output["output_type"] == "stream":
if output["name"] in streams:
streams[output["name"]]["text"] += output["text"]
else:
new_outputs.append(output)
streams[output["name"]] = output
else:
new_outputs.append(output)

# process \r and \b characters
for output in streams.values():
old = output["text"]
while len(output["text"]) < len(old):
old = output["text"]
# Cancel out anything-but-newline followed by backspace
output["text"] = _RGX_BACKSPACE.sub("", output["text"])
# Replace all carriage returns not followed by newline
output["text"] = _RGX_CARRIAGERETURN.sub("", output["text"])

# We also want to ensure stdout and stderr are always in the same consecutive order,
# because they are asynchronous, so order isn't guaranteed.
for i, output in enumerate(new_outputs):
if output["output_type"] == "stream" and output["name"] == "stderr":
if (
len(new_outputs) >= i + 2
and new_outputs[i + 1]["output_type"] == "stream"
and new_outputs[i + 1]["name"] == "stdout"
):
stdout = new_outputs.pop(i + 1)
new_outputs.insert(i, stdout)

cell.outputs = new_outputs

self.nb['cells'][cell_index] = cell
return cell

Expand Down

0 comments on commit b4e9f3b

Please sign in to comment.