Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

respect executor from environment #2849

Merged
merged 5 commits into from
Jun 24, 2020
Merged

respect executor from environment #2849

merged 5 commits into from
Jun 24, 2020

Conversation

jameslamb
Copy link

created from this conversation in Prefect Community Slack

  • adds new tests (if appropriate)
  • add a changelog entry in the changes/ directory (if appropriate)
  • updates docstrings for any new functions or function arguments, including docs/outline.toml for API reference docs (if appropriate)

What does this PR change?

This PR proposes changing the behavior of Environment.run_flow(). Today, if you call this method on an instance of Environment or any child class of it, run_flow() will use the executor in self.executor if that attribute has been set.

if getattr(self, "executor", None) is not None:
executor = self.executor # type: ignore
else:

That strategy means that despite the changes from #2805 , it is not currently possible to use KubernetesJobEnvironment and DaskExecutor together. That is because flows that use KubernetesJobEnvironment end up being run with KubernetesJobEnvironment().run_flow()

"python -c 'import prefect; prefect.environments.KubernetesJobEnvironment().run_flow()'"

KubernetesJobEnvironment's constructor sets self.executor to the default executor from prefect's config if one isn't passed.

if executor is None:
executor = prefect.engine.get_default_executor_class()(
**(executor_kwargs or {})
)

Since KubernetesJobEnvironment inherits run_flow() from Environment, that means that the environment's self.executor will always be set and equal to the value from the config.

So if you use something like I tried today:

KubernetesJobEnvironment(
    executor=DaskExecutor(
        cluster_class="prefect.engine.executors.DaskExecutor"
    ),
    job_spec_file="prefect-flow-run.yaml",
)

Then you'll never get that DaskExecutor. Flows will run with the default, which is often LocalExecutor.

This PR proposes changing that behavior, so that if the environment in the flow has an executor, Environment.run_flow() uses that.

Why is this PR important?

Without this PR, the executor you pass to KubernetesJobEnvironment or FargateTaskEnvironment is ignored when flows are run.

If I'm right, that means that without this PR, the only way to use one of these environments with a non-default executor like DaskExecutor is to maybe set the default executor in a user config (I have not tried this).

I think this is extra important because the use of a different executor is silent...in my case, the job environment happened to be sized appropriately to run my flow, so even though I wanted DaskExecutor, when I got LocalExecutor the flow happily ran and completed. If my job environment had been poorly sized for the work my flow does, I would have probably hit an out of memory error and it wouldn't have been immediately obvious that this PR's root cause was to blame.

Questions for Reviewers

  • I've tested this manually in my local setup, but I think it should get Prefect unit tests so the bug isn't re-introduced. Could you point me to relevant tests in the repo that I could use as a reference to write unit tests for this change?
  • Would it be better to remove the "if no executor given, set self.executor to the default" logic in FargateTaskEnvironment and KubernetesJobEnvironment? I'm not sure what the implications of that would be, but it seems like it would be a cleaner separation of responsibilities.

Thanks for your time and consideration!

@jcrist
Copy link

jcrist commented Jun 24, 2020

Ah, good catch @jameslamb! I didn't notice that this was being run on a new instance of the environment. This should really be a classmethod instead to avoid issues like this. @joshmeek, will anything break if we switch run_flow to a classmethod (I think the answer is no)?

@joshmeek
Copy link

@jcrist I think we're good to make the switch 👍

@jcrist
Copy link

jcrist commented Jun 24, 2020

Great. @jameslamb we're hoping to get a release out in the next day or two - mind if I push to this PR to finish it up?

Aside: this is another great argument for adding some integration tests (see #2792), since this was missed by testing.

@jameslamb
Copy link
Author

@jcrist not a problem, push away!

For the record, I like the classmethod idea 😀

Previously `run_flow` was a method on the class, which led to numerous
bugs. This moves that logic to a function `load_and_run_flow()`, which
loads the flow, then calls `flow.environment.run(flow)`. This simplifies
the running logic, and ensures the proper configuration is used. Tests
have been updated to follow.
@jcrist
Copy link

jcrist commented Jun 24, 2020

Updated. It made more sense to pull run_flow() out into its own function. The code here could be cleaner (with respect to the sometimes-present run() method on environments), but to keep backwards compatibility and get this release out I've used a mixin class. Once the old environments have been deprecated this can definitely be cleaned up and streamlined.

@jameslamb
Copy link
Author

Looks awesome, thank you! This became bigger than my original PR, so you can definitely remove my name from the changes/ entry.

@jcrist
Copy link

jcrist commented Jun 24, 2020

You found the bug and instigated the change, so I'm happy to leave your contribution in.

@jcrist
Copy link

jcrist commented Jun 24, 2020

This also fixes #2850, which was due to the same issue.

@jameslamb
Copy link
Author

ok fair enough, thanks!

@codecov
Copy link

codecov bot commented Jun 24, 2020

Codecov Report

Merging #2849 into master will increase coverage by 0.02%.
The diff coverage is 97.95%.

Copy link

@joshmeek joshmeek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Completed some manual QA with different environment setups to verify that this works and environment-specific configuration is respected. Looks good from my end ✅

@jcrist jcrist merged commit 1e8fd85 into PrefectHQ:master Jun 24, 2020
@jameslamb jameslamb deleted the fix/dask-executor branch September 15, 2020 20:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants