-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Redesign K8s agent/environment interaction #3333
Conversation
Supports loading templates from `s3`, `gcs` and local files.
The following example runs fine for me locally: import time
import os
from prefect import Flow, task
from prefect.environments.execution.k8s.job2 import KubernetesJobConfig
from prefect.environments.storage import Docker
from prefect.engine.executors import LocalDaskExecutor
@task
def hello(name):
print(f"Hello, {name}")
@task
def echo_environment_var(name):
value = os.environ[name]
print(f"{name} = {value}")
with Flow("hello") as flow:
names = ["frodo", "sam", "merry", "pippin", "bilbo"]
hello.map(names)
echo_environment_var("TEST")
# Use a `LocalDaskExecutor` with this flow, whether running remotely or locally
flow.executor = LocalDaskExecutor()
# Configure the k8s job to use with this flow. Here the job template
# is stored in gcs, with a small tweak applied to add an environment variable.
flow.environment = KubernetesJobConfig(
job_template_path="gcs://jch-testing/job_templates/job_template.yaml",
env={"TEST": "IT-WORKED"}
)
# Use docker storage
flow.storage = Docker()
flow.register("testing") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is awesome to see - I'll need to think a little more about it, but I think merging these agents would be feasible.
In the short term, could we introduce executor
s as a Flow attribute as a separate PR and possibly update environments to use flow.executor
as the default, unless configured otherwise? That would pick off one piece of the puzzle.
I'll play with this a little more to see how else we might break this up, but this looks really promising
path = key.split(".") | ||
for k in path[:-1]: | ||
d = d.setdefault(k, {}) | ||
return d.setdefault(path[-1], val) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code looks similar to our plugins code here: https://github.com/PrefectHQ/prefect/blob/master/src/prefect/utilities/plugins.py
Note sure if there's any re-use opportunity, but thought it was worth calling out
args: ["prefect execute flow-run"] | ||
resources: | ||
requests: | ||
cpu: "100m" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Independently of this PR, I think we should increase these defaults by a little
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would you like them to be?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @cicdw ^^
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I originally was thinking that we should move both the limit / request to something closer to 500m although I don't have a specific justification for that number; we've seen a few instances where this job is evicted / freezes and I suspect it's related to this low value.
Because this is soon going to be very easy to configure, I don't feel strongly about updating it immediately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm half tempted to remove it entirely. Imposing a limit (especially one this low) on a job by default is making assumptions for the user. Compute-heavy jobs will be heavily slowed by this limit. Most helm charts (at least the ones I've used) leave the resources field empty by default, leaving it up to the user to decide the resource requirements/limits.
My gut says that for naive users, having no request/limit here won't be a problem, and for expert/enterprise-y users, they'll likely want to configure the default template themselves anyway.
Not a strong thought though, up to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just chiming in to say that this (0.1 limit) caused us quite a few hard-to-diagnose problems with the large mapping flow that we've discussed before...to me it really seems like the downsides to a default limit far outweigh the upsides
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In #3459 I propose we remove these limits (reasoning given in the PR description).
See #3338. I'm not sure if there are any (non-breaking) changes we can make around environments. Currently all environments create a default executor on init (using the default executor defined in the flow registration environment), changing this would be a breaking change (but maybe one we're ok making?). Alternatively, we could make the executor defined on a flow (if present) override any defined on the environment? This would be confusing if a user configured in two places, but the long term plan is to remove them from environments in total, so that'd go away. For now, I think I'd like to keep the change here minimal and less publicized. |
Alright, this has now been updated and works with PrefectHQ/server#100. Current status:
TODO:
Pending tests and docs, if people are feeling adventurous things are starting to be reviewable now. |
Docs section is still hidden (not in sidebar), since I'm not sure if we want to expose it yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looking good! I left some comments / questions / suggestions.
For testing / rollout, if we agree on the schema for the new run config objects we could separate out the serialization logic into its own smaller PR and merge it to get the serializers deployed to Cloud for further testing - I have a slight preference for rolling this out that way, but open to other thoughts
Since cloud is using pydantic now, wouldn't that be a separate PR to cloud/server to handle that anyway? Is that not handled by PrefectHQ/server#100? |
My petty gripes with marshmallow don't outweigh the value of consistency in the codebase.
The docs test for properly formatted section headers was too strict, and prevented use of header words anywhere else in the docstring (i.e. "Example" used in a sentence was forbidden). We now only check if a header keyword is on a line by itself without a ":". This should catch the most common source of errors, forgetting the colon, without causing too much trouble elsewhere.
@@ -232,6 +232,11 @@ title = "Execution Environments" | |||
module = "prefect.environments.execution" | |||
classes = ["DaskKubernetesEnvironment", "DaskCloudProviderEnvironment", "FargateTaskEnvironment", "KubernetesJobEnvironment", "LocalEnvironment"] | |||
|
|||
[pages.run_configs] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This page exists in the docs now, but I didn't add it to the index. Secret docs for secret features.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This LGTM! Exciting stuff - are you holding off on a changelog entry because this is still somewhat experimental?
No strong thoughts here, could go either way. I don't think we should publicize it now, but I'll probably want to know when the feature first landed, and a changelog entry would help with that. |
`.call_args` doesn't have the `.args`/`.kwargs` properties until python 3.8.
Should we add a new "Experimental" changelog category for these sorts of situations? |
Meh, I think I'll add an entry in |
Hmmm, windows only failure. Looking into it. |
Windows paths can have an optional drive component, which complicates things. We now use a custom `parse_path` function instead of directly using `urlparse`, which handles this logic for us. This led to a new utility module `prefect.utilities.filesystems`. I also moved the `read_bytes_from_path` method there, and relocated the tests.
This is still a huge WIP, just pushing some stuff up for others to see.
The goal here is to simplify all the configuration options prefect supports when deploying on k8s (we'll apply similar techniques later to the other agents/environments). Currently, if a user wants to customize a k8s deployment they have a few options:
KubernetesJobEnvironment
to fully customize the job, at the cost of kicking off a second job (a job that starts a job).The mix of environment and agent options can be a bit confusing for new and experienced users (including myself). The kubernetes agent can run jobs using the
KubernetesJobEnvironment
, but they can also run jobs using theLocalEnvironment
, which is unfortunately named since "local" here means inside a container.To simplify things, the following design is used:
This includes the following changes:
executor
configuration now lives on theFlow
itself, rather than in an "environment". This is nice, as it improves parity betweenflow.register()
andflow.run()
, where the same configured executor is used. Several users have been confused in the past about whyflow.run()
doesn't use the configuration in theenvironment
.KubernetesJobConfig
. I'm not sure what the final name will be, or if we'll rename environments to something new. For now this is hacked on as a subclass ofEnvironment
, but it doesn't need to be. An "environment" in the new design only stores data that the corresponding agent needs at run time, it should have no methods defined itself (no moresetup
andexecute
).prefect agent start kubernetes2
). We could merge this back in as a separate code path in the existing k8s agent (I don't think this would be terribly difficult), started as a separate agent just to get things working.When using the new
KubernetesJobConfig
(or whatever we call it), a user has a few options:job_template
(as a nested dict, or as a yaml string), or by passing a local path tojob_template_path
, which will be read in on init. I'm not sold that this is the clearest kwarg api, but I think both methods here are likely to be used. In this case the k8s object will be stored in prefect cloud's DB.gcs
,s3
, and files local to the agent at runtime. Note that any credentials needed to access these files will need to be configured on the agent environment.TODO:
Environment
base class requirements, or if this is aKubernetesJobEnvironment2
or something like that. Environments in this new scheme only hold data to pass to the corresponding agent, no methods need be defined on them. Potential names might beRunConfig
/flow.run_config
, since it's configuration for a flow run. OrAgentConfig
since it's config corresponding to a particular agent? We might also keep the environment name and slowly deprecate old methods, no strong thoughts.