Skip to content

Commit

Permalink
Implement eager copy
Browse files Browse the repository at this point in the history
  • Loading branch information
charmoniumQ committed Jan 18, 2025
1 parent 82ea80e commit ef63ddc
Show file tree
Hide file tree
Showing 7 changed files with 276 additions and 100 deletions.
34 changes: 6 additions & 28 deletions probe_src/libprobe/arena/include/arena.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,28 +66,6 @@ static size_t __arena_align(size_t offset, size_t alignment) {
#define __ARENA_FNAME_LENGTH 64
#define __ARENA_FNAME "%ld.dat"

/* by defining this as inline static it should have zero runtime overhead and
* not produce any symbols outside this compilation unit (file). */
inline static int msync_and_munmap(void* addr, size_t length) {
int ret = msync(addr, length, MS_SYNC);
if (ret != 0) {
#ifdef ARENA_PERROR
perror("msync_and_munmap: msync");
#endif
return -1;
}

ret = munmap(addr, length);
if (ret != 0) {
#ifdef ARENA_PERROR
perror("msync_and_munmap: munmap");
#endif
return -1;
}

return 0;
}

/* Instantiate a new mmap-ed file in the arena dir. */
static int __arena_reinstantiate(struct ArenaDir* arena_dir, size_t capacity) {
/* Create a new mmap */
Expand Down Expand Up @@ -266,10 +244,10 @@ __attribute__((unused)) static int arena_destroy(struct ArenaDir* arena_dir) {
while (current) {
for (size_t i = 0; i < current->next_free_slot; ++i) {
if (current->arena_list[i] != NULL) {
int ret = msync_and_munmap(current->arena_list[i]->base_address, current->arena_list[i]->capacity);
int ret = munmap(current->arena_list[i]->base_address, current->arena_list[i]->capacity);
if (ret != 0) {
#ifdef ARENA_PERROR
fprintf(stderr, "arena_destroy: msync and munmap failed\n");
perror("arena_create: arena_destroy");
#endif
return -1;
}
Expand Down Expand Up @@ -302,10 +280,10 @@ __attribute__((unused)) static int arena_drop_after_fork(struct ArenaDir* arena_
while (current) {
for (size_t i = 0; i < current->next_free_slot; ++i) {
if (current->arena_list[i] != NULL) {
int ret = msync_and_munmap(current->arena_list[i]->base_address, current->arena_list[i]->capacity);
int ret = munmap(current->arena_list[i]->base_address, current->arena_list[i]->capacity);
if (ret != 0) {
#ifdef ARENA_PERROR
fprintf(stderr, "arena_drop_after_fork: msync and munmap failed\n");
perror("arena_create: arena_uninstantiate_all_but_last");
#endif
return -1;
}
Expand All @@ -329,10 +307,10 @@ __attribute__((unused)) static int arena_uninstantiate_all_but_last(struct Arena
while (current) {
for (size_t i = 0; i + ((size_t) is_tail) < current->next_free_slot; ++i) {
if (current->arena_list[i] != NULL) {
int ret = msync_and_munmap(current->arena_list[i]->base_address, current->arena_list[i]->capacity);
int ret = munmap(current->arena_list[i]->base_address, current->arena_list[i]->capacity);
if (ret != 0) {
#ifdef ARENA_PERROR
fprintf(stderr, "arena_uninstantiate_all_but_last: msync and munmap failed\n");
perror("arena_create: arena_uninstantiate_all_but_last");
#endif
return -1;
}
Expand Down
37 changes: 27 additions & 10 deletions probe_src/libprobe/src/global_state.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,25 +88,42 @@ static int get_exec_epoch_safe() {
return __exec_epoch;
}

static int __copy_files = -1;
static char __copy_files = ' ';
static const char* copy_files_env_var = PRIVATE_ENV_VAR_PREFIX "COPY_FILES";
struct InodeTable read_inodes;
struct InodeTable copied_or_overwritten_inodes;
static void init_copy_files() {
assert(__copy_files == -1);
assert(__copy_files == ' ');
const char* copy_files_str = debug_getenv(copy_files_env_var);
if (copy_files_str != NULL && copy_files_str[0] != 0) {
__copy_files = 1;
inode_table_init(&read_inodes);
inode_table_init(&copied_or_overwritten_inodes);
if (copy_files_str) {
__copy_files = copy_files_str[0];
} else {
__copy_files = 0;
__copy_files = '\0';
}
DEBUG("Is copy files? %d", __copy_files);
DEBUG("Copy files? %c", __copy_files);
switch (__copy_files) {
case '\0':
break;
case 'e': /* eagerly copy files */
case 'l': /* lazily copy files */
inode_table_init(&read_inodes);
inode_table_init(&copied_or_overwritten_inodes);
break;
default:
ERROR("copy_files has invalid value %c", __copy_files);
break;
}
}
static bool should_copy_files_eagerly() {
assert(__copy_files == '\0' || __copy_files == 'e' || __copy_files == 'l');
return __copy_files == 'e';
}
static bool should_copy_files_lazily() {
assert(__copy_files == '\0' || __copy_files == 'e' || __copy_files == 'l');
return __copy_files == 'l';
}
static bool should_copy_files() {
assert(__copy_files == 1 || __copy_files == 0);
return __copy_files;
return should_copy_files_eagerly() || should_copy_files_lazily();
}

static int mkdir_and_descend(int my_dirfd, const char* name, long child, bool mkdir, bool close) {
Expand Down
57 changes: 40 additions & 17 deletions probe_src/libprobe/src/prov_buffer.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,24 @@ bool is_replace_op(struct Op op) {
return op.op_code == open_op_code && (op.data.open.flags & O_TRUNC || op.data.open.flags & O_CREAT);
}

int copy(const struct Path* path) {
int copy_to_store(const struct Path* path) {
static char dst_path[PATH_MAX];
path_to_id_string(path, dst_path);
return copy_file(path->dirfd_minus_at_fdcwd + AT_FDCWD, path->path, get_inodes_dirfd(), dst_path, path->size);
/*
** We take precautions to avoid calling copy(f) if copy(f) is already called in the same process.
** But it may have been already called in a different process!
** Especially coreutils used in every script.
*/

int dst_dirfd = get_inodes_dirfd();
int access = unwrapped_faccessat(dst_dirfd, dst_path, F_OK, 0);
if (access == 0) {
DEBUG("Already exists %s %d", path->path, path->inode);
return 0;
} else {
DEBUG("Copying %s %d", path->path, path->inode);
return copy_file(path->dirfd_minus_at_fdcwd + AT_FDCWD, path->path, dst_dirfd, dst_path, path->size);
}
}

/*
Expand All @@ -46,26 +60,35 @@ static void prov_log_try(struct Op op) {

const struct Path* path = op_to_path(&op);
if (should_copy_files() && path->path && path->stat_valid) {
if (is_read_op(op)) {
DEBUG("Reading %s %d", path->path, path->inode);
inode_table_put_if_not_exists(&read_inodes, path);
} else if (is_mutate_op(op)) {
if (inode_table_put_if_not_exists(&copied_or_overwritten_inodes, path)) {
DEBUG("Mutating, but not copying %s %d since it is copied already or overwritten", path->path, path->inode);
} else {
DEBUG("Mutating, therefore copying %s %d", path->path, path->inode);
copy(path);
}
} else if (is_replace_op(op)) {
if (inode_table_contains(&read_inodes, path)) {
if (should_copy_files_lazily()) {
if (is_read_op(op)) {
DEBUG("Reading %s %d", path->path, path->inode);
inode_table_put_if_not_exists(&read_inodes, path);
} else if (is_mutate_op(op)) {
if (inode_table_put_if_not_exists(&copied_or_overwritten_inodes, path)) {
DEBUG("Mutating, but not copying %s %d since it is copied already or overwritten", path->path, path->inode);
} else {
DEBUG("Replace after read %s %d", path->path, path->inode);
copy(path);
DEBUG("Mutating, therefore copying %s %d", path->path, path->inode);
copy_to_store(path);
}
} else if (is_replace_op(op)) {
if (inode_table_contains(&read_inodes, path)) {
if (inode_table_put_if_not_exists(&copied_or_overwritten_inodes, path)) {
DEBUG("Mutating, but not copying %s %d since it is copied already or overwritten", path->path, path->inode);
} else {
DEBUG("Replace after read %s %d", path->path, path->inode);
copy_to_store(path);
}
} else {
DEBUG("Mutating, but not copying %s %d since it was never read", path->path, path->inode);
}
}
} else if (is_read_op(op) || is_mutate_op(op)) {
assert(should_copy_files_eagerly());
if (inode_table_put_if_not_exists(&copied_or_overwritten_inodes, path)) {
DEBUG("Not copying %s %d because already did", path->path, path->inode);
} else {
DEBUG("Mutating, but not copying %s %d since it was never read", path->path, path->inode);
copy_to_store(path);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion probe_src/libprobe/src/prov_ops.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ static struct Path create_path_lazy(int dirfd, BORROWED const char* path, int fl
* Then again, this could happen in the tracee's code too...
* TODO: Remove this once I debug myself.
* */
assert(path == NULL || path[0] != '\0' || flags & AT_EMPTY_PATH);
//assert(path == NULL || (path[0] != '\0' || flags & AT_EMPTY_PATH));

/*
* if path == NULL, then the target is the dir specified by dirfd.
Expand Down
128 changes: 93 additions & 35 deletions probe_src/python/probe_py/manual/cli.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,28 @@
from typing_extensions import Annotated
import typing
import pathlib
import dataclasses
import typer
import shutil
import json
import rich
from probe_py.manual.scp import scp_with_provenance
import rich.console
import rich.pretty
from ..generated.parser import parse_probe_log, parse_probe_log_ctx
from . import analysis
from .workflows import MakefileGenerator
from .workflows import MakefileGenerator, NextflowGenerator
from .ssh_argparser import parse_ssh_args
from . import file_closure
from . import graph_utils
import subprocess
import os
import tempfile
import enum
from .persistent_provenance_db import Process, ProcessInputs, ProcessThatWrites, get_engine
from sqlalchemy.orm import Session
from .analysis import ProcessNode, FileNode
import shlex
import datetime
import random
import socket


console = rich.console.Console(stderr=True)
Expand Down Expand Up @@ -110,6 +116,68 @@ def dataflow_graph(
dataflow_graph = analysis.provlog_to_dataflow_graph(prov_log)
graph_utils.serialize_graph(dataflow_graph, output)

def get_host_name() -> int:
hostname = socket.gethostname()
rng = random.Random(int(datetime.datetime.now().timestamp()) ^ hash(hostname))
bits_per_hex_digit = 4
hex_digits = 8
random_number = rng.getrandbits(bits_per_hex_digit * hex_digits)
return random_number

@export_app.command()
def store_dataflow_graph(probe_log: Annotated[
pathlib.Path,
typer.Argument(help="output file written by `probe record -o $file`."),
] = pathlib.Path("probe_log"))->None:
prov_log = parse_probe_log(probe_log)
dataflow_graph = analysis.provlog_to_dataflow_graph(prov_log)
engine = get_engine()
with Session(engine) as session:
for node in dataflow_graph.nodes():
if isinstance(node, ProcessNode):
print(node)
new_process = Process(process_id = int(node.pid), parent_process_id = 0, cmd = shlex.join(node.cmd), time = datetime.datetime.now())
session.add(new_process)

for (node1, node2) in dataflow_graph.edges():
if isinstance(node1, ProcessNode) and isinstance(node2, ProcessNode):
parent_process_id = node1.pid
child_process = session.get(Process, node2.pid)
if child_process:
child_process.parent_process_id = parent_process_id

elif isinstance(node1, ProcessNode) and isinstance(node2, FileNode):
inode_info = node2.inodeOnDevice
host = get_host_name()
stat_info = os.stat(node2.file)
mtime = int(stat_info.st_mtime * 1_000_000_000)
size = stat_info.st_size
new_output_inode = ProcessThatWrites(inode = inode_info.inode, process_id = node1.pid, device_major = inode_info.device_major, device_minor = inode_info.device_minor, host = host, path = node2.file, mtime = mtime, size = size)
session.add(new_output_inode)

elif isinstance(node1, FileNode) and isinstance(node2, ProcessNode):
inode_info = node1.inodeOnDevice
host = get_host_name()
stat_info = os.stat(node1.file)
mtime = int(stat_info.st_mtime * 1_000_000_000)
size = stat_info.st_size
new_input_inode = ProcessInputs(inode = inode_info.inode, process_id=node2.pid, device_major=inode_info.device_major, device_minor= inode_info.device_minor, host = host, path = node1.file, mtime=mtime, size=size)
session.add(new_input_inode)

root_process = None
for node in dataflow_graph.nodes():
if isinstance(node, ProcessNode):
pid = node.pid
process_record = session.get(Process, pid)
if process_record and process_record.parent_process_id == 0:
if root_process is not None:
print(f"Error: Two parent processes - {pid} and {root_process}")
session.rollback()
return
else:
root_process = pid

session.commit()

@export_app.command()
def debug_text(
Expand Down Expand Up @@ -227,8 +295,8 @@ def ssh(
"""

flags, destination, remote_host = parse_ssh_args(ssh_args)
ssh_cmd = ["ssh"] + flags

ssh_cmd = ["ssh"] + flags

libprobe = pathlib.Path(os.environ["__PROBE_LIB"]) / ("libprobe-dbg.so" if debug else "libprobe.so")
if not libprobe.exists():
Expand Down Expand Up @@ -324,45 +392,35 @@ def makefile(
script = g.generate_makefile(dataflow_graph)
output.write_text(script)


@export_app.command()
def ops_jsonl(
def nextflow(
output: Annotated[
pathlib.Path,
typer.Argument(),
] = pathlib.Path("nextflow.nf"),
probe_log: Annotated[
pathlib.Path,
typer.Argument(help="output file written by `probe record -o $file`."),
] = pathlib.Path("probe_log"),
) -> None:
"""
Export each op to a JSON line
Export the probe_log to a Nextflow workflow
"""
def filter_nested_dict(
dct: typing.Mapping[typing.Any, typing.Any],
) -> typing.Mapping[typing.Any, typing.Any]:
return {
key: (
filter_nested_dict(val) if isinstance(val, dict) else
val.decode(errors="surrogateescape") if isinstance(val, bytes) else
val
)
for key, val in dct.items()
}
stdout_console = rich.console.Console()
prov_log = parse_probe_log(probe_log)
for pid, process in prov_log.processes.items():
for exec_epoch_no, exec_epoch in process.exec_epochs.items():
for tid, thread in exec_epoch.threads.items():
for i, op in enumerate(thread.ops):
stdout_console.print_json(json.dumps({
"pid": pid,
"tid": tid,
"exec_epoch_no": exec_epoch_no,
"i": i,
"op": filter_nested_dict(
dataclasses.asdict(op),
),
"op_data_type": type(op.data).__name__,
}))
dataflow_graph = analysis.provlog_to_dataflow_graph(prov_log)
g = NextflowGenerator()
output = pathlib.Path("nextflow.nf")
script = g.generate_workflow(dataflow_graph)
output.write_text(script)

# Example: scp Desktop/sample_example.txt [email protected]:/home/remote_dir
@app.command(
context_settings=dict(
ignore_unknown_options=True,
),
)
def scp(cmd: list[str]) -> None:
scp_with_provenance(cmd)

if __name__ == "__main__":
app()
Expand Down
Loading

0 comments on commit ef63ddc

Please sign in to comment.