Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove flow runner heartbeat tests #458

Merged
merged 1 commit into from
Dec 27, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 17 additions & 103 deletions tests/client/test_cloud.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pytest
import time
from unittest.mock import MagicMock

import prefect
Expand Down Expand Up @@ -170,124 +171,37 @@ class TestHeartBeats:
"executor", ["local", "sync", "mproc", "mthread"], indirect=True
)
def test_task_runner_has_a_heartbeat(self, executor, monkeypatch):
glob_dict = {}

def heartbeat(self):
glob_dict["was_called"] = True

monkeypatch.setattr("prefect.engine.task_runner.Client", MagicMock())
client = MagicMock()
monkeypatch.setattr(
"prefect.engine.task_runner.TaskRunner._heartbeat", heartbeat
"prefect.engine.task_runner.Client", MagicMock(return_value=client)
)
task = prefect.Task(name="test")
res = TaskRunner(task=task).run(executor=executor)
assert glob_dict.get("was_called") is True

@prefect.task
def sleeper():
time.sleep(2)

with set_temporary_config({"cloud.heartbeat_interval": 1.0}):
res = TaskRunner(task=sleeper).run(executor=executor)

assert res.is_successful()
assert client.update_task_run_heartbeat.called
assert client.update_task_run_heartbeat.call_count >= 2

@pytest.mark.parametrize(
"executor", ["local", "sync", "mproc", "mthread"], indirect=True
)
def test_task_runner_has_a_heartbeat_with_task_run_id(self, executor, monkeypatch):
glob_dict = {}

def update_heartbeat(*args):
glob_dict["call_args"] = args

get_task_run_info = MagicMock(return_value=MagicMock(id="1234", version=0))
client = MagicMock(
get_task_run_info=get_task_run_info,
update_task_run_heartbeat=update_heartbeat,
)
client = MagicMock(get_task_run_info=get_task_run_info)
monkeypatch.setattr(
"prefect.engine.task_runner.Client", MagicMock(return_value=client)
)
task = prefect.Task(name="test")
with set_temporary_config({"prefect_cloud": True}):
res = TaskRunner(task=task).run(executor=executor)

assert "call_args" in glob_dict
assert glob_dict["call_args"][0] == "1234"

@pytest.mark.parametrize(
"executor", ["local", "sync", "mproc", "mthread"], indirect=True
)
def test_flow_runner_has_a_heartbeat(self, executor, monkeypatch):
glob_dict = {}

def heartbeat(self):
glob_dict["was_called"] = True

monkeypatch.setattr("prefect.engine.flow_runner.Client", MagicMock())
monkeypatch.setattr(
"prefect.engine.flow_runner.FlowRunner._heartbeat", heartbeat
)
flow = prefect.Flow(name="test", tasks=[prefect.Task()])
res = FlowRunner(flow=flow).run(executor=executor)
assert glob_dict.get("was_called") is True

@pytest.mark.parametrize(
"executor", ["local", "sync", "mproc", "mthread"], indirect=True
)
def test_task_runner_has_a_heartbeat_even_when_things_go_wrong(
self, executor, monkeypatch
):
glob_dict = {}

def heartbeat(self):
glob_dict["was_called"] = True

monkeypatch.setattr("prefect.engine.task_runner.Client", MagicMock())
monkeypatch.setattr(
"prefect.engine.task_runner.TaskRunner._heartbeat", heartbeat
)

@prefect.task
def raise_me():
raise AttributeError("Doesn't exist")

res = TaskRunner(task=raise_me).run(executor=executor)
assert res.is_failed()
assert glob_dict.get("was_called") is True

@pytest.mark.parametrize(
"executor", ["local", "sync", "mproc", "mthread"], indirect=True
)
def test_flow_runner_has_a_heartbeat_even_when_things_go_wrong(
self, executor, monkeypatch
):
glob_dict = {}

def heartbeat(self):
glob_dict["was_called"] = True

monkeypatch.setattr("prefect.engine.flow_runner.Client", MagicMock())
monkeypatch.setattr(
"prefect.engine.flow_runner.FlowRunner._heartbeat", heartbeat
)

class BadTaskRunner(TaskRunner):
def get_task_run_state(self, *args, **kwargs):
raise RuntimeError("I represent bad code in the task runner.")

flow = prefect.Flow(name="test", tasks=[prefect.Task()])
res = FlowRunner(flow=flow, task_runner_cls=BadTaskRunner).run(
executor=executor
)
assert res.is_failed()
assert glob_dict.get("was_called") is True

@pytest.mark.parametrize("executor", ["local", "sync"], indirect=True)
def test_all_task_runners_have_heartbeats_within_flows(self, executor, monkeypatch):
"""Because MagicMock()'s don't persist across multiple processes / threads, this test
can only test the local and synchronous executors"""
heartbeat = MagicMock()
flow = prefect.Flow(tasks=[prefect.Task(), prefect.Task(), prefect.Task()])
monkeypatch.setattr("prefect.engine.flow_runner.Client", MagicMock())
monkeypatch.setattr("prefect.engine.task_runner.Client", MagicMock())
monkeypatch.setattr(
"prefect.engine.task_runner.TaskRunner._heartbeat", heartbeat
)
res = FlowRunner(flow=flow).run(executor=executor, state=Pending())
assert heartbeat.call_count == 3
assert res.is_successful()
assert client.update_task_run_heartbeat.call_args[0][0] == "1234"


def test_client_is_always_called_even_during_failures(monkeypatch):
Expand Down