Skip to content

Commit

Permalink
feat: schema for persistence provenance and function to store the dat…
Browse files Browse the repository at this point in the history
…aflow graph (#75)

Co-authored-by: Sam Grayson <[email protected]>
  • Loading branch information
Shofiya2003 and charmoniumQ authored Dec 11, 2024
1 parent b13c3e9 commit 86c3dce
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 0 deletions.
5 changes: 5 additions & 0 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,15 @@
];
src = ./probe_src/python;
propagatedBuildInputs = [
# Packages the client will need
frontend.packages.probe-py-generated
python.pkgs.networkx
python.pkgs.pygraphviz
python.pkgs.pydot
python.pkgs.rich
python.pkgs.typer
python.pkgs.sqlalchemy
python.pkgs.xdg-base-dirs
];
nativeCheckInputs = [
frontend.packages.probe-py-generated
Expand Down Expand Up @@ -197,6 +200,8 @@
pypkgs.pydot
pypkgs.rich
pypkgs.typer
pypkgs.sqlalchemy
pypkgs.xdg-base-dirs

# probe_py.manual "dev time" requirements
pypkgs.psutil
Expand Down
69 changes: 69 additions & 0 deletions probe_src/python/probe_py/manual/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@
import os
import tempfile
import enum
from .persistent_provenance_db import Process, ProcessInputs, ProcessThatWrites, get_engine
from sqlalchemy.orm import Session
from .analysis import ProcessNode, FileNode
import shlex
import datetime
import random
import socket


console = rich.console.Console(stderr=True)
Expand Down Expand Up @@ -107,6 +114,68 @@ def dataflow_graph(
dataflow_graph = analysis.provlog_to_dataflow_graph(prov_log)
graph_utils.serialize_graph(dataflow_graph, output)

def get_host_name() -> int:
hostname = socket.gethostname()
rng = random.Random(int(datetime.datetime.now().timestamp()) ^ hash(hostname))
bits_per_hex_digit = 4
hex_digits = 8
random_number = rng.getrandbits(bits_per_hex_digit * hex_digits)
return random_number

@export_app.command()
def store_dataflow_graph(probe_log: Annotated[
pathlib.Path,
typer.Argument(help="output file written by `probe record -o $file`."),
] = pathlib.Path("probe_log"))->None:
prov_log = parse_probe_log(probe_log)
dataflow_graph = analysis.provlog_to_dataflow_graph(prov_log)
engine = get_engine()
with Session(engine) as session:
for node in dataflow_graph.nodes():
if isinstance(node, ProcessNode):
print(node)
new_process = Process(process_id = int(node.pid), parent_process_id = 0, cmd = shlex.join(node.cmd), time = datetime.datetime.now())
session.add(new_process)

for (node1, node2) in dataflow_graph.edges():
if isinstance(node1, ProcessNode) and isinstance(node2, ProcessNode):
parent_process_id = node1.pid
child_process = session.get(Process, node2.pid)
if child_process:
child_process.parent_process_id = parent_process_id

elif isinstance(node1, ProcessNode) and isinstance(node2, FileNode):
inode_info = node2.inodeOnDevice
host = get_host_name()
stat_info = os.stat(node2.file)
mtime = int(stat_info.st_mtime * 1_000_000_000)
size = stat_info.st_size
new_output_inode = ProcessThatWrites(inode = inode_info.inode, process_id = node1.pid, device_major = inode_info.device_major, device_minor = inode_info.device_minor, host = host, path = node2.file, mtime = mtime, size = size)
session.add(new_output_inode)

elif isinstance(node1, FileNode) and isinstance(node2, ProcessNode):
inode_info = node1.inodeOnDevice
host = get_host_name()
stat_info = os.stat(node1.file)
mtime = int(stat_info.st_mtime * 1_000_000_000)
size = stat_info.st_size
new_input_inode = ProcessInputs(inode = inode_info.inode, process_id=node2.pid, device_major=inode_info.device_major, device_minor= inode_info.device_minor, host = host, path = node1.file, mtime=mtime, size=size)
session.add(new_input_inode)

root_process = None
for node in dataflow_graph.nodes():
if isinstance(node, ProcessNode):
pid = node.pid
process_record = session.get(Process, pid)
if process_record and process_record.parent_process_id == 0:
if root_process is not None:
print(f"Error: Two parent processes - {pid} and {root_process}")
session.rollback()
return
else:
root_process = pid

session.commit()

@export_app.command()
def debug_text(
Expand Down
59 changes: 59 additions & 0 deletions probe_src/python/probe_py/manual/persistent_provenance_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from sqlalchemy import create_engine, DateTime
from sqlalchemy.orm import DeclarativeBase, mapped_column, Mapped
from sqlalchemy.engine import Engine
import xdg_base_dirs
import pathlib
from datetime import datetime

class Base(DeclarativeBase):
pass

_engine = None
def get_engine()->Engine:
global _engine
if _engine is None:
home = pathlib.Path(xdg_base_dirs.xdg_data_home())
home.mkdir(parents=True, exist_ok=True)
database_path = home / "probe_log.db"

_engine = create_engine(f'sqlite:///{database_path}', echo=True)
Base.metadata.create_all(_engine)
return _engine

class ProcessThatWrites(Base):
__tablename__ = 'process_that_writes'

id: Mapped[int] = mapped_column(primary_key=True, auto_increment=True)
inode: Mapped[int]
process_id: Mapped[int]
device_major: Mapped[int]
device_minor: Mapped[int]
host: Mapped[int]
path: Mapped[str]
mtime_sec: Mapped[int]
mtime_nsec: Mapped[int]
size: Mapped[int]


class Process(Base):
__tablename__ = 'process'

process_id: Mapped[int] = mapped_column(primary_key=True)
parent_process_id: Mapped[int]
cmd: Mapped[str]
time: Mapped[datetime] = mapped_column(DateTime)


class ProcessInputs(Base):
__tablename__ = 'process_inputs'

id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
inode: Mapped[int]
process_id: Mapped[int]
device_major: Mapped[int]
device_minor: Mapped[int]
host: Mapped[int]
path: Mapped[str]
mtime_sec: Mapped[int]
mtime_nsec: Mapped[int]
size: Mapped[int]

0 comments on commit 86c3dce

Please sign in to comment.