From 483f50e2ebbf37ea53d3914caaec58fefa7f6ab1 Mon Sep 17 00:00:00 2001 From: Shofiya <86974918+Shofiya2003@users.noreply.github.com> Date: Thu, 22 Aug 2024 22:33:21 +0530 Subject: [PATCH] v1: Dataflow graph (#35) * test: added tests to confirm information flow for simple file read/comparision commands and a bash in bash command using the provlog * added pydot to to flake.nix * fix: resolving merge conflicts * tests:added tests to verify the graph for diff command and bash in bash command * test:added tests for command not found and empty command * test:added test for bash in bash pipe command * feat: addes support to create a dataflow graph * fix: create an edge from parent process to child process * feat: v1 of dataflow graph completed * console warning when file with O_RDWR is encountered * fix: fixed the errors that occurred in pre commit command * refactor: assigned better name to traversal function * removed redundant file * removed redundant file * refactor: removed redundant code and comments * replaced the pthread_id in process node with the kernel thread of the process that contains the pthread * fix: fixed pre-commit errors * removed access mode from the filenode * changed FileNode and ProcessNode from class to tuple * feat: changed the logic to produce dataflow graph to create an edge from parent process to file node when the pthread writes to the file * feat: ensure the shared files in pthreads and the parent process * removed redundant comments * inserted command executed in the process node * fix: removed mention of dev/tty from the command * removed file key from FileNode class * feat: added lables to the edges * feat: represented the file using InodeOnDevice class * changed the declaration of dictionary to insert value when key does not exist in the dictionary * fix: fixed import paths due to structural change in the project and made changes to accomodate type changes in op struct * fixed pre-commit checks and errors * fix: removed __ini__.py file from probe_src folder * refactor: used access mode flags from os package instead of integers to identify accessmode * refactor: created a function for property label in FileNode dataclass * added comment to explain the use of shared_file set and ensured shared_files are empty when a new process is encountered * changed the datatype of cmd property of ProcessNode dataclass * edited tasks.md to include task of capturing commands and arguments * removed try..except block and removed init files from namespace package --- .../python/probe_py/__init__.py | 0 probe_src/python/probe_py/manual/analysis.py | 199 ++++++++++++++++-- probe_src/python/probe_py/manual/cli.py | 29 ++- probe_src/tasks.md | 3 +- 4 files changed, 204 insertions(+), 27 deletions(-) create mode 100644 probe_src/probe_frontend/python/probe_py/__init__.py diff --git a/probe_src/probe_frontend/python/probe_py/__init__.py b/probe_src/probe_frontend/python/probe_py/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/probe_src/python/probe_py/manual/analysis.py b/probe_src/python/probe_py/manual/analysis.py index e3218856..bb24beee 100644 --- a/probe_src/python/probe_py/manual/analysis.py +++ b/probe_src/python/probe_py/manual/analysis.py @@ -1,9 +1,15 @@ import typing +from typing import Dict, Tuple import networkx as nx # type: ignore from probe_py.generated.ops import Op, CloneOp, ExecOp, WaitOp, OpenOp, CloseOp, InitProcessOp, InitExecEpochOp, InitThreadOp, StatOp -from probe_py.generated.parser import ProvLog +from probe_py.generated import parser from enum import IntEnum - +import rich +import sys +from dataclasses import dataclass +import pathlib +import os +import collections # TODO: implement this in probe_py.generated.ops class TaskType(IntEnum): @@ -17,10 +23,45 @@ class EdgeLabels(IntEnum): PROGRAM_ORDER = 1 FORK_JOIN = 2 EXEC = 3 - + +@dataclass(frozen=True) +class ProcessNode: + pid: int + cmd: tuple[str,...] + +@dataclass(frozen=True) +class InodeOnDevice: + device_major: int + device_minor: int + inode: int + + def __eq__(self, other: object) -> bool: + if not isinstance(other, InodeOnDevice): + return NotImplemented + return (self.device_major == other.device_major and + self.device_minor == other.device_minor and + self.inode == other.inode) + def __hash__(self) -> int: + return hash((self.device_major, self.device_minor, self.inode)) + +@dataclass(frozen=True) +class FileNode: + inodeOnDevice: InodeOnDevice + version: int + file: str + + @property + def label(self) -> str: + return f"{self.file} v{self.version}" + +# type alias for a node +Node = Tuple[int, int, int, int] + +# type for the edges +EdgeType = Tuple[Node, Node] def validate_provlog( - provlog: ProvLog, + provlog: parser.ProvLog, ) -> list[str]: ret = list[str]() waited_processes = set[tuple[TaskType, int]]() @@ -88,9 +129,8 @@ def validate_provlog( # TODO: Rename "digraph" to "hb_graph" in the entire project. # Digraph (aka "directed graph") is too vague a term; the proper name is "happens-before graph". # Later on, we will have a function that transforms an hb graph to file graph (both of which are digraphs) -def provlog_to_digraph(process_tree_prov_log: ProvLog) -> nx.DiGraph: +def provlog_to_digraph(process_tree_prov_log: parser.ProvLog) -> nx.DiGraph: # [pid, exec_epoch_no, tid, op_index] - Node: typing.TypeAlias = tuple[int, int, int, int] program_order_edges = list[tuple[Node, Node]]() fork_join_edges = list[tuple[Node, Node]]() exec_edges = list[tuple[Node, Node]]() @@ -98,25 +138,20 @@ def provlog_to_digraph(process_tree_prov_log: ProvLog) -> nx.DiGraph: proc_to_ops = dict[tuple[int, int, int], list[Node]]() last_exec_epoch = dict[int, int]() for pid, process in process_tree_prov_log.processes.items(): - for exec_epoch_no, exec_epoch in process.exec_epochs.items(): # to find the last executing epoch of the process last_exec_epoch[pid] = max(last_exec_epoch.get(pid, 0), exec_epoch_no) - # Reduce each thread to the ops we actually care about for tid, thread in exec_epoch.threads.items(): context = (pid, exec_epoch_no, tid) ops = list[Node]() - # Filter just the ops we are interested in op_index = 0 for op_index, op in enumerate(thread.ops): ops.append((*context, op_index)) - # Add just those ops to the graph nodes.extend(ops) - program_order_edges.extend(zip(ops[:-1], ops[1:])) - + program_order_edges.extend(zip(ops[:-1], ops[1:])) # Store these so we can hook up forks/joins between threads proc_to_ops[context] = ops @@ -135,6 +170,7 @@ def get_first_pthread(pid: int, exid: int, target_pthread_id: int) -> list[Node] for op_index, op in enumerate(thread.ops): if op.pthread_id == target_pthread_id: ret.append((pid, exid, tid, op_index)) + break return ret def get_last_pthread(pid: int, exid: int, target_pthread_id: int) -> list[Node]: @@ -145,6 +181,7 @@ def get_last_pthread(pid: int, exid: int, target_pthread_id: int) -> list[Node]: for op_index, op in list(enumerate(thread.ops))[::-1]: if op.pthread_id == target_pthread_id: ret.append((pid, exid, tid, op_index)) + break return ret # Hook up forks/joins @@ -199,12 +236,128 @@ def add_edges(edges:list[tuple[Node, Node]], label:EdgeLabels) -> None: add_edges(fork_join_edges, EdgeLabels.FORK_JOIN) return process_graph +def traverse_hb_for_dfgraph(process_tree_prov_log: parser.ProvLog, starting_node: Node, traversed: set[int] , dataflow_graph:nx.DiGraph, file_version_map: Dict[InodeOnDevice, int], shared_files: set[InodeOnDevice], cmd_map: Dict[int, list[str]]) -> None: + starting_pid = starting_node[0] + + starting_op = prov_log_get_node(process_tree_prov_log, starting_node[0], starting_node[1], starting_node[2], starting_node[3]) + process_graph = provlog_to_digraph(process_tree_prov_log) + + edges = list_edges_from_start_node(process_graph, starting_node) + name_map = collections.defaultdict[InodeOnDevice, list[pathlib.Path]](list) -def prov_log_get_node(prov_log: ProvLog, pid: int, exec_epoch: int, tid: int, op_no: int) -> Op: + target_nodes = collections.defaultdict[int, list[Node]](list) + console = rich.console.Console(file=sys.stderr) + + for edge in edges: + pid, exec_epoch_no, tid, op_index = edge[0] + + # check if the process is already visited when waitOp occurred + if pid in traversed or tid in traversed: + continue + + op = prov_log_get_node(process_tree_prov_log, pid, exec_epoch_no, tid, op_index).data + next_op = prov_log_get_node(process_tree_prov_log, edge[1][0], edge[1][1], edge[1][2], edge[1][3]).data + # when we move to a new process which is not a child process but an independent process we empty the shared_files + if edge[0][0]!=edge[1][0] and not isinstance(op, CloneOp) and not isinstance(next_op, WaitOp) and edge[1][1] == 0 and edge[1][3] == 0: + shared_files = set() + if isinstance(op, OpenOp): + access_mode = op.flags & os.O_ACCMODE + processNode = ProcessNode(pid=pid, cmd=tuple(cmd_map[pid])) + dataflow_graph.add_node(processNode, label=processNode.cmd) + file = InodeOnDevice(op.path.device_major, op.path.device_minor, op.path.inode) + path_str = op.path.path.decode("utf-8") + if access_mode == os.O_RDONLY: + curr_version = file_version_map[file] + fileNode = FileNode(file, curr_version, path_str) + dataflow_graph.add_node(fileNode, label = fileNode.label) + path = pathlib.Path(op.path.path.decode("utf-8")) + if path not in name_map[file]: + name_map[file].append(path) + dataflow_graph.add_edge(fileNode, processNode) + elif access_mode == os.O_WRONLY: + curr_version = file_version_map[file] + if file in shared_files: + fileNode2 = FileNode(file, curr_version, path_str) + dataflow_graph.add_node(fileNode2, label = fileNode2.label) + else: + file_version_map[file] = curr_version + 1 + fileNode2 = FileNode(file, curr_version+1, path_str) + dataflow_graph.add_node(fileNode2, label = fileNode2.label) + if starting_pid == pid: + # shared_files: shared_files helps us keep track of the files shared between parent and child processes. This ensures that when the children write to the file, the version of the file is not incremented multiple times + shared_files.add(file) + path = pathlib.Path(op.path.path.decode("utf-8")) + if path not in name_map[file]: + name_map[file].append(path) + dataflow_graph.add_edge(processNode, fileNode2) + elif access_mode == 2: + console.print(f"Found file {path_str} with access mode O_RDWR", style="red") + else: + raise Exception("unknown access mode") + elif isinstance(op, CloneOp): + if op.task_type == TaskType.TASK_PID: + if edge[0][0] != edge[1][0]: + target_nodes[op.task_id].append(edge[1]) + continue + elif op.task_type == TaskType.TASK_PTHREAD: + if edge[0][2] != edge[1][2]: + target_nodes[op.task_id].append(edge[1]) + continue + if op.task_type != TaskType.TASK_PTHREAD and op.task_type != TaskType.TASK_ISO_C_THREAD: + + processNode1 = ProcessNode(pid = pid, cmd=tuple(cmd_map[pid])) + processNode2 = ProcessNode(pid = op.task_id, cmd=tuple(cmd_map[op.task_id])) + dataflow_graph.add_node(processNode1, label = processNode1.cmd) + dataflow_graph.add_node(processNode2, label = processNode2.cmd) + dataflow_graph.add_edge(processNode1, processNode2) + target_nodes[op.task_id] = list() + elif isinstance(op, WaitOp) and op.options == 0: + for node in target_nodes[op.task_id]: + traverse_hb_for_dfgraph(process_tree_prov_log, node, traversed, dataflow_graph, file_version_map, shared_files, cmd_map) + traversed.add(node[2]) + # return back to the WaitOp of the parent process + if isinstance(next_op, WaitOp): + if next_op.task_id == starting_pid or next_op.task_id == starting_op.pthread_id: + return + return + +def list_edges_from_start_node(graph: nx.DiGraph, start_node: Node) -> list[EdgeType]: + all_edges = list(graph.edges()) + start_index = next(i for i, edge in enumerate(all_edges) if edge[0] == start_node) + ordered_edges = all_edges[start_index:] + all_edges[:start_index] + return ordered_edges + +def provlog_to_dataflow_graph(process_tree_prov_log: parser.ProvLog) -> nx.DiGraph: + dataflow_graph = nx.DiGraph() + file_version_map = collections.defaultdict[InodeOnDevice, int](lambda: 0) + process_graph = provlog_to_digraph(process_tree_prov_log) + root_node = [n for n in process_graph.nodes() if process_graph.out_degree(n) > 0 and process_graph.in_degree(n) == 0][0] + traversed: set[int] = set() + cmd:list[str] = [] + cmd_map = collections.defaultdict[int, list[str]](list) + for edge in list(nx.edges(process_graph))[::-1]: + pid, exec_epoch_no, tid, op_index = edge[0] + op = prov_log_get_node(process_tree_prov_log, pid, exec_epoch_no, tid, op_index).data + if isinstance(op, OpenOp): + file = op.path.path.decode("utf-8") + if file not in cmd and file!="/dev/tty": + cmd.insert(0, file) + elif isinstance(op, InitExecEpochOp): + cmd.insert(0, op.program_name.decode("utf-8")) + if pid == tid and exec_epoch_no == 0: + cmd_map[tid] = cmd + cmd = [] + shared_files:set[InodeOnDevice] = set() + traverse_hb_for_dfgraph(process_tree_prov_log, root_node, traversed, dataflow_graph, file_version_map, shared_files, cmd_map) + pydot_graph = nx.drawing.nx_pydot.to_pydot(dataflow_graph) + dot_string = pydot_graph.to_string() + return dot_string + +def prov_log_get_node(prov_log: parser.ProvLog, pid: int, exec_epoch: int, tid: int, op_no: int) -> Op: return prov_log.processes[pid].exec_epochs[exec_epoch].threads[tid].ops[op_no] -def validate_hb_closes(provlog: ProvLog, process_graph: nx.DiGraph) -> list[str]: +def validate_hb_closes(provlog: parser.ProvLog, process_graph: nx.DiGraph) -> list[str]: # Note that this test doesn't work if a process "intentionally" leaves a fd open for its child. # E.g., bash-in-pipe provlog_reverse = process_graph.reverse() @@ -224,7 +377,7 @@ def validate_hb_closes(provlog: ProvLog, process_graph: nx.DiGraph) -> list[str] return ret -def validate_hb_waits(provlog: ProvLog, process_graph: nx.DiGraph) -> list[str]: +def validate_hb_waits(provlog: parser.ProvLog, process_graph: nx.DiGraph) -> list[str]: provlog_reverse = process_graph.reverse() ret = list[str]() for node in process_graph.nodes: @@ -239,7 +392,7 @@ def validate_hb_waits(provlog: ProvLog, process_graph: nx.DiGraph) -> list[str]: ret.append(f"Wait of {op.data.task_id} in {node} is not preceeded by corresponding clone") return ret -def validate_hb_clones(provlog: ProvLog, process_graph: nx.DiGraph) -> list[str]: +def validate_hb_clones(provlog: parser.ProvLog, process_graph: nx.DiGraph) -> list[str]: ret = list[str]() for node in process_graph.nodes: op = prov_log_get_node(provlog, *node) @@ -268,7 +421,7 @@ def validate_hb_clones(provlog: ProvLog, process_graph: nx.DiGraph) -> list[str] return ret -def validate_hb_degree(provlog: ProvLog, process_graph: nx.DiGraph) -> list[str]: +def validate_hb_degree(provlog: parser.ProvLog, process_graph: nx.DiGraph) -> list[str]: ret = list[str]() found_entry = False found_exit = False @@ -290,7 +443,7 @@ def validate_hb_degree(provlog: ProvLog, process_graph: nx.DiGraph) -> list[str] return ret -def validate_hb_acyclic(provlog: ProvLog, process_graph: nx.DiGraph) -> list[str]: +def validate_hb_acyclic(provlog: parser.ProvLog, process_graph: nx.DiGraph) -> list[str]: try: cycle = nx.find_cycle(process_graph) except nx.NetworkXNoCycle: @@ -299,7 +452,7 @@ def validate_hb_acyclic(provlog: ProvLog, process_graph: nx.DiGraph) -> list[str return [f"Cycle detected: {cycle}"] -def validate_hb_execs(provlog: ProvLog, process_graph: nx.DiGraph) -> list[str]: +def validate_hb_execs(provlog: parser.ProvLog, process_graph: nx.DiGraph) -> list[str]: ret = list[str]() for node0 in process_graph.nodes(): pid0, eid0, tid0, op0 = node0 @@ -317,7 +470,7 @@ def validate_hb_execs(provlog: ProvLog, process_graph: nx.DiGraph) -> list[str]: return ret -def validate_hb_graph(processes: ProvLog, hb_graph: nx.DiGraph) -> list[str]: +def validate_hb_graph(processes: parser.ProvLog, hb_graph: nx.DiGraph) -> list[str]: ret = list[str]() # ret.extend(validate_hb_closes(processes, hb_graph)) ret.extend(validate_hb_waits(processes, hb_graph)) @@ -338,8 +491,7 @@ def relax_node(graph: nx.DiGraph, node: typing.Any) -> list[tuple[typing.Any, ty graph.remove_node(node) return ret - -def digraph_to_pydot_string(prov_log: ProvLog, process_graph: nx.DiGraph) -> str: +def digraph_to_pydot_string(prov_log: parser.ProvLog, process_graph: nx.DiGraph) -> str: label_color_map = { EdgeLabels.EXEC: 'yellow', @@ -376,7 +528,8 @@ def digraph_to_pydot_string(prov_log: ProvLog, process_graph: nx.DiGraph) -> str return dot_string -def construct_process_graph(process_tree_prov_log: ProvLog) -> str: + +def construct_process_graph(process_tree_prov_log: parser.ProvLog) -> str: """ Construct a happens-before graph from process_tree_prov_log diff --git a/probe_src/python/probe_py/manual/cli.py b/probe_src/python/probe_py/manual/cli.py index 7edd14ac..39a7ec80 100644 --- a/probe_src/python/probe_py/manual/cli.py +++ b/probe_src/python/probe_py/manual/cli.py @@ -10,13 +10,13 @@ import shutil import rich from probe_py.generated.parser import parse_probe_log -from . import analysis -from . import util +from probe_py.manual import analysis +from probe_py.manual import util rich.traceback.install(show_locals=False) -project_root = pathlib.Path(__file__).resolve().parent.parent +project_root = pathlib.Path(__file__).resolve().parent.parent.parent.parent A = typing_extensions.Annotated @@ -124,6 +124,29 @@ def process_graph( console.print(warning, style="red") print(analysis.digraph_to_pydot_string(prov_log, process_graph)) +@app.command() +def dataflow_graph( + input: pathlib.Path = pathlib.Path("probe_log"), +) -> None: + """ + Write a process graph from PROBE_LOG in DOT/graphviz format. + """ + if not input.exists(): + typer.secho(f"INPUT {input} does not exist\nUse `PROBE record --output {input} CMD...` to rectify", fg=typer.colors.RED) + raise typer.Abort() + probe_log_tar_obj = tarfile.open(input, "r") + prov_log = parse_probe_log(input) + probe_log_tar_obj.close() + console = rich.console.Console(file=sys.stderr) + process_graph = analysis.provlog_to_digraph(prov_log) + for warning in analysis.validate_provlog(prov_log): + console.print(warning, style="red") + rich.traceback.install(show_locals=False) # Figure out why we need this + process_graph = analysis.provlog_to_digraph(prov_log) + for warning in analysis.validate_hb_graph(prov_log, process_graph): + console.print(warning, style="red") + print(analysis.provlog_to_dataflow_graph(prov_log)) + @app.command() def dump( diff --git a/probe_src/tasks.md b/probe_src/tasks.md index c183a211..4d8bd216 100644 --- a/probe_src/tasks.md +++ b/probe_src/tasks.md @@ -75,10 +75,11 @@ - [ ] Write interesting performance tests, using `benchmark/workloads.py` as inspiration. - [ ] Output conversions - [ ] From the NetworkX digraph, export (Shofiya is working on this): - - [ ] A dataflow graph, showing only files, processes, and the flow of information between them. The following rules define when there is an edge: + - [x] A dataflow graph, showing only files, processes, and the flow of information between them. The following rules define when there is an edge: 1. Data flows from a file to a process if on any thread there is an OpenOp with the flags set to `O_RDWR` or `O_RDONLY`. 2. Data flows from a process to a process if one process CloneOp's the other. 3. Data flows from a process to a file if on any thread there is a OpenOp with the flags set to `O_RDWR` or `O_WRONLY`. + - [ ] Capture the command of the process in libprobe. Currently, the `cmd` property of the `ProcessNode` in the dataflow graph is assigned a value using a map that associates each `process_id` with the corresponding command. This map is generated by a stop-gap function that constructs a command string using the `program-name` from `InitExecEpochOp` and the `file` from `OpenOp`. However, this function often fails to generate the correct command for certain operations. For instance, it struggles with cases such as `cat a.txt; cat a.txt` and the execution of compiled C programs like `./compiled_c_file`. - [ ] [Process Run Crate](https://www.researchobject.org/workflow-run-crate/profiles/process_run_crate/) (Saleha is working on this) - [ ] [Common Workflow Language](https://www.commonwl.org/) - [ ] Write a test that runs the resulting CWL.