Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Robust Runner #109

Merged
merged 91 commits into from
Apr 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
91 commits
Select commit Hold shift + click to select a range
a9b88a3
cleaned up manual tests
maxfischer2781 Feb 28, 2022
b31fd50
made MetaRunner.runners private
maxfischer2781 Mar 1, 2022
ee8da7a
documented runner public entry point
maxfischer2781 Mar 1, 2022
2d65b95
black
maxfischer2781 Mar 1, 2022
9aaf46e
added asyncio.run backport
maxfischer2781 Mar 1, 2022
68acf06
simplified MetaRunner rerun protection
maxfischer2781 Mar 1, 2022
5cf960d
AsyncioRunner now accepts existing event loop
maxfischer2781 Mar 1, 2022
7f23a13
draft for running runners via asyncio
maxfischer2781 Mar 1, 2022
a7a27d3
launching runners in asyncio
maxfischer2781 Mar 1, 2022
216c023
queueing payloads in meta runner
maxfischer2781 Mar 1, 2022
0ee3239
removed queueing and sync from base runner
maxfischer2781 Mar 1, 2022
413dc3d
expanded BaseRunner interface
maxfischer2781 Mar 1, 2022
de94d3e
expanded async helpers
maxfischer2781 Mar 1, 2022
4a96cf4
changed AsyncioRunner to asyncio implementation
maxfischer2781 Mar 1, 2022
f45b8b5
all runners receive the asyncio loop
maxfischer2781 Mar 1, 2022
37f1060
all runners are created equal
maxfischer2781 Mar 1, 2022
23f5cd7
moved stop to base runner since it is generic enough
maxfischer2781 Mar 1, 2022
b527f5e
switched ThreadRunner to asyncio
maxfischer2781 Mar 1, 2022
b3cf0f8
removed old asyncio start code
maxfischer2781 Mar 1, 2022
d07e1d1
drafted trio on asyncio
maxfischer2781 Mar 1, 2022
25034d3
switched asyncio runner to queue model
maxfischer2781 Mar 1, 2022
de856b1
passing trio coroutines properly
maxfischer2781 Mar 1, 2022
95ae7f5
removed unused running flag
maxfischer2781 Mar 1, 2022
0aa7d32
added method to check if runner is ready
maxfischer2781 Mar 1, 2022
caa5123
initialising asyncio resources early
maxfischer2781 Mar 1, 2022
db7c260
trio signals ready
maxfischer2781 Mar 1, 2022
fbd6e1b
stopping a runner is idempotent
maxfischer2781 Mar 1, 2022
5830e4c
removed outdated test
maxfischer2781 Mar 1, 2022
c81ece7
removed faulty indirection in test
maxfischer2781 Mar 1, 2022
eeba44a
removed timing issue when starting trio runner
maxfischer2781 Mar 1, 2022
9c2621e
returning value
maxfischer2781 Mar 2, 2022
fe64d29
missed awaiting coroutine
maxfischer2781 Mar 2, 2022
97f6ebf
added waitable event to meta runner
maxfischer2781 Mar 2, 2022
f989c51
properly running meta runner during tests
maxfischer2781 Mar 2, 2022
fa08944
properly wrapping threads
maxfischer2781 Mar 2, 2022
99e25c1
metarunner shuts down children gracefully
maxfischer2781 Mar 2, 2022
cbb03f8
made runner aclose idempotent
maxfischer2781 Mar 2, 2022
0984656
the future is black
maxfischer2781 Mar 2, 2022
cc7cf94
3.6 has no means to shutdown executors
maxfischer2781 Mar 2, 2022
916442b
cleaned up imports
maxfischer2781 Mar 2, 2022
c3951e6
documented quadrupel wrap
maxfischer2781 Mar 2, 2022
582e39f
using original asyncio.run filter
maxfischer2781 Mar 2, 2022
08baa32
removed sleep obsoleted by ready mechanic
maxfischer2781 Mar 2, 2022
4a5e8dd
testing for running state directly
maxfischer2781 Mar 2, 2022
47405b4
moved public functions to top
maxfischer2781 Mar 2, 2022
c452a80
simplified metarunner stopping
maxfischer2781 Mar 2, 2022
69fe060
split code by responsibilities
maxfischer2781 Mar 2, 2022
1d689e7
removed dead code
maxfischer2781 Mar 2, 2022
e1d5716
split async tools to respective usages
maxfischer2781 Mar 2, 2022
793a9f6
removed outdated initialisation guards
maxfischer2781 Mar 2, 2022
d49247f
added explanations for all runners
maxfischer2781 Mar 2, 2022
988cece
updated codecov for src layout?
maxfischer2781 Mar 3, 2022
139d69a
revert coverage change, was the bot?
maxfischer2781 Mar 3, 2022
1bf99b8
Merge branch 'master' into maintenance/robust_runner
maxfischer2781 Mar 3, 2022
01d318d
removed paranoid safety check
maxfischer2781 Mar 3, 2022
92e7914
added test for payload failure
maxfischer2781 Mar 3, 2022
c2c50b1
added description to payload failure
maxfischer2781 Mar 3, 2022
0e62e61
unqueueing only when watching runners
maxfischer2781 Mar 3, 2022
70193e3
logging and discarding payloads during shutdown
maxfischer2781 Mar 3, 2022
8730f31
split failure test cases
maxfischer2781 Mar 3, 2022
e62fd0b
fixed race when service runner is shutdown before starting
maxfischer2781 Mar 3, 2022
1e3c008
cancelling trio task cancels trio
maxfischer2781 Mar 3, 2022
6f3a75b
tracked asyncio tasks no longer supress cancellation
maxfischer2781 Mar 3, 2022
313343c
removed duplicate handling of KI
maxfischer2781 Mar 3, 2022
fc3bceb
added long-running background task to cancel tests
maxfischer2781 Mar 3, 2022
d854a70
logging runner exceptions thoroughly
maxfischer2781 Mar 4, 2022
d981a6a
propagating cancellation out of trio runner
maxfischer2781 Mar 4, 2022
956f0e4
runner shutdown is shielded from cancellation
maxfischer2781 Mar 4, 2022
de0cf3d
accurately reproducing KI
maxfischer2781 Mar 4, 2022
5ce0f07
removed a stain from this world
maxfischer2781 Mar 4, 2022
b8fd282
expanded exceptions to log
maxfischer2781 Mar 4, 2022
bcce674
removed unused special method
maxfischer2781 Mar 4, 2022
f776e02
removed incomplete compat layer to turn awaitables into coroutines
maxfischer2781 Mar 4, 2022
782c6c9
the future is black
maxfischer2781 Mar 4, 2022
3f9f7d0
added background task to failure test
maxfischer2781 Mar 4, 2022
422b8f8
handling cancellation as finishing
maxfischer2781 Mar 4, 2022
e9e39ec
added descriptive names to test threads
maxfischer2781 Mar 4, 2022
37748ad
asyncio runner also cleans up if tasks remain
maxfischer2781 Mar 4, 2022
2b74a7e
service runner is cancelled more quietly
maxfischer2781 Mar 4, 2022
2cb23fc
Apply suggestions from code review
maxfischer2781 Mar 22, 2022
3f96852
clarified docstrings
maxfischer2781 Mar 22, 2022
d657959
enforced and documented method overrides in subclasses
maxfischer2781 Mar 22, 2022
993709e
Merge branch 'maintenance/robust_runner' of github.com:MatterMiners/c…
maxfischer2781 Mar 22, 2022
da0bfe5
documented internal running meaning
maxfischer2781 Mar 22, 2022
087c089
directly applying queued payloads
maxfischer2781 Mar 22, 2022
5e9be46
transmit failures via Future
maxfischer2781 Mar 22, 2022
c0faa08
pulled wrapper into class
maxfischer2781 Mar 22, 2022
825665b
testing thread service behaviour
maxfischer2781 Mar 22, 2022
2ac054b
removed KI indirection in asyncio runner
maxfischer2781 Mar 22, 2022
6d7c46a
suppress trio cancellation during asyncio shutdown
maxfischer2781 Mar 22, 2022
68b8369
Apply suggestions from review
maxfischer2781 Mar 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 53 additions & 11 deletions cobald_tests/daemon/test_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import asyncio
import contextlib
import logging
import signal
import os

import pytest

Expand All @@ -25,6 +27,7 @@ def accept(payload: ServiceRunner, name=None):
)
thread.start()
if not payload.running.wait(1):
payload.shutdown()
raise RuntimeError("%s failed to start" % payload)
try:
yield
Expand All @@ -33,24 +36,30 @@ def accept(payload: ServiceRunner, name=None):
thread.join()


class TestServiceRunner(object):
def test_no_tainting(self):
"""Assert that no payloads may be scheduled before starting"""
def sync_raise(what):
logging.info(f"raising {what}")
raise what

def payload():
return

runner = ServiceRunner()
runner._meta_runner.register_payload(payload, flavour=threading)
with pytest.raises(RuntimeError):
runner.accept()
async def async_raise(what):
sync_raise(what)


def sync_raise_signal(what):
logging.info(f"signal {what}")
os.kill(os.getpid(), what)


async def async_raise_signal(what):
sync_raise_signal(what)


class TestServiceRunner(object):
def test_unique_reaper(self):
"""Assert that no two runners may fetch services"""
with accept(ServiceRunner(accept_delay=0.1), name="outer"):
with pytest.raises(RuntimeError):
with accept(ServiceRunner(accept_delay=0.1), name="inner"):
pass
ServiceRunner(accept_delay=0.1).accept()

def test_service(self):
"""Test running service classes automatically"""
Expand Down Expand Up @@ -133,3 +142,36 @@ async def co_pingpong(what=default):
break
else:
assert len(reply_store) == 9

@pytest.mark.parametrize(
"flavour, do_sleep, do_raise",
(
(asyncio, asyncio.sleep, async_raise),
(trio, trio.sleep, async_raise),
(threading, time.sleep, sync_raise),
),
)
def test_error_reporting(self, flavour, do_sleep, do_raise):
"""Test that fatal errors do not pass silently"""
# errors should fail the entire runtime
runner = ServiceRunner(accept_delay=0.1)
runner.adopt(do_sleep, 5, flavour=flavour)
runner.adopt(do_raise, LookupError, flavour=flavour)
with pytest.raises(RuntimeError):
runner.accept()

@pytest.mark.parametrize(
"flavour, do_sleep, do_raise",
(
(asyncio, asyncio.sleep, async_raise_signal),
(trio, trio.sleep, async_raise_signal),
(threading, time.sleep, sync_raise_signal),
),
)
def test_interrupt(self, flavour, do_sleep, do_raise):
"""Test that KeyboardInterrupt/^C is graceful shutdown"""
runner = ServiceRunner(accept_delay=0.1)
runner.adopt(do_sleep, 5, flavour=flavour)
# signal.SIGINT == KeyboardInterrupt
runner.adopt(do_raise, signal.SIGINT, flavour=flavour)
runner.accept()
62 changes: 23 additions & 39 deletions cobald_tests/utility/concurrent/test_meta_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import pytest
import time
import asyncio
import contextlib

import trio

Expand All @@ -13,36 +14,22 @@ class TerminateRunner(Exception):
pass


def run_in_thread(payload, name, daemon=True):
thread = threading.Thread(target=payload, name=name, daemon=daemon)
@contextlib.contextmanager
def threaded_run(name=None):
runner = MetaRunner()
thread = threading.Thread(target=runner.run, name=name or str(runner), daemon=True)
thread.start()
time.sleep(0.0)
if not runner.running.wait(1):
runner.stop()
raise RuntimeError("%s failed to start" % runner)
try:
yield runner
finally:
runner.stop()
thread.join()


class TestMetaRunner(object):
def test_bool_payloads(self):
def subroutine():
time.sleep(0.5)

async def a_coroutine():
await asyncio.sleep(0.5)

async def t_coroutine():
await trio.sleep(0.5)

for flavour, payload in (
(threading, subroutine),
(asyncio, a_coroutine),
(trio, t_coroutine),
):
runner = MetaRunner()
assert not bool(runner)
runner.register_payload(payload, flavour=flavour)
assert bool(runner)
run_in_thread(runner.run, name="test_bool_payloads %s" % flavour)
assert bool(runner)
runner.stop()

@pytest.mark.parametrize("flavour", (threading,))
def test_run_subroutine(self, flavour):
"""Test executing a subroutine"""
Expand All @@ -53,11 +40,11 @@ def with_return():
def with_raise():
raise KeyError("expected exception")

runner = MetaRunner()
result = runner.run_payload(with_return, flavour=flavour)
assert result == with_return()
with pytest.raises(KeyError):
runner.run_payload(with_raise, flavour=flavour)
with threaded_run("test_run_subroutine") as runner:
result = runner.run_payload(with_return, flavour=flavour)
assert result == with_return()
with pytest.raises(KeyError):
runner.run_payload(with_raise, flavour=flavour)

@pytest.mark.parametrize("flavour", (asyncio, trio))
def test_run_coroutine(self, flavour):
Expand All @@ -69,13 +56,11 @@ async def with_return():
async def with_raise():
raise KeyError("expected exception")

runner = MetaRunner()
run_in_thread(runner.run, name="test_run_coroutine %s" % flavour)
result = runner.run_payload(with_return, flavour=flavour)
assert result == trio.run(with_return)
with pytest.raises(KeyError):
runner.run_payload(with_raise, flavour=flavour)
runner.stop()
with threaded_run("test_run_coroutine") as runner:
result = runner.run_payload(with_return, flavour=flavour)
assert result == trio.run(with_return)
with pytest.raises(KeyError):
runner.run_payload(with_raise, flavour=flavour)

@pytest.mark.parametrize("flavour", (threading,))
def test_return_subroutine(self, flavour):
Expand Down Expand Up @@ -151,7 +136,6 @@ async def loop():
await flavour.sleep(0)

runner = MetaRunner()

runner.register_payload(noop, loop, flavour=flavour)
runner.register_payload(abort, flavour=flavour)
with pytest.raises(RuntimeError) as exc:
Expand Down
51 changes: 51 additions & 0 deletions src/cobald/daemon/runners/_compat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import sys
import asyncio
import inspect


if sys.version_info >= (3, 7):
asyncio_run = asyncio.run
else:
# almost literal backport of asyncio.run
def asyncio_run(main, *, debug=None):
assert inspect.iscoroutine(main)
loop = asyncio.new_event_loop()
try:
asyncio.set_event_loop(loop)
if debug is not None:
loop.set_debug(debug)
return loop.run_until_complete(main)
finally:
try:
_cancel_all_tasks(loop)
loop.run_until_complete(loop.shutdown_asyncgens())
finally:
asyncio.set_event_loop(None)
loop.close()

def _cancel_all_tasks(loop):
to_cancel = asyncio.Task.all_tasks(loop)
if not to_cancel:
return
for task in to_cancel:
task.cancel()
loop.run_until_complete(asyncio.gather(*to_cancel, return_exceptions=True))
for task in to_cancel:
if task.cancelled():
continue
if task.exception() is not None:
loop.call_exception_handler(
{
"message": "unhandled exception during asyncio.run() shutdown",
"exception": task.exception(),
"task": task,
}
)


if sys.version_info >= (3, 7):
asyncio_current_task = asyncio.current_task
else:

def asyncio_current_task() -> asyncio.Task:
return asyncio.Task.current_task()
41 changes: 0 additions & 41 deletions src/cobald/daemon/runners/async_tools.py

This file was deleted.

Loading