Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add executor kwarg to environments #2805

Merged
merged 12 commits into from
Jun 22, 2020
7 changes: 7 additions & 0 deletions changes/pr2805.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
feature:
- Support configuring executors for `LocalEnvironment`, `KubernetesJobEnvironment`, and `FargateTaskEnvironment` - [#2805](https://github.com/PrefectHQ/prefect/pull/2805)

deprecation:
- Deprecate `RemoteEnvironment` in favor of `LocalEnvironment` - [#2805](https://github.com/PrefectHQ/prefect/pull/2805)
- Deprecate `RemoteDaskEnvironment` in favor of `LocalEnvironment` with a `DaskExecutor` - [#2805](https://github.com/PrefectHQ/prefect/pull/2805)
- Deprecate `executor_kwargs` in `KubernetesJobEnvironment` and `FargateTaskEnvironment` in favor of `executor` - [#2805](https://github.com/PrefectHQ/prefect/pull/2805)
3 changes: 1 addition & 2 deletions docs/.vuepress/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,7 @@ module.exports = {
children: [
'execution/overview',
'execution/storage_options',
'execution/remote_environment',
'execution/remote_dask_environment',
'execution/local_environment',
'execution/dask_cloud_provider_environment',
'execution/dask_k8s_environment',
'execution/k8s_job_environment',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ should be as independent as possible, e.g. each Flow could have its own docker
image, dynamically create the Dask cluster for each Flow run, etc. However, for
development and interactive testing, either using ECS (instead of Fargate) or
creating a Dask cluster manually (with Dask Cloud Provider or otherwise) and then using
`RemoteDaskEnvironment` or just `DaskExecutor` with your flows will result
`LocalEnvironment` configured with a `DaskExecutor` will result
in a much better and faster development experience.
:::

Expand Down
95 changes: 95 additions & 0 deletions docs/orchestration/execution/local_environment.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# Local Environment

[[toc]]

## Overview

The Local Environment (`LocalEnvironment`) is meant to be a simple and
minimally configurable execution Environment for Flow runs, and is the default
Environment for all Flows registered with the Prefect API. The Local
Environment functions as a way to execute Flows without any pre-existing
infrastructure requirements and instead opts to run Flows directly in process.

The only needed configuration for the Local Environment is the specification
of an [Executor](/core/concepts/engine.html#executors) however if it is not
specified then it defaults to the
[LocalExecutor](/api/latest/engine/executors.html#localexecutor).

_For more information on the Local Environment visit the relevant [API
documentation](/api/latest/environments/execution.html#localenvironment)._

## Process

#### Initialization

The `LocalEnvironment` takes an optional `executor` argument. The `executor`
argument accepts an [Executor](/core/concepts/engine.html#executors) object
that should be used to run this flow. If not specified, the local default
executor is used.

#### Setup

The `LocalEnvironment` has no setup step because it has no infrastructure
requirements.

#### Execute

The `LocalEnvironment` executes the flow locally in process, using the
configured `executor`.

## Examples

#### Using a LocalExecutor

Here we configure a `LocalEnvironment` to run a flow using a `LocalExecutor`.
Note that this is the same as the default behavior - if you don't specify an
`environment` on a `Flow` the same configuration will be created for you.

```python
from prefect import Flow
from prefect.environments import LocalEnvironment
from prefect.engine.executors import LocalExecutor

flow = Flow(
"Local Executor Example",
environment=LocalEnvironment(executor=LocalExecutor()),
)
```

#### Using a DaskExecutor, with a local Dask cluster

Here we configure a `LocalEnvironment` to run a flow using a
[DaskExecutor](/api/latest/engine/executors.html#daskexecutor), connected to a
local temporary [Dask](https://dask.org") cluster. When the flow run starts, a
temporary local Dask cluster will be created just for that flow run.

```python
from prefect import Flow
from prefect.environments import LocalEnvironment
from prefect.engine.executors import DaskExecutor

flow = Flow(
"Dask Executor Example",
environment=LocalEnvironment(executor=DaskExecutor())
)
```

#### Using a DaskExecutor, with an existing Dask cluster

Here we configure a `LocalEnvironment` to run a flow using a `DaskExecutor`,
connected to an existing Dask cluster.

```python
from prefect import Flow
from prefect.environments import LocalEnvironment
from prefect.engine.executors import DaskExecutor

flow = Flow(
"Dask Executor Example",
environment=LocalEnvironment(
executor=DaskExecutor(
"tcp://address-of-the-dask-cluster:8786",
)
)
)
```
36 changes: 22 additions & 14 deletions docs/orchestration/execution/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,26 +64,30 @@ While Storage objects provide a way to save and retrieve Flows, [Environments](h

### How Environments are Used

By default, Prefect attaches `RemoteEnvironment` with your local default executor to every Flow you create. To specify a different environment, provide it to your Flow at initialization:
By default, Prefect attaches `LocalEnvironment` with your local default
executor to every Flow you create. To specify a different environment, provide
it to your Flow at initialization:

```python
from prefect.environments import RemoteEnvironment
from prefect.engine.executors import DaskExecutor
from prefect.environments import LocalEnvironment

f = Flow("example-env", environment=RemoteEnvironment(executor="prefect.engine.executors.LocalExecutor"))
f = Flow("example-env", environment=LocalEnvironment(executor=DaskExecutor()))
```

or assign it directly:

```python
from prefect.environments import RemoteEnvironment
from prefect.engine.executors import DaskExecutor
from prefect.environments import LocalEnvironment

f = Flow("example-env")
f.environment = RemoteEnvironment(executor="prefect.engine.executors.LocalExecutor")
f.environment = LocalEnvironment(executor=DaskExecutor())
```

### Setup & Execute

The two main environment functions are `setup` and `execute`. The `setup` function is responsible for creating or prepping any infrastructure requirements before the Flow is executed e.g., spinning up a Dask cluster or checking available platform resources. The `execute` function is responsible for actually telling the Flow where and how it needs to run e.g., running the Flow in process, as per the [`RemoteEnvironment`](https://docs.prefect.io/api/latest/environments/execution.html##remoteenvironment), or registering a new Fargate task, as per the [`FargateTaskEnvironment`](https://docs.prefect.io/api/latest/environments/execution.html#fargatetaskenvironment).
The two main environment functions are `setup` and `execute`. The `setup` function is responsible for creating or prepping any infrastructure requirements before the Flow is executed e.g., spinning up a Dask cluster or checking available platform resources. The `execute` function is responsible for actually telling the Flow where and how it needs to run e.g., running the Flow in process, as per the [`LocalEnvironment`](https://docs.prefect.io/api/latest/environments/execution.html##localenvironment), or registering a new Fargate task, as per the [`FargateTaskEnvironment`](https://docs.prefect.io/api/latest/environments/execution.html#fargatetaskenvironment).

### Environment Callbacks

Expand All @@ -93,11 +97,13 @@ _For more information on the design behind Environment Callbacks visit [PIN 12](

#### Callback Example

In this example we have a function called `report_cluster_metrics` which, when run on a Kubernetes cluster, gathers information about current resource usage. We can use this to track resource usage both before and after a Flow run.
In this example we have a function called `report_cluster_metrics` which, when
run on a Kubernetes cluster, gathers information about current resource usage.
We can use this to track resource usage both before and after a Flow run.

```python
from prefect import Flow, task
from prefect.environments import RemoteEnvironment
from prefect.environments import LocalEnvironment


# Report cluster metrics that we will use before and after Flow run
Expand All @@ -124,8 +130,10 @@ def load(data):


# Attach out metrics reporting callbacks
environment = RemoteEnvironment(on_start=report_cluster_metrics,
on_exit=report_cluster_metrics)
environment = LocalEnvironment(
on_start=report_cluster_metrics,
on_exit=report_cluster_metrics
)


with Flow("Callback-Example", environment=environment) as flow:
Expand All @@ -143,10 +151,10 @@ Environments expose a configurable list of `labels`, allowing you to label your
An Agent's labels must be a superset of the labels specified on a Flow's environment. This means that if a Flow's environment labels are specified as `["dev"]` and an Agent is running with labels set to `["dev", "staging"]`, the agent will run that Flow because the _dev_ label is a subset of the labels provided to the Agent.

```python
from prefect.environments import RemoteEnvironment
from prefect.environments import LocalEnvironment

f = Flow("example-label")
f.environment = RemoteEnvironment(labels=["dev"])
f.environment = LocalEnvironment(labels=["dev"])
```

```python
Expand All @@ -160,10 +168,10 @@ LocalAgent(labels=["dev", "staging"]).start()
On the other hand if you register a flow that has environment labels set to `["dev", "staging"]` and run an Agent with the labels `["dev"]` then it will not pick up the flow because there exists labels in the environment which were not provided to the agent.

```python
from prefect.environments import RemoteEnvironment
from prefect.environments import LocalEnvironment

f = Flow("example-label")
f.environment = RemoteEnvironment(labels=["dev", "staging"])
f.environment = LocalEnvironment(labels=["dev", "staging"])
```

```python
Expand Down
92 changes: 0 additions & 92 deletions docs/orchestration/execution/remote_dask_environment.md

This file was deleted.

89 changes: 0 additions & 89 deletions docs/orchestration/execution/remote_environment.md

This file was deleted.

Loading