Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workflow RDF #310

Open
wants to merge 24 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add iterate_workflow_steps and iterate_test_workflow_steps
FynnBe committed Nov 8, 2022
commit 82f28f5c12df3243688b8070944da4bf89d2a324
66 changes: 53 additions & 13 deletions bioimageio/core/workflow/operators/_run.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from dataclasses import dataclass
from os import PathLike
from typing import Any, Dict, IO, List, Optional, Sequence, Tuple, Union
from typing import Any, Dict, Generator, IO, List, Optional, Sequence, Tuple, Union

import numpy as np
import xarray as xr
@@ -45,32 +46,64 @@ def run_workflow(
rdf_source: Union[dict, PathLike, IO, str, bytes, raw_nodes.URI, RawResourceDescription],
inputs: Sequence = tuple(),
options: Dict[str, Any] = None,
) -> Sequence:
return _run_workflow(rdf_source, test_steps=False, inputs=inputs, options=options)
) -> tuple:
outputs = tuple()
for state in _iterate_workflow_steps_impl(rdf_source, test_steps=False, inputs=inputs, options=options):
outputs = state.outputs

return outputs


def run_workflow_test(
rdf_source: Union[dict, PathLike, IO, str, bytes, raw_nodes.URI, RawResourceDescription],
) -> Sequence:
return _run_workflow(rdf_source, test_steps=True)
) -> tuple:
outputs = tuple()
for state in _iterate_workflow_steps_impl(rdf_source, test_steps=True):
outputs = state.outputs

return outputs


@dataclass
class WorkflowState:
wf_inputs: Dict[str, Any]
wf_options: Dict[str, Any]
inputs: tuple
outputs: tuple
named_outputs: Dict[str, Any]


def iterate_workflow_steps(
rdf_source: Union[dict, PathLike, IO, str, bytes, raw_nodes.URI, RawResourceDescription],
*,
inputs: Sequence = tuple(),
options: Dict[str, Any] = None,
) -> Generator[WorkflowState]:
yield from _iterate_workflow_steps_impl(rdf_source, inputs=inputs, options=options, test_steps=False)


def iterate_test_workflow_steps(
rdf_source: Union[dict, PathLike, IO, str, bytes, raw_nodes.URI, RawResourceDescription]
) -> Generator[WorkflowState]:
yield from _iterate_workflow_steps_impl(rdf_source, test_steps=True)


def _run_workflow(
def _iterate_workflow_steps_impl(
rdf_source: Union[dict, PathLike, IO, str, bytes, raw_nodes.URI, RawResourceDescription],
*,
test_steps: bool,
inputs: Sequence = tuple(),
options: Dict[str, Any] = None,
) -> Tuple:
) -> Generator[WorkflowState]:
import bioimageio.core.workflow.operators as ops

workflow = load_resource_description(rdf_source)
assert isinstance(workflow, nodes.Workflow)
wf_options = {opt.name: opt.default for opt in workflow.options_spec}
wf_options: Dict[str, Any] = {opt.name: opt.default for opt in workflow.options_spec}
if test_steps:
assert not inputs
assert not options
wf_inputs = {}
wf_inputs: Dict[str, Any] = {}
steps = workflow.test_steps
else:
if not len(workflow.inputs_spec) == len(inputs):
@@ -115,7 +148,7 @@ def map_ref(value):

# implicit inputs to a step are the outputs of the previous step.
# For the first step these are the workflow inputs.
outputs = inputs
outputs = tuple(inputs)
for step in steps:
if not hasattr(ops, step.op):
raise NotImplementedError(f"{step.op} not implemented in {ops}")
@@ -124,8 +157,9 @@ def map_ref(value):
if step.inputs is missing:
inputs = outputs
else:
inputs = [map_ref(ipt) for ipt in step.inputs]
inputs = tuple(map_ref(ipt) for ipt in step.inputs)

assert isinstance(inputs, tuple)
options = {k: map_ref(v) for k, v in (step.options or {}).items()}
outputs = op(*inputs, **options)
if not isinstance(outputs, tuple):
@@ -141,6 +175,9 @@ def map_ref(value):

named_outputs.update({f"{step.id}.outputs.{out_name}": out for out_name, out in zip(step.outputs, outputs)})

yield WorkflowState(
wf_inputs=wf_inputs, wf_options=wf_options, inputs=inputs, outputs=outputs, named_outputs=named_outputs
)
if len(workflow.outputs_spec) != len(outputs):
raise ValueError(f"Expected {len(workflow.outputs_spec)} outputs from last step, but got {len(outputs)}.")

@@ -156,7 +193,10 @@ def tensor_as_xr(tensor, axes: Sequence[nodes.Axis]):
else:
return xr.DataArray(tensor, dims=spec_axes)

return [
outputs = tuple(
tensor_as_xr(out, out_spec.axes) if out_spec.type == "tensor" else out
for out_spec, out in zip(workflow.outputs_spec, outputs)
]
)
yield WorkflowState(
wf_inputs=wf_inputs, wf_options=wf_options, inputs=inputs, outputs=outputs, named_outputs=named_outputs
)