Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add idempotency key to flows.create_flow #116

Merged
merged 16 commits into from
Oct 28, 2020
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changes/pr116.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
feature:
- "Add idempotency keys to `flows.create_flow` - [#116](https://github.com/PrefectHQ/server/pull/116)"
45 changes: 42 additions & 3 deletions src/prefect_server/api/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from prefect import api, models
from prefect.serialization.schedule import ScheduleSchema
from prefect.utilities.graphql import with_args
from prefect.utilities.graphql import with_args, EnumValue
from prefect.utilities.plugins import register_api
from prefect_server import config
from prefect_server.utilities import logging
Expand Down Expand Up @@ -120,6 +120,7 @@ async def create_flow(
version_group_id: str = None,
set_schedule_active: bool = True,
description: str = None,
idempotency_key: str = None,
) -> str:
"""
Add a flow to the database.
Expand All @@ -130,6 +131,9 @@ async def create_flow(
- version_group_id (str): A version group to add the Flow to
- set_schedule_active (bool): Whether to set the flow's schedule to active
- description (str): a description of the flow being created
- idempotency_key (optional, str): a key that, if matching the most recent call
to `create_flow` for this flow group, will prevent the creation of another
flow version

Returns:
str: The id of the new flow
Expand Down Expand Up @@ -177,15 +181,22 @@ async def create_flow(
"version_group_id": {"_eq": version_group_id},
"tenant_id": {"_eq": tenant_id},
}
# set up a flow group if it's not already in the system

# lookup the associated flow group (may not exist yet)
flow_group = await models.FlowGroup.where(
{
"_and": [
{"tenant_id": {"_eq": tenant_id}},
{"name": {"_eq": version_group_id}},
]
}
).first({"id", "schedule"})
).first({"id", "schedule", "settings"})

# create the flow group or check for the idempotency key in the existing group
# Note: The key is stashed in the `settings` dict of the flow group which makes
# idempotency only apply to the most flow creation. If we want to avoid
zanieb marked this conversation as resolved.
Show resolved Hide resolved
# re-registering old flow versions we'll need to add a column to the Flow table
zanieb marked this conversation as resolved.
Show resolved Hide resolved
# TODO: Should we leave empty idempotency_keys as `None` or generate a uuid4?
zanieb marked this conversation as resolved.
Show resolved Hide resolved
if flow_group is None:
flow_group_id = await models.FlowGroup(
tenant_id=tenant_id,
Expand All @@ -194,11 +205,39 @@ async def create_flow(
"heartbeat_enabled": True,
"lazarus_enabled": True,
"version_locking_enabled": False,
"idempotency_key": idempotency_key,
},
).insert()
else:
flow_group_id = flow_group.id

# check idempotency key and exit early if we find a matching key and flow,
# otherwise update the key for the group
last_idempotency_key = flow_group.settings.get("idempotency_key", None)
if (
last_idempotency_key
and idempotency_key
and last_idempotency_key == idempotency_key
):
# get the most recent unarchived version, there should only be one
# unarchived flow at a time but it is safer not to presume
flow = await models.Flow.where(
{
"version_group_id": {"_eq": version_group_id},
"archived": {"_eq": False},
}
).first(order_by={"version": EnumValue("desc")})
if flow:
return flow.id
# otherwise, despite the key matching we don't have a valid flow to return
# and will continue as though the key did not match
cicdw marked this conversation as resolved.
Show resolved Hide resolved

settings = flow_group.settings
settings["idempotency_key"] = idempotency_key
await models.FlowGroup.where({"id": {"_eq": flow_group.id}}).update(
set={"settings": settings}
)
joshmeek marked this conversation as resolved.
Show resolved Hide resolved

version = (await models.Flow.where(version_where).max({"version"}))["version"] or 0

# if there is no referenceable schedule for this Flow,
Expand Down
2 changes: 2 additions & 0 deletions src/prefect_server/graphql/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ async def resolve_create_flow(obj: Any, info: GraphQLResolveInfo, input: dict) -
version_group_id = input.get("version_group_id", None)
set_schedule_active = input.get("set_schedule_active", True)
description = input.get("description", None)
idempotency_key = input.get("idempotency_key", None)

if project_id is None:
raise ValueError("Invalid project ID")
Expand Down Expand Up @@ -59,6 +60,7 @@ async def resolve_create_flow(obj: Any, info: GraphQLResolveInfo, input: dict) -
version_group_id=version_group_id,
set_schedule_active=set_schedule_active,
description=description,
idempotency_key=idempotency_key,
)

# archive all other versions
Expand Down
4 changes: 4 additions & 0 deletions src/prefect_server/graphql/schema/flows.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ input create_flow_input {
set_schedule_active: Boolean
"An optional description of this Flow"
description: String
"An optional idempotency key for this flow to prevent multiple sequential creations within the version group"
idempotency_key: String
}

input create_flow_from_compressed_string_input {
Expand All @@ -90,6 +92,8 @@ input create_flow_from_compressed_string_input {
set_schedule_active: Boolean
"An optional description of this Flow"
description: String
"An optional idempotency key for this flow to prevent multiple sequential creations within the version group"
idempotency_key: String
}

input delete_flow_input {
Expand Down
65 changes: 65 additions & 0 deletions tests/api/test_flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,71 @@ async def test_flows_can_be_safely_created_twice(self, project_id, flow):
== len(flow.tasks) * 2
)

async def test_flows_not_added_with_same_idempotency_key(self, project_id, flow):
flow_id_1 = await api.flows.create_flow(
project_id=project_id,
serialized_flow=flow.serialize(),
idempotency_key="foo",
)
flow_model_1 = await models.Flow.where({"id": {"_eq": flow_id_1}}).first(
{"version", "version_group_id"}
)
flow_id_2 = await api.flows.create_flow(
project_id=project_id,
serialized_flow=flow.serialize(),
version_group_id=flow_model_1.version_group_id,
idempotency_key="foo",
)
flow_model_2 = await models.Flow.where({"id": {"_eq": flow_id_2}}).first(
{"version", "version_group_id"}
)

assert flow_id_1 == flow_id_2
assert flow_model_1.version == flow_model_2.version

# Verify that the flow is not duplicated
assert await models.Flow.where({"id": {"_eq": flow_id_1}}).count() == 1
# Verify that the tasks are not duplicated
assert await models.Task.where({"flow_id": {"_eq": flow_id_1}}).count() == len(
flow.tasks
)
zanieb marked this conversation as resolved.
Show resolved Hide resolved

@pytest.mark.parametrize("idempotency_keys", [("foo", "bar"), (None, None)])
async def test_flows_added_with_different_idempotency_key(
self, project_id, flow, idempotency_keys
):
flow_id_1 = await api.flows.create_flow(
project_id=project_id,
serialized_flow=flow.serialize(),
idempotency_key=idempotency_keys[0],
)
flow_model_1 = await models.Flow.where({"id": {"_eq": flow_id_1}}).first(
{"version", "version_group_id"}
)
flow_id_2 = await api.flows.create_flow(
project_id=project_id,
serialized_flow=flow.serialize(),
version_group_id=flow_model_1.version_group_id,
idempotency_key=idempotency_keys[1],
)
flow_model_2 = await models.Flow.where({"id": {"_eq": flow_id_2}}).first(
{"version"}
)

assert flow_id_1 != flow_id_2
assert flow_model_1.version == flow_model_2.version - 1

assert (
await models.Flow.where({"id": {"_in": [flow_id_1, flow_id_2]}}).count()
== 2
)
assert (
await models.Task.where(
{"flow_id": {"_in": [flow_id_1, flow_id_2]}}
).count()
== len(flow.tasks) * 2
)

async def test_create_flow_with_schedule(self, project_id):
flow = prefect.Flow(
name="test", schedule=prefect.schedules.CronSchedule("0 0 * * *")
Expand Down
18 changes: 18 additions & 0 deletions tests/graphql/test_flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,24 @@ async def test_create_flow_with_description(self, run_query, project_id):
)
assert flow.description == description

async def test_create_flow_with_idempotency_key(self, run_query, project_id):
serialized_flow = prefect.Flow(name="test").serialize(build=False)
idempotency_key = "test"
result = await run_query(
query=self.create_flow_mutation,
variables=dict(
input=dict(
serialized_flow=serialized_flow,
project_id=project_id,
idempotency_key=idempotency_key,
)
),
)
flow = await models.Flow.where(id=result.data.create_flow.id).first(
{"flow_group": {"settings"}}
)
assert flow.flow_group.settings["idempotency_key"] == idempotency_key

async def test_create_flow_autodetects_version_group(
self, run_query, project_id, project_id_2
):
Expand Down