-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Decouple "Dask" and Environments #2508
Comments
@jcrist The decoupling of these makes sense. It would also help to keep parity between Core and Cloud execution options, e.g. right now Dask Cloud Provider is an option for an Environment with Cloud but not as an Executor with Core. I really like this idea. (I agree that this issue should supersede #2503. I'll go comment there now, but for me it's ok to close that one in favor of this.) |
@jcrist this makes way more sense to me as well. I just went through figuring out how to productionalize things and writing my own environment for DaskGateway, and had to dig down through so many layers of code/objects to figure certain things out that were so much simpler when i just used the dask executor directly. big 👍 from me |
This is awesome! I was just directed here from Prefect Slack, I want to share that I have a use case that's kind of awkward today but would greatly be improved by this change.
If a To that end, I want to propose a change to this statement:
I have a use case for stopping the cluster at the end of a flow run, but I still think it should still be possible to use |
My plan for this was to still support creating an executor that connects to an existing cluster.
Both use cases are important, but I see complications if an executor starts a cluster but doesn't shut it down. Would that satisfy your needs? |
I think it would be limiting to couple the "how I connect" and "what to do when I exit" code like that. For example if I wanted to express some infrastructure provisioning code as a So my preference is for the startup and teardown behavior of the executor to not be so tightly coupled. I think those two bullet points are great default behaviors, but ideally they wouldn't be strictly required. However, I know that I don't have enough context for the complications that starting but not shutting down a cluster could have, so I'll defer to you and the other maintainers on whether supporting that flexibility is worth it. |
If this is your use case, then I'd have a flow that runs on a @task
def start_dask_cluster():
...
with Flow("provision-cluster") as flow:
start_dask_cluster() The trick with having an |
Makes sense to me! I think if you know you're using a kubernetes dask cluster specifically, you can find the Dask scheduler using pod names or labels, and your code could look for a running scheduler that way and say "connect if it's running, otherwise start it". So it would be possible to code around the "every time I run this flow it starts a new cluster" thing without hardcoding a scheduler address. I get what you're saying though, putting some guardrails around this makes it manageable and prevents some traps that would be easy to fall into. I'm ok with the proposal from #2508 (comment) |
This is done in #2667. |
Docs concepts updates to schedules and tasks
Currently prefect cloud/server flow runs are configured by 3 concepts:
Environment
Executor
Storage
while local flow runs are configured with an
Executor
alone.Subclasses of
Environment
andExecutor
both exist with "dask" in their name, which can make things confusing, especially since the usage differs between local and cloud/server usage.For example, if you're trying to run a flow on kubernetes using Dask:
dask-kubernetes
to spin up a dask cluster, then configure aDaskExecutor
to use that cluster, then pass that executor toflow.run
.DaskKubernetesEnvironment
and attach it to your flow. Environments also have executors, which makes this confusing, since you have aDask*
thing inside aDask*
thing.If you then decide you want to not use a dask executor but still run on cloud, you need to change your environment to a
KubernetesJobEnvironment
. But you still can use a dask executor inside theKubernetesJobEnvironment
? I find the current mixing of concepts a bit confusing - it feels like environments are encroaching in executors' space.I think a potentially cleaner way of specifying flow execution would be to remove anything "dask" related from environments entirely, and move them to executors alone. The separation of duties would be:
Environment
:Configures where and how to run the process that starts up a
FlowRunner
. This may be a kubernetes job, a docker container, a local process, etc...Executor
:Configures where and how to run tasks inside a single flow run. An executor could run things without dask, with dask locally, connect to a remote dask cluster, or start a new temporary dask cluster just for this flow run. This is basically how things work now (minus the cluster creation).
One nice thing about this is that the only difference between running a flow locally and running a flow on cloud is the addition of the
Environment
. If you want to use a dask cluster on kubernetes to run your flow, in both cases you configure anExecutor
to spin up a dask cluster usingdask-kubernetes
. TheEnvironment
is only responsible for managing the initial process, not the dask processes.If people like this plan, I think a way forward would be:
Create an
Executor
class that manages a dask cluster lifetime (starting/stopping/scaling) using the standard dask cluster interface (similar to Generic dask cluster environment #2503, which I think is superseded by this issue). This might look like:The lifetime of the dask cluster created by this executor would be the lifetime of the flow run. If we needed to have specific classes for e.g.
DaskKubernetesExecutor
we could, but if possible I'd prefer to keep this generic (as specified in Generic dask cluster environment #2503).Deprecate the existing
Dask*Environment
classes, with instructions for how to move to using the new executor. ForDaskKubernetesEnvironment
for example, an equivalent configuration might be:Thoughts?
The text was updated successfully, but these errors were encountered: