Skip to content

Commit

Permalink
Adding end-to-end-tests (#24)
Browse files Browse the repository at this point in the history
* test: added tests to confirm information flow for simple file read/comparision commands and a bash in bash command using the provlog

* added pydot to to flake.nix

* fix: resolving merge conflicts

* tests:added tests to verify the graph for diff command and bash in bash command

* test:added tests for command not found and empty command

* test:added test for bash in bash pipe command

* merged with main and resolved merge conflicts

* fix: changed the tests to support the new format of CloseOp and WaitOp

* Wrote test for simple pthread c program that creates new pthreads
- Included TaskType in parse_probe_log file to use TaskType data structure from c file in python test file.
- Made fixes to analysis.py that generates process graph to accomodate changes to the CloneOp and WaitOp format

* refactor: extracted the code to execute command for test in a common function
feat: used record function directly to execute the command instead of typer tests

* refactor: removed type annotation and cleaned the code

* feat: created a common function to check waitOp for every CloneOp

* refactor: created a common function to match OpenOp and CloseOp

* refactor: extracted a common function to test process graph for processes that clone threads and touch files

* test: added test for pthreads
Created an edge in process graph to mark the end of a thread and access the last node in process graph
  • Loading branch information
Shofiya2003 authored Jul 15, 2024
1 parent 4882ee3 commit c685077
Show file tree
Hide file tree
Showing 6 changed files with 254 additions and 8 deletions.
2 changes: 1 addition & 1 deletion flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,4 @@
};
}
);
}
}
Empty file added probe_src/probe_py/__init__.py
Empty file.
15 changes: 9 additions & 6 deletions probe_src/probe_py/analysis.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import typing
import networkx as nx
from .parse_probe_log import ProvLog, Op, CloneOp, ExecOp, WaitOp, CLONE_THREAD
from .parse_probe_log import ProvLog, Op, CloneOp, ExecOp, WaitOp, OpenOp, CloseOp ,CLONE_THREAD
from enum import IntEnum

class EdgeLabels(IntEnum):
Expand Down Expand Up @@ -29,7 +29,7 @@ def provlog_to_digraph(process_tree_prov_log: ProvLog) -> nx.DiGraph:
# Filter just the ops we are interested in
op_index = 0
for op in thread.ops:
if isinstance(op.data, CloneOp | ExecOp | WaitOp):
if isinstance(op.data, CloneOp | ExecOp | WaitOp | OpenOp | CloseOp):
ops.append((*context, op_index))
op_index+=1

Expand All @@ -38,6 +38,9 @@ def provlog_to_digraph(process_tree_prov_log: ProvLog) -> nx.DiGraph:

# Store these so we can hook up forks/joins between threads
proc_to_ops[context] = ops
if len(ops) != 0:
# to mark the end of the thread, edge from last op to (pid, -1, tid, -1)
program_order_edges.append((proc_to_ops[(pid, exec_epoch_no, tid)][-1], (pid, -1, tid, -1)))

def first(pid: int, exid: int, tid: int) -> Node:
if not proc_to_ops.get((pid, exid, tid)):
Expand Down Expand Up @@ -66,16 +69,16 @@ def last(pid: int, exid: int, tid: int) -> Node:
if isinstance(op, CloneOp):
if op.flags & CLONE_THREAD:
# Spawning a thread links to the current PID and exec epoch
target = (pid, exid, op.child_thread_id)
target = (pid, exid, op.task_id)
else:
# New process always links to exec epoch 0 and main thread
# THe TID of the main thread is the same as the PID
target = (op.child_process_id, 0, op.child_process_id)
target = (op.task_id, 0, op.task_id)
exec_edges.append((node, first(*target)))
elif isinstance(op, WaitOp) and op.ferrno == 0 and op.ret > 0:
elif isinstance(op, WaitOp) and op.ferrno == 0 and op.task_id > 0:
# Always wait for main thread of the last exec epoch
if op.ferrno == 0:
target = (op.ret, last_exec_epoch.get(op.ret, 0), op.ret)
target = (op.task_id, last_exec_epoch.get(op.task_id, 0), op.task_id)
fork_join_edges.append((last(*target), node))
elif isinstance(op, ExecOp):
# Exec brings same pid, incremented exid, and main thread
Expand Down
4 changes: 3 additions & 1 deletion probe_src/probe_py/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,6 @@ def dump(
print(op.data)
print()

app()
if __name__ == "__main__":
app()

3 changes: 3 additions & 0 deletions probe_src/probe_py/parse_probe_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@
CloneOp: typing.TypeAlias = py_types[("struct", "CloneOp")]
ExecOp: typing.TypeAlias = py_types[("struct", "ExecOp")]
WaitOp: typing.TypeAlias = py_types[("struct", "WaitOp")]
OpenOp: typing.TypeAlias = py_types[("struct", "OpenOp")]
CloseOp: typing.TypeAlias = py_types[("struct", "CloseOp")]
OpCode: enum.EnumType = py_types[("enum", "OpCode")]
TaskType: enum.EnumType = py_types[("enum", "TaskType")]


@dataclasses.dataclass
Expand Down
238 changes: 238 additions & 0 deletions probe_src/probe_py/test_probe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
import tarfile
from .cli import record
from . import parse_probe_log
from . import analysis
import pathlib
import networkx as nx
import subprocess
import typer
import pytest


def test_diff_cmd():
command = [
'diff', '../flake.nix', '../flake.lock'
]
process_tree_prov_log = execute_command(command, 1)
process_graph = analysis.provlog_to_digraph(process_tree_prov_log)
paths = ['../flake.nix','../flake.lock']
dfs_edges = list(nx.dfs_edges(process_graph))
match_open_and_close_fd(dfs_edges, process_tree_prov_log, paths)


def test_bash_in_bash():
command = ["bash", "-c", "head ../flake.nix ; head ../flake.lock"]
process_tree_prov_log = execute_command(command)
process_graph = analysis.provlog_to_digraph(process_tree_prov_log)
paths = ['../flake.nix', '../flake.lock']
process_file_map = {}
dfs_edges = list(nx.dfs_edges(process_graph))
parent_process_id = dfs_edges[0][0][0]
process_file_map[paths[len(paths)-1]] = parent_process_id
check_for_clone_and_open(dfs_edges, process_tree_prov_log, len(paths)-1, process_file_map, paths)

def test_bash_in_bash_pipe():
command = ["bash", "-c", "head ../flake.nix | tail"]
process_tree_prov_log = execute_command(command)
process_graph = analysis.provlog_to_digraph(process_tree_prov_log)
paths = ['../flake.nix','stdout']
dfs_edges = list(nx.dfs_edges(process_graph))
check_for_clone_and_open(dfs_edges, process_tree_prov_log, len(paths), {}, paths)


def test_pthreads():
process = subprocess.Popen(["gcc", "tests/c/createFile.c", "-o", "test"])
process.communicate()
process_tree_prov_log = execute_command(["./test"])
process_graph = analysis.provlog_to_digraph(process_tree_prov_log)
dfs_edges = list(nx.dfs_edges(process_graph))
total_pthreads = 3
paths = ['/tmp/0.txt', '/tmp/1.txt', '/tmp/2.txt']
check_pthread_graph(dfs_edges, process_tree_prov_log, total_pthreads, paths)

def execute_command(command, return_code=0):
input = pathlib.Path("probe_log")
with pytest.raises(typer.Exit) as excinfo:
record(command, False, False, False,input)
assert excinfo.value.exit_code == return_code
# result = subprocess.run(['./PROBE', 'record'] + command, capture_output=True, text=True, check=True)
assert input.exists()
probe_log_tar_obj = tarfile.open(input, "r")
process_tree_prov_log = parse_probe_log.parse_probe_log_tar(probe_log_tar_obj)
probe_log_tar_obj.close()
return process_tree_prov_log


def check_for_clone_and_open(dfs_edges, process_tree_prov_log, number_of_child_process, process_file_map, paths):
# to ensure files which are opened are closed
file_descriptors = []
# to ensure WaitOp ret is same as the child process pid
check_wait = []
# to ensure the child process has ExecOp, OpenOp and CloseOp
check_child_processes = []
# to ensure child process touch the right file

parent_process_id = dfs_edges[0][0][0]
reserved_file_descriptors = [0, 1, 2]
current_child_process = 0

for edge in dfs_edges:
curr_pid, curr_epoch_idx, curr_tid, curr_op_idx = edge[0]

curr_node_op = get_op_from_provlog(process_tree_prov_log, curr_pid, curr_epoch_idx, curr_tid, curr_op_idx)
if curr_node_op!=None:
curr_node_op = curr_node_op.data
if(isinstance(curr_node_op,parse_probe_log.CloneOp)):
next_op = get_op_from_provlog(process_tree_prov_log, edge[1][0], edge[1][1], edge[1][2], edge[1][3])
if next_op!=None:
next_op = next_op.data
if isinstance(next_op,parse_probe_log.ExecOp):
assert edge[1][0] == curr_node_op.task_id
check_child_processes.append(curr_node_op.task_id)
continue
if isinstance(next_op,parse_probe_log.CloseOp) and edge[0][0]!=edge[1][0]:
assert edge[1][0] == curr_node_op.task_id
check_child_processes.append(curr_node_op.task_id)
continue
if edge[1][3] == -1:
continue
current_child_process+=1
check_wait.append(curr_node_op.task_id)
if len(paths)!=0:
process_file_map[paths[current_child_process-1]] = curr_node_op.task_id
elif(isinstance(curr_node_op,parse_probe_log.WaitOp)):
ret_pid = curr_node_op.task_id
wait_option = curr_node_op.options
if wait_option == 0:
assert ret_pid in check_wait
check_wait.remove(ret_pid)
if(isinstance(curr_node_op,parse_probe_log.OpenOp)):
file_descriptors.append(curr_node_op.fd)
path = curr_node_op.path.path
if path in paths:
if len(process_file_map.keys())!=0:
# ensure the right cloned process has OpenOp for the path
assert curr_pid == process_file_map[path]
if curr_pid!=parent_process_id:
assert curr_pid in check_child_processes
check_child_processes.remove(curr_pid)
elif(isinstance(curr_node_op,parse_probe_log.CloseOp)):
fd = curr_node_op.low_fd
if fd in reserved_file_descriptors:
continue
if curr_node_op.ferrno != 0:
continue
if fd in file_descriptors:
file_descriptors.remove(fd)
elif(isinstance(curr_node_op,parse_probe_log.ExecOp)):
# check if stdout is read in right child process
if(edge[1][3]==-1):
continue
next_init_op = get_op_from_provlog(process_tree_prov_log,curr_pid,1,curr_pid,0)
if next_init_op!=None:
next_init_op = next_init_op.data
if next_init_op.program_name == 'tail':
assert process_file_map['stdout'] == curr_pid
check_child_processes.remove(curr_pid)

# check number of cloneOps
assert current_child_process == number_of_child_process
# check if every cloneOp has a WaitOp
assert len(check_wait) == 0
assert len(process_file_map.items()) == len(paths)
assert len(check_child_processes) == 0
assert len(file_descriptors) == 0


def match_open_and_close_fd(dfs_edges, process_tree_prov_log, paths):
reserved_file_descriptors = [0, 1, 2]
file_descriptors = []
for edge in dfs_edges:
curr_pid, curr_epoch_idx, curr_tid, curr_op_idx = edge[0]
curr_node_op = get_op_from_provlog(process_tree_prov_log, curr_pid, curr_epoch_idx, curr_tid, curr_op_idx)
if curr_node_op!=None:
curr_node_op = curr_node_op.data
if(isinstance(curr_node_op,parse_probe_log.OpenOp)):
file_descriptors.append(curr_node_op.fd)
path = curr_node_op.path.path
if path in paths:
paths.remove(path)
elif(isinstance(curr_node_op,parse_probe_log.CloseOp)):
fd = curr_node_op.low_fd
if fd in reserved_file_descriptors:
continue
if curr_node_op.ferrno != 0:
continue
if fd in file_descriptors:
file_descriptors.remove(fd)

assert len(file_descriptors) == 0
assert len(paths) == 0

def check_pthread_graph(dfs_edges, process_tree_prov_log, total_pthreads, paths):
check_wait = []
process_file_map = {}
current_child_process = 0
file_descriptors = []
reserved_file_descriptors = [1, 2, 3]
edge = dfs_edges[0]
parent_pthread_id = get_op_from_provlog(process_tree_prov_log, edge[0][0], edge[0][1], edge[0][2], edge[0][3]).pthread_id

for edge in dfs_edges:
curr_pid, curr_epoch_idx, curr_tid, curr_op_idx = edge[0]
curr_node_op = get_op_from_provlog(process_tree_prov_log, curr_pid, curr_epoch_idx, curr_tid, curr_op_idx)
print(curr_node_op.data)
if(isinstance(curr_node_op.data,parse_probe_log.CloneOp)):
next_op = get_op_from_provlog(process_tree_prov_log, edge[1][0], edge[1][1], edge[1][2], edge[1][3])
if edge[1][2] != curr_tid:
assert curr_node_op.data.task_id == next_op.pthread_id
continue
check_wait.append(curr_node_op.data.task_id)
if len(paths)!=0:
process_file_map[paths[current_child_process]] = curr_node_op.data.task_id
current_child_process+=1
elif(isinstance(curr_node_op.data,parse_probe_log.WaitOp)):
ret_pid = curr_node_op.data.task_id
wait_option = curr_node_op.data.options
if wait_option == 0:
assert ret_pid in check_wait
check_wait.remove(ret_pid)
elif(isinstance(curr_node_op.data,parse_probe_log.OpenOp)):
file_descriptors.append(curr_node_op.data.fd)
path = curr_node_op.data.path.path
# print(curr_node_op.data)
# print(edge)
# next_op = get_op_from_provlog(process_tree_prov_log, edge[1][0], edge[1][1], edge[1][2], edge[1][3])
# print(next_op.data)
# print(file_descriptors)
# print(">>>>>>>>>>>>>>>>>>>>>")
if path in paths:
if len(process_file_map.keys())!=0 and parent_pthread_id!=curr_node_op.pthread_id:
# ensure the right cloned process has OpenOp for the path
assert process_file_map[path] == curr_node_op.pthread_id
elif(isinstance(curr_node_op.data, parse_probe_log.CloseOp)):
fd = curr_node_op.data.low_fd
print(curr_node_op.data)
if fd in reserved_file_descriptors:
continue
if curr_node_op.data.ferrno != 0:
continue
assert fd in file_descriptors
if fd in file_descriptors:
file_descriptors.remove(fd)

print(file_descriptors)
print("after close")

# check number of cloneOps
assert current_child_process == total_pthreads
# check if every cloneOp has a WaitOp
assert len(check_wait) == 0
# for every file there is a pthread
assert len(process_file_map.items()) == len(paths)
assert len(file_descriptors) == 0

def get_op_from_provlog(process_tree_prov_log, pid, exec_epoch_id, tid,op_idx):
if op_idx == -1 or exec_epoch_id == -1:
return None
return process_tree_prov_log.processes[pid].exec_epochs[exec_epoch_id].threads[tid].ops[op_idx]

0 comments on commit c685077

Please sign in to comment.