Skip to content

Commit

Permalink
Merge pull request #457 from PrefectHQ/skip-fixes
Browse files Browse the repository at this point in the history
Fixes for mapped skips
  • Loading branch information
cicdw authored Dec 27, 2018
2 parents 8d2de77 + 1457fe7 commit 292f68a
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 6 deletions.
20 changes: 14 additions & 6 deletions src/prefect/engine/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ def run(

# check if any upstream tasks skipped (and if we need to skip)
state = self.check_upstream_skipped(
state, upstream_states_set=upstream_states_set
state, upstream_states_set=upstream_states_set, mapped=mapped
)

# check if the task's trigger passes
Expand Down Expand Up @@ -380,20 +380,24 @@ def check_upstream_finished(

@call_state_handlers
def check_upstream_skipped(
self, state: State, upstream_states_set: Set[State]
self, state: State, upstream_states_set: Set[State], mapped: bool = False
) -> State:
"""
Checks if any of the upstream tasks have skipped.
Args:
- state (State): the current state of this task
- upstream_states_set: a set containing the states of any upstream tasks.
- mapped (bool): whether this task represents a parent mapped task,
in which case this check will be skipped
Returns:
- State: the state of the task after running the check
"""
if self.task.skip_on_upstream_skip and any(
s.is_skipped() for s in upstream_states_set
if (
not mapped
and self.task.skip_on_upstream_skip
and any(s.is_skipped() for s in upstream_states_set)
):
raise ENDRUN(
state=Skipped(
Expand Down Expand Up @@ -581,7 +585,9 @@ def check_upstreams_for_mapping(

## no inputs provided
if not mapped_upstreams:
raise ENDRUN(state=Skipped(message="No inputs provided to map over."))
raise ENDRUN(
state=Skipped(message="No inputs provided to map over.", result=[])
)

iterable_values = []
for value in mapped_upstreams:
Expand All @@ -594,7 +600,9 @@ def check_upstreams_for_mapping(

## check that no upstream values are empty
if any([len(v) == 0 for v in iterable_values]):
raise ENDRUN(state=Skipped(message="Empty inputs provided to map over."))
raise ENDRUN(
state=Skipped(message="Empty inputs provided to map over.", result=[])
)

return state

Expand Down
25 changes: 25 additions & 0 deletions tests/core/test_task_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,31 @@ def test_map_failures_dont_leak_out(executor):
assert isinstance(slist[0], prefect.engine.state.TriggerFailed)


@pytest.mark.parametrize(
"executor", ["local", "sync", "mproc", "mthread"], indirect=True
)
def test_map_skips_dont_leak_out(executor):
ll = ListTask()

@task
def add(x):
if x == 1:
raise prefect.engine.signals.SKIP("One is no good")
else:
return x + 1

with Flow() as f:
res = add.map(add.map(ll))

s = f.run(return_tasks=f.tasks, executor=executor)
slist = s.result[res]
assert s.is_successful()
assert isinstance(slist, list)
assert len(slist) == 3
assert [r.result for r in slist][1:] == [4, 5]
assert isinstance(slist[0], prefect.engine.state.Skipped)


@pytest.mark.parametrize(
"executor", ["local", "sync", "mproc", "mthread"], indirect=True
)
Expand Down

0 comments on commit 292f68a

Please sign in to comment.