Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task run heartbeats refactor #158

Merged
merged 4 commits into from
Dec 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ aliases:
run:
name: Set environment variables
command: |
# set prefect tag -- currently pinning to master across the board
echo 'export PREFECT_VERSION=master' >> $BASH_ENV
# set prefect tag -- currently pinning to 0.13.19 as 0.14 is prepared
echo 'export PREFECT_VERSION=0.13.19' >> $BASH_ENV

- &install_prefect_server
run:
Expand Down
2 changes: 2 additions & 0 deletions changes/pr158.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
enhancement:
- "Refactor the Zombie Killer to rely entirely on flow run heartbeats - [#158](https://github.com/PrefectHQ/server/pull/158)"
4 changes: 2 additions & 2 deletions src/prefect_server/services/towel/zombie_killer.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ async def get_task_runs_where_clause(
return {
# the task run is RUNNING
"state": {"_eq": "Running"},
# ... but the heartbeat is stale
"heartbeat": {"_lte": str(heartbeat_cutoff)},
# ... but the flow run heartbeat is stale
"flow_run": {"heartbeat": {"_lte": str(heartbeat_cutoff)}},
# ... and the flow has heartbeats enabled
"task": {
"flow": {
Expand Down
70 changes: 19 additions & 51 deletions tests/services/test_zombie_killer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ async def test_zombie_killer_fails_task_run(running_flow_run_id, task_run_id):
)

# set old heartbeat
await models.TaskRun.where(id=task_run_id).update(
await models.FlowRun.where(id=running_flow_run_id).update(
set={"heartbeat": pendulum.now("utc").subtract(hours=1)}
)

Expand All @@ -32,29 +32,6 @@ async def test_zombie_killer_fails_task_run(running_flow_run_id, task_run_id):
assert flow_run.state == "Running"


async def test_zombie_killer_does_not_fail_dead_flow_run_if_task_still_heartbeating(
running_flow_run_id, task_run_id
):
await api.states.set_task_run_state(
task_run_id, state=prefect.engine.state.Running()
)

# set old heartbeat on flow run, but recent one on task run
await models.FlowRun.where(id=running_flow_run_id).update(
set={"heartbeat": pendulum.now("utc").subtract(hours=1)}
)
await models.TaskRun.where(id=task_run_id).update(
set={"heartbeat": pendulum.now("utc").subtract(seconds=1)}
)

assert await ZombieKiller().reap_zombie_task_runs() == 0

task_run = await models.TaskRun.where(id=task_run_id).first({"state"})
flow_run = await models.FlowRun.where(id=running_flow_run_id).first({"state"})
assert task_run.state == "Running"
assert flow_run.state == "Running"


async def test_zombie_killer_does_not_fail_flow_run_if_heartbeat_disabled(
flow_id, flow_group_id, running_flow_run_id, task_run_id
):
Expand All @@ -71,14 +48,11 @@ async def test_zombie_killer_does_not_fail_flow_run_if_heartbeat_disabled(
await models.FlowRun.where(id=running_flow_run_id).update(
set={"heartbeat": pendulum.now("utc").subtract(hours=1)}
)
await models.TaskRun.where(id=task_run_id).update(
set={"heartbeat": pendulum.now("utc").subtract(hours=1)}
)

assert await ZombieKiller().reap_zombie_task_runs() == 0


async def test_zombie_killer_fails_flow_run_if_heartbeat_setting_set_but_not_disabled(
async def test_zombie_killer_fails_task_run_if_heartbeat_setting_set_but_not_disabled(
flow_id, flow_group_id, running_flow_run_id, task_run_id
):

Expand All @@ -94,16 +68,13 @@ async def test_zombie_killer_fails_flow_run_if_heartbeat_setting_set_but_not_dis
await models.FlowRun.where(id=running_flow_run_id).update(
set={"heartbeat": pendulum.now("utc").subtract(hours=1)}
)
await models.TaskRun.where(id=task_run_id).update(
set={"heartbeat": pendulum.now("utc").subtract(hours=1)}
)

assert await ZombieKiller().reap_zombie_task_runs() == 1
task_run = await models.TaskRun.where(id=task_run_id).first({"state"})
assert task_run.state == "Failed"


async def test_zombie_killer_fails_flow_run_if_heartbeat_setting_not_set(
async def test_zombie_killer_fails_task_run_if_heartbeat_setting_not_set(
flow_id, flow_group_id, running_flow_run_id, task_run_id
):
await models.FlowGroup.where(id=flow_group_id).update({"settings": {}})
Expand All @@ -120,9 +91,6 @@ async def test_zombie_killer_fails_flow_run_if_heartbeat_setting_not_set(
await models.FlowRun.where(id=running_flow_run_id).update(
set={"heartbeat": pendulum.now("utc").subtract(hours=1)}
)
await models.TaskRun.where(id=task_run_id).update(
set={"heartbeat": pendulum.now("utc").subtract(hours=1)}
)

assert await ZombieKiller().reap_zombie_task_runs() == 1
task_run = await models.TaskRun.where(id=task_run_id).first({"state"})
Expand All @@ -145,9 +113,6 @@ async def test_zombie_killer_does_not_apply_if_task_run_is_scheduled(
await models.FlowRun.where(id=running_flow_run_id).update(
set={"heartbeat": pendulum.now("utc").subtract(hours=1)}
)
await models.TaskRun.where(id=task_run_id).update(
set={"heartbeat": pendulum.now("utc").subtract(hours=1)}
)

assert await ZombieKiller().reap_zombie_task_runs() == 0

Expand All @@ -159,6 +124,9 @@ async def test_zombie_killer_does_not_apply_if_heartbeat_is_recent(
task_run_id, state=prefect.engine.state.Running()
)

await models.FlowRun.where(id=running_flow_run_id).update(
set={"heartbeat": pendulum.now("utc")}
)
assert await ZombieKiller().reap_zombie_task_runs() == 0

task_run = await models.TaskRun.where(id=task_run_id).first({"state"})
Expand All @@ -173,7 +141,7 @@ async def test_zombie_killer_creates_logs(running_flow_run_id, task_run_id):
)

# set old heartbeat
await models.TaskRun.where(id=task_run_id).update(
await models.FlowRun.where(id=running_flow_run_id).update(
set={"heartbeat": pendulum.now("utc").subtract(hours=1)}
)

Expand All @@ -196,7 +164,7 @@ async def test_zombie_killer_creates_logs(running_flow_run_id, task_run_id):

class TestZombieKillerRetries:
async def test_zombie_killer_retries_if_max_retries_greater_than_0(
running_flow_run_id, task_id, task_run_id
self, running_flow_run_id, task_id, task_run_id
):

await models.Task.where(id=task_id).update(
Expand All @@ -208,7 +176,7 @@ async def test_zombie_killer_retries_if_max_retries_greater_than_0(
)

# set old heartbeat
await models.TaskRun.where(id=task_run_id).update(
await models.FlowRun.where(id=running_flow_run_id).update(
set={"heartbeat": pendulum.now("utc").subtract(hours=1)}
)

Expand All @@ -221,7 +189,7 @@ async def test_zombie_killer_retries_if_max_retries_greater_than_0(
assert task_run.state_start_time < pendulum.now()

async def test_zombie_killer_retries_if_retry_delay_missing(
running_flow_run_id, task_id, task_run_id
self, running_flow_run_id, task_id, task_run_id
):

await models.Task.where(id=task_id).update(
Expand All @@ -233,7 +201,7 @@ async def test_zombie_killer_retries_if_retry_delay_missing(
)

# set old heartbeat
await models.TaskRun.where(id=task_run_id).update(
await models.FlowRun.where(id=running_flow_run_id).update(
set={"heartbeat": pendulum.now("utc").subtract(hours=1)}
)

Expand All @@ -246,7 +214,7 @@ async def test_zombie_killer_retries_if_retry_delay_missing(
assert task_run.state_start_time < pendulum.now()

async def test_zombie_killer_respects_retry_delay(
running_flow_run_id, task_id, task_run_id
self, running_flow_run_id, task_id, task_run_id
):

await models.Task.where(id=task_id).update(
Expand All @@ -258,7 +226,7 @@ async def test_zombie_killer_respects_retry_delay(
)

# set old heartbeat
await models.TaskRun.where(id=task_run_id).update(
await models.FlowRun.where(id=running_flow_run_id).update(
set={"heartbeat": pendulum.now("utc").subtract(hours=1)}
)

Expand All @@ -275,7 +243,7 @@ async def test_zombie_killer_respects_retry_delay(
)

async def test_zombie_killer_respects_retry_delay_in_postgres_readable_syntax(
running_flow_run_id, task_id, task_run_id
self, running_flow_run_id, task_id, task_run_id
):

await models.Task.where(id=task_id).update(
Expand All @@ -287,7 +255,7 @@ async def test_zombie_killer_respects_retry_delay_in_postgres_readable_syntax(
)

# set old heartbeat
await models.TaskRun.where(id=task_run_id).update(
await models.FlowRun.where(id=running_flow_run_id).update(
set={"heartbeat": pendulum.now("utc").subtract(hours=1)}
)

Expand All @@ -304,7 +272,7 @@ async def test_zombie_killer_respects_retry_delay_in_postgres_readable_syntax(
)

async def test_zombie_killer_stops_retrying_if_max_retries_exceeded(
running_flow_run_id, task_id, task_run_id
self, running_flow_run_id, task_id, task_run_id
):

await models.Task.where(id=task_id).update(
Expand All @@ -316,7 +284,7 @@ async def test_zombie_killer_stops_retrying_if_max_retries_exceeded(
)

# set old heartbeat
await models.TaskRun.where(id=task_run_id).update(
await models.FlowRun.where(id=running_flow_run_id).update(
set={"heartbeat": pendulum.now("utc").subtract(hours=1)}
)

Expand All @@ -331,7 +299,7 @@ async def test_zombie_killer_stops_retrying_if_max_retries_exceeded(
task_run_id, state=prefect.engine.state.Running()
)

await models.TaskRun.where(id=task_run_id).update(
await models.FlowRun.where(id=running_flow_run_id).update(
set={"heartbeat": pendulum.now("utc").subtract(hours=1)}
)

Expand All @@ -357,7 +325,7 @@ async def test_zombie_killer_does_not_retry_if_flow_run_is_not_running(
await models.Task.where(id=task_id).update(
{"max_retries": 1, "retry_delay": "00:00:00"}
)
await models.TaskRun.where(id=task_run_id).update(
await models.FlowRun.where(id=flow_run_id).update(
set={"heartbeat": pendulum.now("utc").subtract(hours=1)}
)

Expand Down