Skip to content

Commit

Permalink
Editting NextflowGenerator class
Browse files Browse the repository at this point in the history
  • Loading branch information
kyrillosishak committed Sep 20, 2024
1 parent 8b6697f commit 1cccdc0
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 44 deletions.
30 changes: 14 additions & 16 deletions probe_src/python/probe_py/manual/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import pydot
from probe_py.manual.workflows import *
from probe_py.manual.analysis import *
import pickle
import enum

rich.traceback.install(show_locals=False)

Expand Down Expand Up @@ -154,10 +154,6 @@ def dataflow_graph(

dot_string, dataflow_graph = analysis.provlog_to_dataflow_graph(prov_log)
print(dot_string)
import pickle
with open(output, 'wb') as f:
pickle.dump(dataflow_graph, f)


@app.command()
def dump(
Expand All @@ -180,12 +176,15 @@ def dump(
print(pid, exid, tid, op_no, op.data)
print()

class OutputFormat(str, enum.Enum):
makefile = "makefile"
nextflow = "nextflow"

@app.command()
def export(
input: pathlib.Path = pathlib.Path("dataflow_graph.pkl"),
makefile : bool = typer.Option(default=False, help="export in makefile format"),
nextflow : bool = typer.Option(default=True, help="export in nextflow format"),
output : pathlib.Path = pathlib.Path("workflow"),
input: pathlib.Path = pathlib.Path("probe_log"),
output_format: OutputFormat = typer.Option(OutputFormat.nextflow, help="Select output format", show_default=True),
output: pathlib.Path = pathlib.Path("workflow"),
) -> None:
"""
Export the dataflow graph to Workflow (Nextflow and Makefile).
Expand All @@ -195,18 +194,17 @@ def export(
typer.secho(f"INPUT {input} does not exist", fg=typer.colors.RED)
raise typer.Abort()

prov_log = parse_probe_log(input)
_, dataflow_graph = analysis.provlog_to_dataflow_graph(prov_log)

with open(input, 'rb') as f:
dataflow_graph = pickle.load(f)

if nextflow :
generator = NextflowGenerator(dataflow_graph)
script = generator.generate_workflow()
if output_format == OutputFormat.nextflow :
generator = NextflowGenerator()
script = generator.generate_workflow(dataflow_graph)
output_file = output / "nextflow.nf"
print(script)
with output_file.open('a') as outfile:
outfile.write(script)
if makefile :
if output_format == OutputFormat.makefile :
generator = MakefileGenerator(dataflow_graph)
script = generator.generate_makefile()
output_file = output / "Makefile"
Expand Down
40 changes: 12 additions & 28 deletions probe_src/python/probe_py/manual/workflows.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from probe_py.manual.analysis import *
import networkx as nx
import abc
import dataclasses
from dataclasses import dataclass
from typing import Tuple, List, Set, Union
Expand All @@ -16,10 +17,13 @@
8- File and Directory Structure Assumptions (Scripts that assume a specific directory structure, Commands that change the working directory (cd))
...
"""
class WorkflowGenerator(abc.ABC):
@abc.abstractmethod
def generate_workflow(self, graph: nx.DiGraph) -> str:
pass

class NextflowGenerator:
def __init__(self, graph: nx.DiGraph):
self.graph = graph
class NextflowGenerator(WorkflowGenerator):
def __init__(self):
self.visited: Set[ProcessNode] = set()
self.process_counter = {}
self.nextflow_script = []
Expand Down Expand Up @@ -141,7 +145,7 @@ def create_processes(self):
self.nextflow_script.append(process_script)
self.workflow.append(f"{self.escape_filename_for_nextflow(outputs[0].label)} = process_{id(node)}({', '.join([self.escape_filename_for_nextflow(i.label) for i in inputs])})")
elif self.is_multiple_output_case(node,inputs,outputs) :
raise NotImplementedError("")
raise NotImplementedError("Handling multiple outputs not implemented yet.")
elif self.is_dynamic_filename_case(node, outputs):
process_script = self.handle_dynamic_filenames(node, inputs, outputs)
elif self.is_parallel_execution(node):
Expand All @@ -152,11 +156,12 @@ def create_processes(self):
self.workflow.append(f"process_{id(node)}()")

self.visited.add(node)

def generate_workflow(self) -> str:
def generate_workflow(self, graph: nx.DiGraph) -> str:
"""
Generate the complete Nextflow workflow script from the graph.
"""
self.graph = graph
self.nextflow_script.append("nextflow.enable.dsl=2\n\n")
self.create_processes()

Expand Down Expand Up @@ -236,28 +241,7 @@ def create_rules(self):
"""
Create Makefile rules based on the dataflow graph.
"""
for node in self.graph.nodes:
if isinstance(node, ProcessNode) and node not in self.visited:
inputs = [n for n in self.graph.predecessors(node) if isinstance(n, FileNode)]
outputs = [n for n in self.graph.successors(node) if isinstance(n, FileNode)]

if self.is_standard_case(node, inputs, outputs):
#rule = self.handle_standard_case(node, inputs, outputs)
raise NotImplementedError("")
elif self.is_multiple_output_case(node, inputs, outputs):
raise NotImplementedError("")
elif self.is_dynamic_filename_case(node, outputs):
#rule = self.handle_dynamic_filenames(node, inputs, outputs)
raise NotImplementedError("")
elif self.is_parallel_execution(node):
#rule = self.handle_parallel_execution(node)
raise NotImplementedError("")
else:
#rule = self.handle_custom_shells(node)
raise NotImplementedError("")

self.makefile_rules.append(rule)
self.visited.add(node)
raise NotImplementedError("Exporting to makefile is not implemented yet.")

def generate_makefile(self) -> str:
"""
Expand Down

0 comments on commit 1cccdc0

Please sign in to comment.