From a5ee959cb65e3e9fddcd08baf4bc486e931b7aec Mon Sep 17 00:00:00 2001 From: Chris White Date: Fri, 5 Feb 2021 14:11:14 -0800 Subject: [PATCH 1/4] Cancel adhoc runs for archived flows instead of deleting them --- src/prefect_server/api/flows.py | 20 ++++++++++++++++++-- src/prefect_server/api/states.py | 8 +++++--- tests/api/test_flows.py | 31 ++++++++++++++++++++++++++++++- 3 files changed, 53 insertions(+), 6 deletions(-) diff --git a/src/prefect_server/api/flows.py b/src/prefect_server/api/flows.py index 2f20f316..bf8c4c43 100644 --- a/src/prefect_server/api/flows.py +++ b/src/prefect_server/api/flows.py @@ -332,7 +332,8 @@ async def archive_flow(flow_id: str) -> bool: Archives a flow. Archiving a flow prevents it from scheduling new runs. It also: - - deletes any currently scheduled runs + - deletes any currently auto-scheduled runs + - cancels any ad-hoc created scheduled runs - resets the "last scheduled run time" of any schedules Args: @@ -355,9 +356,24 @@ async def archive_flow(flow_id: str) -> bool: # delete scheduled flow runs await models.FlowRun.where( - {"flow_id": {"_eq": flow_id}, "state": {"_eq": "Scheduled"}} + { + "flow_id": {"_eq": flow_id}, + "state": {"_eq": "Scheduled"}, + "auto_scheduled": {"_eq": True}, + } ).delete() + # cancel adhoc scheduled flow runs + fr_ids = await models.FlowRun.where( + { + "flow_id": {"_eq": flow_id}, + "state": {"_eq": "Scheduled"}, + "auto_scheduled": {"_eq": False}, + } + ).get({"id"}) + msg = f"Flow {flow_id} was archived." + for flow_run in fr_ids: + await api.states.cancel_flow_run(flow_run.id, state_message=msg) return True diff --git a/src/prefect_server/api/states.py b/src/prefect_server/api/states.py index 310808fb..d88f2df5 100644 --- a/src/prefect_server/api/states.py +++ b/src/prefect_server/api/states.py @@ -207,7 +207,9 @@ async def set_task_run_state( @register_api("states.cancel_flow_run") -async def cancel_flow_run(flow_run_id: str) -> models.FlowRun: +async def cancel_flow_run( + flow_run_id: str, state_message: str = None +) -> models.FlowRun: """ Cancel a flow run. @@ -231,7 +233,7 @@ async def cancel_flow_run(flow_run_id: str) -> models.FlowRun: return flow_run else: if state.is_running(): - state = Cancelling("Flow run is cancelling") + state = Cancelling(state_message or "Flow run is cancelling") else: - state = Cancelled("Flow run is cancelled") + state = Cancelled(state_message or "Flow run is cancelled") return await set_flow_run_state(flow_run_id=flow_run_id, state=state) diff --git a/tests/api/test_flows.py b/tests/api/test_flows.py index d787f733..1b9cfca4 100644 --- a/tests/api/test_flows.py +++ b/tests/api/test_flows.py @@ -746,7 +746,7 @@ async def test_archive_flow_twice(self, flow_id): flow = await models.Flow.where(id=flow_id).first({"archived"}) assert flow.archived - async def test_archive_flow_deletes_scheduled_runs(self, flow_id): + async def test_archive_flow_deletes_auto_scheduled_runs(self, flow_id): # create scheduled api.runs since the fixture doesn't await api.flows.schedule_flow_runs(flow_id=flow_id) @@ -765,6 +765,35 @@ async def test_archive_flow_deletes_scheduled_runs(self, flow_id): == 0 ) + async def test_archive_flow_cancels_ad_hoc_runs(self, flow_id): + # create one ad hoc run and 10 auto scheduled runs + flow_run_id = await api.runs.create_flow_run(flow_id) + await api.flows.schedule_flow_runs(flow_id=flow_id) + + scheduled_runs = await models.FlowRun.where( + { + "flow_id": {"_eq": flow_id}, + "state": {"_eq": "Scheduled"}, + "auto_scheduled": {"_eq": True}, + } + ).get({"id"}) + assert scheduled_runs + + await api.flows.archive_flow(flow_id) + + # all auto scheduled deleted + assert ( + await models.FlowRun.where( + {"id": {"_in": [r.id for r in scheduled_runs]}} + ).count() + == 0 + ) + ad_hoc_state = await models.FlowRun.where(id=flow_run_id).first( + {"state", "state_message"} + ) + assert ad_hoc_state.state == "Cancelled" + assert "archived" in ad_hoc_state.state_message + async def test_archive_flow_with_bad_id(self, flow_id): assert not await api.flows.archive_flow(str(uuid.uuid4())) From 1a49b668aaea5b78dd185a949074ac6da6b03928 Mon Sep 17 00:00:00 2001 From: Chris White Date: Fri, 5 Feb 2021 14:12:17 -0800 Subject: [PATCH 2/4] Add changelog entry --- changes/pr185.yaml | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 changes/pr185.yaml diff --git a/changes/pr185.yaml b/changes/pr185.yaml new file mode 100644 index 00000000..a1ddde40 --- /dev/null +++ b/changes/pr185.yaml @@ -0,0 +1,2 @@ +enhancement: + - "Cancel ad-hoc created flow runs instead of deleting them when archiving a flow - [#185](https://github.com/PrefectHQ/server/pull/185)" From 28153e9c31b658469cb0b36ca8fc47f1f5195842 Mon Sep 17 00:00:00 2001 From: Chris White Date: Mon, 8 Feb 2021 12:30:21 -0800 Subject: [PATCH 3/4] Switch to using gather to handle exceptions better --- src/prefect_server/api/flows.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/prefect_server/api/flows.py b/src/prefect_server/api/flows.py index bf8c4c43..34ead9dc 100644 --- a/src/prefect_server/api/flows.py +++ b/src/prefect_server/api/flows.py @@ -372,8 +372,10 @@ async def archive_flow(flow_id: str) -> bool: } ).get({"id"}) msg = f"Flow {flow_id} was archived." + tasks = [] for flow_run in fr_ids: - await api.states.cancel_flow_run(flow_run.id, state_message=msg) + tasks.append(api.states.cancel_flow_run(flow_run.id, state_message=msg)) + await asyncio.gather(*tasks) return True From 67aeb3db00356f991622c5d35b34650356100179 Mon Sep 17 00:00:00 2001 From: Chris White Date: Mon, 8 Feb 2021 12:58:10 -0800 Subject: [PATCH 4/4] Use list comprehension instead, and import asyncio --- src/prefect_server/api/flows.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/prefect_server/api/flows.py b/src/prefect_server/api/flows.py index 34ead9dc..de11371e 100644 --- a/src/prefect_server/api/flows.py +++ b/src/prefect_server/api/flows.py @@ -1,3 +1,4 @@ +import asyncio import datetime import hashlib import json @@ -372,10 +373,12 @@ async def archive_flow(flow_id: str) -> bool: } ).get({"id"}) msg = f"Flow {flow_id} was archived." - tasks = [] - for flow_run in fr_ids: - tasks.append(api.states.cancel_flow_run(flow_run.id, state_message=msg)) - await asyncio.gather(*tasks) + await asyncio.gather( + *[ + api.states.cancel_flow_run(flow_run.id, state_message=msg) + for flow_run in fr_ids + ] + ) return True