From 95a3adb56362030e932695bfed6b4090f6b947b7 Mon Sep 17 00:00:00 2001 From: Chris White Date: Thu, 10 Dec 2020 20:01:47 -0800 Subject: [PATCH 1/4] Refactor zombie killer to rely entirely on flow run heartbeats --- .../services/towel/zombie_killer.py | 4 +- tests/services/test_zombie_killer.py | 70 +++++-------------- 2 files changed, 21 insertions(+), 53 deletions(-) diff --git a/src/prefect_server/services/towel/zombie_killer.py b/src/prefect_server/services/towel/zombie_killer.py index b283957f..cf9bb5c8 100644 --- a/src/prefect_server/services/towel/zombie_killer.py +++ b/src/prefect_server/services/towel/zombie_killer.py @@ -100,8 +100,8 @@ async def get_task_runs_where_clause( return { # the task run is RUNNING "state": {"_eq": "Running"}, - # ... but the heartbeat is stale - "heartbeat": {"_lte": str(heartbeat_cutoff)}, + # ... but the flow run heartbeat is stale + "flow_run": {"heartbeat": {"_lte": str(heartbeat_cutoff)}}, # ... and the flow has heartbeats enabled "task": { "flow": { diff --git a/tests/services/test_zombie_killer.py b/tests/services/test_zombie_killer.py index 29a2fe71..7c64344e 100644 --- a/tests/services/test_zombie_killer.py +++ b/tests/services/test_zombie_killer.py @@ -20,7 +20,7 @@ async def test_zombie_killer_fails_task_run(running_flow_run_id, task_run_id): ) # set old heartbeat - await models.TaskRun.where(id=task_run_id).update( + await models.FlowRun.where(id=running_flow_run_id).update( set={"heartbeat": pendulum.now("utc").subtract(hours=1)} ) @@ -32,29 +32,6 @@ async def test_zombie_killer_fails_task_run(running_flow_run_id, task_run_id): assert flow_run.state == "Running" -async def test_zombie_killer_does_not_fail_dead_flow_run_if_task_still_heartbeating( - running_flow_run_id, task_run_id -): - await api.states.set_task_run_state( - task_run_id, state=prefect.engine.state.Running() - ) - - # set old heartbeat on flow run, but recent one on task run - await models.FlowRun.where(id=running_flow_run_id).update( - set={"heartbeat": pendulum.now("utc").subtract(hours=1)} - ) - await models.TaskRun.where(id=task_run_id).update( - set={"heartbeat": pendulum.now("utc").subtract(seconds=1)} - ) - - assert await ZombieKiller().reap_zombie_task_runs() == 0 - - task_run = await models.TaskRun.where(id=task_run_id).first({"state"}) - flow_run = await models.FlowRun.where(id=running_flow_run_id).first({"state"}) - assert task_run.state == "Running" - assert flow_run.state == "Running" - - async def test_zombie_killer_does_not_fail_flow_run_if_heartbeat_disabled( flow_id, flow_group_id, running_flow_run_id, task_run_id ): @@ -71,14 +48,11 @@ async def test_zombie_killer_does_not_fail_flow_run_if_heartbeat_disabled( await models.FlowRun.where(id=running_flow_run_id).update( set={"heartbeat": pendulum.now("utc").subtract(hours=1)} ) - await models.TaskRun.where(id=task_run_id).update( - set={"heartbeat": pendulum.now("utc").subtract(hours=1)} - ) assert await ZombieKiller().reap_zombie_task_runs() == 0 -async def test_zombie_killer_fails_flow_run_if_heartbeat_setting_set_but_not_disabled( +async def test_zombie_killer_fails_task_run_if_heartbeat_setting_set_but_not_disabled( flow_id, flow_group_id, running_flow_run_id, task_run_id ): @@ -94,16 +68,13 @@ async def test_zombie_killer_fails_flow_run_if_heartbeat_setting_set_but_not_dis await models.FlowRun.where(id=running_flow_run_id).update( set={"heartbeat": pendulum.now("utc").subtract(hours=1)} ) - await models.TaskRun.where(id=task_run_id).update( - set={"heartbeat": pendulum.now("utc").subtract(hours=1)} - ) assert await ZombieKiller().reap_zombie_task_runs() == 1 task_run = await models.TaskRun.where(id=task_run_id).first({"state"}) assert task_run.state == "Failed" -async def test_zombie_killer_fails_flow_run_if_heartbeat_setting_not_set( +async def test_zombie_killer_fails_task_run_if_heartbeat_setting_not_set( flow_id, flow_group_id, running_flow_run_id, task_run_id ): await models.FlowGroup.where(id=flow_group_id).update({"settings": {}}) @@ -120,9 +91,6 @@ async def test_zombie_killer_fails_flow_run_if_heartbeat_setting_not_set( await models.FlowRun.where(id=running_flow_run_id).update( set={"heartbeat": pendulum.now("utc").subtract(hours=1)} ) - await models.TaskRun.where(id=task_run_id).update( - set={"heartbeat": pendulum.now("utc").subtract(hours=1)} - ) assert await ZombieKiller().reap_zombie_task_runs() == 1 task_run = await models.TaskRun.where(id=task_run_id).first({"state"}) @@ -145,9 +113,6 @@ async def test_zombie_killer_does_not_apply_if_task_run_is_scheduled( await models.FlowRun.where(id=running_flow_run_id).update( set={"heartbeat": pendulum.now("utc").subtract(hours=1)} ) - await models.TaskRun.where(id=task_run_id).update( - set={"heartbeat": pendulum.now("utc").subtract(hours=1)} - ) assert await ZombieKiller().reap_zombie_task_runs() == 0 @@ -159,6 +124,9 @@ async def test_zombie_killer_does_not_apply_if_heartbeat_is_recent( task_run_id, state=prefect.engine.state.Running() ) + await models.FlowRun.where(id=running_flow_run_id).update( + set={"heartbeat": pendulum.now("utc")} + ) assert await ZombieKiller().reap_zombie_task_runs() == 0 task_run = await models.TaskRun.where(id=task_run_id).first({"state"}) @@ -173,7 +141,7 @@ async def test_zombie_killer_creates_logs(running_flow_run_id, task_run_id): ) # set old heartbeat - await models.TaskRun.where(id=task_run_id).update( + await models.FlowRun.where(id=running_flow_run_id).update( set={"heartbeat": pendulum.now("utc").subtract(hours=1)} ) @@ -196,7 +164,7 @@ async def test_zombie_killer_creates_logs(running_flow_run_id, task_run_id): class TestZombieKillerRetries: async def test_zombie_killer_retries_if_max_retries_greater_than_0( - running_flow_run_id, task_id, task_run_id + self, running_flow_run_id, task_id, task_run_id ): await models.Task.where(id=task_id).update( @@ -208,7 +176,7 @@ async def test_zombie_killer_retries_if_max_retries_greater_than_0( ) # set old heartbeat - await models.TaskRun.where(id=task_run_id).update( + await models.FlowRun.where(id=running_flow_run_id).update( set={"heartbeat": pendulum.now("utc").subtract(hours=1)} ) @@ -221,7 +189,7 @@ async def test_zombie_killer_retries_if_max_retries_greater_than_0( assert task_run.state_start_time < pendulum.now() async def test_zombie_killer_retries_if_retry_delay_missing( - running_flow_run_id, task_id, task_run_id + self, running_flow_run_id, task_id, task_run_id ): await models.Task.where(id=task_id).update( @@ -233,7 +201,7 @@ async def test_zombie_killer_retries_if_retry_delay_missing( ) # set old heartbeat - await models.TaskRun.where(id=task_run_id).update( + await models.FlowRun.where(id=running_flow_run_id).update( set={"heartbeat": pendulum.now("utc").subtract(hours=1)} ) @@ -246,7 +214,7 @@ async def test_zombie_killer_retries_if_retry_delay_missing( assert task_run.state_start_time < pendulum.now() async def test_zombie_killer_respects_retry_delay( - running_flow_run_id, task_id, task_run_id + self, running_flow_run_id, task_id, task_run_id ): await models.Task.where(id=task_id).update( @@ -258,7 +226,7 @@ async def test_zombie_killer_respects_retry_delay( ) # set old heartbeat - await models.TaskRun.where(id=task_run_id).update( + await models.FlowRun.where(id=running_flow_run_id).update( set={"heartbeat": pendulum.now("utc").subtract(hours=1)} ) @@ -275,7 +243,7 @@ async def test_zombie_killer_respects_retry_delay( ) async def test_zombie_killer_respects_retry_delay_in_postgres_readable_syntax( - running_flow_run_id, task_id, task_run_id + self, running_flow_run_id, task_id, task_run_id ): await models.Task.where(id=task_id).update( @@ -287,7 +255,7 @@ async def test_zombie_killer_respects_retry_delay_in_postgres_readable_syntax( ) # set old heartbeat - await models.TaskRun.where(id=task_run_id).update( + await models.FlowRun.where(id=running_flow_run_id).update( set={"heartbeat": pendulum.now("utc").subtract(hours=1)} ) @@ -304,7 +272,7 @@ async def test_zombie_killer_respects_retry_delay_in_postgres_readable_syntax( ) async def test_zombie_killer_stops_retrying_if_max_retries_exceeded( - running_flow_run_id, task_id, task_run_id + self, running_flow_run_id, task_id, task_run_id ): await models.Task.where(id=task_id).update( @@ -316,7 +284,7 @@ async def test_zombie_killer_stops_retrying_if_max_retries_exceeded( ) # set old heartbeat - await models.TaskRun.where(id=task_run_id).update( + await models.FlowRun.where(id=running_flow_run_id).update( set={"heartbeat": pendulum.now("utc").subtract(hours=1)} ) @@ -331,7 +299,7 @@ async def test_zombie_killer_stops_retrying_if_max_retries_exceeded( task_run_id, state=prefect.engine.state.Running() ) - await models.TaskRun.where(id=task_run_id).update( + await models.FlowRun.where(id=running_flow_run_id).update( set={"heartbeat": pendulum.now("utc").subtract(hours=1)} ) @@ -357,7 +325,7 @@ async def test_zombie_killer_does_not_retry_if_flow_run_is_not_running( await models.Task.where(id=task_id).update( {"max_retries": 1, "retry_delay": "00:00:00"} ) - await models.TaskRun.where(id=task_run_id).update( + await models.FlowRun.where(id=flow_run_id).update( set={"heartbeat": pendulum.now("utc").subtract(hours=1)} ) From cdf6ffa3098828719467e6d6e04f092c4dadca01 Mon Sep 17 00:00:00 2001 From: Chris White Date: Thu, 10 Dec 2020 20:03:54 -0800 Subject: [PATCH 2/4] Add changelog entry --- changes/pr158.yaml | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 changes/pr158.yaml diff --git a/changes/pr158.yaml b/changes/pr158.yaml new file mode 100644 index 00000000..0aecac5a --- /dev/null +++ b/changes/pr158.yaml @@ -0,0 +1,2 @@ +enhancement: + - "Refactor the Zombie Killer to rely entirely on flow run heartbeats - [#158](https://github.com/PrefectHQ/server/pull/158)" From fcc993ab746f3b44db56cd06b5cdab7d12d4fe1d Mon Sep 17 00:00:00 2001 From: Chris White Date: Thu, 10 Dec 2020 20:14:41 -0800 Subject: [PATCH 3/4] Bound prefect from above as 0.14 changes roll out --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 545aff23..7022cc85 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ import versioneer install_requires = [ - "prefect >= 0.13.0", + "prefect < 0.14.0", # temporary "alembic >= 1.2, < 2.0", "ariadne >= 0.8.0, < 0.12.0", "asyncpg >= 0.20, < 0.21", From 9a5f4c07acd850b95f866a0020bf0aa57ea1241c Mon Sep 17 00:00:00 2001 From: Chris White Date: Thu, 10 Dec 2020 20:30:09 -0800 Subject: [PATCH 4/4] Pin to 0.13.19 in the right place --- .circleci/config.yml | 4 ++-- setup.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 140672fe..2378774a 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -3,8 +3,8 @@ aliases: run: name: Set environment variables command: | - # set prefect tag -- currently pinning to master across the board - echo 'export PREFECT_VERSION=master' >> $BASH_ENV + # set prefect tag -- currently pinning to 0.13.19 as 0.14 is prepared + echo 'export PREFECT_VERSION=0.13.19' >> $BASH_ENV - &install_prefect_server run: diff --git a/setup.py b/setup.py index 7022cc85..545aff23 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ import versioneer install_requires = [ - "prefect < 0.14.0", # temporary + "prefect >= 0.13.0", "alembic >= 1.2, < 2.0", "ariadne >= 0.8.0, < 0.12.0", "asyncpg >= 0.20, < 0.21",