Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better archiving experience #185

Merged
merged 4 commits into from
Feb 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changes/pr185.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
enhancement:
- "Cancel ad-hoc created flow runs instead of deleting them when archiving a flow - [#185](https://github.com/PrefectHQ/server/pull/185)"
25 changes: 23 additions & 2 deletions src/prefect_server/api/flows.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import datetime
import hashlib
import json
Expand Down Expand Up @@ -332,7 +333,8 @@ async def archive_flow(flow_id: str) -> bool:
Archives a flow.

Archiving a flow prevents it from scheduling new runs. It also:
- deletes any currently scheduled runs
- deletes any currently auto-scheduled runs
- cancels any ad-hoc created scheduled runs
- resets the "last scheduled run time" of any schedules

Args:
Expand All @@ -355,9 +357,28 @@ async def archive_flow(flow_id: str) -> bool:

# delete scheduled flow runs
await models.FlowRun.where(
{"flow_id": {"_eq": flow_id}, "state": {"_eq": "Scheduled"}}
{
"flow_id": {"_eq": flow_id},
"state": {"_eq": "Scheduled"},
"auto_scheduled": {"_eq": True},
}
).delete()

# cancel adhoc scheduled flow runs
fr_ids = await models.FlowRun.where(
{
"flow_id": {"_eq": flow_id},
"state": {"_eq": "Scheduled"},
"auto_scheduled": {"_eq": False},
}
).get({"id"})
msg = f"Flow {flow_id} was archived."
await asyncio.gather(
*[
api.states.cancel_flow_run(flow_run.id, state_message=msg)
for flow_run in fr_ids
]
)
return True


Expand Down
8 changes: 5 additions & 3 deletions src/prefect_server/api/states.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,9 @@ async def set_task_run_state(


@register_api("states.cancel_flow_run")
async def cancel_flow_run(flow_run_id: str) -> models.FlowRun:
async def cancel_flow_run(
flow_run_id: str, state_message: str = None
) -> models.FlowRun:
"""
Cancel a flow run.

Expand All @@ -231,7 +233,7 @@ async def cancel_flow_run(flow_run_id: str) -> models.FlowRun:
return flow_run
else:
if state.is_running():
state = Cancelling("Flow run is cancelling")
state = Cancelling(state_message or "Flow run is cancelling")
zanieb marked this conversation as resolved.
Show resolved Hide resolved
else:
state = Cancelled("Flow run is cancelled")
state = Cancelled(state_message or "Flow run is cancelled")
return await set_flow_run_state(flow_run_id=flow_run_id, state=state)
31 changes: 30 additions & 1 deletion tests/api/test_flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,7 @@ async def test_archive_flow_twice(self, flow_id):
flow = await models.Flow.where(id=flow_id).first({"archived"})
assert flow.archived

async def test_archive_flow_deletes_scheduled_runs(self, flow_id):
async def test_archive_flow_deletes_auto_scheduled_runs(self, flow_id):
# create scheduled api.runs since the fixture doesn't

await api.flows.schedule_flow_runs(flow_id=flow_id)
Expand All @@ -765,6 +765,35 @@ async def test_archive_flow_deletes_scheduled_runs(self, flow_id):
== 0
)

async def test_archive_flow_cancels_ad_hoc_runs(self, flow_id):
# create one ad hoc run and 10 auto scheduled runs
flow_run_id = await api.runs.create_flow_run(flow_id)
await api.flows.schedule_flow_runs(flow_id=flow_id)

scheduled_runs = await models.FlowRun.where(
{
"flow_id": {"_eq": flow_id},
"state": {"_eq": "Scheduled"},
"auto_scheduled": {"_eq": True},
}
).get({"id"})
assert scheduled_runs

await api.flows.archive_flow(flow_id)

# all auto scheduled deleted
assert (
await models.FlowRun.where(
{"id": {"_in": [r.id for r in scheduled_runs]}}
).count()
== 0
)
ad_hoc_state = await models.FlowRun.where(id=flow_run_id).first(
{"state", "state_message"}
)
assert ad_hoc_state.state == "Cancelled"
assert "archived" in ad_hoc_state.state_message

async def test_archive_flow_with_bad_id(self, flow_id):
assert not await api.flows.archive_flow(str(uuid.uuid4()))

Expand Down