Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v1: Dataflow graph #35

Merged
merged 42 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
1eb34cc
test: added tests to confirm information flow for simple file read/co…
Shofiya2003 Jun 27, 2024
8f9fe2f
added pydot to to flake.nix
Shofiya2003 Jun 27, 2024
d74c256
Merge remote-tracking branch 'origin/main' into tests
Shofiya2003 Jun 27, 2024
05c3338
fix: resolving merge conflicts
Shofiya2003 Jun 27, 2024
1f907fe
tests:added tests to verify the graph for diff command and bash in ba…
Shofiya2003 Jun 29, 2024
0ea01c3
test:added tests for command not found and empty command
Shofiya2003 Jul 2, 2024
53fe64d
test:added test for bash in bash pipe command
Shofiya2003 Jul 3, 2024
7789d89
Merge branch 'main' of https://github.com/Shofiya2003/prov-tracer
Shofiya2003 Jul 8, 2024
bce6357
feat: addes support to create a dataflow graph
Shofiya2003 Jul 10, 2024
17b3e12
Merge branch 'main' of https://github.com/charmoniumQ/PROBE into data…
Shofiya2003 Jul 16, 2024
3f68977
fix: create an edge from parent process to child process
Shofiya2003 Jul 22, 2024
dbc7cb5
Merge branch 'main' of https://github.com/charmoniumQ/PROBE into data…
Shofiya2003 Jul 23, 2024
6d061d0
feat: v1 of dataflow graph completed
Shofiya2003 Jul 23, 2024
d18f94e
console warning when file with O_RDWR is encountered
Shofiya2003 Jul 23, 2024
dccf10b
fix: fixed the errors that occurred in pre commit command
Shofiya2003 Jul 23, 2024
b9c45f2
refactor: assigned better name to traversal function
Shofiya2003 Jul 23, 2024
ab18598
removed redundant file
Shofiya2003 Jul 23, 2024
56fc186
removed redundant file
Shofiya2003 Jul 23, 2024
4e463cc
refactor: removed redundant code and comments
Shofiya2003 Jul 23, 2024
944cbe3
replaced the pthread_id in process node with the kernel thread of the…
Shofiya2003 Jul 25, 2024
33d5004
fix: fixed pre-commit errors
Shofiya2003 Jul 25, 2024
9caa6c2
removed access mode from the filenode
Shofiya2003 Jul 25, 2024
9fc1746
changed FileNode and ProcessNode from class to tuple
Shofiya2003 Jul 26, 2024
77b16d6
feat: changed the logic to produce dataflow graph to create an edge f…
Shofiya2003 Jul 27, 2024
9c967e3
feat: ensure the shared files in pthreads and the parent process
Shofiya2003 Jul 27, 2024
794e487
removed redundant comments
Shofiya2003 Jul 27, 2024
07a05e8
inserted command executed in the process node
Shofiya2003 Jul 27, 2024
a588708
fix: removed mention of dev/tty from the command
Shofiya2003 Jul 27, 2024
9d273e9
removed file key from FileNode class
Shofiya2003 Jul 27, 2024
72f4c99
feat: added lables to the edges
Shofiya2003 Jul 30, 2024
f0bc00a
feat: represented the file using InodeOnDevice class
Shofiya2003 Jul 30, 2024
802430c
changed the declaration of dictionary to insert value when key does n…
Shofiya2003 Jul 31, 2024
3db6ca0
Merge branch 'main' of https://github.com/charmoniumQ/PROBE into data…
Shofiya2003 Aug 7, 2024
071804d
fix: fixed import paths due to structural change in the project and m…
Shofiya2003 Aug 8, 2024
c0185f4
fixed pre-commit checks and errors
Shofiya2003 Aug 8, 2024
e567990
fix: removed __ini__.py file from probe_src folder
Shofiya2003 Aug 8, 2024
31f58f2
refactor: used access mode flags from os package instead of integers …
Shofiya2003 Aug 8, 2024
d055f0a
refactor: created a function for property label in FileNode dataclass
Shofiya2003 Aug 8, 2024
06cd393
added comment to explain the use of shared_file set and ensured share…
Shofiya2003 Aug 8, 2024
62b46bc
changed the datatype of cmd property of ProcessNode dataclass
Shofiya2003 Aug 8, 2024
4977e94
edited tasks.md to include task of capturing commands and arguments
Shofiya2003 Aug 17, 2024
f6ad837
removed try..except block and removed init files from namespace package
Shofiya2003 Aug 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
199 changes: 176 additions & 23 deletions probe_src/python/probe_py/manual/analysis.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
import typing
from typing import Dict, Tuple
import networkx as nx # type: ignore
from probe_py.generated.ops import Op, CloneOp, ExecOp, WaitOp, OpenOp, CloseOp, InitProcessOp, InitExecEpochOp, InitThreadOp, StatOp
from probe_py.generated.parser import ProvLog
from probe_py.generated import parser
from enum import IntEnum

import rich
import sys
from dataclasses import dataclass
import pathlib
import os
import collections

# TODO: implement this in probe_py.generated.ops
class TaskType(IntEnum):
Expand All @@ -17,10 +23,45 @@ class EdgeLabels(IntEnum):
PROGRAM_ORDER = 1
FORK_JOIN = 2
EXEC = 3


@dataclass(frozen=True)
class ProcessNode:
pid: int
cmd: tuple[str,...]

@dataclass(frozen=True)
class InodeOnDevice:
device_major: int
device_minor: int
inode: int

def __eq__(self, other: object) -> bool:
if not isinstance(other, InodeOnDevice):
return NotImplemented
return (self.device_major == other.device_major and
self.device_minor == other.device_minor and
self.inode == other.inode)
def __hash__(self) -> int:
return hash((self.device_major, self.device_minor, self.inode))

@dataclass(frozen=True)
class FileNode:
inodeOnDevice: InodeOnDevice
version: int
file: str

@property
def label(self) -> str:
return f"{self.file} v{self.version}"

# type alias for a node
Node = Tuple[int, int, int, int]

# type for the edges
EdgeType = Tuple[Node, Node]

def validate_provlog(
provlog: ProvLog,
provlog: parser.ProvLog,
) -> list[str]:
ret = list[str]()
waited_processes = set[tuple[TaskType, int]]()
Expand Down Expand Up @@ -88,35 +129,29 @@ def validate_provlog(
# TODO: Rename "digraph" to "hb_graph" in the entire project.
# Digraph (aka "directed graph") is too vague a term; the proper name is "happens-before graph".
# Later on, we will have a function that transforms an hb graph to file graph (both of which are digraphs)
def provlog_to_digraph(process_tree_prov_log: ProvLog) -> nx.DiGraph:
def provlog_to_digraph(process_tree_prov_log: parser.ProvLog) -> nx.DiGraph:
# [pid, exec_epoch_no, tid, op_index]
Node: typing.TypeAlias = tuple[int, int, int, int]
program_order_edges = list[tuple[Node, Node]]()
fork_join_edges = list[tuple[Node, Node]]()
exec_edges = list[tuple[Node, Node]]()
nodes = list[Node]()
proc_to_ops = dict[tuple[int, int, int], list[Node]]()
last_exec_epoch = dict[int, int]()
for pid, process in process_tree_prov_log.processes.items():

for exec_epoch_no, exec_epoch in process.exec_epochs.items():
# to find the last executing epoch of the process
last_exec_epoch[pid] = max(last_exec_epoch.get(pid, 0), exec_epoch_no)

# Reduce each thread to the ops we actually care about
for tid, thread in exec_epoch.threads.items():
context = (pid, exec_epoch_no, tid)
ops = list[Node]()

# Filter just the ops we are interested in
op_index = 0
for op_index, op in enumerate(thread.ops):
ops.append((*context, op_index))

# Add just those ops to the graph
nodes.extend(ops)
program_order_edges.extend(zip(ops[:-1], ops[1:]))

program_order_edges.extend(zip(ops[:-1], ops[1:]))
# Store these so we can hook up forks/joins between threads
proc_to_ops[context] = ops

Expand All @@ -135,6 +170,7 @@ def get_first_pthread(pid: int, exid: int, target_pthread_id: int) -> list[Node]
for op_index, op in enumerate(thread.ops):
if op.pthread_id == target_pthread_id:
ret.append((pid, exid, tid, op_index))
break
return ret

def get_last_pthread(pid: int, exid: int, target_pthread_id: int) -> list[Node]:
Expand All @@ -145,6 +181,7 @@ def get_last_pthread(pid: int, exid: int, target_pthread_id: int) -> list[Node]:
for op_index, op in list(enumerate(thread.ops))[::-1]:
if op.pthread_id == target_pthread_id:
ret.append((pid, exid, tid, op_index))
break
return ret

# Hook up forks/joins
Expand Down Expand Up @@ -199,12 +236,128 @@ def add_edges(edges:list[tuple[Node, Node]], label:EdgeLabels) -> None:
add_edges(fork_join_edges, EdgeLabels.FORK_JOIN)
return process_graph

def traverse_hb_for_dfgraph(process_tree_prov_log: parser.ProvLog, starting_node: Node, traversed: set[int] , dataflow_graph:nx.DiGraph, file_version_map: Dict[InodeOnDevice, int], shared_files: set[InodeOnDevice], cmd_map: Dict[int, list[str]]) -> None:
starting_pid = starting_node[0]

starting_op = prov_log_get_node(process_tree_prov_log, starting_node[0], starting_node[1], starting_node[2], starting_node[3])
process_graph = provlog_to_digraph(process_tree_prov_log)

edges = list_edges_from_start_node(process_graph, starting_node)
name_map = collections.defaultdict[InodeOnDevice, list[pathlib.Path]](list)

def prov_log_get_node(prov_log: ProvLog, pid: int, exec_epoch: int, tid: int, op_no: int) -> Op:
target_nodes = collections.defaultdict[int, list[Node]](list)
console = rich.console.Console(file=sys.stderr)

for edge in edges:
pid, exec_epoch_no, tid, op_index = edge[0]

# check if the process is already visited when waitOp occurred
if pid in traversed or tid in traversed:
continue

op = prov_log_get_node(process_tree_prov_log, pid, exec_epoch_no, tid, op_index).data
next_op = prov_log_get_node(process_tree_prov_log, edge[1][0], edge[1][1], edge[1][2], edge[1][3]).data
# when we move to a new process which is not a child process but an independent process we empty the shared_files
if edge[0][0]!=edge[1][0] and not isinstance(op, CloneOp) and not isinstance(next_op, WaitOp) and edge[1][1] == 0 and edge[1][3] == 0:
shared_files = set()
if isinstance(op, OpenOp):
access_mode = op.flags & os.O_ACCMODE
processNode = ProcessNode(pid=pid, cmd=tuple(cmd_map[pid]))
dataflow_graph.add_node(processNode, label=processNode.cmd)
file = InodeOnDevice(op.path.device_major, op.path.device_minor, op.path.inode)
path_str = op.path.path.decode("utf-8")
if access_mode == os.O_RDONLY:
curr_version = file_version_map[file]
fileNode = FileNode(file, curr_version, path_str)
dataflow_graph.add_node(fileNode, label = fileNode.label)
path = pathlib.Path(op.path.path.decode("utf-8"))
if path not in name_map[file]:
name_map[file].append(path)
dataflow_graph.add_edge(fileNode, processNode)
elif access_mode == os.O_WRONLY:
curr_version = file_version_map[file]
if file in shared_files:
fileNode2 = FileNode(file, curr_version, path_str)
dataflow_graph.add_node(fileNode2, label = fileNode2.label)
else:
file_version_map[file] = curr_version + 1
fileNode2 = FileNode(file, curr_version+1, path_str)
dataflow_graph.add_node(fileNode2, label = fileNode2.label)
if starting_pid == pid:
# shared_files: shared_files helps us keep track of the files shared between parent and child processes. This ensures that when the children write to the file, the version of the file is not incremented multiple times
shared_files.add(file)
path = pathlib.Path(op.path.path.decode("utf-8"))
if path not in name_map[file]:
name_map[file].append(path)
dataflow_graph.add_edge(processNode, fileNode2)
elif access_mode == 2:
console.print(f"Found file {path_str} with access mode O_RDWR", style="red")
else:
raise Exception("unknown access mode")
elif isinstance(op, CloneOp):
if op.task_type == TaskType.TASK_PID:
if edge[0][0] != edge[1][0]:
target_nodes[op.task_id].append(edge[1])
continue
elif op.task_type == TaskType.TASK_PTHREAD:
if edge[0][2] != edge[1][2]:
target_nodes[op.task_id].append(edge[1])
continue
if op.task_type != TaskType.TASK_PTHREAD and op.task_type != TaskType.TASK_ISO_C_THREAD:

processNode1 = ProcessNode(pid = pid, cmd=tuple(cmd_map[pid]))
processNode2 = ProcessNode(pid = op.task_id, cmd=tuple(cmd_map[op.task_id]))
dataflow_graph.add_node(processNode1, label = processNode1.cmd)
dataflow_graph.add_node(processNode2, label = processNode2.cmd)
dataflow_graph.add_edge(processNode1, processNode2)
target_nodes[op.task_id] = list()
elif isinstance(op, WaitOp) and op.options == 0:
for node in target_nodes[op.task_id]:
traverse_hb_for_dfgraph(process_tree_prov_log, node, traversed, dataflow_graph, file_version_map, shared_files, cmd_map)
traversed.add(node[2])
# return back to the WaitOp of the parent process
if isinstance(next_op, WaitOp):
if next_op.task_id == starting_pid or next_op.task_id == starting_op.pthread_id:
return
return

def list_edges_from_start_node(graph: nx.DiGraph, start_node: Node) -> list[EdgeType]:
all_edges = list(graph.edges())
start_index = next(i for i, edge in enumerate(all_edges) if edge[0] == start_node)
ordered_edges = all_edges[start_index:] + all_edges[:start_index]
return ordered_edges

def provlog_to_dataflow_graph(process_tree_prov_log: parser.ProvLog) -> nx.DiGraph:
dataflow_graph = nx.DiGraph()
file_version_map = collections.defaultdict[InodeOnDevice, int](lambda: 0)
process_graph = provlog_to_digraph(process_tree_prov_log)
root_node = [n for n in process_graph.nodes() if process_graph.out_degree(n) > 0 and process_graph.in_degree(n) == 0][0]
traversed: set[int] = set()
cmd:list[str] = []
cmd_map = collections.defaultdict[int, list[str]](list)
for edge in list(nx.edges(process_graph))[::-1]:
pid, exec_epoch_no, tid, op_index = edge[0]
op = prov_log_get_node(process_tree_prov_log, pid, exec_epoch_no, tid, op_index).data
if isinstance(op, OpenOp):
file = op.path.path.decode("utf-8")
if file not in cmd and file!="/dev/tty":
cmd.insert(0, file)
elif isinstance(op, InitExecEpochOp):
cmd.insert(0, op.program_name.decode("utf-8"))
if pid == tid and exec_epoch_no == 0:
cmd_map[tid] = cmd
cmd = []
shared_files:set[InodeOnDevice] = set()
traverse_hb_for_dfgraph(process_tree_prov_log, root_node, traversed, dataflow_graph, file_version_map, shared_files, cmd_map)
pydot_graph = nx.drawing.nx_pydot.to_pydot(dataflow_graph)
dot_string = pydot_graph.to_string()
return dot_string

def prov_log_get_node(prov_log: parser.ProvLog, pid: int, exec_epoch: int, tid: int, op_no: int) -> Op:
return prov_log.processes[pid].exec_epochs[exec_epoch].threads[tid].ops[op_no]


def validate_hb_closes(provlog: ProvLog, process_graph: nx.DiGraph) -> list[str]:
def validate_hb_closes(provlog: parser.ProvLog, process_graph: nx.DiGraph) -> list[str]:
# Note that this test doesn't work if a process "intentionally" leaves a fd open for its child.
# E.g., bash-in-pipe
provlog_reverse = process_graph.reverse()
Expand All @@ -224,7 +377,7 @@ def validate_hb_closes(provlog: ProvLog, process_graph: nx.DiGraph) -> list[str]
return ret


def validate_hb_waits(provlog: ProvLog, process_graph: nx.DiGraph) -> list[str]:
def validate_hb_waits(provlog: parser.ProvLog, process_graph: nx.DiGraph) -> list[str]:
provlog_reverse = process_graph.reverse()
ret = list[str]()
for node in process_graph.nodes:
Expand All @@ -239,7 +392,7 @@ def validate_hb_waits(provlog: ProvLog, process_graph: nx.DiGraph) -> list[str]:
ret.append(f"Wait of {op.data.task_id} in {node} is not preceeded by corresponding clone")
return ret

def validate_hb_clones(provlog: ProvLog, process_graph: nx.DiGraph) -> list[str]:
def validate_hb_clones(provlog: parser.ProvLog, process_graph: nx.DiGraph) -> list[str]:
ret = list[str]()
for node in process_graph.nodes:
op = prov_log_get_node(provlog, *node)
Expand Down Expand Up @@ -268,7 +421,7 @@ def validate_hb_clones(provlog: ProvLog, process_graph: nx.DiGraph) -> list[str]
return ret


def validate_hb_degree(provlog: ProvLog, process_graph: nx.DiGraph) -> list[str]:
def validate_hb_degree(provlog: parser.ProvLog, process_graph: nx.DiGraph) -> list[str]:
ret = list[str]()
found_entry = False
found_exit = False
Expand All @@ -290,7 +443,7 @@ def validate_hb_degree(provlog: ProvLog, process_graph: nx.DiGraph) -> list[str]
return ret


def validate_hb_acyclic(provlog: ProvLog, process_graph: nx.DiGraph) -> list[str]:
def validate_hb_acyclic(provlog: parser.ProvLog, process_graph: nx.DiGraph) -> list[str]:
try:
cycle = nx.find_cycle(process_graph)
except nx.NetworkXNoCycle:
Expand All @@ -299,7 +452,7 @@ def validate_hb_acyclic(provlog: ProvLog, process_graph: nx.DiGraph) -> list[str
return [f"Cycle detected: {cycle}"]


def validate_hb_execs(provlog: ProvLog, process_graph: nx.DiGraph) -> list[str]:
def validate_hb_execs(provlog: parser.ProvLog, process_graph: nx.DiGraph) -> list[str]:
ret = list[str]()
for node0 in process_graph.nodes():
pid0, eid0, tid0, op0 = node0
Expand All @@ -317,7 +470,7 @@ def validate_hb_execs(provlog: ProvLog, process_graph: nx.DiGraph) -> list[str]:
return ret


def validate_hb_graph(processes: ProvLog, hb_graph: nx.DiGraph) -> list[str]:
def validate_hb_graph(processes: parser.ProvLog, hb_graph: nx.DiGraph) -> list[str]:
ret = list[str]()
# ret.extend(validate_hb_closes(processes, hb_graph))
ret.extend(validate_hb_waits(processes, hb_graph))
Expand All @@ -338,8 +491,7 @@ def relax_node(graph: nx.DiGraph, node: typing.Any) -> list[tuple[typing.Any, ty
graph.remove_node(node)
return ret


def digraph_to_pydot_string(prov_log: ProvLog, process_graph: nx.DiGraph) -> str:
def digraph_to_pydot_string(prov_log: parser.ProvLog, process_graph: nx.DiGraph) -> str:

label_color_map = {
EdgeLabels.EXEC: 'yellow',
Expand Down Expand Up @@ -376,7 +528,8 @@ def digraph_to_pydot_string(prov_log: ProvLog, process_graph: nx.DiGraph) -> str
return dot_string


def construct_process_graph(process_tree_prov_log: ProvLog) -> str:

def construct_process_graph(process_tree_prov_log: parser.ProvLog) -> str:
"""
Construct a happens-before graph from process_tree_prov_log

Expand Down
29 changes: 26 additions & 3 deletions probe_src/python/probe_py/manual/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@
import shutil
import rich
from probe_py.generated.parser import parse_probe_log
from . import analysis
from . import util
from probe_py.manual import analysis
from probe_py.manual import util

rich.traceback.install(show_locals=False)


project_root = pathlib.Path(__file__).resolve().parent.parent
project_root = pathlib.Path(__file__).resolve().parent.parent.parent.parent

A = typing_extensions.Annotated

Expand Down Expand Up @@ -124,6 +124,29 @@ def process_graph(
console.print(warning, style="red")
print(analysis.digraph_to_pydot_string(prov_log, process_graph))

@app.command()
def dataflow_graph(
input: pathlib.Path = pathlib.Path("probe_log"),
) -> None:
"""
Write a process graph from PROBE_LOG in DOT/graphviz format.
"""
if not input.exists():
typer.secho(f"INPUT {input} does not exist\nUse `PROBE record --output {input} CMD...` to rectify", fg=typer.colors.RED)
raise typer.Abort()
probe_log_tar_obj = tarfile.open(input, "r")
prov_log = parse_probe_log(input)
probe_log_tar_obj.close()
console = rich.console.Console(file=sys.stderr)
process_graph = analysis.provlog_to_digraph(prov_log)
for warning in analysis.validate_provlog(prov_log):
console.print(warning, style="red")
rich.traceback.install(show_locals=False) # Figure out why we need this
process_graph = analysis.provlog_to_digraph(prov_log)
for warning in analysis.validate_hb_graph(prov_log, process_graph):
console.print(warning, style="red")
print(analysis.provlog_to_dataflow_graph(prov_log))


@app.command()
def dump(
Expand Down
3 changes: 2 additions & 1 deletion probe_src/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,11 @@
- [ ] Write interesting performance tests, using `benchmark/workloads.py` as inspiration.
- [ ] Output conversions
- [ ] From the NetworkX digraph, export (Shofiya is working on this):
- [ ] A dataflow graph, showing only files, processes, and the flow of information between them. The following rules define when there is an edge:
- [x] A dataflow graph, showing only files, processes, and the flow of information between them. The following rules define when there is an edge:
1. Data flows from a file to a process if on any thread there is an OpenOp with the flags set to `O_RDWR` or `O_RDONLY`.
2. Data flows from a process to a process if one process CloneOp's the other.
3. Data flows from a process to a file if on any thread there is a OpenOp with the flags set to `O_RDWR` or `O_WRONLY`.
- [ ] Capture the command of the process in libprobe. Currently, the `cmd` property of the `ProcessNode` in the dataflow graph is assigned a value using a map that associates each `process_id` with the corresponding command. This map is generated by a stop-gap function that constructs a command string using the `program-name` from `InitExecEpochOp` and the `file` from `OpenOp`. However, this function often fails to generate the correct command for certain operations. For instance, it struggles with cases such as `cat a.txt; cat a.txt` and the execution of compiled C programs like `./compiled_c_file`.
- [ ] [Process Run Crate](https://www.researchobject.org/workflow-run-crate/profiles/process_run_crate/) (Saleha is working on this)
- [ ] [Common Workflow Language](https://www.commonwl.org/)
- [ ] Write a test that runs the resulting CWL.
Expand Down
Loading