Skip to content

Commit

Permalink
Handling cases in Nextflow (#74)
Browse files Browse the repository at this point in the history
* Handling inline commands and chained scripts

* fixing

* fix missing import

* Minor changes

---------

Co-authored-by: Sam Grayson <[email protected]>
  • Loading branch information
kyrillosishak and charmoniumQ authored Dec 11, 2024
1 parent 86c3dce commit 194570c
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 36 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
**/.nextflow
**/work
**/.pytest_cache
**/.idea

# build directories
**/target
Expand Down
25 changes: 0 additions & 25 deletions Makefile

This file was deleted.

27 changes: 24 additions & 3 deletions probe_src/python/probe_py/manual/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import rich.pretty
from ..generated.parser import parse_probe_log, parse_probe_log_ctx
from . import analysis
from .workflows import MakefileGenerator
from .workflows import MakefileGenerator, NextflowGenerator
from .ssh_argparser import parse_ssh_args
from . import file_closure
from . import graph_utils
Expand Down Expand Up @@ -293,8 +293,8 @@ def ssh(
"""

flags, destination, remote_host = parse_ssh_args(ssh_args)
ssh_cmd = ["ssh"] + flags

ssh_cmd = ["ssh"] + flags

libprobe = pathlib.Path(os.environ["__PROBE_LIB"]) / ("libprobe-dbg.so" if debug else "libprobe.so")
if not libprobe.exists():
Expand Down Expand Up @@ -390,6 +390,27 @@ def makefile(
script = g.generate_makefile(dataflow_graph)
output.write_text(script)

@export_app.command()
def nextflow(
output: Annotated[
pathlib.Path,
typer.Argument(),
] = pathlib.Path("nextflow.nf"),
probe_log: Annotated[
pathlib.Path,
typer.Argument(help="output file written by `probe record -o $file`."),
] = pathlib.Path("probe_log"),
) -> None:
"""
Export the probe_log to a Nextflow workflow
"""
prov_log = parse_probe_log(probe_log)
dataflow_graph = analysis.provlog_to_dataflow_graph(prov_log)
g = NextflowGenerator()
output = pathlib.Path("nextflow.nf")
script = g.generate_workflow(dataflow_graph)
output.write_text(script)


if __name__ == "__main__":
app()
Expand Down
116 changes: 108 additions & 8 deletions probe_src/python/probe_py/manual/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,23 @@
import abc
from typing import List, Set, Optional
import pathlib

import shutil
import os
import tempfile
import subprocess
from filecmp import cmp
import re
"""
All the cases we should take care of:
1- One Input, One Output
1- One Input, One Output [x]
2- One Input, Multiple Outputs
3- Multiple Inputs, One Output
4- Multiple Inputs, Multiple Outputs
5- Chained Commands: Inline commands that directly modify the input file (e.g., using sed, awk, or similar)
6- No Input Command: Commands like ls .: Commands that don't take an explicit input file but generate output
7- Ensure that any environment variables or context-specific settings are captured and translated into the Nextflow environment
8- File and Directory Structure Assumptions (Scripts that assume a specific directory structure, Commands that change the working directory (cd))
5- Inline Commands: Inline commands that directly modify the input file (e.g., using sed, awk, or similar) [x]
6- Chained Commands: If a process node calls another script [x]
7- No Input Command: Commands like `ls .`: Commands that don't take an explicit input file but generate output [x]
8- Ensure that any environment variables or context-specific settings are captured and translated into the Nextflow environment
9- File and Directory Structure Assumptions (Scripts that assume a specific directory structure, Commands that change the working directory (cd))
...
"""
class WorkflowGenerator(abc.ABC):
Expand Down Expand Up @@ -67,6 +73,52 @@ def handle_standard_case(self, process: ProcessNode, inputs: List[FileNode], out
\"\"\"
}}"""



def handle_inline_case(self, process: ProcessNode, inputs: List[FileNode], outputs: List[FileNode]) -> str:
input_files = " ".join([f'path "{os.path.basename(file.file)}"' for file in inputs])
output_files = " ".join(
[f'path "{os.path.splitext(os.path.basename(file.file))[0]}_modified{os.path.splitext(file.file)[1]}"' for
file in inputs])

# Build inline commands for each file to perform copy, edit, and rename steps
script_commands = []
for file in inputs:
base_name = os.path.basename(file.file)
temp_name = f"temp_{base_name}"
final_name = f"{os.path.splitext(base_name)[0]}_modified{os.path.splitext(base_name)[1]}"

# Replace the original filename in the command with the temp filename
modified_cmd = []
for cmd in process.cmd:
# Substitute all occurrences of the original filename in each command
cmd_modified = re.sub(r"/(?:[a-zA-Z0-9_\-./]+/)*([a-zA-Z0-9_\-]+\.txt)", temp_name, cmd)
modified_cmd.append(cmd_modified)

script_commands.extend([
f'cp {file.file} {temp_name}', # Copy to temp file
" ".join(modified_cmd), # Apply inline edit with temp filename
f'mv {temp_name} {final_name}' # Rename temp file to final output
])

# Join script commands with newline and indentation for Nextflow process
script_block = "\n ".join(script_commands)

# Create the Nextflow process block
return f"""
process process_{id(process)} {{
input:
{input_files}
output:
{output_files}
script:
\"\"\"
{script_block}
\"\"\"
}}"""

def handle_dynamic_filenames(self, process: ProcessNode, inputs: List[FileNode], outputs: List[FileNode]) -> str:
input_files = " ".join([f'path "{file.file}"\n ' for file in inputs])
output_files = " ".join([f'path "{file.file}"\n ' for file in outputs if file.file])
Expand Down Expand Up @@ -116,11 +168,55 @@ def handle_custom_shells(self, process: ProcessNode) -> str:
\"\"\"
}}"""


def is_inline_editing_command_sandbox(self, command: str, input_files: list[FileNode]) -> bool:
"""
Determine if a command modifies any of the input files in-place, even if the content remains the same.
"""
with tempfile.TemporaryDirectory() as temp_dir:
sandbox_files = {}

# Track original modification times and create sandbox files
original_times = {}
sandbox_command = command
for input_file in input_files:
temp_file = os.path.join(temp_dir, os.path.basename(input_file.file))
shutil.copy(input_file.file, temp_file)
sandbox_files[input_file.file] = temp_file

# Save original modification time
original_times[input_file.file] = os.path.getmtime(input_file.file)
sandbox_command = sandbox_command.replace(input_file.file, temp_file)

# Run the command in the sandbox
try:
subprocess.run(sandbox_command, shell=True, check=True)
except subprocess.CalledProcessError:
print("Command failed to execute.")
return False

# Check if any of the files were modified in-place
for original_file, sandbox_file in sandbox_files.items():
# Get the modified time of the sandboxed file after command execution
sandbox_mod_time = os.path.getmtime(sandbox_file)
original_mod_time = original_times[original_file]

# Compare content and modification times
content_modified = not cmp(original_file, sandbox_file, shallow=False)
time_modified = sandbox_mod_time != original_mod_time

# If either the content or modification time has changed, it's an in-place modification
if content_modified or time_modified:
return True

# Return False if none of the files were modified
return False

def is_standard_case(self, process: ProcessNode, inputs: List[FileNode], outputs: List[FileNode]) -> bool:
return len(inputs) >= 1 and len(outputs) == 1

def is_inline_case(self, process: ProcessNode, inputs: List[FileNode], outputs: List[FileNode]) -> bool:
return self.is_inline_editing_command_sandbox(' '.join(process.cmd), inputs)

def is_multiple_output_case(self, process: ProcessNode, inputs: List[FileNode], outputs: List[FileNode]) -> bool:
return len(inputs) >= 1 and len(outputs) >= 1

Expand All @@ -139,7 +235,7 @@ def create_processes(self) -> None:
inputs = [n for n in self.graph.predecessors(node) if isinstance(n, FileNode)]
outputs = [n for n in self.graph.successors(node) if isinstance(n, FileNode)]

if self.is_standard_case(node, inputs, outputs) :
if self.is_standard_case(node, inputs, outputs):
process_script = self.handle_standard_case(node, inputs, outputs)
self.nextflow_script.append(process_script)
self.workflow.append(f"{self.escape_filename_for_nextflow(outputs[0].label)} = process_{id(node)}({', '.join([self.escape_filename_for_nextflow(i.label) for i in inputs])})")
Expand All @@ -149,6 +245,10 @@ def create_processes(self) -> None:
process_script = self.handle_dynamic_filenames(node, inputs, outputs)
elif self.is_parallel_execution(node):
process_script = self.handle_parallel_execution(node)
elif self.is_inline_case(node, inputs, outputs):
process_script = self.handle_inline_case(node, inputs, outputs)
self.nextflow_script.append(process_script)
self.workflow.append(f"process_{id(node)}({', '.join([self.escape_filename_for_nextflow(i.label) for i in inputs])})")
else:
process_script = self.handle_custom_shells(node)
self.nextflow_script.append(process_script)
Expand Down

0 comments on commit 194570c

Please sign in to comment.