Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

expose ui/api urls in prefect.runtime.task_run #16879

Merged
merged 1 commit into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 37 additions & 9 deletions src/prefect/runtime/task_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,21 @@
from __future__ import annotations

import os
from typing import Any, Callable, Dict, List, Optional
from typing import Any, Callable

from prefect.context import TaskRunContext
from prefect.settings import get_current_settings

__all__ = ["id", "tags", "name", "parameters", "run_count", "task_name"]
__all__ = [
"id",
"tags",
"name",
"parameters",
"run_count",
"task_name",
"api_url",
"ui_url",
]


type_cast: dict[
Expand Down Expand Up @@ -72,17 +82,17 @@ def __getattr__(name: str) -> Any:
return real_value


def __dir__() -> List[str]:
def __dir__() -> list[str]:
return sorted(__all__)


def get_id() -> str:
def get_id() -> str | None:
task_run_ctx = TaskRunContext.get()
if task_run_ctx is not None:
return str(task_run_ctx.task_run.id)


def get_tags() -> List[str]:
def get_tags() -> list[str]:
task_run_ctx = TaskRunContext.get()
if task_run_ctx is None:
return []
Expand All @@ -98,35 +108,53 @@ def get_run_count() -> int:
return task_run_ctx.task_run.run_count


def get_name() -> Optional[str]:
def get_name() -> str | None:
task_run_ctx = TaskRunContext.get()
if task_run_ctx is None:
return None
else:
return task_run_ctx.task_run.name


def get_task_name() -> Optional[str]:
def get_task_name() -> str | None:
task_run_ctx = TaskRunContext.get()
if task_run_ctx is None:
return None
else:
return task_run_ctx.task.name


def get_parameters() -> Dict[str, Any]:
def get_parameters() -> dict[str, Any]:
task_run_ctx = TaskRunContext.get()
if task_run_ctx is not None:
return task_run_ctx.parameters
else:
return {}


FIELDS: dict[str, Callable[[], Any]] = {
def get_task_run_api_url() -> str | None:
if (api_url := get_current_settings().api.url) is None:
return None
if (task_run_id := get_id()) is None:
return None
return f"{api_url}/runs/task-run/{task_run_id}"
cicdw marked this conversation as resolved.
Show resolved Hide resolved


def get_task_run_ui_url() -> str | None:
if (ui_url := get_current_settings().ui_url) is None:
return None
if (task_run_id := get_id()) is None:
return None
return f"{ui_url}/runs/task-run/{task_run_id}"


FIELDS: dict[str, Callable[[], Any | None]] = {
"id": get_id,
"tags": get_tags,
"name": get_name,
"parameters": get_parameters,
"run_count": get_run_count,
"task_name": get_task_name,
"api_url": get_task_run_api_url,
"ui_url": get_task_run_ui_url,
}
68 changes: 44 additions & 24 deletions tests/runtime/test_flow_run.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import dataclasses
import datetime
from typing import Any

import pendulum
import pytest
Expand All @@ -20,13 +21,13 @@ async def test_access_unknown_attribute_fails(self):

async def test_import_unknown_attribute_fails(self):
with pytest.raises(ImportError, match="boop"):
from prefect.runtime.flow_run import boop # noqa
from prefect.runtime.flow_run import boop # noqa # type: ignore

async def test_known_attributes_autocomplete(self):
assert "id" in dir(flow_run)
assert "foo" not in dir(flow_run)

async def test_new_attribute_via_env_var(self, monkeypatch):
async def test_new_attribute_via_env_var(self, monkeypatch: pytest.MonkeyPatch):
monkeypatch.setenv(name="PREFECT__RUNTIME__FLOW_RUN__NEW_KEY", value="foobar")
assert flow_run.new_key == "foobar"

Expand All @@ -47,7 +48,12 @@ async def test_new_attribute_via_env_var(self, monkeypatch):
],
)
async def test_attribute_override_via_env_var(
self, monkeypatch, attribute_name, attribute_value, env_value, expected_value
self,
monkeypatch: pytest.MonkeyPatch,
attribute_name: str,
attribute_value: Any,
env_value: str,
expected_value: Any,
):
# mock attribute_name to be a function that generates attribute_value
monkeypatch.setitem(flow_run.FIELDS, attribute_name, lambda: attribute_value)
Expand All @@ -71,7 +77,7 @@ async def test_attribute_override_via_env_var(
],
)
async def test_attribute_override_via_env_var_not_allowed(
self, monkeypatch, attribute_name, attribute_value
self, monkeypatch: pytest.MonkeyPatch, attribute_name: str, attribute_value: Any
):
# mock attribute_name to be a function that generates attribute_value
monkeypatch.setitem(flow_run.FIELDS, attribute_name, lambda: attribute_value)
Expand All @@ -96,12 +102,12 @@ async def test_id_is_attribute(self):
async def test_id_is_none_when_not_set(self):
assert flow_run.id is None

async def test_id_uses_env_var_when_set(self, monkeypatch):
async def test_id_uses_env_var_when_set(self, monkeypatch: pytest.MonkeyPatch):
monkeypatch.setenv(name="PREFECT__FLOW_RUN_ID", value="foo")
assert flow_run.id == "foo"

async def test_id_prioritizes_context_info_over_env_var_dynamically(
self, monkeypatch
self, monkeypatch: pytest.MonkeyPatch
):
monkeypatch.setenv(name="PREFECT__FLOW_RUN_ID", value="foo")

Expand Down Expand Up @@ -142,7 +148,9 @@ def run_with_tags():

assert flow_run.tags == []

async def test_tags_pulls_from_api_when_needed(self, monkeypatch, prefect_client):
async def test_tags_pulls_from_api_when_needed(
self, monkeypatch: pytest.MonkeyPatch, prefect_client: PrefectClient
):
run = await prefect_client.create_flow_run(
flow=flow(lambda: None, name="test"), tags=["red", "green"]
)
Expand Down Expand Up @@ -170,7 +178,9 @@ async def test_run_count_returns_run_count_when_present_dynamically(self):

assert flow_run.run_count == 0

async def test_run_count_from_api(self, monkeypatch, prefect_client):
async def test_run_count_from_api(
self, monkeypatch: pytest.MonkeyPatch, prefect_client: PrefectClient
):
run = await prefect_client.create_flow_run(
flow=flow(lambda: None, name="test", retries=5)
)
Expand All @@ -193,7 +203,7 @@ async def test_scheduled_start_time_is_timestamp_when_not_set(self):
assert isinstance(flow_run.scheduled_start_time, datetime.datetime)

async def test_scheduled_start_time_pulls_from_api_when_needed(
self, monkeypatch, prefect_client
self, monkeypatch: pytest.MonkeyPatch, prefect_client: PrefectClient
):
TIMESTAMP = pendulum.now("utc").add(days=7)
run = await prefect_client.create_flow_run(
Expand Down Expand Up @@ -224,7 +234,9 @@ async def test_name_returns_name_when_present_dynamically(self):

assert flow_run.name is None

async def test_name_pulls_from_api_when_needed(self, monkeypatch, prefect_client):
async def test_name_pulls_from_api_when_needed(
self, monkeypatch: pytest.MonkeyPatch, prefect_client: PrefectClient
):
run = await prefect_client.create_flow_run(
flow=flow(lambda: None, name="test"), name="foo"
)
Expand Down Expand Up @@ -253,7 +265,7 @@ async def test_flow_name_returns_flow_name_when_present_dynamically(self):
assert flow_run.flow_name is None

async def test_flow_name_pulls_from_api_when_needed(
self, monkeypatch, prefect_client
self, monkeypatch: pytest.MonkeyPatch, prefect_client: PrefectClient
):
run = await prefect_client.create_flow_run(
flow=flow(lambda: None, name="foo"), name="bar"
Expand All @@ -279,7 +291,9 @@ async def test_parameters_from_context(self):
):
assert flow_run.parameters == {"x": "foo", "y": "bar"}

async def test_parameters_from_api(self, monkeypatch, prefect_client):
async def test_parameters_from_api(
self, monkeypatch: pytest.MonkeyPatch, prefect_client: PrefectClient
):
run = await prefect_client.create_flow_run(
flow=flow(lambda: None, name="foo"), parameters={"x": "foo", "y": "bar"}
)
Expand All @@ -300,7 +314,9 @@ def my_flow(x):

assert my_flow(foo) == {"x": foo}

async def test_outside_flow_run_uses_serialized_parameters(self, monkeypatch):
async def test_outside_flow_run_uses_serialized_parameters(
self, monkeypatch: pytest.MonkeyPatch
):
@dataclasses.dataclass
class Foo:
y: int
Expand All @@ -325,7 +341,7 @@ async def test_parent_flow_run_id_is_empty_when_not_set(self):
assert flow_run.parent_flow_run_id is None

async def test_parent_flow_run_id_returns_parent_flow_run_id_when_present_dynamically(
self, prefect_client
self, prefect_client: PrefectClient
):
assert flow_run.parent_flow_run_id is None

Expand Down Expand Up @@ -360,7 +376,7 @@ def foo():
assert flow_run.parent_flow_run_id is None

async def test_parent_flow_run_id_pulls_from_api_when_needed(
self, monkeypatch, prefect_client
self, monkeypatch: pytest.MonkeyPatch, prefect_client: PrefectClient
):
assert flow_run.parent_flow_run_id is None

Expand Down Expand Up @@ -402,7 +418,7 @@ async def test_parent_deployment_id_is_empty_when_not_set(self):
assert flow_run.parent_deployment_id is None

async def test_parent_deployment_id_returns_parent_deployment_id_when_present_dynamically(
self, prefect_client
self, prefect_client: PrefectClient
):
assert flow_run.parent_deployment_id is None

Expand Down Expand Up @@ -460,7 +476,7 @@ def foo():
assert flow_run.parent_deployment_id is None

async def test_parent_deployment_id_pulls_from_api_when_needed(
self, monkeypatch, prefect_client: PrefectClient
self, monkeypatch: pytest.MonkeyPatch, prefect_client: PrefectClient
):
assert flow_run.parent_deployment_id is None

Expand Down Expand Up @@ -536,7 +552,7 @@ async def test_root_flow_run_id_is_empty_when_not_set(self):
assert flow_run.root_flow_run_id is None

async def test_root_flow_run_id_pulls_from_api_when_needed(
self, monkeypatch, prefect_client
self, monkeypatch: pytest.MonkeyPatch, prefect_client: PrefectClient
):
assert flow_run.root_flow_run_id is None

Expand Down Expand Up @@ -591,11 +607,11 @@ def child_task():

class TestURL:
@pytest.mark.parametrize("url_type", ["api_url", "ui_url"])
async def test_url_is_attribute(self, url_type):
async def test_url_is_attribute(self, url_type: str):
assert url_type in dir(flow_run)

@pytest.mark.parametrize("url_type", ["api_url", "ui_url"])
async def test_url_is_none_when_id_not_set(self, url_type):
async def test_url_is_none_when_id_not_set(self, url_type: str):
assert getattr(flow_run, url_type) is None

@pytest.mark.parametrize(
Expand All @@ -604,13 +620,15 @@ async def test_url_is_none_when_id_not_set(self, url_type):
)
async def test_url_returns_correct_url_when_id_present(
self,
url_type,
url_type: str,
):
test_id = "12345"
if url_type == "api_url":
base_url_value = PREFECT_API_URL.value()
elif url_type == "ui_url":
base_url_value = PREFECT_UI_URL.value()
else:
raise ValueError(f"Invalid url_type: {url_type}")

expected_url = f"{base_url_value}/flow-runs/flow-run/{test_id}"

Expand All @@ -627,9 +645,9 @@ async def test_url_returns_correct_url_when_id_present(
)
async def test_url_pulls_from_api_when_needed(
self,
monkeypatch,
prefect_client,
url_type,
monkeypatch: pytest.MonkeyPatch,
prefect_client: PrefectClient,
url_type: str,
):
run = await prefect_client.create_flow_run(flow=flow(lambda: None, name="test"))

Expand All @@ -639,6 +657,8 @@ async def test_url_pulls_from_api_when_needed(
base_url_value = PREFECT_API_URL.value()
elif url_type == "ui_url":
base_url_value = PREFECT_UI_URL.value()
else:
raise ValueError(f"Invalid url_type: {url_type}")

expected_url = f"{base_url_value}/flow-runs/flow-run/{str(run.id)}"

Expand Down
Loading