diff --git a/nbclient/client.py b/nbclient/client.py index b2a1d07..9ad710b 100644 --- a/nbclient/client.py +++ b/nbclient/client.py @@ -4,6 +4,7 @@ import base64 import collections import datetime +import re import signal import typing as t from contextlib import asynccontextmanager, contextmanager @@ -28,6 +29,9 @@ from .output_widget import OutputWidget from .util import ensure_async, run_hook, run_sync +_RGX_CARRIAGERETURN = re.compile(r".*\r(?=[^\n])") +_RGX_BACKSPACE = re.compile(r"[^\n]\b") + def timestamp(msg: t.Optional[t.Dict] = None) -> str: """Get the timestamp for a message.""" @@ -426,6 +430,14 @@ def _kernel_manager_class_default(self) -> t.Type[KernelManager]: ) ) + coalesce_streams = Bool( + help=dedent( + """ + Merge all stream outputs with shared names into single streams. + """ + ) + ) + def __init__(self, nb: NotebookNode, km: t.Optional[KernelManager] = None, **kw: t.Any) -> None: """Initializes the execution manager. @@ -1006,6 +1018,44 @@ async def async_execute_cell( self.on_cell_executed, cell=cell, cell_index=cell_index, execute_reply=exec_reply ) await self._check_raise_for_error(cell, cell_index, exec_reply) + + if self.coalesce_streams and cell.outputs: + new_outputs = [] + streams: dict[str, NotebookNode] = {} + for output in cell.outputs: + if output["output_type"] == "stream": + if output["name"] in streams: + streams[output["name"]]["text"] += output["text"] + else: + new_outputs.append(output) + streams[output["name"]] = output + else: + new_outputs.append(output) + + # process \r and \b characters + for output in streams.values(): + old = output["text"] + while len(output["text"]) < len(old): + old = output["text"] + # Cancel out anything-but-newline followed by backspace + output["text"] = _RGX_BACKSPACE.sub("", output["text"]) + # Replace all carriage returns not followed by newline + output["text"] = _RGX_CARRIAGERETURN.sub("", output["text"]) + + # We also want to ensure stdout and stderr are always in the same consecutive order, + # because they are asynchronous, so order isn't guaranteed. + for i, output in enumerate(new_outputs): + if output["output_type"] == "stream" and output["name"] == "stderr": + if ( + len(new_outputs) >= i + 2 + and new_outputs[i + 1]["output_type"] == "stream" + and new_outputs[i + 1]["name"] == "stdout" + ): + stdout = new_outputs.pop(i + 1) + new_outputs.insert(i, stdout) + + cell.outputs = new_outputs + self.nb['cells'][cell_index] = cell return cell diff --git a/nbclient/tests/test_client.py b/nbclient/tests/test_client.py index 0c75698..946cc33 100644 --- a/nbclient/tests/test_client.py +++ b/nbclient/tests/test_client.py @@ -1830,3 +1830,34 @@ def test_error_async_cell_hooks(self, executor, cell_mock, message_mock): hooks["on_notebook_start"].assert_not_called() hooks["on_notebook_complete"].assert_not_called() hooks["on_notebook_error"].assert_not_called() + + @prepare_cell_mocks( + { + 'msg_type': 'stream', + 'header': {'msg_type': 'stream'}, + 'content': {'name': 'stdout', 'text': 'foo1'}, + }, + { + 'msg_type': 'stream', + 'header': {'msg_type': 'stream'}, + 'content': {'name': 'stderr', 'text': 'bar1'}, + }, + { + 'msg_type': 'stream', + 'header': {'msg_type': 'stream'}, + 'content': {'name': 'stdout', 'text': 'foo2'}, + }, + { + 'msg_type': 'stream', + 'header': {'msg_type': 'stream'}, + 'content': {'name': 'stderr', 'text': 'bar2'}, + }, + ) + def test_coalesce_streams(self, executor, cell_mock, message_mock): + executor.coalesce_streams = True + executor.execute_cell(cell_mock, 0) + + assert cell_mock.outputs == [ + {'output_type': 'stream', 'name': 'stdout', 'text': 'foo1foo2'}, + {'output_type': 'stream', 'name': 'stderr', 'text': 'bar1bar2'}, + ]