Skip to content

Commit

Permalink
Merge pull request #3338 from jcrist/executor-on-flow
Browse files Browse the repository at this point in the history
Support configuring executor on the flow
  • Loading branch information
cicdw authored Sep 18, 2020
2 parents 5e3d6f1 + e8ea670 commit 4ec55c6
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 1 deletion.
2 changes: 2 additions & 0 deletions changes/pr3338.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
enhancement:
- "Support configuring executor on flow, not on environment - [#3338](https://github.com/PrefectHQ/prefect/pull/3338)"
6 changes: 6 additions & 0 deletions src/prefect/core/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from prefect.core.edge import Edge
from prefect.core.parameter import Parameter
from prefect.core.task import Task
from prefect.engine.executors import Executor
from prefect.engine.result import NoResult, Result
from prefect.engine.result_handlers import ResultHandler
from prefect.engine.results import ResultHandlerResult
Expand Down Expand Up @@ -112,6 +113,9 @@ def my_task():
Args:
- name (str): The name of the flow. Cannot be `None` or an empty string
- schedule (prefect.schedules.Schedule, optional): A default schedule for the flow
- executor (prefect.engine.executors.Executor, optional): The executor that the flow
should use. If `None`, the default executor configured in the runtime environment
will be used.
- environment (prefect.environments.Environment, optional): The environment
that the flow should be run in. If `None`, a `LocalEnvironment` will be created.
- storage (prefect.environments.storage.Storage, optional): The unit of storage
Expand Down Expand Up @@ -144,6 +148,7 @@ def __init__(
self,
name: str,
schedule: prefect.schedules.Schedule = None,
executor: Executor = None,
environment: Environment = None,
storage: Storage = None,
tasks: Iterable[Task] = None,
Expand All @@ -163,6 +168,7 @@ def __init__(
self.name = name
self.logger = logging.get_logger(self.name)
self.schedule = schedule
self.executor = executor
self.environment = environment or prefect.environments.LocalEnvironment()
self.storage = storage
if result_handler:
Expand Down
7 changes: 6 additions & 1 deletion src/prefect/engine/flow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,12 @@ def run(
task_contexts = dict(task_contexts or {})
parameters = dict(parameters or {})
if executor is None:
executor = prefect.engine.get_default_executor_class()()
# Use the executor on the flow, if configured
executor = getattr(self.flow, "executor", None)
if executor is None:
executor = prefect.engine.get_default_executor_class()()

self.logger.debug("Using executor type %s", type(executor).__name__)

try:
state, task_states, context, task_contexts = self.initialize_run(
Expand Down
10 changes: 10 additions & 0 deletions tests/engine/test_flow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,16 @@ def test_run_count_tracked_via_retry_states(self):
assert isinstance(state3.result[t2], Failed)


def test_flow_runner_uses_default_executor_on_flow_if_present():
t = SuccessTask()
with Flow(name="test", executor=Executor()) as flow:
result = t()

with raise_on_exception():
with pytest.raises(NotImplementedError):
FlowRunner(flow=flow).run()


def test_flow_runner_uses_user_provided_executor():
t = SuccessTask()
with Flow(name="test") as f:
Expand Down

0 comments on commit 4ec55c6

Please sign in to comment.