Skip to content

Commit

Permalink
Merge pull request #458 from PrefectHQ/remove-some-tests
Browse files Browse the repository at this point in the history
Remove flow runner heartbeat tests
  • Loading branch information
cicdw authored Dec 27, 2018
2 parents 292f68a + 577ab50 commit e96f75e
Showing 1 changed file with 17 additions and 103 deletions.
120 changes: 17 additions & 103 deletions tests/client/test_cloud.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pytest
import time
from unittest.mock import MagicMock

import prefect
Expand Down Expand Up @@ -170,124 +171,37 @@ class TestHeartBeats:
"executor", ["local", "sync", "mproc", "mthread"], indirect=True
)
def test_task_runner_has_a_heartbeat(self, executor, monkeypatch):
glob_dict = {}

def heartbeat(self):
glob_dict["was_called"] = True

monkeypatch.setattr("prefect.engine.task_runner.Client", MagicMock())
client = MagicMock()
monkeypatch.setattr(
"prefect.engine.task_runner.TaskRunner._heartbeat", heartbeat
"prefect.engine.task_runner.Client", MagicMock(return_value=client)
)
task = prefect.Task(name="test")
res = TaskRunner(task=task).run(executor=executor)
assert glob_dict.get("was_called") is True

@prefect.task
def sleeper():
time.sleep(2)

with set_temporary_config({"cloud.heartbeat_interval": 1.0}):
res = TaskRunner(task=sleeper).run(executor=executor)

assert res.is_successful()
assert client.update_task_run_heartbeat.called
assert client.update_task_run_heartbeat.call_count >= 2

@pytest.mark.parametrize(
"executor", ["local", "sync", "mproc", "mthread"], indirect=True
)
def test_task_runner_has_a_heartbeat_with_task_run_id(self, executor, monkeypatch):
glob_dict = {}

def update_heartbeat(*args):
glob_dict["call_args"] = args

get_task_run_info = MagicMock(return_value=MagicMock(id="1234", version=0))
client = MagicMock(
get_task_run_info=get_task_run_info,
update_task_run_heartbeat=update_heartbeat,
)
client = MagicMock(get_task_run_info=get_task_run_info)
monkeypatch.setattr(
"prefect.engine.task_runner.Client", MagicMock(return_value=client)
)
task = prefect.Task(name="test")
with set_temporary_config({"prefect_cloud": True}):
res = TaskRunner(task=task).run(executor=executor)

assert "call_args" in glob_dict
assert glob_dict["call_args"][0] == "1234"

@pytest.mark.parametrize(
"executor", ["local", "sync", "mproc", "mthread"], indirect=True
)
def test_flow_runner_has_a_heartbeat(self, executor, monkeypatch):
glob_dict = {}

def heartbeat(self):
glob_dict["was_called"] = True

monkeypatch.setattr("prefect.engine.flow_runner.Client", MagicMock())
monkeypatch.setattr(
"prefect.engine.flow_runner.FlowRunner._heartbeat", heartbeat
)
flow = prefect.Flow(name="test", tasks=[prefect.Task()])
res = FlowRunner(flow=flow).run(executor=executor)
assert glob_dict.get("was_called") is True

@pytest.mark.parametrize(
"executor", ["local", "sync", "mproc", "mthread"], indirect=True
)
def test_task_runner_has_a_heartbeat_even_when_things_go_wrong(
self, executor, monkeypatch
):
glob_dict = {}

def heartbeat(self):
glob_dict["was_called"] = True

monkeypatch.setattr("prefect.engine.task_runner.Client", MagicMock())
monkeypatch.setattr(
"prefect.engine.task_runner.TaskRunner._heartbeat", heartbeat
)

@prefect.task
def raise_me():
raise AttributeError("Doesn't exist")

res = TaskRunner(task=raise_me).run(executor=executor)
assert res.is_failed()
assert glob_dict.get("was_called") is True

@pytest.mark.parametrize(
"executor", ["local", "sync", "mproc", "mthread"], indirect=True
)
def test_flow_runner_has_a_heartbeat_even_when_things_go_wrong(
self, executor, monkeypatch
):
glob_dict = {}

def heartbeat(self):
glob_dict["was_called"] = True

monkeypatch.setattr("prefect.engine.flow_runner.Client", MagicMock())
monkeypatch.setattr(
"prefect.engine.flow_runner.FlowRunner._heartbeat", heartbeat
)

class BadTaskRunner(TaskRunner):
def get_task_run_state(self, *args, **kwargs):
raise RuntimeError("I represent bad code in the task runner.")

flow = prefect.Flow(name="test", tasks=[prefect.Task()])
res = FlowRunner(flow=flow, task_runner_cls=BadTaskRunner).run(
executor=executor
)
assert res.is_failed()
assert glob_dict.get("was_called") is True

@pytest.mark.parametrize("executor", ["local", "sync"], indirect=True)
def test_all_task_runners_have_heartbeats_within_flows(self, executor, monkeypatch):
"""Because MagicMock()'s don't persist across multiple processes / threads, this test
can only test the local and synchronous executors"""
heartbeat = MagicMock()
flow = prefect.Flow(tasks=[prefect.Task(), prefect.Task(), prefect.Task()])
monkeypatch.setattr("prefect.engine.flow_runner.Client", MagicMock())
monkeypatch.setattr("prefect.engine.task_runner.Client", MagicMock())
monkeypatch.setattr(
"prefect.engine.task_runner.TaskRunner._heartbeat", heartbeat
)
res = FlowRunner(flow=flow).run(executor=executor, state=Pending())
assert heartbeat.call_count == 3
assert res.is_successful()
assert client.update_task_run_heartbeat.call_args[0][0] == "1234"


def test_client_is_always_called_even_during_failures(monkeypatch):
Expand Down

0 comments on commit e96f75e

Please sign in to comment.