Skip to content

Commit

Permalink
Merge pull request #9 from PrefectHQ/projects
Browse files Browse the repository at this point in the history
  • Loading branch information
jlowin authored Jun 26, 2020
2 parents e9291f8 + 44081f7 commit d112409
Show file tree
Hide file tree
Showing 14 changed files with 134 additions and 1,023 deletions.
4 changes: 2 additions & 2 deletions src/prefect_server/_api/states.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ async def set_flow_run_state(
)

if not flow_run:
raise ValueError(f"State update failed for flow run {flow_run_id}")
raise ValueError(f"State update failed for flow run ID {flow_run_id}")

# --------------------------------------------------------
# insert the new state in the database
Expand Down Expand Up @@ -185,7 +185,7 @@ async def set_task_run_state(
)

if not task_run:
raise ValueError(f"State update failed for task run {task_run_id}")
raise ValueError(f"State update failed for task run ID {task_run_id}")

# ------------------------------------------------------
# if the state is running, ensure the flow run is also running
Expand Down
22 changes: 14 additions & 8 deletions src/prefect_server/graphql/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,6 @@ async def resolve_create_flow(obj: Any, info: GraphQLResolveInfo, input: dict) -
if flow:
new_version_group = False

# perform license check
tenant_id = context.get_context().get("tenant_id", None)
if new_version_group and await api.license_checks.is_tenant_at_flow_limit(
tenant_id=tenant_id
):
raise ValueError("This tenant already has the maximum number of flows.")

flow_id = await api.flows.create_flow(
project_id=project_id,
serialized_flow=serialized_flow,
Expand All @@ -75,7 +68,6 @@ async def resolve_create_flow(obj: Any, info: GraphQLResolveInfo, input: dict) -
if version_group_id:
all_other_unarchived_versions = await models.Flow.where(
{
"tenant_id": {"_eq": tenant_id},
"version_group_id": {"_eq": version_group_id},
"id": {"_neq": flow_id},
"archived": {"_eq": False},
Expand Down Expand Up @@ -182,3 +174,17 @@ async def resolve_disable_flow_version_lock(
flow_id=input["flow_id"]
)
}


@mutation.field("set_schedule_active")
async def resolve_set_schedule_active(
obj: Any, info: GraphQLResolveInfo, input: dict
) -> dict:
return {"success": await api.flows.set_schedule_active(flow_id=input["flow_id"])}


@mutation.field("set_schedule_inactive")
async def resolve_set_schedule_inactive(
obj: Any, info: GraphQLResolveInfo, input: dict
) -> dict:
return {"success": await api.flows.set_schedule_inactive(flow_id=input["flow_id"])}
6 changes: 1 addition & 5 deletions src/prefect_server/graphql/projects.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,9 @@
async def resolve_create_project(
obj: Any, info: GraphQLResolveInfo, input: dict
) -> dict:
cloud_context = context.get_context()

return {
"id": await api.projects.create_project(
tenant_id=cloud_context.get("tenant_id"),
name=input["name"],
description=input.get("description"),
name=input["name"], description=input.get("description"),
)
}

Expand Down
21 changes: 8 additions & 13 deletions src/prefect_server/graphql/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,10 @@

@mutation.field("create_flow_run")
async def resolve_create_flow_run(
obj: Any, info: GraphQLResolveInfo, input: dict
obj: Any, info: GraphQLResolveInfo, input: dict = None
) -> dict:
"""
Input Args:
- flow_id: the flow id
- parameters: the run parameters
- context: the run context
- scheduled_start_time: the scheduled start time; defaults to now
- flow_run_name: the flow run name
- version_group_id: the version group ID of the flow to run; if provided, will run the only
unarchived version of the provided ID
"""

input = input or {}

flow_identifier = input.get("flow_id") or input.get("version_group_id")
if not flow_identifier:
Expand All @@ -39,6 +31,7 @@ async def resolve_create_flow_run(
scheduled_start_time=input.get("scheduled_start_time"),
flow_run_name=input.get("flow_run_name"),
version_group_id=input.get("version_group_id"),
idempotency_key=input.get("idempotency_key"),
)
}

Expand All @@ -59,6 +52,7 @@ async def resolve_get_or_create_task_run(
}


@mutation.field("get_or_create_mapped_task_run_children")
async def resolve_get_or_create_mapped_task_run_children(
obj: Any, info: GraphQLResolveInfo, input: dict
) -> List[dict]:
Expand Down Expand Up @@ -101,11 +95,12 @@ async def resolve_update_task_run_heartbeat(

@mutation.field("get_runs_in_queue")
async def resolve_get_runs_in_queue(
obj: Any, info: GraphQLResolveInfo, input: dict
obj: Any, info: GraphQLResolveInfo, input: dict = None
) -> dict:
input = input or {}
labels = input.get("labels", [])
labels.sort()
result = await api.runs.get_runs_in_queue(
tenant_id=tenant_id, before=input.get("before"), labels=labels,
before=input.get("before"), labels=labels,
)
return {"flow_run_ids": result}
11 changes: 2 additions & 9 deletions src/prefect_server/graphql/scalars.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
import uuid

import ariadne
import graphql
import pendulum


JSONScalar = ariadne.ScalarType("JSON")


Expand All @@ -29,14 +29,7 @@ def json_literal_parser(ast, variable_values=None):
"""
When a JSON scalar is passed as part of the literal query string, it needs to be loaded.
"""
# if the ast node has a value, it was provided as a raw string
breakpoint()
if hasattr(ast, "value"):
value = ast.value
# otherwise it was provided as a graphql "object" that we convert to a string representation
else:
value = graphql.print_ast(ast)
return json.loads(value)
return json.loads(ast.value)


DateTimeScalar = ariadne.ScalarType("DateTime")
Expand Down
84 changes: 3 additions & 81 deletions src/prefect_server/graphql/schema/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,6 @@ type Query {
}

type Mutation {
# ------------------------------------------------------
# tenants
# ------------------------------------------------------

"Create a new tenant."
create_tenant(input: create_tenant_input!): tenant_id_payload

"Delete a tenant."
delete_tenant(input: delete_tenant_input!): success_payload

"Update a tenant's name."
update_tenant_name(input: update_tenant_name_input!): tenant_id_payload

"Update a tenant's slug."
update_tenant_slug(input: update_tenant_slug_input!): tenant_id_payload

"Update a tenant's settings."
update_tenant_settings(
input: update_tenant_settings_input!
): tenant_id_payload

"Update a tenant's admin settings."
update_tenant_prefect_admin_settings(
input: update_tenant_prefect_admin_settings_input!
): tenant_id_payload


# ------------------------------------------------------
# flows
Expand Down Expand Up @@ -98,8 +72,6 @@ type Mutation {
"""
set_schedule_inactive(input: set_schedule_inactive_input!): success_payload

"Schedules new flow runs, if possible."
schedule_flow_runs(input: schedule_flow_runs_input!): [flow_run_id_payload!]

# ------------------------------------------------------
# runs
Expand Down Expand Up @@ -135,7 +107,7 @@ type Mutation {
"Delete a flow run."
delete_flow_run(input: delete_flow_run_input!): success_payload

get_runs_in_queue(input: get_runs_in_queue_input!): runs_in_queue_payload
get_runs_in_queue(input: get_runs_in_queue_input): runs_in_queue_payload

# ------------------------------------------------------
# cloud hooks
Expand Down Expand Up @@ -172,37 +144,6 @@ type Mutation {
delete_message(input: delete_message_input!): success_payload

}
# ------------------------------------------------------
# tenant inputs
# ------------------------------------------------------

input create_tenant_input {
name: String!
slug: String!
}

input delete_tenant_input {
confirm: Boolean!
tenant_id: UUID
}

input update_tenant_name_input {
name: String!
}

input update_tenant_slug_input {
slug: String!
}

input update_tenant_settings_input {
settings: JSON
}

input update_tenant_prefect_admin_settings_input {
tenant_id: UUID!
settings: JSON
}


# ------------------------------------------------------
# flow inputs
Expand Down Expand Up @@ -286,26 +227,13 @@ input disable_flow_version_lock_input {
# ------------------------------------------------------

input set_schedule_active_input {
"DEPRECATED: The ID of the schedule to set to active (if provided, interpreted as a flow ID)"
schedule_id: UUID
"A flow ID whose schedule will be set to active"
flow_id: UUID
flow_id: UUID!
}

input set_schedule_inactive_input {
"DEPRECATED: The ID of the schedule to set to active (if provided, interpreted as a flow ID)"
schedule_id: UUID
"A flow ID whose schedule will be set to active"
flow_id: UUID
}

input schedule_flow_runs_input {
"DEPRECATED: The ID of the schedule to set to active (if provided, interpreted as a flow ID)"
schedule_id: UUID
"A flow ID whose schedule will be set to active"
flow_id: UUID
"An optional number of runs to schedule"
max_runs: Int
flow_id: UUID!
}

# ------------------------------------------------------
Expand Down Expand Up @@ -350,7 +278,6 @@ input update_task_run_heartbeat_input {
}

input get_runs_in_queue_input {
tenant_id: UUID
before: DateTime
labels: [String!]
}
Expand Down Expand Up @@ -416,11 +343,6 @@ type success_payload {
error: String
}

type tenant_id_payload {
id: UUID
}


type flow_id_payload {
id: UUID
}
Expand Down
1 change: 0 additions & 1 deletion tests/graphql/test_cloud_hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -980,7 +980,6 @@ async def test_prefect_message_cloud_hook(
"""
await run_query(
query=set_flow_run_state_mutation,

variables=dict(
input=dict(
states=[
Expand Down
Loading

0 comments on commit d112409

Please sign in to comment.