Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple "Dask" and Environments #2508

Closed
jcrist opened this issue May 6, 2020 · 8 comments · Fixed by #2805
Closed

Decouple "Dask" and Environments #2508

jcrist opened this issue May 6, 2020 · 8 comments · Fixed by #2805
Assignees
Labels
enhancement An improvement of an existing feature

Comments

@jcrist
Copy link

jcrist commented May 6, 2020

Currently prefect cloud/server flow runs are configured by 3 concepts:

  • Environment
  • Executor
  • Storage

while local flow runs are configured with an Executor alone.

Subclasses of Environment and Executor both exist with "dask" in their name, which can make things confusing, especially since the usage differs between local and cloud/server usage.

For example, if you're trying to run a flow on kubernetes using Dask:

  • If you're running locally, you'd use dask-kubernetes to spin up a dask cluster, then configure a DaskExecutor to use that cluster, then pass that executor to flow.run.
  • If you're running using cloud, you'd configure a DaskKubernetesEnvironment and attach it to your flow. Environments also have executors, which makes this confusing, since you have a Dask* thing inside a Dask* thing.

If you then decide you want to not use a dask executor but still run on cloud, you need to change your environment to a KubernetesJobEnvironment. But you still can use a dask executor inside the KubernetesJobEnvironment? I find the current mixing of concepts a bit confusing - it feels like environments are encroaching in executors' space.


I think a potentially cleaner way of specifying flow execution would be to remove anything "dask" related from environments entirely, and move them to executors alone. The separation of duties would be:

  • Environment:

    Configures where and how to run the process that starts up a FlowRunner. This may be a kubernetes job, a docker container, a local process, etc...

  • Executor:

    Configures where and how to run tasks inside a single flow run. An executor could run things without dask, with dask locally, connect to a remote dask cluster, or start a new temporary dask cluster just for this flow run. This is basically how things work now (minus the cluster creation).

One nice thing about this is that the only difference between running a flow locally and running a flow on cloud is the addition of the Environment. If you want to use a dask cluster on kubernetes to run your flow, in both cases you configure an Executor to spin up a dask cluster using dask-kubernetes. The Environment is only responsible for managing the initial process, not the dask processes.


If people like this plan, I think a way forward would be:

  • Create an Executor class that manages a dask cluster lifetime (starting/stopping/scaling) using the standard dask cluster interface (similar to Generic dask cluster environment #2503, which I think is superseded by this issue). This might look like:

    executor = DaskExecutor(
        cluster_class="dask_cloudprovider.FargateCluster",
        cluster_kwargs={
            "image": "my-fancy-prefect-image",
        },
        ...
    )
    ...

    The lifetime of the dask cluster created by this executor would be the lifetime of the flow run. If we needed to have specific classes for e.g. DaskKubernetesExecutor we could, but if possible I'd prefer to keep this generic (as specified in Generic dask cluster environment #2503).

  • Deprecate the existing Dask*Environment classes, with instructions for how to move to using the new executor. For DaskKubernetesEnvironment for example, an equivalent configuration might be:

    environment = KubernetesJobEnvironment(
        executor_class=DaskExecutor,
        executor_kwargs=dict(
            cluster_class="dask_kubernetes.KubeCluster",
            cluster_kwargs={...}
        )
    )
    ...

Thoughts?

@jcrist jcrist added enhancement An improvement of an existing feature environment labels May 6, 2020
@joeschmid
Copy link

@jcrist The decoupling of these makes sense. It would also help to keep parity between Core and Cloud execution options, e.g. right now Dask Cloud Provider is an option for an Environment with Cloud but not as an Executor with Core. I really like this idea. (I agree that this issue should supersede #2503. I'll go comment there now, but for me it's ok to close that one in favor of this.)

@ThomasLaPiana
Copy link

@jcrist this makes way more sense to me as well. I just went through figuring out how to productionalize things and writing my own environment for DaskGateway, and had to dig down through so many layers of code/objects to figure certain things out that were so much simpler when i just used the dask executor directly.

big 👍 from me

@lauralorenz lauralorenz assigned lauralorenz and jcrist and unassigned lauralorenz May 19, 2020
@jameslamb
Copy link

This is awesome!

I was just directed here from Prefect Slack, I want to share that I have a use case that's kind of awkward today but would greatly be improved by this change.

I have a flow that could benefit from parallelism, and code that can provision / start a Dask cluster in Kubernetes. I know that I want to use a DaskExecutor to speed it up.

My flow is using an IntervalSchedule. Right now I'm running this flow in a standalone Python process (just a script with flow code that ends in flow.run()), not using an agent talking to Prefect Cloud. For my use case, I'd like to start the Dask cluster at the beginning of a flow run and stop it at the end of a flow run.

If a DaskExecutor could be told to start (and optionally stop) the Dask cluster before / after each flow run, it would make it easy for me to run this in a way that ensures I'm not keeping the cluster up and running when I don't need it.

To that end, I want to propose a change to this statement:

The lifetime of the dask cluster created by this executor would be the lifetime of the flow run.

I have a use case for stopping the cluster at the end of a flow run, but I still think it should still be possible to use DaskExecutor and have it leave the Dask cluster up and running after a flow run ends. There is some overhead involved in creating the cluster, and for some workflows they might prefer to have multiple flows sharing a cluster.

@jcrist
Copy link
Author

jcrist commented May 20, 2020

I have a use case for stopping the cluster at the end of a flow run, but I still think it should still be possible to use DaskExecutor and have it leave the Dask cluster up and running after a flow run ends. There is some overhead involved in creating the cluster, and for some workflows they might prefer to have multiple flows sharing a cluster.

My plan for this was to still support creating an executor that connects to an existing cluster.

  • If an executor starts a cluster, it should stop it after flow completion
  • If an executor connects to a cluster, it should leave it running after flow completion

Both use cases are important, but I see complications if an executor starts a cluster but doesn't shut it down.

Would that satisfy your needs?

@jameslamb
Copy link

I think it would be limiting to couple the "how I connect" and "what to do when I exit" code like that.

For example if I wanted to express some infrastructure provisioning code as a prefect flow (run one time, like you might do with Terraform or CloudFormation), then I could see that code wanting to create a Dask cluster, start it, and then leave it running.

So my preference is for the startup and teardown behavior of the executor to not be so tightly coupled. I think those two bullet points are great default behaviors, but ideally they wouldn't be strictly required.

However, I know that I don't have enough context for the complications that starting but not shutting down a cluster could have, so I'll defer to you and the other maintainers on whether supporting that flexibility is worth it.

@jcrist
Copy link
Author

jcrist commented May 20, 2020

For example if I wanted to express some infrastructure provisioning code as a prefect flow (run one time, like you might do with Terraform or CloudFormation), then I could see that code wanting to create a Dask cluster, start it, and then leave it running.

If this is your use case, then I'd have a flow that runs on a LocalExecutor that provisions a dask cluster and leaves it up.

@task
def start_dask_cluster():
    ...

with Flow("provision-cluster") as flow:
    start_dask_cluster()

The trick with having an Executor start a cluster and not stop it is that it becomes unclear how others will use that cluster, or what will eventually shut it down. Dynamically provisioned clusters usually don't have static addresses, and if you hardcode an address to start a cluster at you need to be careful to not run the flow with a hardcoded address multiple times concurrently. You also get into weird territory with how flow runs are configured. If I register a flow with an executor configured to start an executor and leave it running, I can't rerun the same flow multiple times on failure (you'll end up with many dask clusters that aren't cleaned up, and if you've hardcoded an address you'll run into collisions). There's just too many foot-guns for me to see this flexibility as worth it. I think the problems that might be solved by allowing this can be better solved in a different way (such as my suggestion above).

@jameslamb
Copy link

Makes sense to me!

I think if you know you're using a kubernetes dask cluster specifically, you can find the Dask scheduler using pod names or labels, and your code could look for a running scheduler that way and say "connect if it's running, otherwise start it". So it would be possible to code around the "every time I run this flow it starts a new cluster" thing without hardcoding a scheduler address.

I get what you're saying though, putting some guardrails around this makes it manageable and prevents some traps that would be easy to fall into. I'm ok with the proposal from #2508 (comment)

@jcrist
Copy link
Author

jcrist commented May 28, 2020

Create an Executor class that manages a dask cluster lifetime (starting/stopping/scaling) using the standard dask cluster interface

This is done in #2667.

abrookins pushed a commit that referenced this issue Jul 27, 2022
Docs concepts updates to schedules and tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement An improvement of an existing feature
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants