diff --git a/changes/pr103.yaml b/changes/pr103.yaml new file mode 100644 index 00000000..38f8a4f4 --- /dev/null +++ b/changes/pr103.yaml @@ -0,0 +1,5 @@ +feature: + - "Add ability to configure labels on individual flow runs - [#103](https://github.com/PrefectHQ/server/pull/103)" + +migration: + - "Add labels column to flow run table with sensible defaults - [#103](https://github.com/PrefectHQ/server/pull/103)" diff --git a/services/hasura/migrations/versions/metadata-24f10aeee83e.yaml b/services/hasura/migrations/versions/metadata-24f10aeee83e.yaml new file mode 100644 index 00000000..94611d11 --- /dev/null +++ b/services/hasura/migrations/versions/metadata-24f10aeee83e.yaml @@ -0,0 +1,265 @@ +functions: +- function: + name: downstream_tasks + schema: utility +- function: + name: upstream_tasks + schema: utility +tables: +- array_relationships: + - name: agents + using: + foreign_key_constraint_on: + column: agent_config_id + table: + name: agent + schema: public + table: + name: agent_config + schema: public +- array_relationships: + - name: downstream_edges + using: + foreign_key_constraint_on: + column: upstream_task_id + table: + name: edge + schema: public + - name: task_runs + using: + foreign_key_constraint_on: + column: task_id + table: + name: task_run + schema: public + - name: upstream_edges + using: + foreign_key_constraint_on: + column: downstream_task_id + table: + name: edge + schema: public + object_relationships: + - name: flow + using: + foreign_key_constraint_on: flow_id + table: + name: task + schema: public +- array_relationships: + - name: edges + using: + foreign_key_constraint_on: + column: flow_id + table: + name: edge + schema: public + - name: flow_runs + using: + foreign_key_constraint_on: + column: flow_id + table: + name: flow_run + schema: public + - name: tasks + using: + foreign_key_constraint_on: + column: flow_id + table: + name: task + schema: public + object_relationships: + - name: flow_group + using: + foreign_key_constraint_on: flow_group_id + - name: project + using: + foreign_key_constraint_on: project_id + - name: tenant + using: + foreign_key_constraint_on: tenant_id + table: + name: flow + schema: public +- array_relationships: + - name: flow_groups + using: + foreign_key_constraint_on: + column: tenant_id + table: + name: flow_group + schema: public + - name: flows + using: + foreign_key_constraint_on: + column: tenant_id + table: + name: flow + schema: public + - name: projects + using: + foreign_key_constraint_on: + column: tenant_id + table: + name: project + schema: public + table: + name: tenant + schema: public +- array_relationships: + - name: flow_runs + using: + foreign_key_constraint_on: + column: agent_id + table: + name: flow_run + schema: public + object_relationships: + - name: agent_config + using: + foreign_key_constraint_on: agent_config_id + table: + name: agent + schema: public +- array_relationships: + - name: flows + using: + foreign_key_constraint_on: + column: flow_group_id + table: + name: flow + schema: public + object_relationships: + - name: tenant + using: + foreign_key_constraint_on: tenant_id + table: + name: flow_group + schema: public +- array_relationships: + - name: flows + using: + foreign_key_constraint_on: + column: project_id + table: + name: flow + schema: public + object_relationships: + - name: tenant + using: + foreign_key_constraint_on: tenant_id + table: + name: project + schema: public +- array_relationships: + - name: logs + using: + foreign_key_constraint_on: + column: flow_run_id + table: + name: log + schema: public + - name: states + using: + foreign_key_constraint_on: + column: flow_run_id + table: + name: flow_run_state + schema: public + - name: task_runs + using: + foreign_key_constraint_on: + column: flow_run_id + table: + name: task_run + schema: public + object_relationships: + - name: agent + using: + foreign_key_constraint_on: agent_id + - name: flow + using: + foreign_key_constraint_on: flow_id + - name: tenant + using: + foreign_key_constraint_on: tenant_id + table: + name: flow_run + schema: public +- array_relationships: + - name: logs + using: + foreign_key_constraint_on: + column: task_run_id + table: + name: log + schema: public + - name: states + using: + foreign_key_constraint_on: + column: task_run_id + table: + name: task_run_state + schema: public + object_relationships: + - name: flow_run + using: + foreign_key_constraint_on: flow_run_id + - name: task + using: + foreign_key_constraint_on: task_id + - name: tenant + using: + foreign_key_constraint_on: tenant_id + table: + name: task_run + schema: public +- object_relationships: + - name: downstream_task + using: + foreign_key_constraint_on: downstream_task_id + - name: flow + using: + foreign_key_constraint_on: flow_id + - name: upstream_task + using: + foreign_key_constraint_on: upstream_task_id + table: + name: edge + schema: public +- object_relationships: + - name: flow_run + using: + foreign_key_constraint_on: flow_run_id + table: + name: flow_run_state + schema: public +- object_relationships: + - name: task + using: + manual_configuration: + column_mapping: + task_id: id + remote_table: + name: task + schema: public + table: + name: traversal + schema: utility +- object_relationships: + - name: task_run + using: + foreign_key_constraint_on: task_run_id + table: + name: task_run_state + schema: public +- table: + name: cloud_hook + schema: public +- table: + name: log + schema: public +- table: + name: message + schema: public +version: 2 diff --git a/services/postgres/alembic/versions/2020-09-30T123806_add_label_column_to_flow_runs.py b/services/postgres/alembic/versions/2020-09-30T123806_add_label_column_to_flow_runs.py new file mode 100644 index 00000000..0d695964 --- /dev/null +++ b/services/postgres/alembic/versions/2020-09-30T123806_add_label_column_to_flow_runs.py @@ -0,0 +1,54 @@ +""" +Add label column to flow runs + +Revision ID: 24f10aeee83e +Revises: 850b76d44332 +Create Date: 2020-09-30 12:38:06.915340 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import JSONB, UUID + + +# revision identifiers, used by Alembic. +revision = "24f10aeee83e" +down_revision = "850b76d44332" +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column( + "flow_run", sa.Column("labels", JSONB, nullable=False, server_default="[]") + ) + # this query looks at all currently scheduled flow runs, or more generally any flow + # run that has been updated in the last day, and applies our standard label logic to it. + # This ensures that active flow runs don't incorrectly receive `[]` as their label set, + # which would cause delayed work and user frustration. + # Note that flow runs not touched by this query will have `[]` as their labels regardless of + # their flow label properties. + op.execute( + """ + WITH runs_to_update AS ( + SELECT + flow_run.id, + COALESCE( + flow_group.labels, + flow.environment -> 'labels', + '[]')::JSONB as new_labels + from flow_run + join flow on flow_run.flow_id = flow.id + join flow_group on flow.flow_group_id = flow_group.id + where flow_run.state = 'Scheduled' OR flow_run.updated > NOW() - interval '1 day' + ) + update flow_run + set labels = runs_to_update.new_labels + from runs_to_update + where flow_run.id = runs_to_update.id; + """ + ) + + +def downgrade(): + op.drop_column("flow_run", "labels") diff --git a/src/prefect_server/api/cloud_hooks.py b/src/prefect_server/api/cloud_hooks.py index ba744e28..f7b63889 100644 --- a/src/prefect_server/api/cloud_hooks.py +++ b/src/prefect_server/api/cloud_hooks.py @@ -5,7 +5,6 @@ import httpx from box import Box -from pydantic import BaseModel import prefect from prefect import api, models diff --git a/src/prefect_server/api/runs.py b/src/prefect_server/api/runs.py index a7e184d1..d6b3d1de 100644 --- a/src/prefect_server/api/runs.py +++ b/src/prefect_server/api/runs.py @@ -27,6 +27,7 @@ async def create_flow_run( flow_run_name: str = None, version_group_id: str = None, idempotency_key: str = None, + labels: List[str] = None, ) -> Any: """ Creates a new flow run for an existing flow. @@ -42,6 +43,7 @@ async def create_flow_run( recent unarchived version of the group - idempotency_key (str, optional): An optional idempotency key to prevent duplicate run creation. Idempotency keys are only respected for 24 hours after a flow is created. + - labels (List[str], optional): a list of labels to apply to this individual flow run """ if idempotency_key is not None: @@ -67,6 +69,7 @@ async def create_flow_run( scheduled_start_time=scheduled_start_time, flow_run_name=flow_run_name, version_group_id=version_group_id, + labels=labels, ) if idempotency_key is not None: @@ -77,6 +80,18 @@ async def create_flow_run( return flow_run_id +@register_api("runs.set_flow_run_labels") +async def set_flow_run_labels(flow_run_id: str, labels: List[str]) -> bool: + if flow_run_id is None: + raise ValueError("Invalid flow run ID") + elif labels is None: + raise ValueError("Invalid labels") + result = await models.FlowRun.where(id=flow_run_id).update( + {"labels": sorted(labels)} + ) + return bool(result.affected_rows) + + @register_api("runs.set_flow_run_name") async def set_flow_run_name(flow_run_id: str, name: str) -> bool: if flow_run_id is None: @@ -105,6 +120,7 @@ async def _create_flow_run( scheduled_start_time: datetime.datetime = None, flow_run_name: str = None, version_group_id: str = None, + labels: List[str] = None, ) -> Any: """ Creates a new flow run for an existing flow. @@ -118,6 +134,7 @@ async def _create_flow_run( - flow_run_name (str, optional): An optional string representing this flow run - version_group_id (str, optional): An optional version group ID; if provided, will run the most recent unarchived version of the group + - labels (List[str], optional): a list of labels to apply to this individual flow run """ if flow_id is None and version_group_id is None: @@ -138,9 +155,11 @@ async def _create_flow_run( "id": True, "archived": True, "tenant_id": True, + "environment": True, + "run_config": True, "parameters": True, "flow_group_id": True, - "flow_group": {"default_parameters": True}, + "flow_group": {"default_parameters": True, "labels": True}, }, order_by={"version": EnumValue("desc")}, ) # type: Any @@ -155,6 +174,17 @@ async def _create_flow_run( elif flow.archived: raise ValueError(f"Flow {flow.id} is archived.") + # set labels + if labels is not None: + run_labels = labels + elif flow.flow_group.labels is not None: + run_labels = flow.flow_group.labels + elif flow.run_config is not None: + run_labels = flow.run_config.get("labels") or [] + else: + run_labels = flow.environment.get("labels") or [] + run_labels.sort() + # check parameters run_parameters = flow.flow_group.default_parameters run_parameters.update((parameters or {})) @@ -167,6 +197,7 @@ async def _create_flow_run( run = models.FlowRun( tenant_id=flow.tenant_id, flow_id=flow_id or flow.id, + labels=run_labels, parameters=run_parameters, context=context or {}, scheduled_start_time=scheduled_start_time, @@ -426,6 +457,7 @@ async def get_runs_in_queue( "run_config": True, "flow_group": {"labels": True}, }, + "labels": True, }, order_by=[{"state_start_time": EnumValue("asc")}], # get extra in case labeled runs don't show up at the top @@ -439,16 +471,7 @@ async def get_runs_in_queue( if counter == config.queued_runs_returned_limit: continue - # critical line: if flow_group labels are None that means use the flow labels - if flow_run.flow.flow_group.labels is not None: - run_labels = flow_run.flow.flow_group.labels - else: - # Use labels from `run_config` if present, otherwise use `environment` - run_labels = ( - flow_run.flow.run_config.get("labels") - if flow_run.flow.run_config - else flow_run.flow.environment.get("labels") - ) or [] + run_labels = flow_run.labels # if the run labels are a superset of the provided labels, skip if set(run_labels) - set(labels): diff --git a/src/prefect_server/api/states.py b/src/prefect_server/api/states.py index b9a96843..882e148f 100644 --- a/src/prefect_server/api/states.py +++ b/src/prefect_server/api/states.py @@ -5,7 +5,6 @@ import uuid import pendulum -from box import Box import prefect from prefect import api, models diff --git a/src/prefect_server/cli/dev.py b/src/prefect_server/cli/dev.py index 03855ecd..e6e64b5a 100644 --- a/src/prefect_server/cli/dev.py +++ b/src/prefect_server/cli/dev.py @@ -1,6 +1,5 @@ import asyncio import atexit -import glob import os import shutil import signal diff --git a/src/prefect_server/database/models.py b/src/prefect_server/database/models.py index d19e39bf..ed41283f 100644 --- a/src/prefect_server/database/models.py +++ b/src/prefect_server/database/models.py @@ -126,6 +126,7 @@ class FlowRun(HasuraModel): tenant_id: UUIDString = None flow_id: UUIDString = None parameters: Dict[str, Any] = None + labels: List[str] = None context: Dict[str, Any] = None version: int = None heartbeat: datetime.datetime = None diff --git a/src/prefect_server/database/orm.py b/src/prefect_server/database/orm.py index cee98e46..82bbc9b4 100644 --- a/src/prefect_server/database/orm.py +++ b/src/prefect_server/database/orm.py @@ -8,7 +8,6 @@ import pydantic import prefect -import prefect_server from prefect.utilities.graphql import with_args from prefect_server.database.hasura import GQLObjectTypes diff --git a/src/prefect_server/graphql/agents.py b/src/prefect_server/graphql/agents.py index b8c6cb1c..2e13c0ec 100644 --- a/src/prefect_server/graphql/agents.py +++ b/src/prefect_server/graphql/agents.py @@ -1,9 +1,8 @@ -from typing import Any, List +from typing import Any from graphql import GraphQLResolveInfo from prefect_server import api -from prefect_server.utilities import context from prefect_server.utilities.graphql import mutation diff --git a/src/prefect_server/graphql/runs.py b/src/prefect_server/graphql/runs.py index 38940320..819bf486 100644 --- a/src/prefect_server/graphql/runs.py +++ b/src/prefect_server/graphql/runs.py @@ -20,7 +20,7 @@ async def resolve_mapped_children( Retrieve details about a task run's mapped children """ query = r""" - SELECT + SELECT min(task_run.start_time) AS min_start_time, max(task_run.end_time) AS max_end_time, task_run.state, @@ -29,7 +29,7 @@ async def resolve_mapped_children( JOIN task_run AS reference ON task_run.flow_run_id = reference.flow_run_id AND task_run.task_id = reference.task_id - WHERE + WHERE reference.id = $1 AND reference.map_index < 0 AND task_run.map_index >= 0 @@ -77,6 +77,7 @@ async def resolve_create_flow_run( flow_run_name=input.get("flow_run_name"), version_group_id=input.get("version_group_id"), idempotency_key=input.get("idempotency_key"), + labels=input.get("labels"), ) } @@ -84,6 +85,17 @@ async def resolve_create_flow_run( return result +@mutation.field("set_flow_run_labels") +async def resolve_set_flow_run_labels( + obj: Any, info: GraphQLResolveInfo, input: dict +) -> dict: + return { + "success": await api.runs.set_flow_run_labels( + flow_run_id=input["flow_run_id"], labels=input["labels"] + ) + } + + @mutation.field("set_flow_run_name") async def resolve_set_flow_run_name( obj: Any, info: GraphQLResolveInfo, input: dict diff --git a/src/prefect_server/graphql/schema/runs.graphql b/src/prefect_server/graphql/schema/runs.graphql index 462e68d0..ba769597 100644 --- a/src/prefect_server/graphql/schema/runs.graphql +++ b/src/prefect_server/graphql/schema/runs.graphql @@ -14,6 +14,9 @@ extend type Mutation { "Rename a task run" set_task_run_name(input: set_task_run_name_input!): success_payload + "Update the labels for a flow run" + set_flow_run_labels(input: set_flow_run_labels_input!): success_payload + """ Create a new task run, if it doesn't exist already. This is useful when dealing with dynamic task runs, such as those created by mapping, when it is not known if the task run has been @@ -52,6 +55,8 @@ input create_flow_run_input { "A JSON payload specifying parameter values for any parameters in this flow" parameters: JSON "A JSON payload of key / value pairs to be placed in Prefect context for this run" + labels: [String!] + "A list of labels to apply to the newly created run" context: JSON "An optional start time to schedule this run for - if not provided, will be scheduled immediately" scheduled_start_time: DateTime @@ -63,6 +68,13 @@ input create_flow_run_input { version_group_id: String } +input set_flow_run_labels_input { + "The ID of the flow run" + flow_run_id: UUID! + "The new labels" + labels: [String!]! +} + input set_flow_run_name_input { "The ID of the flow run" flow_run_id: UUID! @@ -127,4 +139,4 @@ type mapped_children_payload { min_start_time: DateTime max_end_time: DateTime state_counts: JSON -} \ No newline at end of file +} diff --git a/src/prefect_server/services/towel/lazarus.py b/src/prefect_server/services/towel/lazarus.py index 8c754d13..95da9299 100644 --- a/src/prefect_server/services/towel/lazarus.py +++ b/src/prefect_server/services/towel/lazarus.py @@ -1,6 +1,6 @@ import asyncio import datetime -from typing import Any, Dict, List +from typing import Any, Dict import pendulum diff --git a/src/prefect_server/services/towel/zombie_killer.py b/src/prefect_server/services/towel/zombie_killer.py index ff1bd3c3..b283957f 100644 --- a/src/prefect_server/services/towel/zombie_killer.py +++ b/src/prefect_server/services/towel/zombie_killer.py @@ -1,6 +1,6 @@ import asyncio import datetime -from typing import Any, Dict, List +from typing import Any, Dict import pendulum diff --git a/tests/api/test_runs.py b/tests/api/test_runs.py index ac8acfe7..1a2c99a0 100644 --- a/tests/api/test_runs.py +++ b/tests/api/test_runs.py @@ -31,6 +31,60 @@ async def test_create_flow_run(self, simple_flow_id): flow_run_id = await api.runs.create_flow_run(flow_id=simple_flow_id) assert await models.FlowRun.exists(flow_run_id) + async def test_create_flow_run_accepts_labels(self, simple_flow_id): + flow_run_id = await api.runs.create_flow_run( + flow_id=simple_flow_id, labels=["one", "two"] + ) + flow_run = await models.FlowRun.where(id=flow_run_id).first({"labels"}) + assert flow_run.labels == ["one", "two"] + + async def test_create_flow_run_respects_flow_group_labels( + self, + tenant_id, + labeled_flow_id, + ): + # update the flow group's labels + labels = ["meep", "morp"] + labeled_flow = await models.Flow.where(id=labeled_flow_id).first( + {"flow_group_id"} + ) + await api.flow_groups.set_flow_group_labels( + flow_group_id=labeled_flow.flow_group_id, labels=labels + ) + # create a run + flow_run_id = await api.runs.create_flow_run(flow_id=labeled_flow_id) + flow_run = await models.FlowRun.where(id=flow_run_id).first({"labels"}) + assert flow_run.labels == ["meep", "morp"] + + async def test_create_flow_run_respects_flow_labels( + self, + tenant_id, + labeled_flow_id, + ): + labeled_flow = await models.Flow.where(id=labeled_flow_id).first( + {"environment"} + ) + # create a flow run + flow_run_id = await api.runs.create_flow_run(flow_id=labeled_flow_id) + flow_run = await models.FlowRun.where(id=flow_run_id).first({"labels"}) + assert flow_run.labels == sorted(labeled_flow.environment["labels"]) + + async def test_create_flow_run_respects_empty_flow_group_labels( + self, + tenant_id, + labeled_flow_id, + ): + labeled_flow = await models.Flow.where(id=labeled_flow_id).first( + {"flow_group_id"} + ) + await api.flow_groups.set_flow_group_labels( + flow_group_id=labeled_flow.flow_group_id, labels=[] + ) + # create a run + flow_run_id = await api.runs.create_flow_run(flow_id=labeled_flow_id) + flow_run = await models.FlowRun.where(id=flow_run_id).first({"labels"}) + assert flow_run.labels == [] + async def test_create_flow_run_with_version_group_id(self, project_id): flow_ids = [] for _ in range(15): @@ -763,6 +817,23 @@ async def test_get_flow_run_in_queue_uses_labels( assert labeled_flow_run_id in flow_runs assert flow_run_id not in flow_runs + async def test_get_flow_run_in_queue_uses_run_labels( + self, + tenant_id, + flow_id, + labeled_flow_run_id, + ): + + flow_run_id = await api.runs.create_flow_run( + flow_id=flow_id, labels=["dev", "staging"] + ) + + flow_runs = await api.runs.get_runs_in_queue( + tenant_id=tenant_id, labels=["dev", "staging"] + ) + assert flow_run_id in flow_runs + assert labeled_flow_run_id not in flow_runs + async def test_get_flow_run_in_queue_works_if_environment_labels_are_none( self, tenant_id, flow_run_id, flow_id ): @@ -1035,42 +1106,30 @@ async def test_get_runs_in_queue_ignores_task_runs_with_start_time_none( assert flow_run_id not in flow_runs -class TestGetRunsInQueueFlowGroupLabels: - async def test_get_flow_runs_in_queue_respects_flow_group_labels( - self, tenant_id, labeled_flow_id, labeled_flow_run_id - ): - # update the flow group's labels - labels = ["meep", "morp"] - labeled_flow = await models.Flow.where(id=labeled_flow_id).first( - {"flow_group_id"} - ) - await api.flow_groups.set_flow_group_labels( - flow_group_id=labeled_flow.flow_group_id, labels=labels - ) - # get runs in queue - flow_runs = await api.runs.get_runs_in_queue(tenant_id=tenant_id, labels=labels) - # confirm we could retrieve with the new labels - assert labeled_flow_run_id in flow_runs +class TestSetFlowRunLabels: + async def test_set_flow_run_labels(self, flow_run_id): + fr = await models.FlowRun.where(id=flow_run_id).first({"labels"}) + assert fr.labels == [] - async def test_get_flow_run_in_queue_respects_empty_flow_group_labels( - self, tenant_id, labeled_flow_id, labeled_flow_run_id - ): - labeled_flow = await models.Flow.where(id=labeled_flow_id).first( - {"flow_group_id"} - ) - await api.flow_groups.set_flow_group_labels( - flow_group_id=labeled_flow.flow_group_id, labels=[] - ) - flow_runs = await api.runs.get_runs_in_queue(tenant_id=tenant_id, labels=[]) - # confirm we could retrieve the run without labels - assert labeled_flow_run_id in flow_runs + await api.runs.set_flow_run_labels(flow_run_id=flow_run_id, labels=["a", "b"]) - # set flow group labels to none and confirm the run isn't retrieved - await api.flow_groups.set_flow_group_labels( - flow_group_id=labeled_flow.flow_group_id, labels=None + fr = await models.FlowRun.where(id=flow_run_id).first({"labels"}) + assert fr.labels == ["a", "b"] + + async def test_set_flow_run_labels_must_have_value(self, flow_run_id): + with pytest.raises(ValueError, match="Invalid labels"): + await api.runs.set_flow_run_labels(flow_run_id=flow_run_id, labels=None) + + async def test_set_flow_run_id_invalid(self): + assert not await api.runs.set_flow_run_labels( + flow_run_id=str(uuid.uuid4()), labels=["a"] ) - flow_runs = await api.runs.get_runs_in_queue(tenant_id=tenant_id, labels=[]) - assert labeled_flow_run_id not in flow_runs + + async def test_set_flow_run_id_none(self): + with pytest.raises(ValueError, match="Invalid flow run ID"): + assert not await api.runs.set_flow_run_labels( + flow_run_id=None, labels=["a"] + ) class TestSetFlowRunName: diff --git a/tests/database/test_orm.py b/tests/database/test_orm.py index 249a747a..0f9017d5 100644 --- a/tests/database/test_orm.py +++ b/tests/database/test_orm.py @@ -143,6 +143,7 @@ async def test_nested_insert_array(self, flow_id): """ insert nested objects as an array""" flow_run_id = await models.FlowRun( flow_id=flow_id, + labels=[], states=[ models.FlowRunState(state="test", serialized_state={}), models.FlowRunState(state="test", serialized_state={}), @@ -160,6 +161,7 @@ async def test_nested_insert_array_dicts(self, flow_id): """ insert nested objects as an array""" flow_run_id = await models.FlowRun( flow_id=flow_id, + labels=[], states=[ dict(state="test", serialized_state={}), dict(state="test", serialized_state={}), diff --git a/tests/fixtures/database_fixtures.py b/tests/fixtures/database_fixtures.py index ad23220f..7b98368d 100644 --- a/tests/fixtures/database_fixtures.py +++ b/tests/fixtures/database_fixtures.py @@ -77,7 +77,7 @@ async def flow_group_id(flow_id): async def labeled_flow_id(project_id): flow = prefect.Flow( name="Labeled Flow", - environment=prefect.environments.execution.remote.RemoteEnvironment( + environment=prefect.environments.execution.local.LocalEnvironment( labels=["foo", "bar"] ), schedule=prefect.schedules.IntervalSchedule( diff --git a/tests/graphql/test_runs.py b/tests/graphql/test_runs.py index aae3f968..c3b6d100 100644 --- a/tests/graphql/test_runs.py +++ b/tests/graphql/test_runs.py @@ -38,14 +38,35 @@ async def test_create_flow_run(self, run_query, flow_id): "scheduled_start_time", "auto_scheduled", "context", + "labels", } ) assert fr.flow_id == flow_id + assert fr.labels == [] assert fr.scheduled_start_time == dt assert fr.parameters == dict(x=1) assert fr.auto_scheduled is False assert fr.context == {"a": 2} + async def test_create_flow_run_with_labels(self, run_query, flow_id): + result = await run_query( + query=self.mutation, + variables=dict( + input=dict( + flow_id=flow_id, + labels=["a", "b", "c"], + ) + ), + ) + fr = await models.FlowRun.where(id=result.data.create_flow_run.id).first( + { + "flow_id", + "labels", + } + ) + assert fr.flow_id == flow_id + assert fr.labels == ["a", "b", "c"] + async def test_create_flow_run_with_version_group_id(self, run_query, flow_id): dt = pendulum.now("utc").add(hours=1) f = await models.Flow.where(id=flow_id).first({"version_group_id"}) @@ -528,6 +549,42 @@ async def test_multiple_runs_in_queue_before_certain_time( assert len(result.data.get_runs_in_queue.flow_run_ids) == i + 1 +class TestSetFlowRunLabels: + mutation = """ + mutation($input: set_flow_run_labels_input!) { + set_flow_run_labels(input: $input) { + success + } + } + """ + + async def test_set_flow_run_labels(self, run_query, flow_run_id): + + fr = await models.FlowRun.where(id=flow_run_id).first({"labels"}) + assert fr.labels == [] + + result = await run_query( + query=self.mutation, + variables=dict(input=dict(flow_run_id=flow_run_id, labels=["big", "boo"])), + ) + + fr = await models.FlowRun.where(id=flow_run_id).first({"labels"}) + assert fr.labels == ["big", "boo"] + + async def test_set_flow_run_labels_to_empty(self, run_query, labeled_flow_run_id): + + fr = await models.FlowRun.where(id=labeled_flow_run_id).first({"labels"}) + assert fr.labels + + result = await run_query( + query=self.mutation, + variables=dict(input=dict(flow_run_id=labeled_flow_run_id, labels=[])), + ) + + fr = await models.FlowRun.where(id=labeled_flow_run_id).first({"labels"}) + assert fr.labels == [] + + class TestSetFlowRunName: mutation = """ mutation($input: set_flow_run_name_input!) {