Skip to content

Commit

Permalink
v1: Dataflow graph (#35)
Browse files Browse the repository at this point in the history
* test: added tests to confirm information flow for simple file read/comparision commands and a bash in bash command using the provlog

* added pydot to to flake.nix

* fix: resolving merge conflicts

* tests:added tests to verify the graph for diff command and bash in bash command

* test:added tests for command not found and empty command

* test:added test for bash in bash pipe command

* feat: addes support to create a dataflow graph

* fix: create an edge from parent process to child process

* feat: v1 of dataflow graph completed

* console warning when file with O_RDWR is encountered

* fix: fixed the errors that occurred in pre commit command

* refactor: assigned better name to traversal function

* removed redundant file

* removed redundant file

* refactor: removed redundant code and comments

* replaced the pthread_id in process node with the kernel thread of the process that contains the pthread

* fix: fixed pre-commit errors

* removed access mode from the filenode

* changed FileNode and ProcessNode from class to tuple

* feat: changed the logic to produce dataflow graph to create an edge from parent process to file node when the pthread writes to the file

* feat: ensure the shared files in pthreads and the parent process

* removed redundant comments

* inserted command executed in the process node

* fix: removed mention of dev/tty from the command

* removed file key from FileNode class

* feat: added lables to the edges

* feat: represented the file using InodeOnDevice class

* changed the declaration of dictionary to insert value when key does not exist in the dictionary

* fix: fixed import paths due to structural change in the project and made changes to accomodate type changes in op struct

* fixed pre-commit checks and errors

* fix: removed __ini__.py file from probe_src folder

* refactor: used access mode flags from os package instead of integers to identify accessmode

* refactor: created a function for property label in FileNode dataclass

* added comment to explain the use of shared_file set and ensured shared_files are empty when a new process is encountered

* changed the datatype of cmd property of ProcessNode dataclass

* edited tasks.md to include task of capturing commands and arguments

* removed try..except block and removed init files from namespace package
  • Loading branch information
Shofiya2003 authored Aug 22, 2024
1 parent aa659eb commit 483f50e
Show file tree
Hide file tree
Showing 4 changed files with 204 additions and 27 deletions.
Empty file.
199 changes: 176 additions & 23 deletions probe_src/python/probe_py/manual/analysis.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
import typing
from typing import Dict, Tuple
import networkx as nx # type: ignore
from probe_py.generated.ops import Op, CloneOp, ExecOp, WaitOp, OpenOp, CloseOp, InitProcessOp, InitExecEpochOp, InitThreadOp, StatOp
from probe_py.generated.parser import ProvLog
from probe_py.generated import parser
from enum import IntEnum

import rich
import sys
from dataclasses import dataclass
import pathlib
import os
import collections

# TODO: implement this in probe_py.generated.ops
class TaskType(IntEnum):
Expand All @@ -17,10 +23,45 @@ class EdgeLabels(IntEnum):
PROGRAM_ORDER = 1
FORK_JOIN = 2
EXEC = 3


@dataclass(frozen=True)
class ProcessNode:
pid: int
cmd: tuple[str,...]

@dataclass(frozen=True)
class InodeOnDevice:
device_major: int
device_minor: int
inode: int

def __eq__(self, other: object) -> bool:
if not isinstance(other, InodeOnDevice):
return NotImplemented
return (self.device_major == other.device_major and
self.device_minor == other.device_minor and
self.inode == other.inode)
def __hash__(self) -> int:
return hash((self.device_major, self.device_minor, self.inode))

@dataclass(frozen=True)
class FileNode:
inodeOnDevice: InodeOnDevice
version: int
file: str

@property
def label(self) -> str:
return f"{self.file} v{self.version}"

# type alias for a node
Node = Tuple[int, int, int, int]

# type for the edges
EdgeType = Tuple[Node, Node]

def validate_provlog(
provlog: ProvLog,
provlog: parser.ProvLog,
) -> list[str]:
ret = list[str]()
waited_processes = set[tuple[TaskType, int]]()
Expand Down Expand Up @@ -88,35 +129,29 @@ def validate_provlog(
# TODO: Rename "digraph" to "hb_graph" in the entire project.
# Digraph (aka "directed graph") is too vague a term; the proper name is "happens-before graph".
# Later on, we will have a function that transforms an hb graph to file graph (both of which are digraphs)
def provlog_to_digraph(process_tree_prov_log: ProvLog) -> nx.DiGraph:
def provlog_to_digraph(process_tree_prov_log: parser.ProvLog) -> nx.DiGraph:
# [pid, exec_epoch_no, tid, op_index]
Node: typing.TypeAlias = tuple[int, int, int, int]
program_order_edges = list[tuple[Node, Node]]()
fork_join_edges = list[tuple[Node, Node]]()
exec_edges = list[tuple[Node, Node]]()
nodes = list[Node]()
proc_to_ops = dict[tuple[int, int, int], list[Node]]()
last_exec_epoch = dict[int, int]()
for pid, process in process_tree_prov_log.processes.items():

for exec_epoch_no, exec_epoch in process.exec_epochs.items():
# to find the last executing epoch of the process
last_exec_epoch[pid] = max(last_exec_epoch.get(pid, 0), exec_epoch_no)

# Reduce each thread to the ops we actually care about
for tid, thread in exec_epoch.threads.items():
context = (pid, exec_epoch_no, tid)
ops = list[Node]()

# Filter just the ops we are interested in
op_index = 0
for op_index, op in enumerate(thread.ops):
ops.append((*context, op_index))

# Add just those ops to the graph
nodes.extend(ops)
program_order_edges.extend(zip(ops[:-1], ops[1:]))

program_order_edges.extend(zip(ops[:-1], ops[1:]))
# Store these so we can hook up forks/joins between threads
proc_to_ops[context] = ops

Expand All @@ -135,6 +170,7 @@ def get_first_pthread(pid: int, exid: int, target_pthread_id: int) -> list[Node]
for op_index, op in enumerate(thread.ops):
if op.pthread_id == target_pthread_id:
ret.append((pid, exid, tid, op_index))
break
return ret

def get_last_pthread(pid: int, exid: int, target_pthread_id: int) -> list[Node]:
Expand All @@ -145,6 +181,7 @@ def get_last_pthread(pid: int, exid: int, target_pthread_id: int) -> list[Node]:
for op_index, op in list(enumerate(thread.ops))[::-1]:
if op.pthread_id == target_pthread_id:
ret.append((pid, exid, tid, op_index))
break
return ret

# Hook up forks/joins
Expand Down Expand Up @@ -199,12 +236,128 @@ def add_edges(edges:list[tuple[Node, Node]], label:EdgeLabels) -> None:
add_edges(fork_join_edges, EdgeLabels.FORK_JOIN)
return process_graph

def traverse_hb_for_dfgraph(process_tree_prov_log: parser.ProvLog, starting_node: Node, traversed: set[int] , dataflow_graph:nx.DiGraph, file_version_map: Dict[InodeOnDevice, int], shared_files: set[InodeOnDevice], cmd_map: Dict[int, list[str]]) -> None:
starting_pid = starting_node[0]

starting_op = prov_log_get_node(process_tree_prov_log, starting_node[0], starting_node[1], starting_node[2], starting_node[3])
process_graph = provlog_to_digraph(process_tree_prov_log)

edges = list_edges_from_start_node(process_graph, starting_node)
name_map = collections.defaultdict[InodeOnDevice, list[pathlib.Path]](list)

def prov_log_get_node(prov_log: ProvLog, pid: int, exec_epoch: int, tid: int, op_no: int) -> Op:
target_nodes = collections.defaultdict[int, list[Node]](list)
console = rich.console.Console(file=sys.stderr)

for edge in edges:
pid, exec_epoch_no, tid, op_index = edge[0]

# check if the process is already visited when waitOp occurred
if pid in traversed or tid in traversed:
continue

op = prov_log_get_node(process_tree_prov_log, pid, exec_epoch_no, tid, op_index).data
next_op = prov_log_get_node(process_tree_prov_log, edge[1][0], edge[1][1], edge[1][2], edge[1][3]).data
# when we move to a new process which is not a child process but an independent process we empty the shared_files
if edge[0][0]!=edge[1][0] and not isinstance(op, CloneOp) and not isinstance(next_op, WaitOp) and edge[1][1] == 0 and edge[1][3] == 0:
shared_files = set()
if isinstance(op, OpenOp):
access_mode = op.flags & os.O_ACCMODE
processNode = ProcessNode(pid=pid, cmd=tuple(cmd_map[pid]))
dataflow_graph.add_node(processNode, label=processNode.cmd)
file = InodeOnDevice(op.path.device_major, op.path.device_minor, op.path.inode)
path_str = op.path.path.decode("utf-8")
if access_mode == os.O_RDONLY:
curr_version = file_version_map[file]
fileNode = FileNode(file, curr_version, path_str)
dataflow_graph.add_node(fileNode, label = fileNode.label)
path = pathlib.Path(op.path.path.decode("utf-8"))
if path not in name_map[file]:
name_map[file].append(path)
dataflow_graph.add_edge(fileNode, processNode)
elif access_mode == os.O_WRONLY:
curr_version = file_version_map[file]
if file in shared_files:
fileNode2 = FileNode(file, curr_version, path_str)
dataflow_graph.add_node(fileNode2, label = fileNode2.label)
else:
file_version_map[file] = curr_version + 1
fileNode2 = FileNode(file, curr_version+1, path_str)
dataflow_graph.add_node(fileNode2, label = fileNode2.label)
if starting_pid == pid:
# shared_files: shared_files helps us keep track of the files shared between parent and child processes. This ensures that when the children write to the file, the version of the file is not incremented multiple times
shared_files.add(file)
path = pathlib.Path(op.path.path.decode("utf-8"))
if path not in name_map[file]:
name_map[file].append(path)
dataflow_graph.add_edge(processNode, fileNode2)
elif access_mode == 2:
console.print(f"Found file {path_str} with access mode O_RDWR", style="red")
else:
raise Exception("unknown access mode")
elif isinstance(op, CloneOp):
if op.task_type == TaskType.TASK_PID:
if edge[0][0] != edge[1][0]:
target_nodes[op.task_id].append(edge[1])
continue
elif op.task_type == TaskType.TASK_PTHREAD:
if edge[0][2] != edge[1][2]:
target_nodes[op.task_id].append(edge[1])
continue
if op.task_type != TaskType.TASK_PTHREAD and op.task_type != TaskType.TASK_ISO_C_THREAD:

processNode1 = ProcessNode(pid = pid, cmd=tuple(cmd_map[pid]))
processNode2 = ProcessNode(pid = op.task_id, cmd=tuple(cmd_map[op.task_id]))
dataflow_graph.add_node(processNode1, label = processNode1.cmd)
dataflow_graph.add_node(processNode2, label = processNode2.cmd)
dataflow_graph.add_edge(processNode1, processNode2)
target_nodes[op.task_id] = list()
elif isinstance(op, WaitOp) and op.options == 0:
for node in target_nodes[op.task_id]:
traverse_hb_for_dfgraph(process_tree_prov_log, node, traversed, dataflow_graph, file_version_map, shared_files, cmd_map)
traversed.add(node[2])
# return back to the WaitOp of the parent process
if isinstance(next_op, WaitOp):
if next_op.task_id == starting_pid or next_op.task_id == starting_op.pthread_id:
return
return

def list_edges_from_start_node(graph: nx.DiGraph, start_node: Node) -> list[EdgeType]:
all_edges = list(graph.edges())
start_index = next(i for i, edge in enumerate(all_edges) if edge[0] == start_node)
ordered_edges = all_edges[start_index:] + all_edges[:start_index]
return ordered_edges

def provlog_to_dataflow_graph(process_tree_prov_log: parser.ProvLog) -> nx.DiGraph:
dataflow_graph = nx.DiGraph()
file_version_map = collections.defaultdict[InodeOnDevice, int](lambda: 0)
process_graph = provlog_to_digraph(process_tree_prov_log)
root_node = [n for n in process_graph.nodes() if process_graph.out_degree(n) > 0 and process_graph.in_degree(n) == 0][0]
traversed: set[int] = set()
cmd:list[str] = []
cmd_map = collections.defaultdict[int, list[str]](list)
for edge in list(nx.edges(process_graph))[::-1]:
pid, exec_epoch_no, tid, op_index = edge[0]
op = prov_log_get_node(process_tree_prov_log, pid, exec_epoch_no, tid, op_index).data
if isinstance(op, OpenOp):
file = op.path.path.decode("utf-8")
if file not in cmd and file!="/dev/tty":
cmd.insert(0, file)
elif isinstance(op, InitExecEpochOp):
cmd.insert(0, op.program_name.decode("utf-8"))
if pid == tid and exec_epoch_no == 0:
cmd_map[tid] = cmd
cmd = []
shared_files:set[InodeOnDevice] = set()
traverse_hb_for_dfgraph(process_tree_prov_log, root_node, traversed, dataflow_graph, file_version_map, shared_files, cmd_map)
pydot_graph = nx.drawing.nx_pydot.to_pydot(dataflow_graph)
dot_string = pydot_graph.to_string()
return dot_string

def prov_log_get_node(prov_log: parser.ProvLog, pid: int, exec_epoch: int, tid: int, op_no: int) -> Op:
return prov_log.processes[pid].exec_epochs[exec_epoch].threads[tid].ops[op_no]


def validate_hb_closes(provlog: ProvLog, process_graph: nx.DiGraph) -> list[str]:
def validate_hb_closes(provlog: parser.ProvLog, process_graph: nx.DiGraph) -> list[str]:
# Note that this test doesn't work if a process "intentionally" leaves a fd open for its child.
# E.g., bash-in-pipe
provlog_reverse = process_graph.reverse()
Expand All @@ -224,7 +377,7 @@ def validate_hb_closes(provlog: ProvLog, process_graph: nx.DiGraph) -> list[str]
return ret


def validate_hb_waits(provlog: ProvLog, process_graph: nx.DiGraph) -> list[str]:
def validate_hb_waits(provlog: parser.ProvLog, process_graph: nx.DiGraph) -> list[str]:
provlog_reverse = process_graph.reverse()
ret = list[str]()
for node in process_graph.nodes:
Expand All @@ -239,7 +392,7 @@ def validate_hb_waits(provlog: ProvLog, process_graph: nx.DiGraph) -> list[str]:
ret.append(f"Wait of {op.data.task_id} in {node} is not preceeded by corresponding clone")
return ret

def validate_hb_clones(provlog: ProvLog, process_graph: nx.DiGraph) -> list[str]:
def validate_hb_clones(provlog: parser.ProvLog, process_graph: nx.DiGraph) -> list[str]:
ret = list[str]()
for node in process_graph.nodes:
op = prov_log_get_node(provlog, *node)
Expand Down Expand Up @@ -268,7 +421,7 @@ def validate_hb_clones(provlog: ProvLog, process_graph: nx.DiGraph) -> list[str]
return ret


def validate_hb_degree(provlog: ProvLog, process_graph: nx.DiGraph) -> list[str]:
def validate_hb_degree(provlog: parser.ProvLog, process_graph: nx.DiGraph) -> list[str]:
ret = list[str]()
found_entry = False
found_exit = False
Expand All @@ -290,7 +443,7 @@ def validate_hb_degree(provlog: ProvLog, process_graph: nx.DiGraph) -> list[str]
return ret


def validate_hb_acyclic(provlog: ProvLog, process_graph: nx.DiGraph) -> list[str]:
def validate_hb_acyclic(provlog: parser.ProvLog, process_graph: nx.DiGraph) -> list[str]:
try:
cycle = nx.find_cycle(process_graph)
except nx.NetworkXNoCycle:
Expand All @@ -299,7 +452,7 @@ def validate_hb_acyclic(provlog: ProvLog, process_graph: nx.DiGraph) -> list[str
return [f"Cycle detected: {cycle}"]


def validate_hb_execs(provlog: ProvLog, process_graph: nx.DiGraph) -> list[str]:
def validate_hb_execs(provlog: parser.ProvLog, process_graph: nx.DiGraph) -> list[str]:
ret = list[str]()
for node0 in process_graph.nodes():
pid0, eid0, tid0, op0 = node0
Expand All @@ -317,7 +470,7 @@ def validate_hb_execs(provlog: ProvLog, process_graph: nx.DiGraph) -> list[str]:
return ret


def validate_hb_graph(processes: ProvLog, hb_graph: nx.DiGraph) -> list[str]:
def validate_hb_graph(processes: parser.ProvLog, hb_graph: nx.DiGraph) -> list[str]:
ret = list[str]()
# ret.extend(validate_hb_closes(processes, hb_graph))
ret.extend(validate_hb_waits(processes, hb_graph))
Expand All @@ -338,8 +491,7 @@ def relax_node(graph: nx.DiGraph, node: typing.Any) -> list[tuple[typing.Any, ty
graph.remove_node(node)
return ret


def digraph_to_pydot_string(prov_log: ProvLog, process_graph: nx.DiGraph) -> str:
def digraph_to_pydot_string(prov_log: parser.ProvLog, process_graph: nx.DiGraph) -> str:

label_color_map = {
EdgeLabels.EXEC: 'yellow',
Expand Down Expand Up @@ -376,7 +528,8 @@ def digraph_to_pydot_string(prov_log: ProvLog, process_graph: nx.DiGraph) -> str
return dot_string


def construct_process_graph(process_tree_prov_log: ProvLog) -> str:

def construct_process_graph(process_tree_prov_log: parser.ProvLog) -> str:
"""
Construct a happens-before graph from process_tree_prov_log
Expand Down
29 changes: 26 additions & 3 deletions probe_src/python/probe_py/manual/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@
import shutil
import rich
from probe_py.generated.parser import parse_probe_log
from . import analysis
from . import util
from probe_py.manual import analysis
from probe_py.manual import util

rich.traceback.install(show_locals=False)


project_root = pathlib.Path(__file__).resolve().parent.parent
project_root = pathlib.Path(__file__).resolve().parent.parent.parent.parent

A = typing_extensions.Annotated

Expand Down Expand Up @@ -124,6 +124,29 @@ def process_graph(
console.print(warning, style="red")
print(analysis.digraph_to_pydot_string(prov_log, process_graph))

@app.command()
def dataflow_graph(
input: pathlib.Path = pathlib.Path("probe_log"),
) -> None:
"""
Write a process graph from PROBE_LOG in DOT/graphviz format.
"""
if not input.exists():
typer.secho(f"INPUT {input} does not exist\nUse `PROBE record --output {input} CMD...` to rectify", fg=typer.colors.RED)
raise typer.Abort()
probe_log_tar_obj = tarfile.open(input, "r")
prov_log = parse_probe_log(input)
probe_log_tar_obj.close()
console = rich.console.Console(file=sys.stderr)
process_graph = analysis.provlog_to_digraph(prov_log)
for warning in analysis.validate_provlog(prov_log):
console.print(warning, style="red")
rich.traceback.install(show_locals=False) # Figure out why we need this
process_graph = analysis.provlog_to_digraph(prov_log)
for warning in analysis.validate_hb_graph(prov_log, process_graph):
console.print(warning, style="red")
print(analysis.provlog_to_dataflow_graph(prov_log))


@app.command()
def dump(
Expand Down
3 changes: 2 additions & 1 deletion probe_src/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,11 @@
- [ ] Write interesting performance tests, using `benchmark/workloads.py` as inspiration.
- [ ] Output conversions
- [ ] From the NetworkX digraph, export (Shofiya is working on this):
- [ ] A dataflow graph, showing only files, processes, and the flow of information between them. The following rules define when there is an edge:
- [x] A dataflow graph, showing only files, processes, and the flow of information between them. The following rules define when there is an edge:
1. Data flows from a file to a process if on any thread there is an OpenOp with the flags set to `O_RDWR` or `O_RDONLY`.
2. Data flows from a process to a process if one process CloneOp's the other.
3. Data flows from a process to a file if on any thread there is a OpenOp with the flags set to `O_RDWR` or `O_WRONLY`.
- [ ] Capture the command of the process in libprobe. Currently, the `cmd` property of the `ProcessNode` in the dataflow graph is assigned a value using a map that associates each `process_id` with the corresponding command. This map is generated by a stop-gap function that constructs a command string using the `program-name` from `InitExecEpochOp` and the `file` from `OpenOp`. However, this function often fails to generate the correct command for certain operations. For instance, it struggles with cases such as `cat a.txt; cat a.txt` and the execution of compiled C programs like `./compiled_c_file`.
- [ ] [Process Run Crate](https://www.researchobject.org/workflow-run-crate/profiles/process_run_crate/) (Saleha is working on this)
- [ ] [Common Workflow Language](https://www.commonwl.org/)
- [ ] Write a test that runs the resulting CWL.
Expand Down

0 comments on commit 483f50e

Please sign in to comment.