diff --git a/probe_src/python/probe_py/manual/cli.py b/probe_src/python/probe_py/manual/cli.py index 8268ffd4..85e23be2 100644 --- a/probe_src/python/probe_py/manual/cli.py +++ b/probe_src/python/probe_py/manual/cli.py @@ -1,3 +1,6 @@ +import typing +import dataclasses +import json from typing_extensions import Annotated import pathlib import typer @@ -413,6 +416,53 @@ def nextflow( script = g.generate_workflow(dataflow_graph) output.write_text(script) + +@export_app.command() +def ops_jsonl( + probe_log: Annotated[ + pathlib.Path, + typer.Argument(help="output file written by `probe record -o $file`."), + ] = pathlib.Path("probe_log"), +) -> None: + """ + Export each op to a JSON line. + + The format is subject to change as PROBE evolves. Use with caution! + """ + + def filter_nested_dict( + dct: typing.Mapping[typing.Any, typing.Any], + ) -> typing.Mapping[typing.Any, typing.Any]: + """Converts the bytes in a nested dict to a string""" + return { + key: ( + # If dict, Recurse self + filter_nested_dict(val) if isinstance(val, dict) else + # If bytes, decode to string + val.decode(errors="surrogateescape") if isinstance(val, bytes) else + # Else, do nothing + val + ) + for key, val in dct.items() + } + stdout_console = rich.console.Console() + prov_log = parse_probe_log(probe_log) + for pid, process in prov_log.processes.items(): + for exec_epoch_no, exec_epoch in process.exec_epochs.items(): + for tid, thread in exec_epoch.threads.items(): + for i, op in enumerate(thread.ops): + stdout_console.print_json(json.dumps({ + "pid": pid, + "tid": tid, + "exec_epoch_no": exec_epoch_no, + "i": i, + "op": filter_nested_dict( + dataclasses.asdict(op), + ), + "op_data_type": type(op.data).__name__, + })) + + # Example: scp Desktop/sample_example.txt root@136.183.142.28:/home/remote_dir @app.command( context_settings=dict(