Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add executor kwarg to environments #2805

Merged
merged 12 commits into from
Jun 22, 2020

Conversation

jcrist
Copy link

@jcrist jcrist commented Jun 17, 2020

This allows configuring an instantiated Executor on an Environment.
With the addition of support for configuring Dask clusters inside the
Executor, this should remove the need for any dask-specific
environments. After deprecation, only the following environments should
remain:

  • LocalEnvironment
  • KubernetesJobEnvironment
  • FargateTaskEnvironment

Here's an example of running a flow, where the flow-runner is a local
process, using a temporary dask cluster running on fargate:

flow.environment = LocalEnvironment(
    executor=DaskExecutor(
        cluster_class="dask_cloudprovider.FargateCluster",
        cluster_kwargs={...},
    )
)

One nice thing about using an instantiated executor is that execution
locally with flow.run() and registering a flow to be executed later
with flow.register() both can use the same object. Since an executor
doesn't use any resources until its .start() method is called, there
should be no harm done using an instantiated executor in the
Environment object.

Fixes #2508.

  • adds new tests (if appropriate)
  • add a changelog entry in the changes/ directory (if appropriate)
  • updates docstrings for any new functions or function arguments, including docs/outline.toml for API reference docs (if appropriate)

This allows configuring an instantiated `Executor` on an `Environment`.
With the addition of support for configuring Dask clusters inside the
`Executor`, this should remove the need for any dask-specific
environments. After deprecation, only the following environments should
remain:

- `LocalEnvironment`
- `KubernetesJobEnvironment`
- `FargateTaskEnvironment`

Here's an example of running a flow, where the flow-runner is a local
process, using a temporary dask cluster running on fargate:

```python
flow.environment = LocalEnvironment(
    executor=DaskExecutor(
        cluster_class="dask_cloudprovider.FargateCluster",
        cluster_kwargs={...},
    )
)
```

One nice thing about using an *instantiated* executor is that execution
locally with `flow.run()` and registering a flow to be executed later
with `flow.register()` both can use the same object. Since an executor
doesn't use any resources until its `.start()` method is called, there
should be no harm done using an instantiated executor in the
`Environment` object.
@jcrist
Copy link
Author

jcrist commented Jun 17, 2020

TODO:

- [ ] Add warnings to deprecated environments
- [ ] Add tests
- [ ] Documentation

@jcrist
Copy link
Author

jcrist commented Jun 17, 2020

One wrench in this plan is propagating the image name to the executor from the flow storage. This only matters if you want your dask workers & scheduler to share the same image as your flow runner, which shouldn't strictly be necessary, but may be desired. Dask-kubernetes includes ways to template based on environment variables, so under certain circumstances we could use that, but this may not fit all use cases. I'm not certain what the best solution is here.

@jcrist
Copy link
Author

jcrist commented Jun 17, 2020

One wrench in this plan is propagating the image name to the executor from the flow storage. This only matters if you want your dask workers & scheduler to share the same image as your flow runner, which shouldn't strictly be necessary, but may be desired.

A few options:

  • This should be doable for all cases with existing support, but would require properly configuring a mix of:

    • Dask's config (in ~/.config/dask/ in the image, not locally)
    • Passing the right things to cluster_kwargs

    We could provide docs on the proper way to configure use with dask-kubernetes, dask-cloudprovider, dask-gateway, etc... Some of the configurations are a bit hairy though, so this may not be as nice as we'd like it to be.

  • Currently cluster_class in DaskExecutor can be any callable, so the following does work:

executor = DaskExecutor(
    cluster_class=lambda: FargateCluster(image=prefect.context.image)
)

The downside is that we lose the ability to introspect the type of cluster_class, since it's just a lambda. It does allow full flexibility though.

  • We may provide builtin recipes for creating clusters that better play with prefect's state complexity. Perhaps:
executor = DaskExecutor(
    cluster_class=prefect.recipes.dask_fargate_cluster
)

Note that the reason we can't do:

executor = DaskExecutor(
    cluster_class=FargateCluster,
    cluster_kwargs={"image": prefect.context.image},
)

is that we want prefect.context.image to resolve in the runtime environment (where the config will be set to the current image) not the registration environment (where the config will most likely be empty). It's a chicken-and-egg problem - if using a docker-storage image for your workers, we don't know the image name until we've already built the image.

@jcrist
Copy link
Author

jcrist commented Jun 17, 2020

With hindsight on #2667, I wonder if we should have opted for a cluster kwarg that is either a created cluster or a callable that takes no args to create one, rather than splitting into cluster_class and cluster_kwargs. The current way allows more introspection (we might grab the class from the executor to display in the UI), but doesn't make it 100% clear that any callable (not necessarily a class) is fine:

executor = DaskExecutor(
    cluster=lambda: FargateCluster(image=prefect.context.image)
)

@jcrist
Copy link
Author

jcrist commented Jun 18, 2020

After sleeping on things, I've come up with 4 possible solutions. I think one benefit of the new design is that it is super flexible, so a user could technically do any of the following - I'm not sure which to recommend though.

Currently the following patterns are easy and straightforward:

  • Use a local dask cluster
env = LocalEnvironment(
    executor=DaskExecutor(),
)
  • Connect to an existing dask cluster
env = LocalEnvironment(
    executor=DaskExecutor("tcp://my-cluster-address:8786")
)
  • Use a fargate dask cluster with a fixed image for the workers
env = LocalEnvironment(
    executor=DaskExecutor(
        cluster_class="dask_cloudprovider.FargateCluster",
        cluster_kwargs={
            "image": "myfancyimage",
            "n_workers": 5,
        }
    )
)

The hard pattern is when you want to start a dask cluster using the same image that the flow-runner is using, and you're using Docker storage (so the image name is dynamic). This is a chicken-and-egg problem, since we don't know the image name until run time, and the different dask cluster classes (e.g. dask_kubernetes.KubeCluster, dask_cloudprovider.FargateCluster, ...) all have different ways of specifying what image to use.

The good news is that the image information will be available in the runtime environment as:

  • In python: prefect.context.image
  • The PREFECT__CONTEXT__IMAGE environment variable

A few options:

  1. Pass a custom callable to cluster_class. This patten already works, but may be non-obvious to users. We could document this though, and it's fairly straightforward. One downside is that it makes the cluster_class opaque, so we couldn't automatically grab the type of cluster to display in the UI.
env = FargateTaskEnvironment(
    executor=DaskExecutor(
        cluster_class=lambda: dask_cloudprovider.FargateCluster(
            image=prefect.context.image
        )
    )
)

from dask_kubernetes import KubeCluster, make_pod_spec
env = KubernetesJobEnvironment(
    executor=DaskExecutor(
        cluster_class= lambda: KubeCluster(
            make_pod_spec(
                image=prefect.context.image,
            )
        )
    )
)
  1. Support a callable object instead of dict for cluster_kwargs. This is almost the same as the above, but keeps the cluster_class accessible. This doesn't currently work, but would be easy to support - I think it's slightly less easy to understand though.
env = FargateTaskEnvironment(
    executor=DaskExecutor(
        cluster_class="dask_cloudprovider.FargateCluster",
        cluster_kwargs=lambda: {
            "image": prefect.context.image
        }
    )
)

env = KubernetesJobEnvironment(
    executor=DaskExecutor(
        cluster_class="dask_kubernetes.KubeCluster",
        cluster_kwargs=lambda: {
            "pod_template": make_pod_spec(
                image=prefect.context.image,
            )
        }
    )
)
  1. Both dask_cloudprovider and dask_kubernetes support configuration through dask's config system. Often things set in the config can reference environment variables, which will be templated in before use. We might recommend users go through dask's config system, to avoid the use of custom lambdas. This doesn't currently work, but could. I'm less enthused about this solution, as it seems overly complicated and is less flexible.
env = FargateTaskEnvironment(
    executor=DaskExecutor(
        cluster_class="dask_cloudprovider.FargateCluster",
        dask_config={
            "cloudprovider.ecs.image": "$PREFECT__CONTEXT__IMAGE"
        }
    )
)

worker_template = """
kind: Pod
spec:
  restartPolicy: Never
  containers:
  - image: $PREFECT__CONTEXT__IMAGE
    args: [dask-worker, --nthreads, '2']
    name: dask
    resources:
      limits:
        cpu: "2"
        memory: 6G
      requests:
        cpu: "2"
        memory: 6G
"""

env = KubernetesJobEnvironment(
    executor=DaskExecutor(
        cluster_class="dask_kubernetes.KubeCluster",
        dask_config={
            "kubernetes.worker-template": worker_template,
        }
    )
)
  1. We might recommend users bake the dask config file into their image, and rely on that for configuring the dask cluster. This would work similar to option 3 above, except that the config lives in the image. We might add default config files to our prefect images to do this for the user (note that explicit setting of any of these fields will override the on-disk config, so users that explicitly set the image wouldn't be affected).
# ~/.config/dask/config.yaml
cloudprovider:
  ecs:
    image: "$PREFECT__CONTEXT__IMAGE"

kubernetes:
  worker-template:
    kind: Pod
    spec:
      containers:
      - image: $PREFECT__CONTEXT__IMAGE
        args: [dask-worker, --nthreads, '2', --no-dashboard, --memory-limit, 6GB, --death-timeout, '60']
        name: dask
        resources:
          limits:
            cpu: "2"
            memory: 6G
          requests:
            cpu: "2"
            memory: 6G
env = FargateTaskEnvironment(
    executor=DaskExecutor(
        cluster_class="dask_cloudprovider.FargateCluster",
    )
)

env = KubernetesJobEnvironment(
    executor=DaskExecutor(
        cluster_class="dask_kubernetes.KubeCluster",
    )
)

Solutions 1 and 4 are already supported with no code changes from us. Solutions 2 and 3 would require small code changes from us, but I could see arguments for making those changes. The code here is designed to be flexible, to allow the user to fully take charge of their dask cluster config without prefect getting in the way, the main question here is:

Which of these patterns is easiest for a user to understand, and should be our recommended way of handling this?

We also might do more than one. Right now I'm leaning towards documenting 1 and 4, and possibly baking some simple default configs into our provided prefect images. If a user wants something more complex they should be building their own docker image anyway.

@jlowin
Copy link
Member

jlowin commented Jun 18, 2020

@jcrist is it fair to say that the major (only?) downside of option 1 is just the difficulty of introspecting the user's choice through (for example) the UI, which is what motivated option 2?

If so, perhaps one of the following is an easy way to solve it without going fully to option 2:

  • have the serialized form reflect the name of the function, which would be <lambda> in your examples but could potentially be prefect.executors.recipes.cool_dask_cluster if we ship some prebaked ones
  • supply a "display" kwarg or pull something from the (future) metadata payload.

With the exception of the UI rendering issue, I think option 1 is best because it is straightforward and documentable without too much explanation: the way you do this is with a callable, and in recipes we ship a bunch to get you started.

@jcrist
Copy link
Author

jcrist commented Jun 18, 2020

is it fair to say that the major (only?) downside of option 1 is just the difficulty of introspecting the user's choice through (for example) the UI, which is what motivated option 2?

Yeah, that's accurate. Currently we're not extracting the cluster class name at all, and I don't think we should yet (it's not clear what info we should stick in metadata automatically). For now I was planning on leaving metadata empty. I think your assessment above is accurate.

I'd like to avoid shipping recipes as part of prefect, since the recipes may also accumulate features/config options/etc... I'm hoping to avoid the need to maintain that code, and instead rely on documenting common patterns alone.

@jcrist
Copy link
Author

jcrist commented Jun 19, 2020

Current Status:

  • Add executor kwarg to LocalEnvironment, FargateTaskEnvironment, and KubernetesJobEnvironment. Deprecate executor_kwargs if present.
  • Deprecate RemoteEnvironment in favor of LocalEnvironment everywhere
  • Deprecate RemoteDaskEnvironment in favor of LocalEnvironment with a DaskExecutor everywhere
  • Document LocalEnvironment
  • Add tests for executor kwarg in relevant environments

I've removed the serializer changes in this PR - those would only be useful for the UI, and we can add additional info to the schema later. I'm holding off on deprecating DaskCloudproviderEnvironment and DaskKubernetesEnvironment in this PR - I want things to simmer for a bit and to improve our deployment docs before adding a warning to those.

@codecov
Copy link

codecov bot commented Jun 19, 2020

Codecov Report

Merging #2805 into master will decrease coverage by 0.02%.
The diff coverage is 87.17%.

- Use default executor if environment created without specifying an
  executor. This default is resolved at register time, rather than run
  time, which matches the behavior of `RemoteEnvironment`.
- Add tests for executor behavior for all affected environments
- Cleanup tests for fargate and k8s environments
@jcrist jcrist changed the title WIP - Add executor kwarg to environments Add executor kwarg to environments Jun 19, 2020
@jcrist
Copy link
Author

jcrist commented Jun 19, 2020

This should be ready for review.

jcrist added 2 commits June 19, 2020 16:50
Copy link
Member

@cicdw cicdw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Request that we not bake local default executor info into environments

src/prefect/environments/execution/local.py Show resolved Hide resolved
@jcrist
Copy link
Author

jcrist commented Jun 22, 2020

I believe all concerns have been addressed.

Copy link

@joshmeek joshmeek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ran some tests and all good from my end 👍

@jcrist
Copy link
Author

jcrist commented Jun 22, 2020

@cicdw, good to merge?

@jcrist jcrist merged commit 5c8b3d5 into PrefectHQ:master Jun 22, 2020
@jcrist jcrist deleted the add-executor-to-environments branch June 22, 2020 17:37
@jameslamb jameslamb mentioned this pull request Jun 24, 2020
3 tasks
@gatesn
Copy link

gatesn commented Sep 21, 2020

I might have misunderstood this PR, but when I try running a Flow constructed with an environment, that environment's executor isn't actually used.

Should we be passing self.environment.executor as a kwarg here? https://github.com/PrefectHQ/prefect/blob/8b53155ad4dadeb11ff508174f545bd87973514f/src/prefect/core/flow.py#L922...L929

@jcrist
Copy link
Author

jcrist commented Sep 21, 2020

Stuff configured on an environment is only used when executing a flow registered (flow.register()) with an orchestration backend (cloud/server). flow.run() will not use the executor configured on an environment (or anything else on the environment). In the future we're moving towards configuring the executor on the flow itself (see #3338) which will make this more uniform across both flow.run() and flow.register().

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Decouple "Dask" and Environments
5 participants