Skip to content

Commit

Permalink
Merge pull request #3333 from jcrist/agent-env-refactor
Browse files Browse the repository at this point in the history
Redesign K8s agent/environment interaction
  • Loading branch information
jcrist authored Sep 30, 2020
2 parents d2d3ad3 + 43ea3e3 commit b18d8aa
Show file tree
Hide file tree
Showing 23 changed files with 935 additions and 204 deletions.
2 changes: 2 additions & 0 deletions changes/pr3333.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
enhancement:
- "Add `flow.run_config`, an *experimental* design for configuring deployed flows - [#3333](https://github.com/PrefectHQ/prefect/pull/3333)"
21 changes: 13 additions & 8 deletions docs/generate_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,15 +126,20 @@ def wrapped(*args, **kwargs):
return wrapped


VALID_DOCSTRING_SECTIONS = [
"Args",
"Returns",
"Raises",
"Example",
"Examples",
"References",
]


def clean_line(line):
line = (
line.replace("Args:", "**Args**:")
.replace("Returns:", "**Returns**:")
.replace("Raises:", "**Raises**:")
.replace("Example:", "**Example**:")
.replace("References:", "**References**:")
.replace(".**", ".\n\n**")
)
for header in VALID_DOCSTRING_SECTIONS:
line = line.replace(f"{header}:", f"**{header}**:")
line = line.replace(".**", ".\n\n**")
return line.lstrip()


Expand Down
5 changes: 5 additions & 0 deletions docs/outline.toml
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,11 @@ title = "Execution Environments"
module = "prefect.environments.execution"
classes = ["DaskKubernetesEnvironment", "DaskCloudProviderEnvironment", "FargateTaskEnvironment", "KubernetesJobEnvironment", "LocalEnvironment"]

[pages.run_configs]
title = "Run Configuration"
module = "prefect.run_configs"
classes = ["RunConfig", "KubernetesJob"]

[pages.tasks.control_flow]
title = "Control Flow Tasks"
module = "prefect.tasks.control_flow"
Expand Down
88 changes: 20 additions & 68 deletions docs/test_generate_docs.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import inspect
import re
import sys
import textwrap
Expand All @@ -22,6 +23,7 @@
get_call_signature,
get_class_methods,
patch_imports,
VALID_DOCSTRING_SECTIONS,
)

with patch_imports():
Expand Down Expand Up @@ -468,71 +470,21 @@ def my_doc():
assert res.count(r">\*<") == 2


@pytest.mark.parametrize(
"fn", [fn for page in OUTLINE for fn in page.get("functions", [])]
)
def test_sections_have_formatted_headers_for_function_docs(fn):
doc = format_doc(fn, in_table=True)
for section in ["Args", "Returns", "Raises", "References", "Example"]:
option1 = ">**{}**:".format(section)
option2 = "\n**{}**:".format(section)
assert (section in doc) is any(
[(o in doc) for o in (option1, option2)]
), "{fn.__name__} has a poorly formatted {sec} header.".format(
fn=fn, sec=section
)
if (section != "Example") and section in doc:
assert "{}**:<".format(section) in doc.replace(
" ", ""
), "{fn.__name__} has a poorly formatted {sec} listing.".format(
fn=fn, sec=section
)


@pytest.mark.parametrize(
"obj", [obj for page in OUTLINE for obj in page.get("classes", [])]
)
def test_sections_have_formatted_headers_for_class_docs(obj):
doc = format_doc(obj)
for section in ["Args", "Returns", "Raises", "References", "Example"]:
option1 = ">**{}**:".format(section)
option2 = "\n**{}**:".format(section)
option3 = "**{}**:".format(section)
assert (section in doc) is any(
[(o in doc) for o in (option1, option2)] + [doc.startswith(option3)]
), "{obj.__module__}.{obj.__name__} has a poorly formatted {sec} header.".format(
obj=obj, sec=section
)
if (section != "Example") and section in doc:
assert "{}**:<".format(section) in doc.replace(
" ", ""
), "{obj.__module__}.{obj.__name__} has a poorly formatted {sec} listing.".format(
obj=obj, sec=section
)


@pytest.mark.parametrize(
"obj,fn",
[
(obj, fn)
for page in OUTLINE
for obj in page.get("classes", [])
for fn in get_class_methods(obj)
],
) # parametrized like this for easy reading of tests
def test_sections_have_formatted_headers_for_class_method_docs(obj, fn):
doc = format_doc(fn, in_table=True)
for section in ["Args", "Returns", "Raises", "Example"]:
option1 = ">**{}**:".format(section)
option2 = "\n**{}**:".format(section)
assert (section in doc) is any(
[(o in doc) for o in (option1, option2)]
), "{obj.__module__}.{obj.__name__}.{fn.__name__} has a poorly formatted {sec} header.".format(
obj=obj, fn=fn, sec=section
)
if (section != "Example") and section in doc:
assert "{}**:<".format(section) in doc.replace(
" ", ""
), "{obj.__module__}.{obj.__name__}.{fn.__name__} has a poorly formatted {sec} listing.".format(
obj=obj, fn=fn, sec=section
)
all_objects = []
for page in OUTLINE:
all_objects.extend(page.get("functions", []))
for cls in page.get("classes"):
all_objects.append(cls)
all_objects.extend(get_class_methods(cls))


@pytest.mark.parametrize("obj", all_objects)
def test_section_headers_are_properly_formatted(obj):
doc = inspect.getdoc(obj)
if not doc:
return
for section in VALID_DOCSTRING_SECTIONS:
if re.search(f"^\\s*{section}\\s*$", doc, flags=re.M):
assert (
False
), f"{obj.__module__}.{obj.__qualname__} has a poorly formatted '{section}' header"
1 change: 1 addition & 0 deletions src/prefect/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@ def query_flow_runs(self) -> list:
"id",
"name",
"environment",
"run_config",
"storage",
"version",
"core_version",
Expand Down
143 changes: 132 additions & 11 deletions src/prefect/agent/kubernetes/agent.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
import time
import uuid
from typing import Iterable, List
from typing import Iterable, List, Any

import json
import yaml
Expand All @@ -10,9 +10,24 @@
from prefect import config
from prefect.agent import Agent
from prefect.engine.state import Failed
from prefect.serialization.run_config import RunConfigSchema
from prefect.utilities.agent import get_flow_image, get_flow_run_command
from prefect.utilities.filesystems import read_bytes_from_path
from prefect.utilities.graphql import GraphQLResult

DEFAULT_JOB_TEMPLATE_PATH = os.path.join(os.path.dirname(__file__), "job_template.yaml")


def _get_or_create(d: dict, key: str, val: Any = None) -> Any:
"""Get a (possibly nested) field from a dict, creating intermediate values
if needed."""
if val is None:
val = {}
path = key.split(".")
for k in path[:-1]:
d = d.setdefault(k, {})
return d.setdefault(path[-1], val)


class KubernetesAgent(Agent):
"""
Expand All @@ -33,7 +48,7 @@ class KubernetesAgent(Agent):
```
For details on the available environment variables for customizing the job spec,
see `help(KubernetesAgent.replace_job_spec_yaml)`.
see `help(KubernetesAgent.generate_job_spec_from_environment)`.
Specifying a namespace for the agent will create flow run jobs in that namespace:
```
Expand All @@ -46,6 +61,8 @@ class KubernetesAgent(Agent):
pulled from backend agent configuration.
- namespace (str, optional): A Kubernetes namespace to create jobs in. Defaults
to the environment variable `NAMESPACE` or `default`.
- job_template_path (str, optional): A path to a job template file to use instead
of the default.
- name (str, optional): An optional name to give this agent. Can also be set through
the environment variable `PREFECT__CLOUD__AGENT__NAME`. Defaults to "agent"
- labels (List[str], optional): a list of labels, which are arbitrary string
Expand All @@ -72,6 +89,7 @@ def __init__(
self,
agent_config_id: str = None,
namespace: str = None,
job_template_path: str = None,
name: str = None,
labels: Iterable[str] = None,
env_vars: dict = None,
Expand All @@ -92,6 +110,7 @@ def __init__(
)

self.namespace = namespace or os.getenv("NAMESPACE", "default")
self.job_template_path = job_template_path or DEFAULT_JOB_TEMPLATE_PATH
self.volume_mounts = volume_mounts
self.volumes = volumes

Expand Down Expand Up @@ -216,9 +235,7 @@ def deploy_flow(self, flow_run: GraphQLResult) -> str:

self.logger.info("Deploying flow run {}".format(flow_run.id)) # type: ignore

image = get_flow_image(flow_run=flow_run)

job_spec = self.replace_job_spec_yaml(flow_run=flow_run, image=image)
job_spec = self.generate_job_spec(flow_run=flow_run)
job_name = job_spec["metadata"]["name"]

self.logger.debug("Creating namespaced job {}".format(job_name))
Expand Down Expand Up @@ -248,8 +265,22 @@ def deploy_flow(self, flow_run: GraphQLResult) -> str:

return "Job {}".format(job_name)

def replace_job_spec_yaml(
self, flow_run: GraphQLResult, image: str, identifier: str = None
def generate_job_spec(self, flow_run: GraphQLResult) -> dict:
"""Generate a k8s job spec for a flow run
Args:
- flow_run (GraphQLResult): A flow run object
Returns:
- dict: a dictionary representation of a k8s job for flow execution
"""
if getattr(flow_run.flow, "run_config", None) is not None:
return self.generate_job_spec_from_run_config(flow_run)
else:
return self.generate_job_spec_from_environment(flow_run)

def generate_job_spec_from_environment(
self, flow_run: GraphQLResult, image: str = None
) -> dict:
"""
Populate a k8s job spec. This spec defines a k8s job that handles
Expand Down Expand Up @@ -277,14 +308,12 @@ def replace_job_spec_yaml(
Args:
- flow_run (GraphQLResult): A flow run object
- image (str): The full name of an image to use for the job
- identifier (str): A unique identifier to identify this job. If none is given,
Prefect will create a random identifier each time a job is created.
- image (str, optional): The full name of an image to use for the job
Returns:
- dict: a dictionary representation of a k8s job for flow execution
"""
identifier = identifier or str(uuid.uuid4())[:8]
identifier = str(uuid.uuid4())[:8]
yaml_path = os.getenv(
"YAML_TEMPLATE", os.path.join(os.path.dirname(__file__), "job_spec.yaml")
)
Expand All @@ -304,6 +333,8 @@ def replace_job_spec_yaml(
job["spec"]["template"]["metadata"]["labels"].update(**k8s_labels)

# Use provided image for job
if image is None:
image = get_flow_image(flow_run=flow_run)
job["spec"]["template"]["spec"]["containers"][0]["image"] = image

self.logger.debug("Using image {} for job".format(image))
Expand Down Expand Up @@ -369,6 +400,96 @@ def replace_job_spec_yaml(

return job

def generate_job_spec_from_run_config(self, flow_run: GraphQLResult) -> dict:
"""Generate a k8s job spec for a flow run.
Args:
- flow_run (GraphQLResult): A flow run object
Returns:
- dict: a dictionary representation of a k8s job for flow execution
"""
run_config = RunConfigSchema().load(flow_run.flow.run_config)

if run_config.job_template:
job = run_config.job_template
else:
job_template_path = run_config.job_template_path or self.job_template_path
self.logger.debug("Loading job template from %r", job_template_path)
template_bytes = read_bytes_from_path(job_template_path)
job = yaml.safe_load(template_bytes)

identifier = uuid.uuid4().hex[:8]

job_name = f"prefect-job-{identifier}"

# Populate job metadata for identification
k8s_labels = {
"prefect.io/identifier": identifier,
"prefect.io/flow_run_id": flow_run.id, # type: ignore
"prefect.io/flow_id": flow_run.flow.id, # type: ignore
}
_get_or_create(job, "metadata.labels")
_get_or_create(job, "spec.template.metadata.labels")
job["metadata"]["name"] = job_name
job["metadata"]["labels"].update(**k8s_labels)
job["spec"]["template"]["metadata"]["labels"].update(**k8s_labels)

# Get the first container, which is used for the prefect job
containers = _get_or_create(job, "spec.template.spec.containers", [])
if not containers:
containers.append({})
container = containers[0]

# Set container image
container["image"] = get_flow_image(flow_run)

# Set flow run command
container["args"] = [get_flow_run_command(flow_run)]

# Populate environment variables from the following sources,
# with reverse precedence (later sources override).
# - Values in the job template
# - Values set using the `--env` CLI flag on the agent
# - Values set on the job configuration
# - Hardcoded values below, provided they're not already set
env = self.env_vars.copy()
if run_config.env:
env.update(run_config.env)
env.update(
{
"PREFECT__CLOUD__API": config.cloud.api,
"PREFECT__CLOUD__AUTH_TOKEN": config.cloud.agent.auth_token,
"PREFECT__CLOUD__USE_LOCAL_SECRETS": "false",
"PREFECT__CONTEXT__FLOW_RUN_ID": flow_run.id,
"PREFECT__CONTEXT__FLOW_ID": flow_run.flow.id,
"PREFECT__LOGGING__LEVEL": config.logging.level,
"PREFECT__LOGGING__LOG_TO_CLOUD": str(self.log_to_cloud).lower(),
"PREFECT__ENGINE__FLOW_RUNNER__DEFAULT_CLASS": "prefect.engine.cloud.CloudFlowRunner",
"PREFECT__ENGINE__TASK_RUNNER__DEFAULT_CLASS": "prefect.engine.cloud.CloudTaskRunner",
}
)
container_env = [{"name": k, "value": v} for k, v in env.items()]
for entry in container.get("env", []):
if entry["name"] not in env:
container_env.append(entry)
container["env"] = container_env

# Set resource requirements if provided
_get_or_create(container, "resources.requests")
_get_or_create(container, "resources.limits")
resources = container["resources"]
if run_config.memory_request:
resources["requests"]["memory"] = run_config.memory_request
if run_config.memory_limit:
resources["limits"]["memory"] = run_config.memory_limit
if run_config.cpu_request:
resources["requests"]["cpu"] = run_config.cpu_request
if run_config.cpu_limit:
resources["limits"]["cpu"] = run_config.cpu_limit

return job

@staticmethod
def generate_deployment_yaml(
token: str = None,
Expand Down
15 changes: 15 additions & 0 deletions src/prefect/agent/kubernetes/job_template.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
apiVersion: batch/v1
kind: Job
spec:
template:
spec:
restartPolicy: Never
containers:
- name: flow
imagePullPolicy: IfNotPresent
command: ["/bin/sh", "-c"]
resources:
requests:
cpu: "100m"
limits:
cpu: "100m"
Loading

0 comments on commit b18d8aa

Please sign in to comment.