diff --git a/changes/pr166.yaml b/changes/pr166.yaml new file mode 100644 index 00000000..b46dd4e5 --- /dev/null +++ b/changes/pr166.yaml @@ -0,0 +1,5 @@ +migration: + - "Add `flow_group.run_config` and `flow_run.run_config` - [#166](https://github.com/PrefectHQ/server/pull/166)" + +feature: + - "Support per-flow-run and per-flow-group `run_config` overrides - [#166](https://github.com/PrefectHQ/server/pull/166)" diff --git a/services/hasura/migrations/versions/metadata-7ca57ea2fdff.yaml b/services/hasura/migrations/versions/metadata-7ca57ea2fdff.yaml new file mode 100644 index 00000000..72d9c44a --- /dev/null +++ b/services/hasura/migrations/versions/metadata-7ca57ea2fdff.yaml @@ -0,0 +1,272 @@ +functions: +- function: + name: downstream_tasks + schema: utility +- function: + name: upstream_tasks + schema: utility +tables: +- array_relationships: + - name: agents + using: + foreign_key_constraint_on: + column: agent_config_id + table: + name: agent + schema: public + table: + name: agent_config + schema: public +- array_relationships: + - name: downstream_edges + using: + foreign_key_constraint_on: + column: upstream_task_id + table: + name: edge + schema: public + - name: task_runs + using: + foreign_key_constraint_on: + column: task_id + table: + name: task_run + schema: public + - name: upstream_edges + using: + foreign_key_constraint_on: + column: downstream_task_id + table: + name: edge + schema: public + object_relationships: + - name: flow + using: + foreign_key_constraint_on: flow_id + table: + name: task + schema: public +- array_relationships: + - name: edges + using: + foreign_key_constraint_on: + column: flow_id + table: + name: edge + schema: public + - name: flow_runs + using: + foreign_key_constraint_on: + column: flow_id + table: + name: flow_run + schema: public + - name: tasks + using: + foreign_key_constraint_on: + column: flow_id + table: + name: task + schema: public + object_relationships: + - name: flow_group + using: + foreign_key_constraint_on: flow_group_id + - name: project + using: + foreign_key_constraint_on: project_id + - name: tenant + using: + foreign_key_constraint_on: tenant_id + table: + name: flow + schema: public +- array_relationships: + - name: flow_groups + using: + foreign_key_constraint_on: + column: tenant_id + table: + name: flow_group + schema: public + - name: flows + using: + foreign_key_constraint_on: + column: tenant_id + table: + name: flow + schema: public + - name: projects + using: + foreign_key_constraint_on: + column: tenant_id + table: + name: project + schema: public + table: + name: tenant + schema: public +- array_relationships: + - name: flow_runs + using: + foreign_key_constraint_on: + column: agent_id + table: + name: flow_run + schema: public + object_relationships: + - name: agent_config + using: + foreign_key_constraint_on: agent_config_id + table: + name: agent + schema: public +- array_relationships: + - name: flows + using: + foreign_key_constraint_on: + column: flow_group_id + table: + name: flow + schema: public + object_relationships: + - name: tenant + using: + foreign_key_constraint_on: tenant_id + table: + name: flow_group + schema: public +- array_relationships: + - name: flows + using: + foreign_key_constraint_on: + column: project_id + table: + name: flow + schema: public + object_relationships: + - name: tenant + using: + foreign_key_constraint_on: tenant_id + table: + name: project + schema: public +- array_relationships: + - name: logs + using: + foreign_key_constraint_on: + column: flow_run_id + table: + name: log + schema: public + - name: states + using: + foreign_key_constraint_on: + column: flow_run_id + table: + name: flow_run_state + schema: public + - name: task_runs + using: + foreign_key_constraint_on: + column: flow_run_id + table: + name: task_run + schema: public + object_relationships: + - name: agent + using: + foreign_key_constraint_on: agent_id + - name: flow + using: + foreign_key_constraint_on: flow_id + - name: tenant + using: + foreign_key_constraint_on: tenant_id + table: + name: flow_run + schema: public +- array_relationships: + - name: logs + using: + foreign_key_constraint_on: + column: task_run_id + table: + name: log + schema: public + - name: states + using: + foreign_key_constraint_on: + column: task_run_id + table: + name: task_run_state + schema: public + object_relationships: + - name: flow_run + using: + foreign_key_constraint_on: flow_run_id + - name: task + using: + foreign_key_constraint_on: task_id + - name: tenant + using: + foreign_key_constraint_on: tenant_id + table: + name: task_run + schema: public +- object_relationships: + - name: downstream_task + using: + foreign_key_constraint_on: downstream_task_id + - name: flow + using: + foreign_key_constraint_on: flow_id + - name: upstream_task + using: + foreign_key_constraint_on: upstream_task_id + table: + name: edge + schema: public +- object_relationships: + - name: flow_run + using: + foreign_key_constraint_on: flow_run_id + table: + name: flow_run_state + schema: public +- object_relationships: + - name: task + using: + manual_configuration: + column_mapping: + task_id: id + remote_table: + name: task + schema: public + table: + name: traversal + schema: utility +- object_relationships: + - name: task_run + using: + foreign_key_constraint_on: task_run_id + table: + name: task_run_artifact + schema: public +- object_relationships: + - name: task_run + using: + foreign_key_constraint_on: task_run_id + table: + name: task_run_state + schema: public +- table: + name: cloud_hook + schema: public +- table: + name: log + schema: public +- table: + name: message + schema: public +version: 2 diff --git a/services/postgres/alembic/versions/2020-12-28T150027_add_run_config_to_flow_runs.py b/services/postgres/alembic/versions/2020-12-28T150027_add_run_config_to_flow_runs.py new file mode 100644 index 00000000..fbb7c02b --- /dev/null +++ b/services/postgres/alembic/versions/2020-12-28T150027_add_run_config_to_flow_runs.py @@ -0,0 +1,50 @@ +""" +Add run_config to flow runs and flow groups + +Revision ID: 7ca57ea2fdff +Revises: 57ac2cb01ac1 +Create Date: 2020-12-28 15:00:27.573816 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import JSONB + + +# revision identifiers, used by Alembic. +revision = "7ca57ea2fdff" +down_revision = "57ac2cb01ac1" +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column( + "flow_run", sa.Column("run_config", JSONB, nullable=True, server_default=None) + ) + op.add_column( + "flow_group", sa.Column("run_config", JSONB, nullable=True, server_default=None) + ) + # This query updates all currently scheduled flow runs to add the run_config field + # from the corresponding flow. + op.execute( + """ + WITH runs_to_update AS ( + SELECT + flow_run.id, + flow.run_config + from flow_run + join flow on flow_run.flow_id = flow.id + where flow_run.state = 'Scheduled' OR flow_run.updated > NOW() - interval '1 day' + ) + update flow_run + set run_config = runs_to_update.run_config + from runs_to_update + where flow_run.id = runs_to_update.id; + """ + ) + + +def downgrade(): + op.drop_column("flow_run", "run_config") + op.drop_column("flow_group", "run_config") diff --git a/src/prefect_server/api/flow_groups.py b/src/prefect_server/api/flow_groups.py index 2e460142..9918aad7 100644 --- a/src/prefect_server/api/flow_groups.py +++ b/src/prefect_server/api/flow_groups.py @@ -1,4 +1,4 @@ -from typing import List +from typing import List, Dict, Any from prefect import api, models from prefect.serialization.schedule import ClockSchema @@ -159,6 +159,33 @@ async def set_flow_group_labels(flow_group_id: str, labels: List[str] = None) -> return bool(result.affected_rows) +@register_api("flow_groups.set_flow_group_run_config") +async def set_flow_group_run_config( + flow_group_id: str, run_config: Dict[str, Any] = None +) -> bool: + """ + Sets run_config for a flow group. + + Args: + - flow_group_id (str): the ID of the flow group to update + - run_config (dict, optional): a run-config override for a flow + group. Providing `None` defaults any previous flow group + `run_config` setting. + + Returns: + - bool: whether setting `run_config` for the flow group was successful + + Raises: + - ValueError: if flow group ID isn't provided + """ + if not flow_group_id: + raise ValueError("Invalid flow group ID") + result = await models.FlowGroup.where(id=flow_group_id).update( + set=dict(run_config=run_config) + ) + return bool(result.affected_rows) + + @register_api("flow_groups.enable_heartbeat") async def enable_heartbeat_for_flow(flow_group_id: str) -> bool: """ diff --git a/src/prefect_server/api/runs.py b/src/prefect_server/api/runs.py index 46afa829..066c1d72 100644 --- a/src/prefect_server/api/runs.py +++ b/src/prefect_server/api/runs.py @@ -28,6 +28,7 @@ async def create_flow_run( version_group_id: str = None, idempotency_key: str = None, labels: List[str] = None, + run_config: dict = None, ) -> Any: """ Creates a new flow run for an existing flow. @@ -44,6 +45,7 @@ async def create_flow_run( - idempotency_key (str, optional): An optional idempotency key to prevent duplicate run creation. Idempotency keys are only respected for 24 hours after a flow is created. - labels (List[str], optional): a list of labels to apply to this individual flow run + - run-config (dict, optional): A run-config override for this flow run. """ if idempotency_key is not None: @@ -69,6 +71,7 @@ async def create_flow_run( flow_run_name=flow_run_name, version_group_id=version_group_id, labels=labels, + run_config=run_config, ) if idempotency_key is not None: @@ -120,6 +123,7 @@ async def _create_flow_run( flow_run_name: str = None, version_group_id: str = None, labels: List[str] = None, + run_config: dict = None, ) -> Any: """ Creates a new flow run for an existing flow. @@ -134,6 +138,7 @@ async def _create_flow_run( - version_group_id (str, optional): An optional version group ID; if provided, will run the most recent unarchived version of the group - labels (List[str], optional): a list of labels to apply to this individual flow run + - run-config (dict, optional): A run-config override for this flow run. """ if flow_id is None and version_group_id is None: @@ -158,7 +163,11 @@ async def _create_flow_run( "run_config": True, "parameters": True, "flow_group_id": True, - "flow_group": {"default_parameters": True, "labels": True}, + "flow_group": { + "default_parameters": True, + "labels": True, + "run_config": True, + }, }, order_by={"version": EnumValue("desc")}, ) # type: Any @@ -173,11 +182,15 @@ async def _create_flow_run( elif flow.archived: raise ValueError(f"Flow {flow.id} is archived.") - # set labels + # determine active labels if labels is not None: run_labels = labels + elif run_config is not None: + run_labels = run_config.get("labels") or [] elif flow.flow_group.labels is not None: run_labels = flow.flow_group.labels + elif flow.flow_group.run_config is not None: + run_labels = flow.flow_group.run_config.get("labels") or [] elif flow.run_config is not None: run_labels = flow.run_config.get("labels") or [] elif flow.environment is not None: @@ -186,6 +199,13 @@ async def _create_flow_run( run_labels = [] run_labels.sort() + # determine active run_config + if run_config is None: + if flow.flow_group.run_config is not None: + run_config = flow.flow_group.run_config + else: + run_config = flow.run_config + # check parameters run_parameters = flow.flow_group.default_parameters run_parameters.update((parameters or {})) @@ -200,6 +220,7 @@ async def _create_flow_run( flow_id=flow_id or flow.id, labels=run_labels, parameters=run_parameters, + run_config=run_config, context=context or {}, scheduled_start_time=scheduled_start_time, name=flow_run_name or names.generate_slug(2), @@ -465,11 +486,6 @@ async def get_runs_in_queue( ).get( { "id": True, - "flow": { - "environment": True, - "run_config": True, - "flow_group": {"labels": True}, - }, "labels": True, }, order_by=[{"state_start_time": EnumValue("asc")}], diff --git a/src/prefect_server/database/models.py b/src/prefect_server/database/models.py index 6d532d69..0e2455ba 100644 --- a/src/prefect_server/database/models.py +++ b/src/prefect_server/database/models.py @@ -127,6 +127,7 @@ class FlowRun(HasuraModel): flow_id: UUIDString = None parameters: Dict[str, Any] = None labels: List[str] = None + run_config: Dict[str, Any] = None context: Dict[str, Any] = None version: int = None heartbeat: datetime.datetime = None @@ -312,6 +313,7 @@ class FlowGroup(HasuraModel): default_parameters: dict = None schedule: dict = None labels: List[str] = None + run_config: Dict[str, Any] = None # relationships flows: List["Flow"] = None diff --git a/src/prefect_server/graphql/flow_groups.py b/src/prefect_server/graphql/flow_groups.py index c28601b3..30f5d1c9 100644 --- a/src/prefect_server/graphql/flow_groups.py +++ b/src/prefect_server/graphql/flow_groups.py @@ -21,7 +21,17 @@ async def resolve_set_flow_group_labels( obj: Any, info: GraphQLResolveInfo, input: dict ) -> dict: result = await api.flow_groups.set_flow_group_labels( - flow_group_id=input["flow_group_id"], labels=input["labels"] + flow_group_id=input["flow_group_id"], labels=input.get("labels") + ) + return {"success": result} + + +@mutation.field("set_flow_group_run_config") +async def resolve_set_flow_group_run_config( + obj: Any, info: GraphQLResolveInfo, input: dict +) -> dict: + result = await api.flow_groups.set_flow_group_run_config( + flow_group_id=input["flow_group_id"], run_config=input.get("run_config") ) return {"success": result} diff --git a/src/prefect_server/graphql/runs.py b/src/prefect_server/graphql/runs.py index 16994b80..36eb87aa 100644 --- a/src/prefect_server/graphql/runs.py +++ b/src/prefect_server/graphql/runs.py @@ -116,6 +116,7 @@ async def resolve_create_flow_run( version_group_id=input.get("version_group_id"), idempotency_key=input.get("idempotency_key"), labels=input.get("labels"), + run_config=input.get("run_config"), ) } diff --git a/src/prefect_server/graphql/schema/flows.graphql b/src/prefect_server/graphql/schema/flows.graphql index 40d51734..23449cbd 100644 --- a/src/prefect_server/graphql/schema/flows.graphql +++ b/src/prefect_server/graphql/schema/flows.graphql @@ -41,7 +41,10 @@ extend type Mutation { "Set labels for a flow group." set_flow_group_labels(input: set_flow_group_labels_input!): success_payload - + + "Set run_config for a flow group." + set_flow_group_run_config(input: set_flow_group_run_config_input!): success_payload + "Set a schedule for a flow group." set_flow_group_schedule( input: set_flow_group_schedule_input! @@ -151,7 +154,14 @@ input set_flow_group_labels_input { labels: [String!] } - input set_flow_group_schedule_input { +input set_flow_group_run_config_input { + "The ID of the flow group to update" + flow_group_id: UUID! + "The serialized run config to associate with the flow group" + run_config: JSON +} + +input set_flow_group_schedule_input { "The ID of the flow group to update" flow_group_id: UUID! "A list of cron clocks for the schedule" diff --git a/src/prefect_server/graphql/schema/runs.graphql b/src/prefect_server/graphql/schema/runs.graphql index fe0cae26..01e477a0 100644 --- a/src/prefect_server/graphql/schema/runs.graphql +++ b/src/prefect_server/graphql/schema/runs.graphql @@ -63,9 +63,11 @@ input create_flow_run_input { flow_id: UUID "A JSON payload specifying parameter values for any parameters in this flow" parameters: JSON - "A JSON payload of key / value pairs to be placed in Prefect context for this run" - labels: [String!] "A list of labels to apply to the newly created run" + labels: [String!] + "A run config override to apply to the newly created run" + run_config: JSON + "A JSON payload of key / value pairs to be placed in Prefect context for this run" context: JSON "An optional start time to schedule this run for - if not provided, will be scheduled immediately" scheduled_start_time: DateTime diff --git a/tests/api/test_flow_groups.py b/tests/api/test_flow_groups.py index dbcd5a41..8c907c7d 100644 --- a/tests/api/test_flow_groups.py +++ b/tests/api/test_flow_groups.py @@ -119,6 +119,39 @@ async def test_set_flow_group_labels_for_none_flow_group(self): ) +class TestSetFlowGroupRunConfig: + @pytest.mark.parametrize( + "run_config", [None, {"type": "UniversalRun", "labels": ["a"]}] + ) + async def test_set_flow_group_run_config(self, flow_group_id, run_config): + flow_group = await models.FlowGroup.where(id=flow_group_id).first( + {"run_config"} + ) + assert flow_group.run_config is None + + success = await api.flow_groups.set_flow_group_run_config( + flow_group_id=flow_group_id, run_config=run_config + ) + assert success is True + + flow_group = await models.FlowGroup.where(id=flow_group_id).first( + {"run_config"} + ) + assert flow_group.run_config == run_config + + async def test_set_flow_group_run_config_for_invalid_flow_group(self): + success = await api.flow_groups.set_flow_group_run_config( + flow_group_id=str(uuid.uuid4()), run_config=None + ) + assert success is False + + async def test_set_flow_group_run_config_for_none_flow_group(self): + with pytest.raises(ValueError, match="Invalid flow group ID"): + await api.flow_groups.set_flow_group_run_config( + flow_group_id=None, run_config=None + ) + + class TestSetFlowGroupSchedule: @pytest.mark.parametrize( "clock", diff --git a/tests/api/test_runs.py b/tests/api/test_runs.py index 47ecf91d..1545adb9 100644 --- a/tests/api/test_runs.py +++ b/tests/api/test_runs.py @@ -5,6 +5,7 @@ import prefect from prefect import api, models +from prefect.run_configs import UniversalRun from prefect.engine.state import ( Failed, Finished, @@ -85,6 +86,62 @@ async def test_create_flow_run_respects_empty_flow_group_labels( flow_run = await models.FlowRun.where(id=flow_run_id).first({"labels"}) assert flow_run.labels == [] + @pytest.mark.parametrize("set_run_config", [False, True]) + @pytest.mark.parametrize("set_group_run_config", [False, True]) + @pytest.mark.parametrize("set_labels", [False, True]) + @pytest.mark.parametrize("set_group_labels", [False, True]) + async def test_create_flow_run_run_config_and_labels( + self, + tenant_id, + project_id, + set_run_config, + set_group_run_config, + set_labels, + set_group_labels, + ): + """Check that a flow-run's run config and labels take the following precedence: + - run_config: flow run, flow group, flow + - labels: flow run, flow run run_config, flow group, flow group run_config, + flow run_config + """ + labels = ["from-flow"] + flow_id = await api.flows.create_flow( + project_id=project_id, + serialized_flow=prefect.Flow( + name="test", run_config=UniversalRun(labels=labels) + ).serialize(), + ) + flow = await models.Flow.where(id=flow_id).first( + {"flow_group_id", "run_config"} + ) + run_config = flow.run_config + run_kwargs = {} + if set_group_run_config: + labels = ["from-group-run-config"] + run_config = UniversalRun(labels=labels).serialize() + await api.flow_groups.set_flow_group_run_config( + flow_group_id=flow.flow_group_id, run_config=run_config + ) + if set_group_labels: + labels = ["from-group"] + await api.flow_groups.set_flow_group_labels( + flow_group_id=flow.flow_group_id, labels=labels + ) + if set_run_config: + labels = ["from-run-config"] + run_kwargs["run_config"] = run_config = UniversalRun( + labels=labels + ).serialize() + if set_labels: + run_kwargs["labels"] = labels = ["from-run"] + # create a run + flow_run_id = await api.runs.create_flow_run(flow_id=flow_id, **run_kwargs) + flow_run = await models.FlowRun.where(id=flow_run_id).first( + {"labels", "run_config"} + ) + assert flow_run.labels == labels + assert flow_run.run_config == run_config + async def test_create_flow_run_with_version_group_id(self, project_id): flow_ids = [] for _ in range(15): diff --git a/tests/graphql/test_flow_groups.py b/tests/graphql/test_flow_groups.py index d7c0e95a..af87f069 100644 --- a/tests/graphql/test_flow_groups.py +++ b/tests/graphql/test_flow_groups.py @@ -1,6 +1,6 @@ import pytest -from prefect import api, models +from prefect import models from prefect.serialization.schedule import ScheduleSchema @@ -50,6 +50,34 @@ async def test_clear_flow_group_labels(self, run_query, flow_group_id): assert flow_group.labels is None +class TestSetFlowGroupRunConfig: + mutation = """ + mutation($input: set_flow_group_run_config_input!) { + set_flow_group_run_config(input: $input) { + success + } + } + """ + + @pytest.mark.parametrize( + "run_config", [None, {"type": "UniversalRun", "labels": ["a"]}] + ) + async def test_set_flow_group_run_config( + self, run_query, flow_group_id, run_config + ): + result = await run_query( + query=self.mutation, + variables=dict( + input=dict(flow_group_id=flow_group_id, run_config=run_config) + ), + ) + assert result.data.set_flow_group_run_config.success is True + flow_group = await models.FlowGroup.where(id=flow_group_id).first( + {"run_config"} + ) + assert flow_group.run_config == run_config + + class TestSetFlowGroupSchedule: mutation = """ mutation($input: set_flow_group_schedule_input!) { diff --git a/tests/graphql/test_runs.py b/tests/graphql/test_runs.py index 1c3592fc..08c441d3 100644 --- a/tests/graphql/test_runs.py +++ b/tests/graphql/test_runs.py @@ -6,7 +6,7 @@ import prefect from prefect import api, models -from prefect.engine.state import Pending, Scheduled, Success +from prefect.engine.state import Scheduled, Success class TestCreateFlowRun: @@ -67,6 +67,21 @@ async def test_create_flow_run_with_labels(self, run_query, flow_id): assert fr.flow_id == flow_id assert fr.labels == ["a", "b", "c"] + async def test_create_flow_run_with_run_config(self, run_query, flow_id): + run_config = {"type": "UniversalRun", "labels": []} + result = await run_query( + query=self.mutation, + variables=dict(input=dict(flow_id=flow_id, run_config=run_config)), + ) + fr = await models.FlowRun.where(id=result.data.create_flow_run.id).first( + { + "flow_id", + "run_config", + } + ) + assert fr.flow_id == flow_id + assert fr.run_config == run_config + async def test_create_flow_run_with_version_group_id(self, run_query, flow_id): dt = pendulum.now("utc").add(hours=1) f = await models.Flow.where(id=flow_id).first({"version_group_id"})