Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redesign K8s agent/environment interaction #3333

Merged
merged 22 commits into from
Sep 30, 2020

Conversation

jcrist
Copy link

@jcrist jcrist commented Sep 16, 2020

This is still a huge WIP, just pushing some stuff up for others to see.

The goal here is to simplify all the configuration options prefect supports when deploying on k8s (we'll apply similar techniques later to the other agents/environments). Currently, if a user wants to customize a k8s deployment they have a few options:

  • Apply some configuration options via the agent CLI (e.g. default namespace, environment variables, etc...)
  • Change the default yaml template to use a new template via the agent
  • Use a KubernetesJobEnvironment to fully customize the job, at the cost of kicking off a second job (a job that starts a job).

The mix of environment and agent options can be a bit confusing for new and experienced users (including myself). The kubernetes agent can run jobs using the KubernetesJobEnvironment, but they can also run jobs using the LocalEnvironment, which is unfortunately named since "local" here means inside a container.

To simplify things, the following design is used:

  • One-to-one relationship between agents and environments. If you're using a k8s agent, you need a k8s environment.
  • An environment should be usable by a user with no further configuration. Admins can change what a "default" run looks like by configuring options on the agent.
  • An experienced user can also tweak all options through the specified environment as well. There's no disconnect where some options need to be set on the agent and some on the environment.
  • For users who are extremely security minded we need to provide an option to store all "code" (e.g. yaml manifests) outside of Prefect infrastructure. However, we also support storing manifests in prefect cloud for users who are fine with this (I suspect many won't care).

This includes the following changes:

  • The executor configuration now lives on the Flow itself, rather than in an "environment". This is nice, as it improves parity between flow.register() and flow.run(), where the same configured executor is used. Several users have been confused in the past about why flow.run() doesn't use the configuration in the environment.
  • A new "environment" is created called KubernetesJobConfig. I'm not sure what the final name will be, or if we'll rename environments to something new. For now this is hacked on as a subclass of Environment, but it doesn't need to be. An "environment" in the new design only stores data that the corresponding agent needs at run time, it should have no methods defined itself (no more setup and execute).
  • A new agent is created (can be started with prefect agent start kubernetes2). We could merge this back in as a separate code path in the existing k8s agent (I don't think this would be terribly difficult), started as a separate agent just to get things working.

When using the new KubernetesJobConfig (or whatever we call it), a user has a few options:

  • Specify nothing, and get the default behavior defined on the kubernetes agent. I suspect most users will use this.
flow.environment = KubernetesJobConfig()
  • Specify a few common options (image, resource requests, environment variables, etc...) via keyword arguments on the environment. This makes common changes simpler without requiring the user know all about k8s specs.
flow.environment = KubernetesJobConfig(cpu_limit="4", env={"EXAMPLE": "VALUE"})
  • Specify a k8s job spec to use as a template. This can be specified directly via job_template (as a nested dict, or as a yaml string), or by passing a local path to job_template_path, which will be read in on init. I'm not sold that this is the clearest kwarg api, but I think both methods here are likely to be used. In this case the k8s object will be stored in prefect cloud's DB.
flow.environment = KubernetesJobConfig(job_template=some_nested_dict_representation)
flow.environment = KubernetesJobConfig(job_template_path="/path/to/a/job.yaml")
  • Specify a remote path to a job template, which is loaded at runtime. This can be nice for users who want to update the flow templates without re-registering, or who don't want to store the template inside prefect cloud (for security or other reasons). Supported locations include gcs, s3, and files local to the agent at runtime. Note that any credentials needed to access these files will need to be configured on the agent environment.
flow.environment = KubernetesJobConfig(job_template_path="gcs://bucket/path/to/job.yaml")

TODO:

  • Figure out how we want to go about merging this (new agent? merge back into old agent (I think this makes more sense)?).
  • Figure out if we want to rename the new environment here to a new concept to drop the old Environment base class requirements, or if this is a KubernetesJobEnvironment2 or something like that. Environments in this new scheme only hold data to pass to the corresponding agent, no methods need be defined on them. Potential names might be RunConfig/flow.run_config, since it's configuration for a flow run. Or AgentConfig since it's config corresponding to a particular agent? We might also keep the environment name and slowly deprecate old methods, no strong thoughts.
  • Update serializers depending on decisions with the above. For now things are super hacked together so they still work with no changes to cloud/server.
  • Tests
  • Docs

@jcrist jcrist requested a review from joshmeek as a code owner September 16, 2020 23:31
@jcrist
Copy link
Author

jcrist commented Sep 16, 2020

The following example runs fine for me locally:

import time
import os

from prefect import Flow, task
from prefect.environments.execution.k8s.job2 import KubernetesJobConfig
from prefect.environments.storage import Docker
from prefect.engine.executors import LocalDaskExecutor

@task
def hello(name):
    print(f"Hello, {name}")

@task
def echo_environment_var(name):
    value = os.environ[name]
    print(f"{name} = {value}")

with Flow("hello") as flow:
    names = ["frodo", "sam", "merry", "pippin", "bilbo"]
    hello.map(names)
    echo_environment_var("TEST")

# Use a `LocalDaskExecutor` with this flow, whether running remotely or locally
flow.executor = LocalDaskExecutor()

# Configure the k8s job to use with this flow. Here the job template
# is stored in gcs, with a small tweak applied to add an environment variable.
flow.environment = KubernetesJobConfig(
    job_template_path="gcs://jch-testing/job_templates/job_template.yaml",
    env={"TEST": "IT-WORKED"}
)

# Use docker storage
flow.storage = Docker()

flow.register("testing")

@codecov
Copy link

codecov bot commented Sep 16, 2020

Codecov Report

Merging #3333 into master will increase coverage by 0.06%.
The diff coverage is 93.75%.

Copy link
Member

@cicdw cicdw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is awesome to see - I'll need to think a little more about it, but I think merging these agents would be feasible.

In the short term, could we introduce executors as a Flow attribute as a separate PR and possibly update environments to use flow.executor as the default, unless configured otherwise? That would pick off one piece of the puzzle.

I'll play with this a little more to see how else we might break this up, but this looks really promising

path = key.split(".")
for k in path[:-1]:
d = d.setdefault(k, {})
return d.setdefault(path[-1], val)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code looks similar to our plugins code here: https://github.com/PrefectHQ/prefect/blob/master/src/prefect/utilities/plugins.py

Note sure if there's any re-use opportunity, but thought it was worth calling out

args: ["prefect execute flow-run"]
resources:
requests:
cpu: "100m"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Independently of this PR, I think we should increase these defaults by a little

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would you like them to be?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @cicdw ^^

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally was thinking that we should move both the limit / request to something closer to 500m although I don't have a specific justification for that number; we've seen a few instances where this job is evicted / freezes and I suspect it's related to this low value.

Because this is soon going to be very easy to configure, I don't feel strongly about updating it immediately.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm half tempted to remove it entirely. Imposing a limit (especially one this low) on a job by default is making assumptions for the user. Compute-heavy jobs will be heavily slowed by this limit. Most helm charts (at least the ones I've used) leave the resources field empty by default, leaving it up to the user to decide the resource requirements/limits.

My gut says that for naive users, having no request/limit here won't be a problem, and for expert/enterprise-y users, they'll likely want to configure the default template themselves anyway.

Not a strong thought though, up to you.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just chiming in to say that this (0.1 limit) caused us quite a few hard-to-diagnose problems with the large mapping flow that we've discussed before...to me it really seems like the downsides to a default limit far outweigh the upsides

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In #3459 I propose we remove these limits (reasoning given in the PR description).

@jcrist
Copy link
Author

jcrist commented Sep 17, 2020

In the short term, could we introduce executors as a Flow attribute as a separate PR and possibly update environments to use flow.executor as the default, unless configured otherwise? That would pick off one piece of the puzzle.

See #3338. I'm not sure if there are any (non-breaking) changes we can make around environments. Currently all environments create a default executor on init (using the default executor defined in the flow registration environment), changing this would be a breaking change (but maybe one we're ok making?). Alternatively, we could make the executor defined on a flow (if present) override any defined on the environment? This would be confusing if a user configured in two places, but the long term plan is to remove them from environments in total, so that'd go away.

For now, I think I'd like to keep the change here minimal and less publicized.

@jcrist jcrist marked this pull request as draft September 25, 2020 15:38
@jcrist
Copy link
Author

jcrist commented Sep 25, 2020

Alright, this has now been updated and works with PrefectHQ/server#100.

Current status:

  • The k8s agent can deploy flows with both the new run_config field and the old environment field. If a run_config is present, it's always used.
  • A base class RunConfig has been added for defining flow-run configurations. The only implementation so far is KubernetesJob.
  • Serializers have been updated to support serializing this new field along with the flow. Note that these look a bit different than the existing marshmallow serializers - I wanted the output json to match a certain schema, and the existing marshmallow machinery didn't support that as easily as one would hope (too much magic for my liking). In the future it should be straightforward to port this to pydantic.

TODO:

  • tests
  • docstrings. Since we don't want to highly publicize this for a bit, docstrings alone should suffice for now I think

Pending tests and docs, if people are feeling adventurous things are starting to be reviewable now.

Docs section is still hidden (not in sidebar), since I'm not sure if we
want to expose it yet.
Copy link
Member

@cicdw cicdw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looking good! I left some comments / questions / suggestions.

For testing / rollout, if we agree on the schema for the new run config objects we could separate out the serialization logic into its own smaller PR and merge it to get the serializers deployed to Cloud for further testing - I have a slight preference for rolling this out that way, but open to other thoughts

src/prefect/agent/kubernetes/agent.py Show resolved Hide resolved
src/prefect/agent/kubernetes/agent.py Outdated Show resolved Hide resolved
src/prefect/agent/kubernetes/agent.py Outdated Show resolved Hide resolved
src/prefect/cli/execute.py Outdated Show resolved Hide resolved
src/prefect/serialization/environment.py Outdated Show resolved Hide resolved
src/prefect/serialization/run_config.py Outdated Show resolved Hide resolved
src/prefect/utilities/agent.py Outdated Show resolved Hide resolved
@jcrist
Copy link
Author

jcrist commented Sep 29, 2020

For testing / rollout, if we agree on the schema for the new run config objects we could separate out the serialization logic into its own smaller PR and merge it to get the serializers deployed to Cloud for further testing

Since cloud is using pydantic now, wouldn't that be a separate PR to cloud/server to handle that anyway? Is that not handled by PrefectHQ/server#100?

My petty gripes with marshmallow don't outweigh the value of consistency
in the codebase.
The docs test for properly formatted section headers was too strict, and
prevented use of header words anywhere else in the docstring (i.e.
"Example" used in a sentence was forbidden). We now only check if a
header keyword is on a line by itself without a ":". This should catch
the most common source of errors, forgetting the colon, without causing
too much trouble elsewhere.
@jcrist jcrist changed the title WIP - Redesign K8s agent/environment interaction Redesign K8s agent/environment interaction Sep 30, 2020
@jcrist jcrist marked this pull request as ready for review September 30, 2020 17:04
@@ -232,6 +232,11 @@ title = "Execution Environments"
module = "prefect.environments.execution"
classes = ["DaskKubernetesEnvironment", "DaskCloudProviderEnvironment", "FargateTaskEnvironment", "KubernetesJobEnvironment", "LocalEnvironment"]

[pages.run_configs]
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This page exists in the docs now, but I didn't add it to the index. Secret docs for secret features.

cicdw
cicdw previously approved these changes Sep 30, 2020
Copy link
Member

@cicdw cicdw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This LGTM! Exciting stuff - are you holding off on a changelog entry because this is still somewhat experimental?

@jcrist
Copy link
Author

jcrist commented Sep 30, 2020

are you holding off on a changelog entry because this is still somewhat experimental?

No strong thoughts here, could go either way. I don't think we should publicize it now, but I'll probably want to know when the feature first landed, and a changelog entry would help with that.

`.call_args` doesn't have the `.args`/`.kwargs` properties until python
3.8.
@cicdw
Copy link
Member

cicdw commented Sep 30, 2020

Should we add a new "Experimental" changelog category for these sorts of situations?

joshmeek
joshmeek previously approved these changes Sep 30, 2020
@jcrist
Copy link
Author

jcrist commented Sep 30, 2020

Meh, I think I'll add an entry in Enhancements, but qualify it with "experimental". I'll also tweak the KubernetesJob docstring to mention that it may change.

@jcrist
Copy link
Author

jcrist commented Sep 30, 2020

Hmmm, windows only failure. Looking into it.

Windows paths can have an optional drive component, which complicates
things. We now use a custom `parse_path` function instead of directly
using `urlparse`, which handles this logic for us. This led to a new
utility module `prefect.utilities.filesystems`. I also moved the
`read_bytes_from_path` method there, and relocated the tests.
@jcrist jcrist merged commit b18d8aa into PrefectHQ:master Sep 30, 2020
@jcrist jcrist deleted the agent-env-refactor branch September 30, 2020 22:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants