From ec087e36508cc2eb7b31b7e156f1afc9ea108404 Mon Sep 17 00:00:00 2001 From: Michael Adkins Date: Tue, 27 Oct 2020 10:50:01 -0500 Subject: [PATCH 01/13] Add idempotency_key to create_flow_run Naive first implementation --- src/prefect_server/api/flows.py | 40 ++++++++++++++++++- src/prefect_server/graphql/flows.py | 2 + .../graphql/schema/flows.graphql | 4 ++ 3 files changed, 45 insertions(+), 1 deletion(-) diff --git a/src/prefect_server/api/flows.py b/src/prefect_server/api/flows.py index 2bd85be3..474752bc 100644 --- a/src/prefect_server/api/flows.py +++ b/src/prefect_server/api/flows.py @@ -120,6 +120,7 @@ async def create_flow( version_group_id: str = None, set_schedule_active: bool = True, description: str = None, + idempotency_key: str = None, ) -> str: """ Add a flow to the database. @@ -130,6 +131,9 @@ async def create_flow( - version_group_id (str): A version group to add the Flow to - set_schedule_active (bool): Whether to set the flow's schedule to active - description (str): a description of the flow being created + - idempotency_key (optional, str): a key that, if matching the most recent call + to `create_flow` for this flow group, will prevent the creation of another + flow version Returns: str: The id of the new flow @@ -177,7 +181,8 @@ async def create_flow( "version_group_id": {"_eq": version_group_id}, "tenant_id": {"_eq": tenant_id}, } - # set up a flow group if it's not already in the system + + # lookup the associated flow group (may not exist yet) flow_group = await models.FlowGroup.where( { "_and": [ @@ -186,6 +191,12 @@ async def create_flow( ] } ).first({"id", "schedule"}) + + # create the flow group or check for the idempotency key in the existing group + # Note: The key is stashed in the `settings` dict of the flow group which makes + # idempotency only apply to the most flow creation. If we want to avoid + # re-registering old flow versions we'll need to add a column to the Flow table + # TODO: Should we leave empty idempotency_keys as `None` or generate a uuid4? if flow_group is None: flow_group_id = await models.FlowGroup( tenant_id=tenant_id, @@ -194,11 +205,38 @@ async def create_flow( "heartbeat_enabled": True, "lazarus_enabled": True, "version_locking_enabled": False, + "idempotency_key": idempotency_key, }, ).insert() else: flow_group_id = flow_group.id + # check idempotency key and early if we find a matching key and flow, + # otherwise update the key for the group + last_idempotency_key = flow_group.settings.get("idempotency_key", None) + if ( + last_idempotency_key + and idempotency_key + and last_idempotency_key == idempotency_key + ): + # get the most recent unarchived version, there should only be one + # unarchived flow at a time but it is safer not to presume + flow = await models.Flow.where( + { + "version_group_id": {"_eq": version_group_id}, + "archived": {"_eq": False}, + "order_by": {"version": "desc"}, + } + ).first() + if flow: + return flow.id + # otherwise, despite the key matching we don't have a valid flow to return + # and will continue as though the key did not match + + settings = flow_group.settings + settings["idempotency_key"] = idempotency_key + await flow_group.update(set={"settings": settings}) + version = (await models.Flow.where(version_where).max({"version"}))["version"] or 0 # if there is no referenceable schedule for this Flow, diff --git a/src/prefect_server/graphql/flows.py b/src/prefect_server/graphql/flows.py index 648bf800..df1cb88e 100644 --- a/src/prefect_server/graphql/flows.py +++ b/src/prefect_server/graphql/flows.py @@ -26,6 +26,7 @@ async def resolve_create_flow(obj: Any, info: GraphQLResolveInfo, input: dict) - version_group_id = input.get("version_group_id", None) set_schedule_active = input.get("set_schedule_active", True) description = input.get("description", None) + idempotency_key = input.get("idempotency_key", None) if project_id is None: raise ValueError("Invalid project ID") @@ -59,6 +60,7 @@ async def resolve_create_flow(obj: Any, info: GraphQLResolveInfo, input: dict) - version_group_id=version_group_id, set_schedule_active=set_schedule_active, description=description, + idempotency_key=idempotency_key, ) # archive all other versions diff --git a/src/prefect_server/graphql/schema/flows.graphql b/src/prefect_server/graphql/schema/flows.graphql index 2911bc9e..a80165bf 100644 --- a/src/prefect_server/graphql/schema/flows.graphql +++ b/src/prefect_server/graphql/schema/flows.graphql @@ -77,6 +77,8 @@ input create_flow_input { set_schedule_active: Boolean "An optional description of this Flow" description: String + "An optional idempotency key for this flow to prevent multiple subsequent creations" + idempotency_key: String } input create_flow_from_compressed_string_input { @@ -90,6 +92,8 @@ input create_flow_from_compressed_string_input { set_schedule_active: Boolean "An optional description of this Flow" description: String + "An optional idempotency key for this flow to prevent multiple subsequent creations" + idempotency_key: String } input delete_flow_input { From df86365efe219a688ff6eb28a34f2fbfb29e7c93 Mon Sep 17 00:00:00 2001 From: Michael Adkins Date: Tue, 27 Oct 2020 12:29:18 -0500 Subject: [PATCH 02/13] Pull flow group settings and update correctly --- src/prefect_server/api/flows.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/prefect_server/api/flows.py b/src/prefect_server/api/flows.py index 474752bc..403ba95c 100644 --- a/src/prefect_server/api/flows.py +++ b/src/prefect_server/api/flows.py @@ -190,7 +190,7 @@ async def create_flow( {"name": {"_eq": version_group_id}}, ] } - ).first({"id", "schedule"}) + ).first({"id", "schedule", "settings"}) # create the flow group or check for the idempotency key in the existing group # Note: The key is stashed in the `settings` dict of the flow group which makes @@ -235,7 +235,9 @@ async def create_flow( settings = flow_group.settings settings["idempotency_key"] = idempotency_key - await flow_group.update(set={"settings": settings}) + await models.FlowGroup.where({"id": {"_eq": flow_group.id}}).update( + set={"settings": settings} + ) version = (await models.Flow.where(version_where).max({"version"}))["version"] or 0 From a29fbbd2f7782809bcc0a8745272b78e8a5010ea Mon Sep 17 00:00:00 2001 From: Michael Adkins Date: Tue, 27 Oct 2020 12:56:11 -0500 Subject: [PATCH 03/13] Add test for graphql idempotency key creation --- tests/graphql/test_flows.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/tests/graphql/test_flows.py b/tests/graphql/test_flows.py index 18674139..0821fb67 100644 --- a/tests/graphql/test_flows.py +++ b/tests/graphql/test_flows.py @@ -119,6 +119,24 @@ async def test_create_flow_with_description(self, run_query, project_id): ) assert flow.description == description + async def test_create_flow_with_idempotency_key(self, run_query, project_id): + serialized_flow = prefect.Flow(name="test").serialize(build=False) + idempotency_key = "test" + result = await run_query( + query=self.create_flow_mutation, + variables=dict( + input=dict( + serialized_flow=serialized_flow, + project_id=project_id, + idempotency_key=idempotency_key, + ) + ), + ) + flow = await models.Flow.where(id=result.data.create_flow.id).first( + {"flow_group": {"settings"}} + ) + assert flow.flow_group.settings["idempotency_key"] == idempotency_key + async def test_create_flow_autodetects_version_group( self, run_query, project_id, project_id_2 ): From 57126d31aa0443fb2e2ffb763bc66e099c2187c6 Mon Sep 17 00:00:00 2001 From: Michael Adkins Date: Tue, 27 Oct 2020 12:58:30 -0500 Subject: [PATCH 04/13] Fix order_by clause --- src/prefect_server/api/flows.py | 5 ++--- tests/api/test_flows.py | 27 +++++++++++++++++++++++++++ 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/src/prefect_server/api/flows.py b/src/prefect_server/api/flows.py index 403ba95c..38eac461 100644 --- a/src/prefect_server/api/flows.py +++ b/src/prefect_server/api/flows.py @@ -10,7 +10,7 @@ from prefect import api, models from prefect.serialization.schedule import ScheduleSchema -from prefect.utilities.graphql import with_args +from prefect.utilities.graphql import with_args, EnumValue from prefect.utilities.plugins import register_api from prefect_server import config from prefect_server.utilities import logging @@ -225,9 +225,8 @@ async def create_flow( { "version_group_id": {"_eq": version_group_id}, "archived": {"_eq": False}, - "order_by": {"version": "desc"}, } - ).first() + ).first(order_by={"version": EnumValue("desc")}) if flow: return flow.id # otherwise, despite the key matching we don't have a valid flow to return diff --git a/tests/api/test_flows.py b/tests/api/test_flows.py index e35feef5..0a8574b4 100644 --- a/tests/api/test_flows.py +++ b/tests/api/test_flows.py @@ -261,6 +261,33 @@ async def test_flows_can_be_safely_created_twice(self, project_id, flow): == len(flow.tasks) * 2 ) + async def test_flows_not_duplicated_with_same_idempotency_key( + self, project_id, flow + ): + flow_id_1 = await api.flows.create_flow( + project_id=project_id, + serialized_flow=flow.serialize(), + idempotency_key="foo", + ) + flow_model = await models.Flow.where({"id": {"_eq": flow_id_1}}).first( + {"version_group_id"} + ) + flow_id_2 = await api.flows.create_flow( + project_id=project_id, + serialized_flow=flow.serialize(), + version_group_id=flow_model.version_group_id, + idempotency_key="foo", + ) + + assert flow_id_1 == flow_id_2 + + # Verify that the flow is not duplicated + assert await models.Flow.where({"id": {"_eq": flow_id_1}}).count() == 1 + # Verify that the tasks are not duplicated + assert await models.Task.where({"flow_id": {"_eq": flow_id_1}}).count() == len( + flow.tasks + ) + async def test_create_flow_with_schedule(self, project_id): flow = prefect.Flow( name="test", schedule=prefect.schedules.CronSchedule("0 0 * * *") From a70f2cc887818edf5fb61c0dc3b73f088db808d1 Mon Sep 17 00:00:00 2001 From: Michael Adkins Date: Tue, 27 Oct 2020 13:08:10 -0500 Subject: [PATCH 05/13] Add test for different idempotency keys Also add coverage for versions to first test --- tests/api/test_flows.py | 47 +++++++++++++++++++++++++++++++++++------ 1 file changed, 41 insertions(+), 6 deletions(-) diff --git a/tests/api/test_flows.py b/tests/api/test_flows.py index 0a8574b4..3213aecd 100644 --- a/tests/api/test_flows.py +++ b/tests/api/test_flows.py @@ -261,25 +261,27 @@ async def test_flows_can_be_safely_created_twice(self, project_id, flow): == len(flow.tasks) * 2 ) - async def test_flows_not_duplicated_with_same_idempotency_key( - self, project_id, flow - ): + async def test_flows_not_added_with_same_idempotency_key(self, project_id, flow): flow_id_1 = await api.flows.create_flow( project_id=project_id, serialized_flow=flow.serialize(), idempotency_key="foo", ) - flow_model = await models.Flow.where({"id": {"_eq": flow_id_1}}).first( - {"version_group_id"} + flow_model_1 = await models.Flow.where({"id": {"_eq": flow_id_1}}).first( + {"version", "version_group_id"} ) flow_id_2 = await api.flows.create_flow( project_id=project_id, serialized_flow=flow.serialize(), - version_group_id=flow_model.version_group_id, + version_group_id=flow_model_1.version_group_id, idempotency_key="foo", ) + flow_model_2 = await models.Flow.where({"id": {"_eq": flow_id_2}}).first( + {"version", "version_group_id"} + ) assert flow_id_1 == flow_id_2 + assert flow_model_1.version == flow_model_2.version # Verify that the flow is not duplicated assert await models.Flow.where({"id": {"_eq": flow_id_1}}).count() == 1 @@ -288,6 +290,39 @@ async def test_flows_not_duplicated_with_same_idempotency_key( flow.tasks ) + async def test_flows_added_with_different_idempotency_key(self, project_id, flow): + flow_id_1 = await api.flows.create_flow( + project_id=project_id, + serialized_flow=flow.serialize(), + idempotency_key="foo", + ) + flow_model_1 = await models.Flow.where({"id": {"_eq": flow_id_1}}).first( + {"version", "version_group_id"} + ) + flow_id_2 = await api.flows.create_flow( + project_id=project_id, + serialized_flow=flow.serialize(), + version_group_id=flow_model_1.version_group_id, + idempotency_key="bar", + ) + flow_model_2 = await models.Flow.where({"id": {"_eq": flow_id_2}}).first( + {"version"} + ) + + assert flow_id_1 != flow_id_2 + assert flow_model_1.version == flow_model_2.version - 1 + + assert ( + await models.Flow.where({"id": {"_in": [flow_id_1, flow_id_2]}}).count() + == 2 + ) + assert ( + await models.Task.where( + {"flow_id": {"_in": [flow_id_1, flow_id_2]}} + ).count() + == len(flow.tasks) * 2 + ) + async def test_create_flow_with_schedule(self, project_id): flow = prefect.Flow( name="test", schedule=prefect.schedules.CronSchedule("0 0 * * *") From d076a0d482f3549dbd4e1fd91c293aa264da7012 Mon Sep 17 00:00:00 2001 From: Michael Adkins Date: Tue, 27 Oct 2020 13:11:43 -0500 Subject: [PATCH 06/13] Add test coverage for `None` keys --- tests/api/test_flows.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tests/api/test_flows.py b/tests/api/test_flows.py index 3213aecd..b192d568 100644 --- a/tests/api/test_flows.py +++ b/tests/api/test_flows.py @@ -290,11 +290,14 @@ async def test_flows_not_added_with_same_idempotency_key(self, project_id, flow) flow.tasks ) - async def test_flows_added_with_different_idempotency_key(self, project_id, flow): + @pytest.mark.parametrize("idempotency_keys", [("foo", "bar"), (None, None)]) + async def test_flows_added_with_different_idempotency_key( + self, project_id, flow, idempotency_keys + ): flow_id_1 = await api.flows.create_flow( project_id=project_id, serialized_flow=flow.serialize(), - idempotency_key="foo", + idempotency_key=idempotency_keys[0], ) flow_model_1 = await models.Flow.where({"id": {"_eq": flow_id_1}}).first( {"version", "version_group_id"} @@ -303,7 +306,7 @@ async def test_flows_added_with_different_idempotency_key(self, project_id, flow project_id=project_id, serialized_flow=flow.serialize(), version_group_id=flow_model_1.version_group_id, - idempotency_key="bar", + idempotency_key=idempotency_keys[1], ) flow_model_2 = await models.Flow.where({"id": {"_eq": flow_id_2}}).first( {"version"} From ae5bcc157be7fed1a63d3e137119c887911b7013 Mon Sep 17 00:00:00 2001 From: Michael Adkins Date: Tue, 27 Oct 2020 13:21:33 -0500 Subject: [PATCH 07/13] Add changes/ --- changes/pr116.yaml | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 changes/pr116.yaml diff --git a/changes/pr116.yaml b/changes/pr116.yaml new file mode 100644 index 00000000..a62f3ea0 --- /dev/null +++ b/changes/pr116.yaml @@ -0,0 +1,2 @@ +feature: + - "Add idempotency keys to `flows.create_flow` - [#116](https://github.com/PrefectHQ/server/pull/116)" From e908cc5194186c4237c4f22894e05c188886dec0 Mon Sep 17 00:00:00 2001 From: Michael Adkins Date: Tue, 27 Oct 2020 13:25:42 -0500 Subject: [PATCH 08/13] Improve graphql description of idempotency key --- src/prefect_server/graphql/schema/flows.graphql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/prefect_server/graphql/schema/flows.graphql b/src/prefect_server/graphql/schema/flows.graphql index a80165bf..40d51734 100644 --- a/src/prefect_server/graphql/schema/flows.graphql +++ b/src/prefect_server/graphql/schema/flows.graphql @@ -77,7 +77,7 @@ input create_flow_input { set_schedule_active: Boolean "An optional description of this Flow" description: String - "An optional idempotency key for this flow to prevent multiple subsequent creations" + "An optional idempotency key for this flow to prevent multiple sequential creations within the version group" idempotency_key: String } @@ -92,7 +92,7 @@ input create_flow_from_compressed_string_input { set_schedule_active: Boolean "An optional description of this Flow" description: String - "An optional idempotency key for this flow to prevent multiple subsequent creations" + "An optional idempotency key for this flow to prevent multiple sequential creations within the version group" idempotency_key: String } From f463f3f4fcd7777725684aac1fabb0bdc79f0bf8 Mon Sep 17 00:00:00 2001 From: Michael Adkins Date: Tue, 27 Oct 2020 14:37:33 -0700 Subject: [PATCH 09/13] Fix missing word in comment --- src/prefect_server/api/flows.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/prefect_server/api/flows.py b/src/prefect_server/api/flows.py index 38eac461..514cea3f 100644 --- a/src/prefect_server/api/flows.py +++ b/src/prefect_server/api/flows.py @@ -211,7 +211,7 @@ async def create_flow( else: flow_group_id = flow_group.id - # check idempotency key and early if we find a matching key and flow, + # check idempotency key and exit early if we find a matching key and flow, # otherwise update the key for the group last_idempotency_key = flow_group.settings.get("idempotency_key", None) if ( From b1a0ead0e7aa8879fd75dc6e0f74ef9ac356c54a Mon Sep 17 00:00:00 2001 From: Michael Adkins Date: Wed, 28 Oct 2020 09:00:06 -0700 Subject: [PATCH 10/13] Fix comment typo Co-authored-by: Chris White --- src/prefect_server/api/flows.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/prefect_server/api/flows.py b/src/prefect_server/api/flows.py index 514cea3f..08b5878a 100644 --- a/src/prefect_server/api/flows.py +++ b/src/prefect_server/api/flows.py @@ -194,7 +194,7 @@ async def create_flow( # create the flow group or check for the idempotency key in the existing group # Note: The key is stashed in the `settings` dict of the flow group which makes - # idempotency only apply to the most flow creation. If we want to avoid + # idempotency only apply to the most recent flow creation. If we want to avoid # re-registering old flow versions we'll need to add a column to the Flow table # TODO: Should we leave empty idempotency_keys as `None` or generate a uuid4? if flow_group is None: From 7e47b8204f5d8174548ba9f16a38fb6a9eaf8ed1 Mon Sep 17 00:00:00 2001 From: Michael Adkins Date: Wed, 28 Oct 2020 11:01:52 -0500 Subject: [PATCH 11/13] Remove extra note --- src/prefect_server/api/flows.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/prefect_server/api/flows.py b/src/prefect_server/api/flows.py index 08b5878a..0d003eae 100644 --- a/src/prefect_server/api/flows.py +++ b/src/prefect_server/api/flows.py @@ -192,11 +192,8 @@ async def create_flow( } ).first({"id", "schedule", "settings"}) - # create the flow group or check for the idempotency key in the existing group - # Note: The key is stashed in the `settings` dict of the flow group which makes - # idempotency only apply to the most recent flow creation. If we want to avoid - # re-registering old flow versions we'll need to add a column to the Flow table - # TODO: Should we leave empty idempotency_keys as `None` or generate a uuid4? + # create the flow group or check for the idempotency key in the existing flow group + # settings if flow_group is None: flow_group_id = await models.FlowGroup( tenant_id=tenant_id, From 43cd55083a78af9fc8cb1c57965058a888149bd7 Mon Sep 17 00:00:00 2001 From: Michael Adkins Date: Wed, 28 Oct 2020 11:27:29 -0500 Subject: [PATCH 12/13] Add test coverage for archived/deleted cases And fix accidental override of flow local in that case --- src/prefect_server/api/flows.py | 6 ++--- tests/api/test_flows.py | 43 +++++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 3 deletions(-) diff --git a/src/prefect_server/api/flows.py b/src/prefect_server/api/flows.py index 0d003eae..2f20f316 100644 --- a/src/prefect_server/api/flows.py +++ b/src/prefect_server/api/flows.py @@ -218,14 +218,14 @@ async def create_flow( ): # get the most recent unarchived version, there should only be one # unarchived flow at a time but it is safer not to presume - flow = await models.Flow.where( + flow_model = await models.Flow.where( { "version_group_id": {"_eq": version_group_id}, "archived": {"_eq": False}, } ).first(order_by={"version": EnumValue("desc")}) - if flow: - return flow.id + if flow_model: + return flow_model.id # otherwise, despite the key matching we don't have a valid flow to return # and will continue as though the key did not match diff --git a/tests/api/test_flows.py b/tests/api/test_flows.py index b192d568..df53865d 100644 --- a/tests/api/test_flows.py +++ b/tests/api/test_flows.py @@ -326,6 +326,49 @@ async def test_flows_added_with_different_idempotency_key( == len(flow.tasks) * 2 ) + @pytest.mark.parametrize("no_flow_case", ["archived", "deleted"]) + async def test_flows_added_with_same_idempotency_key_but_no_valid_flows( + self, + project_id, + flow, + no_flow_case, + ): + idempotency_key = "foo" + flow_id_1 = await api.flows.create_flow( + project_id=project_id, + serialized_flow=flow.serialize(), + idempotency_key=idempotency_key, + ) + flow_model_1 = await models.Flow.where({"id": {"_eq": flow_id_1}}).first( + {"version", "version_group_id"} + ) + + if no_flow_case == "deleted": + await models.Flow.where({"id": {"_eq": flow_id_1}}).delete() + elif no_flow_case == "archived": + await models.Flow.where({"id": {"_eq": flow_id_1}}).update( + set={"archived": True} + ) + + flow_id_2 = await api.flows.create_flow( + project_id=project_id, + serialized_flow=flow.serialize(), + version_group_id=flow_model_1.version_group_id, + idempotency_key=idempotency_key, + ) + flow_model_2 = await models.Flow.where({"id": {"_eq": flow_id_2}}).first( + {"version"} + ) + + assert flow_id_1 != flow_id_2 + if no_flow_case == "archived": + # in the deleted case, the version will start over + assert flow_model_1.version == flow_model_2.version - 1 + + assert await models.Flow.where( + {"id": {"_in": [flow_id_1, flow_id_2]}} + ).count() == (2 if no_flow_case == "archived" else 1) + async def test_create_flow_with_schedule(self, project_id): flow = prefect.Flow( name="test", schedule=prefect.schedules.CronSchedule("0 0 * * *") From 50066507d22026daf103ff0d69217758871161f2 Mon Sep 17 00:00:00 2001 From: Michael Adkins Date: Wed, 28 Oct 2020 11:56:24 -0500 Subject: [PATCH 13/13] Expand test coverage for different keys Including X -> Y -> X and X -> None -> X --- tests/api/test_flows.py | 79 +++++++++++++++++++++++++---------------- 1 file changed, 49 insertions(+), 30 deletions(-) diff --git a/tests/api/test_flows.py b/tests/api/test_flows.py index df53865d..37312835 100644 --- a/tests/api/test_flows.py +++ b/tests/api/test_flows.py @@ -290,41 +290,60 @@ async def test_flows_not_added_with_same_idempotency_key(self, project_id, flow) flow.tasks ) - @pytest.mark.parametrize("idempotency_keys", [("foo", "bar"), (None, None)]) - async def test_flows_added_with_different_idempotency_key( + @pytest.mark.parametrize( + "idempotency_keys", + [ + pytest.param(("foo", "bar"), id="simple different keys"), + pytest.param((None, None), id="sequential empty keys"), + pytest.param(("foo", None, "foo"), id="same key with empty between"), + pytest.param(("foo", "bar", "foo"), id="same key with different between"), + ], + ) + async def test_flows_added_with_different_idempotency_keys( self, project_id, flow, idempotency_keys ): - flow_id_1 = await api.flows.create_flow( - project_id=project_id, - serialized_flow=flow.serialize(), - idempotency_key=idempotency_keys[0], - ) - flow_model_1 = await models.Flow.where({"id": {"_eq": flow_id_1}}).first( - {"version", "version_group_id"} - ) - flow_id_2 = await api.flows.create_flow( - project_id=project_id, - serialized_flow=flow.serialize(), - version_group_id=flow_model_1.version_group_id, - idempotency_key=idempotency_keys[1], - ) - flow_model_2 = await models.Flow.where({"id": {"_eq": flow_id_2}}).first( - {"version"} - ) + """ + Allows testing any number of keys but expects/asserts that all test cases + successfully create a flow per key + """ + flow_ids = [] + flow_models = [] + + for idempotency_key in idempotency_keys: + flow_id = await api.flows.create_flow( + project_id=project_id, + serialized_flow=flow.serialize(), + idempotency_key=idempotency_key, + # Create the flows within the same group as the first + version_group_id=( + flow_models[0].version_group_id if flow_models else None + ), + ) + flow_ids.append(flow_id) + flow_models.append( + await models.Flow.where({"id": {"_eq": flow_id}}).first( + {"version", "version_group_id"} + ) + ) - assert flow_id_1 != flow_id_2 - assert flow_model_1.version == flow_model_2.version - 1 + # We should have the same number of IDs as keys + assert len(flow_ids) == len(idempotency_keys) - assert ( - await models.Flow.where({"id": {"_in": [flow_id_1, flow_id_2]}}).count() - == 2 - ) - assert ( - await models.Task.where( - {"flow_id": {"_in": [flow_id_1, flow_id_2]}} - ).count() - == len(flow.tasks) * 2 + # All ids should be unique + assert len(flow_ids) == len(set(flow_ids)) + + # Versions should be increasing + for i in range(len(flow_models) - 1): + assert flow_models[i].version == flow_models[i + 1].version - 1 + + # There should be N flows in the Flows table + assert await models.Flow.where({"id": {"_in": flow_ids}}).count() == len( + flow_ids ) + # There should be N * n_tasks_per_flow tasks in the Tasks table + assert await models.Task.where({"flow_id": {"_in": flow_ids}}).count() == len( + flow.tasks + ) * len(flow_ids) @pytest.mark.parametrize("no_flow_case", ["archived", "deleted"]) async def test_flows_added_with_same_idempotency_key_but_no_valid_flows(