-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add executor
kwarg to environments
#2805
Add executor
kwarg to environments
#2805
Conversation
This allows configuring an instantiated `Executor` on an `Environment`. With the addition of support for configuring Dask clusters inside the `Executor`, this should remove the need for any dask-specific environments. After deprecation, only the following environments should remain: - `LocalEnvironment` - `KubernetesJobEnvironment` - `FargateTaskEnvironment` Here's an example of running a flow, where the flow-runner is a local process, using a temporary dask cluster running on fargate: ```python flow.environment = LocalEnvironment( executor=DaskExecutor( cluster_class="dask_cloudprovider.FargateCluster", cluster_kwargs={...}, ) ) ``` One nice thing about using an *instantiated* executor is that execution locally with `flow.run()` and registering a flow to be executed later with `flow.register()` both can use the same object. Since an executor doesn't use any resources until its `.start()` method is called, there should be no harm done using an instantiated executor in the `Environment` object.
TODO:
|
One wrench in this plan is propagating the image name to the executor from the flow storage. This only matters if you want your dask workers & scheduler to share the same image as your flow runner, which shouldn't strictly be necessary, but may be desired. Dask-kubernetes includes ways to template based on environment variables, so under certain circumstances we could use that, but this may not fit all use cases. I'm not certain what the best solution is here. |
A few options:
executor = DaskExecutor(
cluster_class=lambda: FargateCluster(image=prefect.context.image)
) The downside is that we lose the ability to introspect the type of
executor = DaskExecutor(
cluster_class=prefect.recipes.dask_fargate_cluster
) Note that the reason we can't do: executor = DaskExecutor(
cluster_class=FargateCluster,
cluster_kwargs={"image": prefect.context.image},
) is that we want |
With hindsight on #2667, I wonder if we should have opted for a executor = DaskExecutor(
cluster=lambda: FargateCluster(image=prefect.context.image)
) |
After sleeping on things, I've come up with 4 possible solutions. I think one benefit of the new design is that it is super flexible, so a user could technically do any of the following - I'm not sure which to recommend though. Currently the following patterns are easy and straightforward:
env = LocalEnvironment(
executor=DaskExecutor(),
)
env = LocalEnvironment(
executor=DaskExecutor("tcp://my-cluster-address:8786")
)
env = LocalEnvironment(
executor=DaskExecutor(
cluster_class="dask_cloudprovider.FargateCluster",
cluster_kwargs={
"image": "myfancyimage",
"n_workers": 5,
}
)
) The hard pattern is when you want to start a dask cluster using the same image that the flow-runner is using, and you're using The good news is that the image information will be available in the runtime environment as:
A few options:
env = FargateTaskEnvironment(
executor=DaskExecutor(
cluster_class=lambda: dask_cloudprovider.FargateCluster(
image=prefect.context.image
)
)
)
from dask_kubernetes import KubeCluster, make_pod_spec
env = KubernetesJobEnvironment(
executor=DaskExecutor(
cluster_class= lambda: KubeCluster(
make_pod_spec(
image=prefect.context.image,
)
)
)
)
env = FargateTaskEnvironment(
executor=DaskExecutor(
cluster_class="dask_cloudprovider.FargateCluster",
cluster_kwargs=lambda: {
"image": prefect.context.image
}
)
)
env = KubernetesJobEnvironment(
executor=DaskExecutor(
cluster_class="dask_kubernetes.KubeCluster",
cluster_kwargs=lambda: {
"pod_template": make_pod_spec(
image=prefect.context.image,
)
}
)
)
env = FargateTaskEnvironment(
executor=DaskExecutor(
cluster_class="dask_cloudprovider.FargateCluster",
dask_config={
"cloudprovider.ecs.image": "$PREFECT__CONTEXT__IMAGE"
}
)
)
worker_template = """
kind: Pod
spec:
restartPolicy: Never
containers:
- image: $PREFECT__CONTEXT__IMAGE
args: [dask-worker, --nthreads, '2']
name: dask
resources:
limits:
cpu: "2"
memory: 6G
requests:
cpu: "2"
memory: 6G
"""
env = KubernetesJobEnvironment(
executor=DaskExecutor(
cluster_class="dask_kubernetes.KubeCluster",
dask_config={
"kubernetes.worker-template": worker_template,
}
)
)
# ~/.config/dask/config.yaml
cloudprovider:
ecs:
image: "$PREFECT__CONTEXT__IMAGE"
kubernetes:
worker-template:
kind: Pod
spec:
containers:
- image: $PREFECT__CONTEXT__IMAGE
args: [dask-worker, --nthreads, '2', --no-dashboard, --memory-limit, 6GB, --death-timeout, '60']
name: dask
resources:
limits:
cpu: "2"
memory: 6G
requests:
cpu: "2"
memory: 6G env = FargateTaskEnvironment(
executor=DaskExecutor(
cluster_class="dask_cloudprovider.FargateCluster",
)
)
env = KubernetesJobEnvironment(
executor=DaskExecutor(
cluster_class="dask_kubernetes.KubeCluster",
)
) Solutions Which of these patterns is easiest for a user to understand, and should be our recommended way of handling this? We also might do more than one. Right now I'm leaning towards documenting |
@jcrist is it fair to say that the major (only?) downside of option 1 is just the difficulty of introspecting the user's choice through (for example) the UI, which is what motivated option 2? If so, perhaps one of the following is an easy way to solve it without going fully to option 2:
With the exception of the UI rendering issue, I think option 1 is best because it is straightforward and documentable without too much explanation: the way you do this is with a callable, and in |
Yeah, that's accurate. Currently we're not extracting the cluster class name at all, and I don't think we should yet (it's not clear what info we should stick in I'd like to avoid shipping |
These were strictly additive, and would have been only used in the UI. We can do this later, and may want to rethink the schemas anyway.
Current Status:
I've removed the serializer changes in this PR - those would only be useful for the UI, and we can add additional info to the schema later. I'm holding off on deprecating |
- Use default executor if environment created without specifying an executor. This default is resolved at register time, rather than run time, which matches the behavior of `RemoteEnvironment`. - Add tests for executor behavior for all affected environments - Cleanup tests for fargate and k8s environments
executor
kwarg to environmentsexecutor
kwarg to environments
This should be ready for review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Request that we not bake local default executor info into environments
I believe all concerns have been addressed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ran some tests and all good from my end 👍
@cicdw, good to merge? |
I might have misunderstood this PR, but when I try running a Flow constructed with an environment, that environment's executor isn't actually used. Should we be passing |
Stuff configured on an environment is only used when executing a flow registered ( |
This allows configuring an instantiated
Executor
on anEnvironment
.With the addition of support for configuring Dask clusters inside the
Executor
, this should remove the need for any dask-specificenvironments. After deprecation, only the following environments should
remain:
LocalEnvironment
KubernetesJobEnvironment
FargateTaskEnvironment
Here's an example of running a flow, where the flow-runner is a local
process, using a temporary dask cluster running on fargate:
One nice thing about using an instantiated executor is that execution
locally with
flow.run()
and registering a flow to be executed laterwith
flow.register()
both can use the same object. Since an executordoesn't use any resources until its
.start()
method is called, thereshould be no harm done using an instantiated executor in the
Environment
object.Fixes #2508.
changes/
directory (if appropriate)docs/outline.toml
for API reference docs (if appropriate)