-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
respect executor from environment #2849
Conversation
Ah, good catch @jameslamb! I didn't notice that this was being run on a new instance of the environment. This should really be a |
@jcrist I think we're good to make the switch 👍 |
Great. @jameslamb we're hoping to get a release out in the next day or two - mind if I push to this PR to finish it up? Aside: this is another great argument for adding some integration tests (see #2792), since this was missed by testing. |
@jcrist not a problem, push away! For the record, I like the classmethod idea 😀 |
Previously `run_flow` was a method on the class, which led to numerous bugs. This moves that logic to a function `load_and_run_flow()`, which loads the flow, then calls `flow.environment.run(flow)`. This simplifies the running logic, and ensures the proper configuration is used. Tests have been updated to follow.
Updated. It made more sense to pull |
Looks awesome, thank you! This became bigger than my original PR, so you can definitely remove my name from the |
You found the bug and instigated the change, so I'm happy to leave your contribution in. |
This also fixes #2850, which was due to the same issue. |
ok fair enough, thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Completed some manual QA with different environment setups to verify that this works and environment-specific configuration is respected. Looks good from my end ✅
created from this conversation in Prefect Community Slack
changes/
directory (if appropriate)docs/outline.toml
for API reference docs (if appropriate)What does this PR change?
This PR proposes changing the behavior of
Environment.run_flow()
. Today, if you call this method on an instance ofEnvironment
or any child class of it,run_flow()
will use the executor inself.executor
if that attribute has been set.prefect/src/prefect/environments/execution/base.py
Lines 133 to 135 in 191f429
That strategy means that despite the changes from #2805 , it is not currently possible to use
KubernetesJobEnvironment
andDaskExecutor
together. That is because flows that useKubernetesJobEnvironment
end up being run withKubernetesJobEnvironment().run_flow()
prefect/src/prefect/environments/execution/k8s/job.py
Line 247 in 191f429
KubernetesJobEnvironment
's constructor setsself.executor
to the default executor fromprefect
's config if one isn't passed.prefect/src/prefect/environments/execution/k8s/job.py
Lines 71 to 74 in 191f429
Since
KubernetesJobEnvironment
inheritsrun_flow()
fromEnvironment
, that means that the environment'sself.executor
will always be set and equal to the value from the config.So if you use something like I tried today:
Then you'll never get that
DaskExecutor
. Flows will run with the default, which is oftenLocalExecutor
.This PR proposes changing that behavior, so that if the
environment
in the flow has anexecutor
,Environment.run_flow()
uses that.Why is this PR important?
Without this PR, the
executor
you pass toKubernetesJobEnvironment
orFargateTaskEnvironment
is ignored when flows are run.If I'm right, that means that without this PR, the only way to use one of these environments with a non-default executor like
DaskExecutor
is to maybe set the default executor in a user config (I have not tried this).I think this is extra important because the use of a different executor is silent...in my case, the job environment happened to be sized appropriately to run my flow, so even though I wanted
DaskExecutor
, when I gotLocalExecutor
the flow happily ran and completed. If my job environment had been poorly sized for the work my flow does, I would have probably hit an out of memory error and it wouldn't have been immediately obvious that this PR's root cause was to blame.Questions for Reviewers
self.executor
to the default" logic inFargateTaskEnvironment
andKubernetesJobEnvironment
? I'm not sure what the implications of that would be, but it seems like it would be a cleaner separation of responsibilities.Thanks for your time and consideration!