Skip to content

Commit

Permalink
Merge pull request PrefectHQ#103 from PrefectHQ/label-refactor
Browse files Browse the repository at this point in the history
Run Labels refactor
  • Loading branch information
cicdw authored Oct 1, 2020
2 parents 153e342 + ce41534 commit 1f41b5a
Show file tree
Hide file tree
Showing 18 changed files with 541 additions and 56 deletions.
5 changes: 5 additions & 0 deletions changes/pr103.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
feature:
- "Add ability to configure labels on individual flow runs - [#103](https://github.com/PrefectHQ/server/pull/103)"

migration:
- "Add labels column to flow run table with sensible defaults - [#103](https://github.com/PrefectHQ/server/pull/103)"
265 changes: 265 additions & 0 deletions services/hasura/migrations/versions/metadata-24f10aeee83e.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
functions:
- function:
name: downstream_tasks
schema: utility
- function:
name: upstream_tasks
schema: utility
tables:
- array_relationships:
- name: agents
using:
foreign_key_constraint_on:
column: agent_config_id
table:
name: agent
schema: public
table:
name: agent_config
schema: public
- array_relationships:
- name: downstream_edges
using:
foreign_key_constraint_on:
column: upstream_task_id
table:
name: edge
schema: public
- name: task_runs
using:
foreign_key_constraint_on:
column: task_id
table:
name: task_run
schema: public
- name: upstream_edges
using:
foreign_key_constraint_on:
column: downstream_task_id
table:
name: edge
schema: public
object_relationships:
- name: flow
using:
foreign_key_constraint_on: flow_id
table:
name: task
schema: public
- array_relationships:
- name: edges
using:
foreign_key_constraint_on:
column: flow_id
table:
name: edge
schema: public
- name: flow_runs
using:
foreign_key_constraint_on:
column: flow_id
table:
name: flow_run
schema: public
- name: tasks
using:
foreign_key_constraint_on:
column: flow_id
table:
name: task
schema: public
object_relationships:
- name: flow_group
using:
foreign_key_constraint_on: flow_group_id
- name: project
using:
foreign_key_constraint_on: project_id
- name: tenant
using:
foreign_key_constraint_on: tenant_id
table:
name: flow
schema: public
- array_relationships:
- name: flow_groups
using:
foreign_key_constraint_on:
column: tenant_id
table:
name: flow_group
schema: public
- name: flows
using:
foreign_key_constraint_on:
column: tenant_id
table:
name: flow
schema: public
- name: projects
using:
foreign_key_constraint_on:
column: tenant_id
table:
name: project
schema: public
table:
name: tenant
schema: public
- array_relationships:
- name: flow_runs
using:
foreign_key_constraint_on:
column: agent_id
table:
name: flow_run
schema: public
object_relationships:
- name: agent_config
using:
foreign_key_constraint_on: agent_config_id
table:
name: agent
schema: public
- array_relationships:
- name: flows
using:
foreign_key_constraint_on:
column: flow_group_id
table:
name: flow
schema: public
object_relationships:
- name: tenant
using:
foreign_key_constraint_on: tenant_id
table:
name: flow_group
schema: public
- array_relationships:
- name: flows
using:
foreign_key_constraint_on:
column: project_id
table:
name: flow
schema: public
object_relationships:
- name: tenant
using:
foreign_key_constraint_on: tenant_id
table:
name: project
schema: public
- array_relationships:
- name: logs
using:
foreign_key_constraint_on:
column: flow_run_id
table:
name: log
schema: public
- name: states
using:
foreign_key_constraint_on:
column: flow_run_id
table:
name: flow_run_state
schema: public
- name: task_runs
using:
foreign_key_constraint_on:
column: flow_run_id
table:
name: task_run
schema: public
object_relationships:
- name: agent
using:
foreign_key_constraint_on: agent_id
- name: flow
using:
foreign_key_constraint_on: flow_id
- name: tenant
using:
foreign_key_constraint_on: tenant_id
table:
name: flow_run
schema: public
- array_relationships:
- name: logs
using:
foreign_key_constraint_on:
column: task_run_id
table:
name: log
schema: public
- name: states
using:
foreign_key_constraint_on:
column: task_run_id
table:
name: task_run_state
schema: public
object_relationships:
- name: flow_run
using:
foreign_key_constraint_on: flow_run_id
- name: task
using:
foreign_key_constraint_on: task_id
- name: tenant
using:
foreign_key_constraint_on: tenant_id
table:
name: task_run
schema: public
- object_relationships:
- name: downstream_task
using:
foreign_key_constraint_on: downstream_task_id
- name: flow
using:
foreign_key_constraint_on: flow_id
- name: upstream_task
using:
foreign_key_constraint_on: upstream_task_id
table:
name: edge
schema: public
- object_relationships:
- name: flow_run
using:
foreign_key_constraint_on: flow_run_id
table:
name: flow_run_state
schema: public
- object_relationships:
- name: task
using:
manual_configuration:
column_mapping:
task_id: id
remote_table:
name: task
schema: public
table:
name: traversal
schema: utility
- object_relationships:
- name: task_run
using:
foreign_key_constraint_on: task_run_id
table:
name: task_run_state
schema: public
- table:
name: cloud_hook
schema: public
- table:
name: log
schema: public
- table:
name: message
schema: public
version: 2
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""
Add label column to flow runs
Revision ID: 24f10aeee83e
Revises: 850b76d44332
Create Date: 2020-09-30 12:38:06.915340
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import JSONB, UUID


# revision identifiers, used by Alembic.
revision = "24f10aeee83e"
down_revision = "850b76d44332"
branch_labels = None
depends_on = None


def upgrade():
op.add_column(
"flow_run", sa.Column("labels", JSONB, nullable=False, server_default="[]")
)
# this query looks at all currently scheduled flow runs, or more generally any flow
# run that has been updated in the last day, and applies our standard label logic to it.
# This ensures that active flow runs don't incorrectly receive `[]` as their label set,
# which would cause delayed work and user frustration.
# Note that flow runs not touched by this query will have `[]` as their labels regardless of
# their flow label properties.
op.execute(
"""
WITH runs_to_update AS (
SELECT
flow_run.id,
COALESCE(
flow_group.labels,
flow.environment -> 'labels',
'[]')::JSONB as new_labels
from flow_run
join flow on flow_run.flow_id = flow.id
join flow_group on flow.flow_group_id = flow_group.id
where flow_run.state = 'Scheduled' OR flow_run.updated > NOW() - interval '1 day'
)
update flow_run
set labels = runs_to_update.new_labels
from runs_to_update
where flow_run.id = runs_to_update.id;
"""
)


def downgrade():
op.drop_column("flow_run", "labels")
1 change: 0 additions & 1 deletion src/prefect_server/api/cloud_hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import httpx
from box import Box
from pydantic import BaseModel

import prefect
from prefect import api, models
Expand Down
Loading

0 comments on commit 1f41b5a

Please sign in to comment.