Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add agent registration for server users #3385

Merged
merged 19 commits into from
Sep 29, 2020
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changes/pr3385.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
enhancement:
- "Enable agent registration for server users - [#3385](https://github.com/PrefectHQ/prefect/pull/3385)"
36 changes: 28 additions & 8 deletions src/prefect/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ class Agent:
environment variable or in your user configuration file.

Args:
- agent_config_id (str, optional): An optional agent ID that can be used to set configuration
joshmeek marked this conversation as resolved.
Show resolved Hide resolved
based on an agent from a backend API. If set all configuration values will be pulled
joshmeek marked this conversation as resolved.
Show resolved Hide resolved
from backend agent configuration. If not set, any manual kwargs will be used.
- name (str, optional): An optional name to give this agent. Can also be set through
the environment variable `PREFECT__CLOUD__AGENT__NAME`. Defaults to "agent"
- labels (List[str], optional): a list of labels, which are arbitrary string
Expand All @@ -106,15 +109,20 @@ class Agent:

def __init__(
self,
agent_config_id: str = None,
name: str = None,
labels: Iterable[str] = None,
env_vars: dict = None,
max_polls: int = None,
agent_address: str = None,
no_cloud_logs: bool = False,
) -> None:
self.name = name or config.cloud.agent.get("name", "agent")
# Load token and initialize client
token = config.cloud.agent.get("auth_token")
self.client = Client(api_server=config.cloud.api, api_token=token)

self.agent_config_id = agent_config_id
self.name = name or config.cloud.agent.get("name", "agent")
self.labels = labels or list(config.cloud.agent.get("labels", []))
self.env_vars = env_vars or config.cloud.agent.get("env_vars", dict())
self.max_polls = max_polls
Expand All @@ -123,6 +131,7 @@ def __init__(
self.agent_address = agent_address or config.cloud.agent.get(
"agent_address", ""
)

self._api_server = None # type: ignore
self._api_server_loop = None # type: Optional[IOLoop]
self._api_server_thread = None # type: Optional[threading.Thread]
Expand All @@ -146,12 +155,8 @@ def __init__(
self.logger.debug(f"Agent address: {self.agent_address}")
self.logger.debug(f"Log to Cloud: {self.log_to_cloud}")

token = config.cloud.agent.get("auth_token")

self.logger.debug(f"Prefect backend: {config.backend}")

self.client = Client(api_server=config.cloud.api, api_token=token)

def _verify_token(self, token: str) -> None:
"""
Checks whether a token with a `RUNNER` scope was provided
Expand All @@ -173,19 +178,34 @@ def _verify_token(self, token: str) -> None:

def _register_agent(self) -> str:
"""
Register this agent with Prefect Cloud and retrieve agent ID
Register this agent with a backend API and retrieve the ID

Returns:
- The agent ID as a string
"""
agent_id = self.client.register_agent(
agent_type=type(self).__name__, name=self.name, labels=self.labels # type: ignore
agent_type=type(self).__name__,
name=self.name,
labels=self.labels, # type: ignore
agent_config_id=self.agent_config_id,
)

self.logger.debug(f"Agent ID: {agent_id}")

if self.agent_config_id:
self._retrieve_agent_config()

return agent_id

def _retrieve_agent_config(self) -> dict:
"""
Retrieve the configuration of an agent if an agent ID is provided

Returns:
- dict: a dictionary of agent configuration
"""
return self.client.get_agent_config(self.agent_config_id) # type: ignore

def start(self, _loop_intervals: dict = None) -> None:
"""
The main entrypoint to the agent. This function loops and constantly polls for
Expand All @@ -196,7 +216,7 @@ def start(self, _loop_intervals: dict = None) -> None:
"""
if config.backend == "cloud":
self._verify_token(self.client.get_auth_token())
self.client.attach_headers({"X-PREFECT-AGENT-ID": self._register_agent()})
self.client.attach_headers({"X-PREFECT-AGENT-ID": self._register_agent()})

try:
self.setup()
Expand Down
5 changes: 5 additions & 0 deletions src/prefect/agent/docker/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ class DockerAgent(Agent):
```

Args:
- agent_config_id (str, optional): An optional agent configuration ID that can be used to set
configuration based on an agent from a backend API. If set all configuration values will be
pulled from backend agent configuration.
- name (str, optional): An optional name to give this agent. Can also be set through
the environment variable `PREFECT__CLOUD__AGENT__NAME`. Defaults to "agent"
- labels (List[str], optional): a list of labels, which are arbitrary string
Expand Down Expand Up @@ -79,6 +82,7 @@ class DockerAgent(Agent):

def __init__(
self,
agent_config_id: str = None,
name: str = None,
labels: Iterable[str] = None,
env_vars: dict = None,
Expand All @@ -94,6 +98,7 @@ def __init__(
reg_allow_list: List[str] = None,
) -> None:
super().__init__(
agent_config_id=agent_config_id,
name=name,
labels=labels,
env_vars=env_vars,
Expand Down
5 changes: 5 additions & 0 deletions src/prefect/agent/fargate/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ class FargateAgent(Agent):
```

Args:
- agent_config_id (str, optional): An optional agent configuration ID that can be used to set
configuration based on an agent from a backend API. If set all configuration values will be
pulled from backend agent configuration.
- name (str, optional): An optional name to give this agent. Can also be set through
the environment variable `PREFECT__CLOUD__AGENT__NAME`. Defaults to "agent"
- labels (List[str], optional): a list of labels, which are arbitrary string
Expand Down Expand Up @@ -99,6 +102,7 @@ class FargateAgent(Agent):

def __init__( # type: ignore
self,
agent_config_id: str = None,
name: str = None,
labels: Iterable[str] = None,
env_vars: dict = None,
Expand All @@ -118,6 +122,7 @@ def __init__( # type: ignore
**kwargs,
) -> None:
super().__init__(
agent_config_id=agent_config_id,
name=name,
labels=labels,
env_vars=env_vars,
Expand Down
5 changes: 5 additions & 0 deletions src/prefect/agent/kubernetes/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ class KubernetesAgent(Agent):
```

Args:
- agent_config_id (str, optional): An optional agent configuration ID that can be used to set
configuration based on an agent from a backend API. If set all configuration values will be
pulled from backend agent configuration.
- namespace (str, optional): A Kubernetes namespace to create jobs in. Defaults
to the environment variable `NAMESPACE` or `default`.
- name (str, optional): An optional name to give this agent. Can also be set through
Expand All @@ -67,6 +70,7 @@ class KubernetesAgent(Agent):

def __init__(
self,
agent_config_id: str = None,
namespace: str = None,
name: str = None,
labels: Iterable[str] = None,
Expand All @@ -78,6 +82,7 @@ def __init__(
volumes: List[dict] = None,
) -> None:
super().__init__(
agent_config_id=agent_config_id,
name=name,
labels=labels,
env_vars=env_vars,
Expand Down
5 changes: 5 additions & 0 deletions src/prefect/agent/local/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ class LocalAgent(Agent):
```

Args:
- agent_config_id (str, optional): An optional agent configuration ID that can be used to set
configuration based on an agent from a backend API. If set all configuration values will be
pulled from backend agent configuration.
- name (str, optional): An optional name to give this agent. Can also be set through
the environment variable `PREFECT__CLOUD__AGENT__NAME`. Defaults to "agent"
- labels (List[str], optional): a list of labels, which are arbitrary string
Expand All @@ -56,6 +59,7 @@ class LocalAgent(Agent):

def __init__(
self,
agent_config_id: str = None,
name: str = None,
labels: Iterable[str] = None,
env_vars: dict = None,
Expand All @@ -70,6 +74,7 @@ def __init__(
self.import_paths = import_paths or []
self.show_flow_logs = show_flow_logs
super().__init__(
agent_config_id=agent_config_id,
name=name,
labels=labels,
env_vars=env_vars,
Expand Down
50 changes: 29 additions & 21 deletions src/prefect/cli/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def agent():
"--token", "-t", required=False, help="A Prefect Cloud API token.", hidden=True
)
@click.option("--api", "-a", required=False, help="A Prefect API URL.", hidden=True)
@click.option("--agent-config-id", required=False, help="An agent ID", hidden=True)
@click.option(
"--name",
"-n",
Expand Down Expand Up @@ -141,6 +142,7 @@ def start(
agent_option,
token,
api,
agent_config_id,
name,
verbose,
label,
Expand All @@ -167,27 +169,28 @@ def start(

\b
Options:
--token, -t TEXT A Prefect Cloud API token with RUNNER scope
--api, -a TEXT A Prefect API URL
--name, -n TEXT A name to use for the agent
--verbose, -v Enable verbose agent DEBUG logs
Defaults to INFO level logging
--label, -l TEXT Labels the agent will use to query for flow runs
Multiple values supported e.g. `-l label1 -l label2`
--env, -e TEXT Environment variables to set on each submitted flow
run.
Note that equal signs in environment variable values
are not currently supported from the CLI. Multiple
values supported.
e.g. `-e AUTH=token -e PKG_SETTING=true`
--max-polls INT Maximum number of times the agent should poll the
Prefect API for flow runs. Will run forever if not
specified.
--no-cloud-logs Turn off logging to the Prefect API for all flow runs
Defaults to `False`
--agent-address TEXT The address to server internal api at. Currently this
is just health checks for use by an orchestration layer
(e.g. kubernetes). Leave blank for no api server (default).
--token, -t TEXT A Prefect Cloud API token with RUNNER scope
--api, -a TEXT A Prefect API URL
--agent-config--id TEXT An agent ID to link this agent instance with
--name, -n TEXT A name to use for the agent
--verbose, -v Enable verbose agent DEBUG logs
Defaults to INFO level logging
--label, -l TEXT Labels the agent will use to query for flow runs
Multiple values supported e.g. `-l label1 -l label2`
--env, -e TEXT Environment variables to set on each submitted flow
run.
Note that equal signs in environment variable values
are not currently supported from the CLI. Multiple
values supported.
e.g. `-e AUTH=token -e PKG_SETTING=true`
--max-polls INT Maximum number of times the agent should poll the
Prefect API for flow runs. Will run forever if not
specified.
--no-cloud-logs Turn off logging to the Prefect API for all flow runs
Defaults to `False`
--agent-address TEXT The address to server internal api at. Currently this
is just health checks for use by an orchestration layer
(e.g. kubernetes). Leave blank for no api server (default).

\b
Local Agent:
Expand Down Expand Up @@ -253,6 +256,7 @@ def start(

if agent_option == "local":
from_qualified_name(retrieved_agent)(
agent_config_id=agent_config_id,
name=name,
labels=labels,
env_vars=env_vars,
Expand All @@ -264,6 +268,7 @@ def start(
).start()
elif agent_option == "docker":
from_qualified_name(retrieved_agent)(
agent_config_id=agent_config_id,
name=name,
labels=labels,
env_vars=env_vars,
Expand All @@ -278,6 +283,7 @@ def start(
).start()
elif agent_option == "fargate":
from_qualified_name(retrieved_agent)(
agent_config_id=agent_config_id,
name=name,
labels=labels,
env_vars=env_vars,
Expand All @@ -287,6 +293,7 @@ def start(
).start()
elif agent_option == "kubernetes":
from_qualified_name(retrieved_agent)(
agent_config_id=agent_config_id,
namespace=namespace,
name=name,
labels=labels,
Expand All @@ -296,6 +303,7 @@ def start(
).start()
else:
from_qualified_name(retrieved_agent)(
agent_config_id=agent_config_id,
name=name,
labels=labels,
env_vars=env_vars,
Expand Down
40 changes: 37 additions & 3 deletions src/prefect/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1462,16 +1462,21 @@ def write_run_logs(self, logs: List[Dict]) -> None:
raise ValueError("Writing logs failed.")

def register_agent(
self, agent_type: str, name: str = None, labels: List[str] = None
self,
agent_type: str,
name: str = None,
labels: List[str] = None,
agent_config_id: str = None,
) -> str:
"""
Register an agent with Cloud
Register an agent with a backend API

Args:
- agent_type (str): The type of agent being registered
- name: (str, optional): The name of the agent being registered
- labels (List[str], optional): A list of any present labels on the agent
being registered
- agent_config_id (str, optional): The ID of an agent configuration to register with

Returns:
- The agent ID as a string
Expand All @@ -1484,10 +1489,39 @@ def register_agent(

result = self.graphql(
mutation,
variables=dict(input=dict(type=agent_type, name=name, labels=labels)),
variables=dict(
input=dict(
type=agent_type,
name=name,
labels=labels or [],
tenant_id=self._active_tenant_id,
agent_config_id=agent_config_id,
)
),
)

if not result.data.register_agent.id:
raise ValueError("Error registering agent")

return result.data.register_agent.id

def get_agent_config(self, agent_config_id: str) -> dict:
"""
Get agent config settings

Args:
- agent_config_id (str): The ID of an agent configuration to retrieve

Returns:
- dict: the agent configuration's `settings`
"""
query = {
"query": {
with_args(
"agent_config", {"where": {"id": {"_eq": agent_config_id}}}
): {"settings": True}
}
}

result = self.graphql(query) # type: Any
return result.data.agent_config[0].settings
Loading