Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 fixing flaky test test_checked_once_task_is_auto_removed #4527

Merged
merged 6 commits into from
Jul 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,11 @@ def get_task_status(
done = task.done()

return TaskStatus.parse_obj(
dict(
task_progress=tracked_task.task_progress,
done=done,
started=tracked_task.started,
)
{
"task_progress": tracked_task.task_progress,
"done": done,
"started": tracked_task.started,
}
)

def get_task_result(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@

import asyncio
import urllib.parse
from collections.abc import AsyncIterator
from datetime import datetime
from typing import AsyncIterator, Final
from typing import Any, Final

import pytest
from faker import Faker
Expand All @@ -24,17 +25,18 @@
TaskStatus,
)
from servicelib.long_running_tasks._task import TasksManager, start_task
from tenacity import TryAgain
from tenacity._asyncio import AsyncRetrying
from tenacity.retry import retry_if_exception_type
from tenacity.stop import stop_after_delay
from tenacity.wait import wait_fixed

_RETRY_PARAMS = dict(
reraise=True,
wait=wait_fixed(0.1),
stop=stop_after_delay(60),
retry=retry_if_exception_type(AssertionError),
)
_RETRY_PARAMS: dict[str, Any] = {
"reraise": True,
"wait": wait_fixed(0.1),
"stop": stop_after_delay(60),
"retry": retry_if_exception_type((AssertionError, TryAgain)),
}


async def a_background_task(
Expand All @@ -47,7 +49,8 @@ async def a_background_task(
await asyncio.sleep(1)
task_progress.update(percent=ProgressPercent((i + 1) / total_sleep))
if raise_when_finished:
raise RuntimeError("raised this error as instructed")
msg = "raised this error as instructed"
raise RuntimeError(msg)

return 42

Expand All @@ -59,7 +62,8 @@ async def fast_background_task(task_progress: TaskProgress) -> int:

async def failing_background_task(task_progress: TaskProgress):
"""this task does nothing and returns a constant"""
raise RuntimeError("failing asap")
msg = "failing asap"
raise RuntimeError(msg)


TEST_CHECK_STALE_INTERVAL_S: Final[float] = 1
Expand All @@ -75,42 +79,37 @@ async def tasks_manager() -> AsyncIterator[TasksManager]:
await tasks_manager.close()


async def test_unchecked_task_is_auto_removed(tasks_manager: TasksManager):
@pytest.mark.parametrize("check_task_presence_before", [True, False])
async def test_task_is_auto_removed(
tasks_manager: TasksManager, check_task_presence_before: bool
):
task_id = start_task(
tasks_manager,
a_background_task,
raise_when_finished=False,
total_sleep=10 * TEST_CHECK_STALE_INTERVAL_S,
)
await asyncio.sleep(2 * TEST_CHECK_STALE_INTERVAL_S + 1)
async for attempt in AsyncRetrying(**_RETRY_PARAMS):
with attempt:
with pytest.raises(TaskNotFoundError):
tasks_manager.get_task_status(task_id, with_task_context=None)
with pytest.raises(TaskNotFoundError):
tasks_manager.get_task_result(task_id, with_task_context=None)
with pytest.raises(TaskNotFoundError):
tasks_manager.get_task_result_old(task_id)

if check_task_presence_before:
# immediately after starting the task is still there
task_status = tasks_manager.get_task_status(task_id, with_task_context=None)
assert task_status

async def test_checked_once_task_is_auto_removed(tasks_manager: TasksManager):
task_id = start_task(
tasks_manager,
a_background_task,
raise_when_finished=False,
total_sleep=10 * TEST_CHECK_STALE_INTERVAL_S,
)
# check once (different branch in code)
tasks_manager.get_task_status(task_id, with_task_context=None)
await asyncio.sleep(2 * TEST_CHECK_STALE_INTERVAL_S + 1)
# wait for task to be automatically removed
# meaning no calls via the manager methods are received
async for attempt in AsyncRetrying(**_RETRY_PARAMS):
with attempt:
with pytest.raises(TaskNotFoundError):
tasks_manager.get_task_status(task_id, with_task_context=None)
with pytest.raises(TaskNotFoundError):
tasks_manager.get_task_result(task_id, with_task_context=None)
with pytest.raises(TaskNotFoundError):
tasks_manager.get_task_result_old(task_id)
for tasks in tasks_manager._tasks_groups.values(): # noqa: SLF001
if task_id in tasks:
msg = "wait till no element is found any longer"
raise TryAgain(msg)

with pytest.raises(TaskNotFoundError):
tasks_manager.get_task_status(task_id, with_task_context=None)
with pytest.raises(TaskNotFoundError):
tasks_manager.get_task_result(task_id, with_task_context=None)
with pytest.raises(TaskNotFoundError):
tasks_manager.get_task_result_old(task_id)


async def test_checked_task_is_not_auto_removed(tasks_manager: TasksManager):
Expand Down Expand Up @@ -182,7 +181,9 @@ async def not_unique_task(task_progress: TaskProgress):


def test_get_task_id():
assert TasksManager._create_task_id("") != TasksManager._create_task_id("")
obj1 = TasksManager._create_task_id("") # noqa: SLF001
obj2 = TasksManager._create_task_id("") # noqa: SLF001
assert obj1 != obj2


async def test_get_status(tasks_manager: TasksManager):
Expand Down Expand Up @@ -248,7 +249,7 @@ async def test_get_result_old_finished_with_error(tasks_manager: TasksManager):
assert task_result.result is None
assert task_result.error is not None
assert task_result.error.startswith(f"Task {task_id} finished with exception:")
assert 'raise RuntimeError("failing asap")' in task_result.error
assert "failing asap" in task_result.error


async def test_get_result_task_was_cancelled_multiple_times(
Expand Down Expand Up @@ -364,7 +365,7 @@ async def test_list_tasks(tasks_manager: TasksManager):
NUM_TASKS = 10
task_ids = []
for _ in range(NUM_TASKS):
task_ids.append(
task_ids.append( # noqa: PERF401
start_task(
tasks_manager=tasks_manager,
task=a_background_task,
Expand Down