diff --git a/probe_src/libprobe/arena/include/arena.h b/probe_src/libprobe/arena/include/arena.h index f0afdb15..078c2249 100644 --- a/probe_src/libprobe/arena/include/arena.h +++ b/probe_src/libprobe/arena/include/arena.h @@ -66,28 +66,6 @@ static size_t __arena_align(size_t offset, size_t alignment) { #define __ARENA_FNAME_LENGTH 64 #define __ARENA_FNAME "%ld.dat" -/* by defining this as inline static it should have zero runtime overhead and - * not produce any symbols outside this compilation unit (file). */ -inline static int msync_and_munmap(void* addr, size_t length) { - int ret = msync(addr, length, MS_SYNC); - if (ret != 0) { -#ifdef ARENA_PERROR - perror("msync_and_munmap: msync"); -#endif - return -1; - } - - ret = munmap(addr, length); - if (ret != 0) { -#ifdef ARENA_PERROR - perror("msync_and_munmap: munmap"); -#endif - return -1; - } - - return 0; -} - /* Instantiate a new mmap-ed file in the arena dir. */ static int __arena_reinstantiate(struct ArenaDir* arena_dir, size_t capacity) { /* Create a new mmap */ @@ -266,10 +244,10 @@ __attribute__((unused)) static int arena_destroy(struct ArenaDir* arena_dir) { while (current) { for (size_t i = 0; i < current->next_free_slot; ++i) { if (current->arena_list[i] != NULL) { - int ret = msync_and_munmap(current->arena_list[i]->base_address, current->arena_list[i]->capacity); + int ret = munmap(current->arena_list[i]->base_address, current->arena_list[i]->capacity); if (ret != 0) { #ifdef ARENA_PERROR - fprintf(stderr, "arena_destroy: msync and munmap failed\n"); + perror("arena_create: arena_destroy"); #endif return -1; } @@ -302,10 +280,10 @@ __attribute__((unused)) static int arena_drop_after_fork(struct ArenaDir* arena_ while (current) { for (size_t i = 0; i < current->next_free_slot; ++i) { if (current->arena_list[i] != NULL) { - int ret = msync_and_munmap(current->arena_list[i]->base_address, current->arena_list[i]->capacity); + int ret = munmap(current->arena_list[i]->base_address, current->arena_list[i]->capacity); if (ret != 0) { #ifdef ARENA_PERROR - fprintf(stderr, "arena_drop_after_fork: msync and munmap failed\n"); + perror("arena_create: arena_uninstantiate_all_but_last"); #endif return -1; } @@ -329,10 +307,10 @@ __attribute__((unused)) static int arena_uninstantiate_all_but_last(struct Arena while (current) { for (size_t i = 0; i + ((size_t) is_tail) < current->next_free_slot; ++i) { if (current->arena_list[i] != NULL) { - int ret = msync_and_munmap(current->arena_list[i]->base_address, current->arena_list[i]->capacity); + int ret = munmap(current->arena_list[i]->base_address, current->arena_list[i]->capacity); if (ret != 0) { #ifdef ARENA_PERROR - fprintf(stderr, "arena_uninstantiate_all_but_last: msync and munmap failed\n"); + perror("arena_create: arena_uninstantiate_all_but_last"); #endif return -1; } diff --git a/probe_src/libprobe/src/global_state.c b/probe_src/libprobe/src/global_state.c index fd30b2d6..8337d709 100644 --- a/probe_src/libprobe/src/global_state.c +++ b/probe_src/libprobe/src/global_state.c @@ -88,25 +88,42 @@ static int get_exec_epoch_safe() { return __exec_epoch; } -static int __copy_files = -1; +static char __copy_files = ' '; static const char* copy_files_env_var = PRIVATE_ENV_VAR_PREFIX "COPY_FILES"; struct InodeTable read_inodes; struct InodeTable copied_or_overwritten_inodes; static void init_copy_files() { - assert(__copy_files == -1); + assert(__copy_files == ' '); const char* copy_files_str = debug_getenv(copy_files_env_var); - if (copy_files_str != NULL && copy_files_str[0] != 0) { - __copy_files = 1; - inode_table_init(&read_inodes); - inode_table_init(&copied_or_overwritten_inodes); + if (copy_files_str) { + __copy_files = copy_files_str[0]; } else { - __copy_files = 0; + __copy_files = '\0'; } - DEBUG("Is copy files? %d", __copy_files); + DEBUG("Copy files? %c", __copy_files); + switch (__copy_files) { + case '\0': + break; + case 'e': /* eagerly copy files */ + case 'l': /* lazily copy files */ + inode_table_init(&read_inodes); + inode_table_init(&copied_or_overwritten_inodes); + break; + default: + ERROR("copy_files has invalid value %c", __copy_files); + break; + } +} +static bool should_copy_files_eagerly() { + assert(__copy_files == '\0' || __copy_files == 'e' || __copy_files == 'l'); + return __copy_files == 'e'; +} +static bool should_copy_files_lazily() { + assert(__copy_files == '\0' || __copy_files == 'e' || __copy_files == 'l'); + return __copy_files == 'l'; } static bool should_copy_files() { - assert(__copy_files == 1 || __copy_files == 0); - return __copy_files; + return should_copy_files_eagerly() || should_copy_files_lazily(); } static int mkdir_and_descend(int my_dirfd, const char* name, long child, bool mkdir, bool close) { diff --git a/probe_src/libprobe/src/prov_buffer.c b/probe_src/libprobe/src/prov_buffer.c index 1504342f..c3cf5aa4 100644 --- a/probe_src/libprobe/src/prov_buffer.c +++ b/probe_src/libprobe/src/prov_buffer.c @@ -23,10 +23,24 @@ bool is_replace_op(struct Op op) { return op.op_code == open_op_code && (op.data.open.flags & O_TRUNC || op.data.open.flags & O_CREAT); } -int copy(const struct Path* path) { +int copy_to_store(const struct Path* path) { static char dst_path[PATH_MAX]; path_to_id_string(path, dst_path); - return copy_file(path->dirfd_minus_at_fdcwd + AT_FDCWD, path->path, get_inodes_dirfd(), dst_path, path->size); + /* + ** We take precautions to avoid calling copy(f) if copy(f) is already called in the same process. + ** But it may have been already called in a different process! + ** Especially coreutils used in every script. + */ + + int dst_dirfd = get_inodes_dirfd(); + int access = unwrapped_faccessat(dst_dirfd, dst_path, F_OK, 0); + if (access == 0) { + DEBUG("Already exists %s %d", path->path, path->inode); + return 0; + } else { + DEBUG("Copying %s %d", path->path, path->inode); + return copy_file(path->dirfd_minus_at_fdcwd + AT_FDCWD, path->path, dst_dirfd, dst_path, path->size); + } } /* @@ -46,26 +60,35 @@ static void prov_log_try(struct Op op) { const struct Path* path = op_to_path(&op); if (should_copy_files() && path->path && path->stat_valid) { - if (is_read_op(op)) { - DEBUG("Reading %s %d", path->path, path->inode); - inode_table_put_if_not_exists(&read_inodes, path); - } else if (is_mutate_op(op)) { - if (inode_table_put_if_not_exists(&copied_or_overwritten_inodes, path)) { - DEBUG("Mutating, but not copying %s %d since it is copied already or overwritten", path->path, path->inode); - } else { - DEBUG("Mutating, therefore copying %s %d", path->path, path->inode); - copy(path); - } - } else if (is_replace_op(op)) { - if (inode_table_contains(&read_inodes, path)) { + if (should_copy_files_lazily()) { + if (is_read_op(op)) { + DEBUG("Reading %s %d", path->path, path->inode); + inode_table_put_if_not_exists(&read_inodes, path); + } else if (is_mutate_op(op)) { if (inode_table_put_if_not_exists(&copied_or_overwritten_inodes, path)) { DEBUG("Mutating, but not copying %s %d since it is copied already or overwritten", path->path, path->inode); } else { - DEBUG("Replace after read %s %d", path->path, path->inode); - copy(path); + DEBUG("Mutating, therefore copying %s %d", path->path, path->inode); + copy_to_store(path); + } + } else if (is_replace_op(op)) { + if (inode_table_contains(&read_inodes, path)) { + if (inode_table_put_if_not_exists(&copied_or_overwritten_inodes, path)) { + DEBUG("Mutating, but not copying %s %d since it is copied already or overwritten", path->path, path->inode); + } else { + DEBUG("Replace after read %s %d", path->path, path->inode); + copy_to_store(path); + } + } else { + DEBUG("Mutating, but not copying %s %d since it was never read", path->path, path->inode); } + } + } else if (is_read_op(op) || is_mutate_op(op)) { + assert(should_copy_files_eagerly()); + if (inode_table_put_if_not_exists(&copied_or_overwritten_inodes, path)) { + DEBUG("Not copying %s %d because already did", path->path, path->inode); } else { - DEBUG("Mutating, but not copying %s %d since it was never read", path->path, path->inode); + copy_to_store(path); } } } diff --git a/probe_src/libprobe/src/prov_ops.c b/probe_src/libprobe/src/prov_ops.c index 5023ac12..261b2d85 100644 --- a/probe_src/libprobe/src/prov_ops.c +++ b/probe_src/libprobe/src/prov_ops.c @@ -21,7 +21,7 @@ static struct Path create_path_lazy(int dirfd, BORROWED const char* path, int fl * Then again, this could happen in the tracee's code too... * TODO: Remove this once I debug myself. * */ - assert(path == NULL || path[0] != '\0' || flags & AT_EMPTY_PATH); + //assert(path == NULL || (path[0] != '\0' || flags & AT_EMPTY_PATH)); /* * if path == NULL, then the target is the dir specified by dirfd. diff --git a/probe_src/python/probe_py/manual/cli.py b/probe_src/python/probe_py/manual/cli.py index 71106e1d..8268ffd4 100644 --- a/probe_src/python/probe_py/manual/cli.py +++ b/probe_src/python/probe_py/manual/cli.py @@ -1,15 +1,14 @@ from typing_extensions import Annotated -import typing import pathlib -import dataclasses import typer import shutil -import json +import rich +from probe_py.manual.scp import scp_with_provenance import rich.console import rich.pretty from ..generated.parser import parse_probe_log, parse_probe_log_ctx from . import analysis -from .workflows import MakefileGenerator +from .workflows import MakefileGenerator, NextflowGenerator from .ssh_argparser import parse_ssh_args from . import file_closure from . import graph_utils @@ -17,6 +16,13 @@ import os import tempfile import enum +from .persistent_provenance_db import Process, ProcessInputs, ProcessThatWrites, get_engine +from sqlalchemy.orm import Session +from .analysis import ProcessNode, FileNode +import shlex +import datetime +import random +import socket console = rich.console.Console(stderr=True) @@ -110,6 +116,68 @@ def dataflow_graph( dataflow_graph = analysis.provlog_to_dataflow_graph(prov_log) graph_utils.serialize_graph(dataflow_graph, output) +def get_host_name() -> int: + hostname = socket.gethostname() + rng = random.Random(int(datetime.datetime.now().timestamp()) ^ hash(hostname)) + bits_per_hex_digit = 4 + hex_digits = 8 + random_number = rng.getrandbits(bits_per_hex_digit * hex_digits) + return random_number + +@export_app.command() +def store_dataflow_graph(probe_log: Annotated[ + pathlib.Path, + typer.Argument(help="output file written by `probe record -o $file`."), + ] = pathlib.Path("probe_log"))->None: + prov_log = parse_probe_log(probe_log) + dataflow_graph = analysis.provlog_to_dataflow_graph(prov_log) + engine = get_engine() + with Session(engine) as session: + for node in dataflow_graph.nodes(): + if isinstance(node, ProcessNode): + print(node) + new_process = Process(process_id = int(node.pid), parent_process_id = 0, cmd = shlex.join(node.cmd), time = datetime.datetime.now()) + session.add(new_process) + + for (node1, node2) in dataflow_graph.edges(): + if isinstance(node1, ProcessNode) and isinstance(node2, ProcessNode): + parent_process_id = node1.pid + child_process = session.get(Process, node2.pid) + if child_process: + child_process.parent_process_id = parent_process_id + + elif isinstance(node1, ProcessNode) and isinstance(node2, FileNode): + inode_info = node2.inodeOnDevice + host = get_host_name() + stat_info = os.stat(node2.file) + mtime = int(stat_info.st_mtime * 1_000_000_000) + size = stat_info.st_size + new_output_inode = ProcessThatWrites(inode = inode_info.inode, process_id = node1.pid, device_major = inode_info.device_major, device_minor = inode_info.device_minor, host = host, path = node2.file, mtime = mtime, size = size) + session.add(new_output_inode) + + elif isinstance(node1, FileNode) and isinstance(node2, ProcessNode): + inode_info = node1.inodeOnDevice + host = get_host_name() + stat_info = os.stat(node1.file) + mtime = int(stat_info.st_mtime * 1_000_000_000) + size = stat_info.st_size + new_input_inode = ProcessInputs(inode = inode_info.inode, process_id=node2.pid, device_major=inode_info.device_major, device_minor= inode_info.device_minor, host = host, path = node1.file, mtime=mtime, size=size) + session.add(new_input_inode) + + root_process = None + for node in dataflow_graph.nodes(): + if isinstance(node, ProcessNode): + pid = node.pid + process_record = session.get(Process, pid) + if process_record and process_record.parent_process_id == 0: + if root_process is not None: + print(f"Error: Two parent processes - {pid} and {root_process}") + session.rollback() + return + else: + root_process = pid + + session.commit() @export_app.command() def debug_text( @@ -227,8 +295,8 @@ def ssh( """ flags, destination, remote_host = parse_ssh_args(ssh_args) - - ssh_cmd = ["ssh"] + flags + + ssh_cmd = ["ssh"] + flags libprobe = pathlib.Path(os.environ["__PROBE_LIB"]) / ("libprobe-dbg.so" if debug else "libprobe.so") if not libprobe.exists(): @@ -324,45 +392,35 @@ def makefile( script = g.generate_makefile(dataflow_graph) output.write_text(script) - @export_app.command() -def ops_jsonl( +def nextflow( + output: Annotated[ + pathlib.Path, + typer.Argument(), + ] = pathlib.Path("nextflow.nf"), probe_log: Annotated[ pathlib.Path, typer.Argument(help="output file written by `probe record -o $file`."), ] = pathlib.Path("probe_log"), ) -> None: """ - Export each op to a JSON line + Export the probe_log to a Nextflow workflow """ - def filter_nested_dict( - dct: typing.Mapping[typing.Any, typing.Any], - ) -> typing.Mapping[typing.Any, typing.Any]: - return { - key: ( - filter_nested_dict(val) if isinstance(val, dict) else - val.decode(errors="surrogateescape") if isinstance(val, bytes) else - val - ) - for key, val in dct.items() - } - stdout_console = rich.console.Console() prov_log = parse_probe_log(probe_log) - for pid, process in prov_log.processes.items(): - for exec_epoch_no, exec_epoch in process.exec_epochs.items(): - for tid, thread in exec_epoch.threads.items(): - for i, op in enumerate(thread.ops): - stdout_console.print_json(json.dumps({ - "pid": pid, - "tid": tid, - "exec_epoch_no": exec_epoch_no, - "i": i, - "op": filter_nested_dict( - dataclasses.asdict(op), - ), - "op_data_type": type(op.data).__name__, - })) + dataflow_graph = analysis.provlog_to_dataflow_graph(prov_log) + g = NextflowGenerator() + output = pathlib.Path("nextflow.nf") + script = g.generate_workflow(dataflow_graph) + output.write_text(script) +# Example: scp Desktop/sample_example.txt root@136.183.142.28:/home/remote_dir +@app.command( +context_settings=dict( + ignore_unknown_options=True, + ), +) +def scp(cmd: list[str]) -> None: + scp_with_provenance(cmd) if __name__ == "__main__": app() diff --git a/probe_src/python/probe_py/manual/workflows.py b/probe_src/python/probe_py/manual/workflows.py index 4a93606a..db6f131a 100644 --- a/probe_src/python/probe_py/manual/workflows.py +++ b/probe_src/python/probe_py/manual/workflows.py @@ -3,17 +3,23 @@ import abc from typing import List, Set, Optional import pathlib - +import shutil +import os +import tempfile +import subprocess +from filecmp import cmp +import re """ All the cases we should take care of: -1- One Input, One Output +1- One Input, One Output [x] 2- One Input, Multiple Outputs 3- Multiple Inputs, One Output 4- Multiple Inputs, Multiple Outputs -5- Chained Commands: Inline commands that directly modify the input file (e.g., using sed, awk, or similar) -6- No Input Command: Commands like ls .: Commands that don't take an explicit input file but generate output -7- Ensure that any environment variables or context-specific settings are captured and translated into the Nextflow environment -8- File and Directory Structure Assumptions (Scripts that assume a specific directory structure, Commands that change the working directory (cd)) +5- Inline Commands: Inline commands that directly modify the input file (e.g., using sed, awk, or similar) [x] +6- Chained Commands: If a process node calls another script [x] +7- No Input Command: Commands like `ls .`: Commands that don't take an explicit input file but generate output [x] +8- Ensure that any environment variables or context-specific settings are captured and translated into the Nextflow environment +9- File and Directory Structure Assumptions (Scripts that assume a specific directory structure, Commands that change the working directory (cd)) ... """ class WorkflowGenerator(abc.ABC): @@ -67,6 +73,52 @@ def handle_standard_case(self, process: ProcessNode, inputs: List[FileNode], out \"\"\" }}""" + + + def handle_inline_case(self, process: ProcessNode, inputs: List[FileNode], outputs: List[FileNode]) -> str: + input_files = " ".join([f'path "{os.path.basename(file.file)}"' for file in inputs]) + output_files = " ".join( + [f'path "{os.path.splitext(os.path.basename(file.file))[0]}_modified{os.path.splitext(file.file)[1]}"' for + file in inputs]) + + # Build inline commands for each file to perform copy, edit, and rename steps + script_commands = [] + for file in inputs: + base_name = os.path.basename(file.file) + temp_name = f"temp_{base_name}" + final_name = f"{os.path.splitext(base_name)[0]}_modified{os.path.splitext(base_name)[1]}" + + # Replace the original filename in the command with the temp filename + modified_cmd = [] + for cmd in process.cmd: + # Substitute all occurrences of the original filename in each command + cmd_modified = re.sub(r"/(?:[a-zA-Z0-9_\-./]+/)*([a-zA-Z0-9_\-]+\.txt)", temp_name, cmd) + modified_cmd.append(cmd_modified) + + script_commands.extend([ + f'cp {file.file} {temp_name}', # Copy to temp file + " ".join(modified_cmd), # Apply inline edit with temp filename + f'mv {temp_name} {final_name}' # Rename temp file to final output + ]) + + # Join script commands with newline and indentation for Nextflow process + script_block = "\n ".join(script_commands) + + # Create the Nextflow process block + return f""" +process process_{id(process)} {{ + input: + {input_files} + + output: + {output_files} + + script: + \"\"\" + {script_block} + \"\"\" +}}""" + def handle_dynamic_filenames(self, process: ProcessNode, inputs: List[FileNode], outputs: List[FileNode]) -> str: input_files = " ".join([f'path "{file.file}"\n ' for file in inputs]) output_files = " ".join([f'path "{file.file}"\n ' for file in outputs if file.file]) @@ -116,11 +168,55 @@ def handle_custom_shells(self, process: ProcessNode) -> str: \"\"\" }}""" - + def is_inline_editing_command_sandbox(self, command: str, input_files: list[FileNode]) -> bool: + """ + Determine if a command modifies any of the input files in-place, even if the content remains the same. + """ + with tempfile.TemporaryDirectory() as temp_dir: + sandbox_files = {} + + # Track original modification times and create sandbox files + original_times = {} + sandbox_command = command + for input_file in input_files: + temp_file = os.path.join(temp_dir, os.path.basename(input_file.file)) + shutil.copy(input_file.file, temp_file) + sandbox_files[input_file.file] = temp_file + + # Save original modification time + original_times[input_file.file] = os.path.getmtime(input_file.file) + sandbox_command = sandbox_command.replace(input_file.file, temp_file) + + # Run the command in the sandbox + try: + subprocess.run(sandbox_command, shell=True, check=True) + except subprocess.CalledProcessError: + print("Command failed to execute.") + return False + + # Check if any of the files were modified in-place + for original_file, sandbox_file in sandbox_files.items(): + # Get the modified time of the sandboxed file after command execution + sandbox_mod_time = os.path.getmtime(sandbox_file) + original_mod_time = original_times[original_file] + + # Compare content and modification times + content_modified = not cmp(original_file, sandbox_file, shallow=False) + time_modified = sandbox_mod_time != original_mod_time + + # If either the content or modification time has changed, it's an in-place modification + if content_modified or time_modified: + return True + + # Return False if none of the files were modified + return False def is_standard_case(self, process: ProcessNode, inputs: List[FileNode], outputs: List[FileNode]) -> bool: return len(inputs) >= 1 and len(outputs) == 1 + def is_inline_case(self, process: ProcessNode, inputs: List[FileNode], outputs: List[FileNode]) -> bool: + return self.is_inline_editing_command_sandbox(' '.join(process.cmd), inputs) + def is_multiple_output_case(self, process: ProcessNode, inputs: List[FileNode], outputs: List[FileNode]) -> bool: return len(inputs) >= 1 and len(outputs) >= 1 @@ -139,7 +235,7 @@ def create_processes(self) -> None: inputs = [n for n in self.graph.predecessors(node) if isinstance(n, FileNode)] outputs = [n for n in self.graph.successors(node) if isinstance(n, FileNode)] - if self.is_standard_case(node, inputs, outputs) : + if self.is_standard_case(node, inputs, outputs): process_script = self.handle_standard_case(node, inputs, outputs) self.nextflow_script.append(process_script) self.workflow.append(f"{self.escape_filename_for_nextflow(outputs[0].label)} = process_{id(node)}({', '.join([self.escape_filename_for_nextflow(i.label) for i in inputs])})") @@ -149,6 +245,10 @@ def create_processes(self) -> None: process_script = self.handle_dynamic_filenames(node, inputs, outputs) elif self.is_parallel_execution(node): process_script = self.handle_parallel_execution(node) + elif self.is_inline_case(node, inputs, outputs): + process_script = self.handle_inline_case(node, inputs, outputs) + self.nextflow_script.append(process_script) + self.workflow.append(f"process_{id(node)}({', '.join([self.escape_filename_for_nextflow(i.label) for i in inputs])})") else: process_script = self.handle_custom_shells(node) self.nextflow_script.append(process_script) diff --git a/probe_src/python/pyproject.toml b/probe_src/python/pyproject.toml index 5b0d08b5..10af7eaf 100644 --- a/probe_src/python/pyproject.toml +++ b/probe_src/python/pyproject.toml @@ -15,10 +15,10 @@ dynamic = ["version", "description"] dependencies = [ "probe_py.generated", "networkx", - "pygraphviz", "pydot", "rich", "typer", + "xdg-base-dirs" ] [project.urls]