Skip to content

Commit

Permalink
Merge pull request #166 from jcrist/per-flow-run-run-config
Browse files Browse the repository at this point in the history
Per flow-run run-configs
  • Loading branch information
jcrist authored Dec 29, 2020
2 parents 7a7e856 + d47f0c5 commit 773179c
Show file tree
Hide file tree
Showing 14 changed files with 543 additions and 15 deletions.
5 changes: 5 additions & 0 deletions changes/pr166.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
migration:
- "Add `flow_group.run_config` and `flow_run.run_config` - [#166](https://github.com/PrefectHQ/server/pull/166)"

feature:
- "Support per-flow-run and per-flow-group `run_config` overrides - [#166](https://github.com/PrefectHQ/server/pull/166)"
272 changes: 272 additions & 0 deletions services/hasura/migrations/versions/metadata-7ca57ea2fdff.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
functions:
- function:
name: downstream_tasks
schema: utility
- function:
name: upstream_tasks
schema: utility
tables:
- array_relationships:
- name: agents
using:
foreign_key_constraint_on:
column: agent_config_id
table:
name: agent
schema: public
table:
name: agent_config
schema: public
- array_relationships:
- name: downstream_edges
using:
foreign_key_constraint_on:
column: upstream_task_id
table:
name: edge
schema: public
- name: task_runs
using:
foreign_key_constraint_on:
column: task_id
table:
name: task_run
schema: public
- name: upstream_edges
using:
foreign_key_constraint_on:
column: downstream_task_id
table:
name: edge
schema: public
object_relationships:
- name: flow
using:
foreign_key_constraint_on: flow_id
table:
name: task
schema: public
- array_relationships:
- name: edges
using:
foreign_key_constraint_on:
column: flow_id
table:
name: edge
schema: public
- name: flow_runs
using:
foreign_key_constraint_on:
column: flow_id
table:
name: flow_run
schema: public
- name: tasks
using:
foreign_key_constraint_on:
column: flow_id
table:
name: task
schema: public
object_relationships:
- name: flow_group
using:
foreign_key_constraint_on: flow_group_id
- name: project
using:
foreign_key_constraint_on: project_id
- name: tenant
using:
foreign_key_constraint_on: tenant_id
table:
name: flow
schema: public
- array_relationships:
- name: flow_groups
using:
foreign_key_constraint_on:
column: tenant_id
table:
name: flow_group
schema: public
- name: flows
using:
foreign_key_constraint_on:
column: tenant_id
table:
name: flow
schema: public
- name: projects
using:
foreign_key_constraint_on:
column: tenant_id
table:
name: project
schema: public
table:
name: tenant
schema: public
- array_relationships:
- name: flow_runs
using:
foreign_key_constraint_on:
column: agent_id
table:
name: flow_run
schema: public
object_relationships:
- name: agent_config
using:
foreign_key_constraint_on: agent_config_id
table:
name: agent
schema: public
- array_relationships:
- name: flows
using:
foreign_key_constraint_on:
column: flow_group_id
table:
name: flow
schema: public
object_relationships:
- name: tenant
using:
foreign_key_constraint_on: tenant_id
table:
name: flow_group
schema: public
- array_relationships:
- name: flows
using:
foreign_key_constraint_on:
column: project_id
table:
name: flow
schema: public
object_relationships:
- name: tenant
using:
foreign_key_constraint_on: tenant_id
table:
name: project
schema: public
- array_relationships:
- name: logs
using:
foreign_key_constraint_on:
column: flow_run_id
table:
name: log
schema: public
- name: states
using:
foreign_key_constraint_on:
column: flow_run_id
table:
name: flow_run_state
schema: public
- name: task_runs
using:
foreign_key_constraint_on:
column: flow_run_id
table:
name: task_run
schema: public
object_relationships:
- name: agent
using:
foreign_key_constraint_on: agent_id
- name: flow
using:
foreign_key_constraint_on: flow_id
- name: tenant
using:
foreign_key_constraint_on: tenant_id
table:
name: flow_run
schema: public
- array_relationships:
- name: logs
using:
foreign_key_constraint_on:
column: task_run_id
table:
name: log
schema: public
- name: states
using:
foreign_key_constraint_on:
column: task_run_id
table:
name: task_run_state
schema: public
object_relationships:
- name: flow_run
using:
foreign_key_constraint_on: flow_run_id
- name: task
using:
foreign_key_constraint_on: task_id
- name: tenant
using:
foreign_key_constraint_on: tenant_id
table:
name: task_run
schema: public
- object_relationships:
- name: downstream_task
using:
foreign_key_constraint_on: downstream_task_id
- name: flow
using:
foreign_key_constraint_on: flow_id
- name: upstream_task
using:
foreign_key_constraint_on: upstream_task_id
table:
name: edge
schema: public
- object_relationships:
- name: flow_run
using:
foreign_key_constraint_on: flow_run_id
table:
name: flow_run_state
schema: public
- object_relationships:
- name: task
using:
manual_configuration:
column_mapping:
task_id: id
remote_table:
name: task
schema: public
table:
name: traversal
schema: utility
- object_relationships:
- name: task_run
using:
foreign_key_constraint_on: task_run_id
table:
name: task_run_artifact
schema: public
- object_relationships:
- name: task_run
using:
foreign_key_constraint_on: task_run_id
table:
name: task_run_state
schema: public
- table:
name: cloud_hook
schema: public
- table:
name: log
schema: public
- table:
name: message
schema: public
version: 2
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""
Add run_config to flow runs and flow groups
Revision ID: 7ca57ea2fdff
Revises: 57ac2cb01ac1
Create Date: 2020-12-28 15:00:27.573816
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import JSONB


# revision identifiers, used by Alembic.
revision = "7ca57ea2fdff"
down_revision = "57ac2cb01ac1"
branch_labels = None
depends_on = None


def upgrade():
op.add_column(
"flow_run", sa.Column("run_config", JSONB, nullable=True, server_default=None)
)
op.add_column(
"flow_group", sa.Column("run_config", JSONB, nullable=True, server_default=None)
)
# This query updates all currently scheduled flow runs to add the run_config field
# from the corresponding flow.
op.execute(
"""
WITH runs_to_update AS (
SELECT
flow_run.id,
flow.run_config
from flow_run
join flow on flow_run.flow_id = flow.id
where flow_run.state = 'Scheduled' OR flow_run.updated > NOW() - interval '1 day'
)
update flow_run
set run_config = runs_to_update.run_config
from runs_to_update
where flow_run.id = runs_to_update.id;
"""
)


def downgrade():
op.drop_column("flow_run", "run_config")
op.drop_column("flow_group", "run_config")
29 changes: 28 additions & 1 deletion src/prefect_server/api/flow_groups.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List
from typing import List, Dict, Any

from prefect import api, models
from prefect.serialization.schedule import ClockSchema
Expand Down Expand Up @@ -159,6 +159,33 @@ async def set_flow_group_labels(flow_group_id: str, labels: List[str] = None) ->
return bool(result.affected_rows)


@register_api("flow_groups.set_flow_group_run_config")
async def set_flow_group_run_config(
flow_group_id: str, run_config: Dict[str, Any] = None
) -> bool:
"""
Sets run_config for a flow group.
Args:
- flow_group_id (str): the ID of the flow group to update
- run_config (dict, optional): a run-config override for a flow
group. Providing `None` defaults any previous flow group
`run_config` setting.
Returns:
- bool: whether setting `run_config` for the flow group was successful
Raises:
- ValueError: if flow group ID isn't provided
"""
if not flow_group_id:
raise ValueError("Invalid flow group ID")
result = await models.FlowGroup.where(id=flow_group_id).update(
set=dict(run_config=run_config)
)
return bool(result.affected_rows)


@register_api("flow_groups.enable_heartbeat")
async def enable_heartbeat_for_flow(flow_group_id: str) -> bool:
"""
Expand Down
Loading

0 comments on commit 773179c

Please sign in to comment.