From a9b88a3cb5337680134e4e141a444bfea6b0259f Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Mon, 28 Feb 2022 18:28:16 +0100 Subject: [PATCH 01/89] cleaned up manual tests --- src/cobald/daemon/runners/meta_runner.py | 38 ------------------------ 1 file changed, 38 deletions(-) diff --git a/src/cobald/daemon/runners/meta_runner.py b/src/cobald/daemon/runners/meta_runner.py index f6627c75..ccf4d70b 100644 --- a/src/cobald/daemon/runners/meta_runner.py +++ b/src/cobald/daemon/runners/meta_runner.py @@ -1,6 +1,5 @@ import logging import threading -import trio from types import ModuleType @@ -80,40 +79,3 @@ def _stop_runners(self): continue runner.stop() self.runners[threading].stop() - - -if __name__ == "__main__": - logging.basicConfig(level=logging.DEBUG) - import time - import asyncio - - runner = MetaRunner() - - async def trio_sleeper(): - for i in range(3): - print("trio\t", i) - await trio.sleep(0.1) - - runner.register_payload(trio_sleeper, flavour=trio) - - async def asyncio_sleeper(): - for i in range(3): - print("asyncio\t", i) - await asyncio.sleep(0.1) - - runner.register_payload(asyncio_sleeper, flavour=asyncio) - - def thread_sleeper(): - for i in range(3): - print("thread\t", i) - time.sleep(0.1) - - runner.register_payload(thread_sleeper, flavour=threading) - - async def teardown(): - await trio.sleep(5) - raise SystemExit("Abort from trio runner") - - runner.register_payload(teardown, flavour=trio) - - runner.run() From b31fd50529d1eae38010d480c47e444880424cd4 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Tue, 1 Mar 2022 09:52:35 +0100 Subject: [PATCH 02/89] made MetaRunner.runners private --- src/cobald/daemon/runners/meta_runner.py | 25 ++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/src/cobald/daemon/runners/meta_runner.py b/src/cobald/daemon/runners/meta_runner.py index ccf4d70b..593475a9 100644 --- a/src/cobald/daemon/runners/meta_runner.py +++ b/src/cobald/daemon/runners/meta_runner.py @@ -1,5 +1,6 @@ import logging import threading +import warnings from types import ModuleType @@ -22,15 +23,23 @@ class MetaRunner(object): def __init__(self): self._logger = logging.getLogger("cobald.runtime.runner.meta") - self.runners = { + self._runners = { runner.flavour: runner() for runner in self.runner_types } # type: dict[ModuleType, BaseRunner] self._lock = threading.Lock() self.running = threading.Event() self.running.clear() + @property + def runners(self): + warnings.warn(DeprecationWarning( + "Accessing 'MetaRunner.runners' directly is deprecated. " + "Use register_payload or run_payload with the correct flavour instead." + )) + return self._runners + def __bool__(self): - return any(bool(runner) for runner in self.runners.values()) + return any(bool(runner) for runner in self._runners.values()) def register_payload(self, *payloads, flavour: ModuleType): """Queue one or more payload for execution after its runner is started""" @@ -38,11 +47,11 @@ def register_payload(self, *payloads, flavour: ModuleType): self._logger.debug( "registering payload %s (%s)", NameRepr(payload), NameRepr(flavour) ) - self.runners[flavour].register_payload(payload) + self._runners[flavour].register_payload(payload) def run_payload(self, payload, *, flavour: ModuleType): """Execute one payload after its runner is started and return its output""" - return self.runners[flavour].run_payload(payload) + return self._runners[flavour].run_payload(payload) def run(self): """Run all runners, blocking until completion or error""" @@ -51,8 +60,8 @@ def run(self): with self._lock: assert not self.running.set(), "cannot re-run: %s" % self self.running.set() - thread_runner = self.runners[threading] - for runner in self.runners.values(): + thread_runner = self._runners[threading] + for runner in self._runners.values(): if runner is not thread_runner: thread_runner.register_payload(runner.run) if threading.current_thread() == threading.main_thread(): @@ -74,8 +83,8 @@ def stop(self): self._stop_runners() def _stop_runners(self): - for runner in self.runners.values(): + for runner in self._runners.values(): if runner.flavour == threading: continue runner.stop() - self.runners[threading].stop() + self._runners[threading].stop() From ee8da7a1f605e710bd9748cf84f38dfdbbc4d441 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Tue, 1 Mar 2022 10:56:55 +0100 Subject: [PATCH 03/89] documented runner public entry point --- src/cobald/daemon/runners/service.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/cobald/daemon/runners/service.py b/src/cobald/daemon/runners/service.py index fa10ecc0..4a714f7b 100644 --- a/src/cobald/daemon/runners/service.py +++ b/src/cobald/daemon/runners/service.py @@ -110,6 +110,15 @@ def __new_service__(cls, *args, **kwargs): class ServiceRunner(object): """ Runner for coroutines, subroutines and services + + The service runner provides safe concurrency by tracking concurrent tasks + to prevent silent failures. If any task fails with an exception or provides + unexpected output values, this is registered as an error; the runner will + gracefully shut down all tasks in this case. + + In order to provide ``async`` concurrency, the runner also manages common + ``async`` event loops and tracks them for failures as well. As a result, + ``async`` code should usually use the "current" event loop directly. """ def __init__(self, accept_delay: float = 1): From 2d65b95d1807d60065ad210f798363e42cda03f9 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Tue, 1 Mar 2022 11:33:35 +0100 Subject: [PATCH 04/89] black --- src/cobald/daemon/runners/meta_runner.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/cobald/daemon/runners/meta_runner.py b/src/cobald/daemon/runners/meta_runner.py index 593475a9..bb33dcc9 100644 --- a/src/cobald/daemon/runners/meta_runner.py +++ b/src/cobald/daemon/runners/meta_runner.py @@ -32,10 +32,12 @@ def __init__(self): @property def runners(self): - warnings.warn(DeprecationWarning( - "Accessing 'MetaRunner.runners' directly is deprecated. " - "Use register_payload or run_payload with the correct flavour instead." - )) + warnings.warn( + DeprecationWarning( + "Accessing 'MetaRunner.runners' directly is deprecated. " + "Use register_payload or run_payload with the correct flavour instead." + ) + ) return self._runners def __bool__(self): From 9aaf46e4f103749a60a7ece7ebbf8e4b9c275532 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Tue, 1 Mar 2022 11:36:03 +0100 Subject: [PATCH 05/89] added asyncio.run backport --- src/cobald/daemon/runners/_compat.py | 42 ++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 src/cobald/daemon/runners/_compat.py diff --git a/src/cobald/daemon/runners/_compat.py b/src/cobald/daemon/runners/_compat.py new file mode 100644 index 00000000..1a7708f3 --- /dev/null +++ b/src/cobald/daemon/runners/_compat.py @@ -0,0 +1,42 @@ +import sys +import asyncio +import inspect + + +if sys.version_info >= (3, 7): + asyncio_run = asyncio.run +else: + # almost literal backport of asyncio.run + def asyncio_run(main, *, debug=None): + assert inspect.iscoroutine(main) + loop = asyncio.new_event_loop() + try: + asyncio.set_event_loop(loop) + if debug is not None: + loop.set_debug(debug) + return loop.run_until_complete(main) + finally: + try: + _cancel_all_tasks(loop) + loop.run_until_complete(loop.shutdown_asyncgens()) + loop.run_until_complete(loop.shutdown_default_executor()) + finally: + asyncio.set_event_loop(None) + loop.close() + + def _cancel_all_tasks(loop): + to_cancel = asyncio.Task.all_tasks(loop) + if not to_cancel: + return + for task in to_cancel: + task.cancel() + loop.run_until_complete(asyncio.gather(*to_cancel, return_exceptions=True)) + for task in to_cancel: + if task.exception() is not None and not task.cancelled(): + loop.call_exception_handler( + { + "message": "unhandled exception during asyncio.run() shutdown", + "exception": task.exception(), + "task": task, + } + ) From 68acf0633503c76795a419f03da4ed0daf91b124 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Tue, 1 Mar 2022 12:05:11 +0100 Subject: [PATCH 06/89] simplified MetaRunner rerun protection --- src/cobald/daemon/runners/meta_runner.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/cobald/daemon/runners/meta_runner.py b/src/cobald/daemon/runners/meta_runner.py index bb33dcc9..6493f138 100644 --- a/src/cobald/daemon/runners/meta_runner.py +++ b/src/cobald/daemon/runners/meta_runner.py @@ -27,8 +27,6 @@ def __init__(self): runner.flavour: runner() for runner in self.runner_types } # type: dict[ModuleType, BaseRunner] self._lock = threading.Lock() - self.running = threading.Event() - self.running.clear() @property def runners(self): @@ -58,10 +56,8 @@ def run_payload(self, payload, *, flavour: ModuleType): def run(self): """Run all runners, blocking until completion or error""" self._logger.info("starting all runners") + self._lock.acquire() try: - with self._lock: - assert not self.running.set(), "cannot re-run: %s" % self - self.running.set() thread_runner = self._runners[threading] for runner in self._runners.values(): if runner is not thread_runner: @@ -78,7 +74,7 @@ def run(self): finally: self._stop_runners() self._logger.info("stopped all runners") - self.running.clear() + self._lock.release() def stop(self): """Stop all runners""" From 5cf960dafb3fc41bc7b6c3ee9a3c01278b2cfdef Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Tue, 1 Mar 2022 12:09:20 +0100 Subject: [PATCH 07/89] AsyncioRunner now accepts existing event loop --- src/cobald/daemon/runners/asyncio_runner.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/cobald/daemon/runners/asyncio_runner.py b/src/cobald/daemon/runners/asyncio_runner.py index f71697bc..7511d62e 100644 --- a/src/cobald/daemon/runners/asyncio_runner.py +++ b/src/cobald/daemon/runners/asyncio_runner.py @@ -1,3 +1,4 @@ +from typing import Optional import asyncio from functools import partial @@ -10,9 +11,9 @@ class AsyncioRunner(BaseRunner): flavour = asyncio - def __init__(self): + def __init__(self, loop: Optional[asyncio.AbstractEventLoop] = None): super().__init__() - self.event_loop = asyncio.new_event_loop() + self.event_loop = loop if loop is not None else asyncio.new_event_loop() self._tasks = set() def register_payload(self, payload): From 7f23a13a250c2ea44240002c08be8c4674e85b8a Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Tue, 1 Mar 2022 12:41:24 +0100 Subject: [PATCH 08/89] draft for running runners via asyncio --- src/cobald/daemon/runners/meta_runner.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/cobald/daemon/runners/meta_runner.py b/src/cobald/daemon/runners/meta_runner.py index 6493f138..11594676 100644 --- a/src/cobald/daemon/runners/meta_runner.py +++ b/src/cobald/daemon/runners/meta_runner.py @@ -1,6 +1,7 @@ import logging import threading import warnings +import asyncio from types import ModuleType @@ -53,6 +54,21 @@ def run_payload(self, payload, *, flavour: ModuleType): """Execute one payload after its runner is started and return its output""" return self._runners[flavour].run_payload(payload) + async def _launch_runners(self): + """Launch all runners inside an `asyncio` event loop and wait for them""" + asyncio_loop = asyncio.get_event_loop() + # we are already running asyncio – just wrap it as a runner + runners = {asyncio: AsyncioRunner(asyncio_loop)} + # launch other runners in asyncio's thread pool + # this blocks some threads of the pool, but we have only very few runners + runner_tasks = [] + for runner_type in self.runner_types: + if runner_type.flavour in runners: + continue + runner = runners[runner_type.flavour] = runner_type() + runner_tasks.append(asyncio_loop.run_in_executor(None, runner.run)) + await asyncio.gather(*runner_tasks) + def run(self): """Run all runners, blocking until completion or error""" self._logger.info("starting all runners") From a7a27d337bd54abf2c6e6c028cef0d6016d6c3a8 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Tue, 1 Mar 2022 13:49:42 +0100 Subject: [PATCH 09/89] launching runners in asyncio --- src/cobald/daemon/runners/meta_runner.py | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/src/cobald/daemon/runners/meta_runner.py b/src/cobald/daemon/runners/meta_runner.py index 11594676..3cd0ab15 100644 --- a/src/cobald/daemon/runners/meta_runner.py +++ b/src/cobald/daemon/runners/meta_runner.py @@ -9,7 +9,7 @@ from .trio_runner import TrioRunner from .asyncio_runner import AsyncioRunner from .thread_runner import ThreadRunner -from .asyncio_watcher import asyncio_main_run +from ._compat import asyncio_run from cobald.daemon.debug import NameRepr @@ -74,14 +74,7 @@ def run(self): self._logger.info("starting all runners") self._lock.acquire() try: - thread_runner = self._runners[threading] - for runner in self._runners.values(): - if runner is not thread_runner: - thread_runner.register_payload(runner.run) - if threading.current_thread() == threading.main_thread(): - asyncio_main_run(root_runner=thread_runner) - else: - thread_runner.run() + asyncio_run(self._launch_runners()) except KeyboardInterrupt: self._logger.info("runner interrupted") except Exception as err: From 216c023838cf8f89ceabde3dbff54bb209a869b4 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Tue, 1 Mar 2022 13:50:00 +0100 Subject: [PATCH 10/89] queueing payloads in meta runner --- src/cobald/daemon/runners/meta_runner.py | 44 ++++++++++++++++++------ 1 file changed, 34 insertions(+), 10 deletions(-) diff --git a/src/cobald/daemon/runners/meta_runner.py b/src/cobald/daemon/runners/meta_runner.py index 3cd0ab15..1c81464d 100644 --- a/src/cobald/daemon/runners/meta_runner.py +++ b/src/cobald/daemon/runners/meta_runner.py @@ -1,3 +1,4 @@ +from typing import Dict, Any import logging import threading import warnings @@ -24,9 +25,8 @@ class MetaRunner(object): def __init__(self): self._logger = logging.getLogger("cobald.runtime.runner.meta") - self._runners = { - runner.flavour: runner() for runner in self.runner_types - } # type: dict[ModuleType, BaseRunner] + self._runners: Dict[ModuleType, BaseRunner] = {} + self._runner_queues: Dict[ModuleType, Any] = {} self._lock = threading.Lock() @property @@ -43,17 +43,39 @@ def __bool__(self): return any(bool(runner) for runner in self._runners.values()) def register_payload(self, *payloads, flavour: ModuleType): - """Queue one or more payload for execution after its runner is started""" - for payload in payloads: - self._logger.debug( - "registering payload %s (%s)", NameRepr(payload), NameRepr(flavour) - ) - self._runners[flavour].register_payload(payload) + """Queue one or more payloads for execution after its runner is started""" + try: + runner = self._runners[flavour] + except KeyError: + if self._runners: + raise RuntimeError(f"unknown runner {NameRepr(flavour)}") from None + self._runner_queues.setdefault(flavour, []).extend(payloads) + else: + for payload in payloads: + self._logger.debug( + "registering payload %s (%s)", NameRepr(payload), NameRepr(flavour) + ) + runner.register_payload(payload) def run_payload(self, payload, *, flavour: ModuleType): - """Execute one payload after its runner is started and return its output""" + """ + Execute one payload and return its output + + This method will block until the payload is completed. To avoid deadlocks, + it is an error to call it during initialisation before the runners are started. + """ return self._runners[flavour].run_payload(payload) + async def _unqueue_payloads(self): + """Register payloads once runners are started""" + assert self._runners, "runners must be launched before unqueueing" + await asyncio.sleep(0) + # runners are started already, so no new payloads can be registered + for flavour, queue in self._runner_queues.items(): + self.register_payload(*queue, flavour=flavour) + queue.clear() + self._runner_queues.clear() + async def _launch_runners(self): """Launch all runners inside an `asyncio` event loop and wait for them""" asyncio_loop = asyncio.get_event_loop() @@ -67,6 +89,8 @@ async def _launch_runners(self): continue runner = runners[runner_type.flavour] = runner_type() runner_tasks.append(asyncio_loop.run_in_executor(None, runner.run)) + self._runners = runners + await self._unqueue_payloads() await asyncio.gather(*runner_tasks) def run(self): From 0ee3239136b4d0e5c83a014d7cd7bac07502f0a0 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Tue, 1 Mar 2022 14:13:41 +0100 Subject: [PATCH 11/89] removed queueing and sync from base runner --- src/cobald/daemon/runners/base_runner.py | 49 ++++++++---------------- 1 file changed, 16 insertions(+), 33 deletions(-) diff --git a/src/cobald/daemon/runners/base_runner.py b/src/cobald/daemon/runners/base_runner.py index 2bff4a88..bc527b00 100644 --- a/src/cobald/daemon/runners/base_runner.py +++ b/src/cobald/daemon/runners/base_runner.py @@ -12,74 +12,57 @@ def __init__(self): self._logger = logging.getLogger( "cobald.runtime.runner.%s" % NameRepr(self.flavour) ) - self._payloads = [] - self._lock = threading.Lock() - #: signal that runner should keep in running - self.running = threading.Event() - #: signal that runner has stopped + #: signal that runner is stopped self._stopped = threading.Event() - self.running.clear() self._stopped.set() - def __bool__(self): - with self._lock: - return bool(self._payloads) or self.running.is_set() - def register_payload(self, payload): """ - Register ``payload`` for asynchronous execution + Register ``payload`` for background execution in a threadsafe manner This runs ``payload`` as an orphaned background task as soon as possible. It is an error for ``payload`` to return or raise anything without handling it. """ - with self._lock: - self._payloads.append(payload) + raise NotImplementedError def run_payload(self, payload): """ - Register ``payload`` for synchronous execution + Register ``payload`` for direct execution in a threadsafe manner This runs ``payload`` as soon as possible, blocking until completion. Should ``payload`` return or raise anything, it is propagated to the caller. """ raise NotImplementedError - def run(self): + async def run(self): """ - Execute all current and future payloads + Execute all current and future payloads in an `asyncio` task Blocks and executes payloads until :py:meth:`stop` is called. It is an error for any orphaned payload to return or raise. + + Implementations should override :py:meth:`~.run_payloads` + to customize their specific parts. """ self._logger.info("runner started: %s", self) + self._stopped.clear() try: - with self._lock: - assert not self.running.is_set() and self._stopped.is_set(), ( - "cannot re-run: %s" % self - ) - self.running.set() - self._stopped.clear() - self._run() + await self.run_payloads() except Exception: self._logger.exception("runner aborted: %s", self) raise else: self._logger.info("runner stopped: %s", self) finally: - with self._lock: - self.running.clear() - self._stopped.set() + self._stopped.set() - def _run(self): + async def run_payloads(self): raise NotImplementedError def stop(self): - """Stop execution of all current and future payloads""" - if not self.running.wait(0.2): - return - self._logger.debug("runner disabled: %s", self) - with self._lock: - self.running.clear() + """ + Stop execution of all current and future payloads and wait for success + """ self._stopped.wait() From 413dc3d924250e0049050e17a1af300bc6122597 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Tue, 1 Mar 2022 15:18:30 +0100 Subject: [PATCH 12/89] expanded BaseRunner interface --- src/cobald/daemon/runners/base_runner.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/cobald/daemon/runners/base_runner.py b/src/cobald/daemon/runners/base_runner.py index bc527b00..ecf68b13 100644 --- a/src/cobald/daemon/runners/base_runner.py +++ b/src/cobald/daemon/runners/base_runner.py @@ -12,7 +12,7 @@ def __init__(self): self._logger = logging.getLogger( "cobald.runtime.runner.%s" % NameRepr(self.flavour) ) - #: signal that runner is stopped + self._running = threading.Event() self._stopped = threading.Event() self._stopped.set() @@ -41,24 +41,29 @@ async def run(self): Blocks and executes payloads until :py:meth:`stop` is called. It is an error for any orphaned payload to return or raise. - Implementations should override :py:meth:`~.run_payloads` + Implementations should override :py:meth:`~.manage_payloads` to customize their specific parts. """ self._logger.info("runner started: %s", self) self._stopped.clear() + self._running.set() try: - await self.run_payloads() + await self.manage_payloads() except Exception: self._logger.exception("runner aborted: %s", self) raise else: self._logger.info("runner stopped: %s", self) finally: + self._running.clear() self._stopped.set() - async def run_payloads(self): + async def manage_payloads(self): raise NotImplementedError + async def aclose(self): + """Shut down this runner""" + def stop(self): """ Stop execution of all current and future payloads and wait for success From de94d3e4113dbb5569fdb80ece84f8037542b7bb Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Tue, 1 Mar 2022 15:18:50 +0100 Subject: [PATCH 13/89] expanded async helpers --- src/cobald/daemon/runners/async_tools.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/src/cobald/daemon/runners/async_tools.py b/src/cobald/daemon/runners/async_tools.py index e2f29f2b..8fb67691 100644 --- a/src/cobald/daemon/runners/async_tools.py +++ b/src/cobald/daemon/runners/async_tools.py @@ -1,14 +1,27 @@ +from typing import Callable, Awaitable, Coroutine import threading + from .base_runner import OrphanedReturn -async def raise_return(payload): +async def raise_return(payload: Callable[[], Awaitable]) -> None: """Wrapper to raise exception on unhandled return values""" value = await payload() if value is not None: raise OrphanedReturn(payload, value) +def ensure_coroutine(awaitable: Awaitable) -> Coroutine: + """Ensure that ``awaitable`` is a coroutine and wrap it otherwise""" + if isinstance(awaitable, Coroutine): + return awaitable + + async def wrapper(): + return await awaitable + + return wrapper() + + class AsyncExecution(object): def __init__(self, payload): self.payload = payload From 4a96cf4e4cb7a5362c0fe95b2e84e9e81ef8b83c Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Tue, 1 Mar 2022 15:19:32 +0100 Subject: [PATCH 14/89] changed AsyncioRunner to asyncio implementation --- src/cobald/daemon/runners/asyncio_runner.py | 100 ++++++++------------ 1 file changed, 37 insertions(+), 63 deletions(-) diff --git a/src/cobald/daemon/runners/asyncio_runner.py b/src/cobald/daemon/runners/asyncio_runner.py index 7511d62e..89721e47 100644 --- a/src/cobald/daemon/runners/asyncio_runner.py +++ b/src/cobald/daemon/runners/asyncio_runner.py @@ -1,9 +1,8 @@ -from typing import Optional +from typing import Optional, Callable, Awaitable import asyncio -from functools import partial from .base_runner import BaseRunner -from .async_tools import raise_return, AsyncExecution +from .async_tools import raise_return, ensure_coroutine class AsyncioRunner(BaseRunner): @@ -16,67 +15,42 @@ def __init__(self, loop: Optional[asyncio.AbstractEventLoop] = None): self.event_loop = loop if loop is not None else asyncio.new_event_loop() self._tasks = set() - def register_payload(self, payload): - super().register_payload(partial(raise_return, payload)) - - def run_payload(self, payload): - execution = AsyncExecution(payload) - super().register_payload(execution.coroutine) - return execution.wait() - - def _run(self): - asyncio.set_event_loop(self.event_loop) - self.event_loop.run_until_complete(self._run_payloads()) - - async def _run_payloads(self): - """Async component of _run""" - delay = 0.0 - try: - while self.running.is_set(): - await self._start_payloads() - await self._reap_payloads() - await asyncio.sleep(delay) - delay = min(delay + 0.1, 1.0) - except Exception: - await self._cancel_payloads() - raise - - async def _start_payloads(self): - """Start all queued payloads""" - with self._lock: - for coroutine in self._payloads: - task = self.event_loop.create_task(coroutine()) - self._tasks.add(task) - self._payloads.clear() - await asyncio.sleep(0) - - async def _reap_payloads(self): - """Clean up all finished payloads""" - for task in self._tasks.copy(): - if task.done(): - self._tasks.remove(task) - if task.exception() is not None: - raise task.exception() - await asyncio.sleep(0) - - async def _cancel_payloads(self): - """Cancel all remaining payloads""" - for task in self._tasks: - task.cancel() - await asyncio.sleep(0) - for task in self._tasks: - while not task.done(): - await asyncio.sleep(0.1) + def register_payload(self, payload: Callable[[], Awaitable]): + self.event_loop.call_soon_threadsafe( + lambda: self._tasks.add(self.event_loop.create_task(raise_return(payload))) + ) + + def run_payload(self, payload: Callable[[], Awaitable]): + future = asyncio.run_coroutine_threadsafe( + ensure_coroutine(payload()), self.event_loop + ) + return future.result() + + async def manage_payloads(self): + # Remove tracked tasks and raise if tasks leak + while self._tasks or self._running.is_set(): + # let asyncio efficiently wait for errors + # we only force wake up via timeout every now and then to clean up + done, pending = await asyncio.wait( + self._tasks, timeout=60, return_when=asyncio.FIRST_EXCEPTION + ) + self._tasks.difference_update(done) + for task in done: + # re-raise any exceptions + task.result() + + async def aclose(self): + self._running.clear() + while self._tasks: + for task in self._tasks.copy(): + if task.done(): + self._tasks.remove(task) task.cancel() + await asyncio.sleep(0.5) def stop(self): - if not self.running.wait(0.2): + if not self._running.wait(0.2): return - self._logger.debug("runner disabled: %s", self) - with self._lock: - self.running.clear() - for task in self._tasks: - task.cancel() - self._stopped.wait() - self.event_loop.stop() - self.event_loop.close() + # the loop exists independently of this runner, we can use it to shut down + closed = asyncio.run_coroutine_threadsafe(self.aclose(), self.event_loop) + closed.result() From f45b8b5f05eed57af8f087e0bfbb6378ac30da88 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Tue, 1 Mar 2022 17:01:34 +0100 Subject: [PATCH 15/89] all runners receive the asyncio loop --- src/cobald/daemon/runners/asyncio_runner.py | 13 ++++++------- src/cobald/daemon/runners/base_runner.py | 5 ++++- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/cobald/daemon/runners/asyncio_runner.py b/src/cobald/daemon/runners/asyncio_runner.py index 89721e47..63ad0475 100644 --- a/src/cobald/daemon/runners/asyncio_runner.py +++ b/src/cobald/daemon/runners/asyncio_runner.py @@ -10,19 +10,18 @@ class AsyncioRunner(BaseRunner): flavour = asyncio - def __init__(self, loop: Optional[asyncio.AbstractEventLoop] = None): - super().__init__() - self.event_loop = loop if loop is not None else asyncio.new_event_loop() + def __init__(self, asyncio_loop: asyncio.AbstractEventLoop): + super().__init__(asyncio_loop) self._tasks = set() def register_payload(self, payload: Callable[[], Awaitable]): - self.event_loop.call_soon_threadsafe( - lambda: self._tasks.add(self.event_loop.create_task(raise_return(payload))) + self.asyncio_loop.call_soon_threadsafe( + lambda: self._tasks.add(self.asyncio_loop.create_task(raise_return(payload))) ) def run_payload(self, payload: Callable[[], Awaitable]): future = asyncio.run_coroutine_threadsafe( - ensure_coroutine(payload()), self.event_loop + ensure_coroutine(payload()), self.asyncio_loop ) return future.result() @@ -52,5 +51,5 @@ def stop(self): if not self._running.wait(0.2): return # the loop exists independently of this runner, we can use it to shut down - closed = asyncio.run_coroutine_threadsafe(self.aclose(), self.event_loop) + closed = asyncio.run_coroutine_threadsafe(self.aclose(), self.asyncio_loop) closed.result() diff --git a/src/cobald/daemon/runners/base_runner.py b/src/cobald/daemon/runners/base_runner.py index ecf68b13..c6204bb9 100644 --- a/src/cobald/daemon/runners/base_runner.py +++ b/src/cobald/daemon/runners/base_runner.py @@ -1,3 +1,4 @@ +import asyncio import logging import threading from typing import Any @@ -6,9 +7,11 @@ class BaseRunner(object): + """Concurrency backend on top of `asyncio`""" flavour = None # type: Any - def __init__(self): + def __init__(self, asyncio_loop: asyncio.AbstractEventLoop): + self.asyncio_loop = asyncio_loop self._logger = logging.getLogger( "cobald.runtime.runner.%s" % NameRepr(self.flavour) ) From 37f1060c3418d64c7522579a26dbe8f79c466797 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Tue, 1 Mar 2022 17:04:03 +0100 Subject: [PATCH 16/89] all runners are created equal --- src/cobald/daemon/runners/meta_runner.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/src/cobald/daemon/runners/meta_runner.py b/src/cobald/daemon/runners/meta_runner.py index 1c81464d..e3ae96a9 100644 --- a/src/cobald/daemon/runners/meta_runner.py +++ b/src/cobald/daemon/runners/meta_runner.py @@ -79,17 +79,11 @@ async def _unqueue_payloads(self): async def _launch_runners(self): """Launch all runners inside an `asyncio` event loop and wait for them""" asyncio_loop = asyncio.get_event_loop() - # we are already running asyncio – just wrap it as a runner - runners = {asyncio: AsyncioRunner(asyncio_loop)} - # launch other runners in asyncio's thread pool - # this blocks some threads of the pool, but we have only very few runners + self._runners = {} runner_tasks = [] for runner_type in self.runner_types: - if runner_type.flavour in runners: - continue - runner = runners[runner_type.flavour] = runner_type() - runner_tasks.append(asyncio_loop.run_in_executor(None, runner.run)) - self._runners = runners + runner = self._runners[runner_type.flavour] = runner_type(asyncio_loop) + runner_tasks.append(asyncio_loop.create_task(runner.run())) await self._unqueue_payloads() await asyncio.gather(*runner_tasks) From 23f5cd71c4664f60ebf61c83da949545c90bfdda Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Tue, 1 Mar 2022 17:25:20 +0100 Subject: [PATCH 17/89] moved stop to base runner since it is generic enough --- src/cobald/daemon/runners/asyncio_runner.py | 7 ------- src/cobald/daemon/runners/base_runner.py | 11 +++++++---- 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/src/cobald/daemon/runners/asyncio_runner.py b/src/cobald/daemon/runners/asyncio_runner.py index 63ad0475..099039bc 100644 --- a/src/cobald/daemon/runners/asyncio_runner.py +++ b/src/cobald/daemon/runners/asyncio_runner.py @@ -46,10 +46,3 @@ async def aclose(self): self._tasks.remove(task) task.cancel() await asyncio.sleep(0.5) - - def stop(self): - if not self._running.wait(0.2): - return - # the loop exists independently of this runner, we can use it to shut down - closed = asyncio.run_coroutine_threadsafe(self.aclose(), self.asyncio_loop) - closed.result() diff --git a/src/cobald/daemon/runners/base_runner.py b/src/cobald/daemon/runners/base_runner.py index c6204bb9..c46d5f83 100644 --- a/src/cobald/daemon/runners/base_runner.py +++ b/src/cobald/daemon/runners/base_runner.py @@ -68,10 +68,13 @@ async def aclose(self): """Shut down this runner""" def stop(self): - """ - Stop execution of all current and future payloads and wait for success - """ - self._stopped.wait() + """Stop execution of all current and future payloads and block until success""" + if not self._running.wait(0.2): + return + # the loop exists independently of all runners, we can use it to shut down + closed = asyncio.run_coroutine_threadsafe(self.aclose(), self.asyncio_loop) + closed.result() + class OrphanedReturn(Exception): From b527f5efd7425d93cdf9815e504a0143fcc5c0f8 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Tue, 1 Mar 2022 17:25:40 +0100 Subject: [PATCH 18/89] switched ThreadRunner to asyncio --- src/cobald/daemon/runners/thread_runner.py | 57 +++++++++++----------- 1 file changed, 28 insertions(+), 29 deletions(-) diff --git a/src/cobald/daemon/runners/thread_runner.py b/src/cobald/daemon/runners/thread_runner.py index df5b6641..a4e2b024 100644 --- a/src/cobald/daemon/runners/thread_runner.py +++ b/src/cobald/daemon/runners/thread_runner.py @@ -1,5 +1,6 @@ +from typing import Optional import threading -import time +import asyncio from ..debug import NameRepr from .base_runner import BaseRunner, OrphanedReturn @@ -42,9 +43,13 @@ class ThreadRunner(BaseRunner): flavour = threading - def __init__(self): - super().__init__() - self._threads = set() + def __init__(self, asyncio_loop: asyncio.AbstractEventLoop): + super().__init__(asyncio_loop) + self._failure_queue: Optional[asyncio.Queue] = None + + def register_payload(self, payload): + thread = threading.Thread(target=self.run_payload, args=(payload,), daemon=True) + thread.start() def run_payload(self, payload): # - run_payload has to block until payload is done @@ -52,30 +57,24 @@ def run_payload(self, payload): # we just block this thread by running payload directly return payload() - def _run(self): - delay = 0.0 - while self.running.is_set(): - self._start_payloads() - self._reap_payloads() - time.sleep(delay) - delay = min(delay + 0.1, 1.0) + def _monitor_payload(self, payload): + try: + result = payload() + except BaseException as e: + failure = e + else: + if result is None: + return + failure = OrphanedReturn(payload, result) + assert self._failure_queue is not None + self.asyncio_loop.call_soon_threadsafe(self._failure_queue.put_nowait, failure) - def _start_payloads(self): - """Start all queued payloads""" - with self._lock: - payloads = self._payloads.copy() - self._payloads.clear() - for subroutine in payloads: - thread = CapturingThread(target=subroutine) - thread.start() - self._threads.add(thread) - self._logger.debug("booted thread %s", thread) - time.sleep(0) + async def manage_payloads(self): + self._failure_queue = asyncio.Queue() + failure = await self._failure_queue.get() + if failure is not None: + raise failure - def _reap_payloads(self): - """Clean up all finished payloads""" - for thread in self._threads.copy(): - # CapturingThread.join will throw - if thread.join(timeout=0): - self._threads.remove(thread) - self._logger.debug("reaped thread %s", thread) + async def aclose(self): + self._running.clear() + await self._failure_queue.put(None) From b3cf0f81dd0e00186380ade681804cd59b296bdd Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Tue, 1 Mar 2022 17:26:53 +0100 Subject: [PATCH 19/89] removed old asyncio start code --- src/cobald/daemon/runners/asyncio_watcher.py | 41 -------------------- src/cobald/daemon/runners/thread_runner.py | 33 ---------------- 2 files changed, 74 deletions(-) delete mode 100644 src/cobald/daemon/runners/asyncio_watcher.py diff --git a/src/cobald/daemon/runners/asyncio_watcher.py b/src/cobald/daemon/runners/asyncio_watcher.py deleted file mode 100644 index f859198f..00000000 --- a/src/cobald/daemon/runners/asyncio_watcher.py +++ /dev/null @@ -1,41 +0,0 @@ -import asyncio -import threading -import sys - -from .base_runner import BaseRunner -from .thread_runner import CapturingThread - - -async def awaitable_runner(runner: BaseRunner): - """Execute a runner without blocking the event loop""" - runner_thread = CapturingThread(target=runner.run) - runner_thread.start() - delay = 0.0 - while not runner_thread.join(timeout=0): - await asyncio.sleep(delay) - delay = min(delay + 0.1, 1.0) - - -def asyncio_main_run(root_runner: BaseRunner): - """ - Create an ``asyncio`` event loop running in the main thread and watching runners - - Using ``asyncio`` to handle subprocesses requires a specific loop type - to run in the main thread. - This function sets up and runs the correct loop in a portable way. - In addition, it runs a single :py:class:`~.BaseRunner` until completion - or failure. - - .. seealso:: The `issue #8 `_ - for details. - """ - assert ( - threading.current_thread() == threading.main_thread() - ), "only main thread can accept asyncio subprocesses" - if sys.platform == "win32": - event_loop = asyncio.ProactorEventLoop() - asyncio.set_event_loop(event_loop) - else: - event_loop = asyncio.get_event_loop() - asyncio.get_child_watcher().attach_loop(event_loop) - event_loop.run_until_complete(awaitable_runner(root_runner)) diff --git a/src/cobald/daemon/runners/thread_runner.py b/src/cobald/daemon/runners/thread_runner.py index a4e2b024..315a25fb 100644 --- a/src/cobald/daemon/runners/thread_runner.py +++ b/src/cobald/daemon/runners/thread_runner.py @@ -2,42 +2,9 @@ import threading import asyncio -from ..debug import NameRepr from .base_runner import BaseRunner, OrphanedReturn -class CapturingThread(threading.Thread): - """ - Daemonic threads that capture any return value or exception from their ``target`` - """ - - def __init__(self, **kwargs): - super().__init__(**kwargs, daemon=True) - self._exception = None - self._name = str(NameRepr(self._target)) - - def join(self, timeout=None): - super().join(timeout=timeout) - if self._started.is_set() and not self.is_alive(): - if self._exception is not None: - raise self._exception - return not self.is_alive() - - def run(self): - """Modified ``run`` that captures return value and exceptions from ``target``""" - try: - if self._target: - return_value = self._target(*self._args, **self._kwargs) - if return_value is not None: - self._exception = OrphanedReturn(self, return_value) - except BaseException as err: - self._exception = err - finally: - # Avoid a refcycle if the thread is running a function with - # an argument that has a member that points to the thread. - del self._target, self._args, self._kwargs - - class ThreadRunner(BaseRunner): """Runner for subroutines with :py:mod:`threading`""" From d07e1d15c3d3c5e40c4158fdbd1ecedc85b69bd6 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Tue, 1 Mar 2022 18:15:45 +0100 Subject: [PATCH 20/89] drafted trio on asyncio --- src/cobald/daemon/runners/trio_runner.py | 80 ++++++++++++++---------- 1 file changed, 46 insertions(+), 34 deletions(-) diff --git a/src/cobald/daemon/runners/trio_runner.py b/src/cobald/daemon/runners/trio_runner.py index 6d6c11df..eccd5e5f 100644 --- a/src/cobald/daemon/runners/trio_runner.py +++ b/src/cobald/daemon/runners/trio_runner.py @@ -1,9 +1,10 @@ -import trio +from typing import Optional, Callable, Awaitable +import asyncio from functools import partial - +import trio from .base_runner import BaseRunner -from .async_tools import raise_return, AsyncExecution +from .async_tools import raise_return class TrioRunner(BaseRunner): @@ -11,37 +12,48 @@ class TrioRunner(BaseRunner): flavour = trio - def __init__(self): - self._nursery = None - super().__init__() - - def register_payload(self, payload): - super().register_payload(partial(raise_return, payload)) - - def run_payload(self, payload): - execution = AsyncExecution(payload) - super().register_payload(execution.coroutine) - return execution.wait() - - def _run(self): - return trio.run(self._await_all) - - async def _await_all(self): - """Async component of _run""" - delay = 0.0 - # we run a top-level nursery that automatically reaps/cancels for us + def __init__(self, asyncio_loop: asyncio.AbstractEventLoop): + super().__init__(asyncio_loop) + self._trio_token: Optional[trio.lowlevel.TrioToken] = None + self._submit_tasks: Optional[trio.MemorySendChannel] = None + + def register_payload(self, payload: Callable[[], Awaitable]): + assert self._trio_token is not None and self._submit_tasks is not None + trio.from_thread.run( + self._submit_tasks.send, payload, trio_token=self._trio_token + ) + + def run_payload(self, payload: Callable[[], Awaitable]): + assert self._trio_token is not None and self._submit_tasks is not None + trio.from_thread.run( + payload, trio_token=self._trio_token + ) + + async def manage_payloads(self): + # this blocks one thread of the asyncio event loop + await self.asyncio_loop.run_in_executor(None, self._run_trio_blocking) + + def _run_trio_blocking(self): + return trio.run(self._manage_payloads_trio) + + async def _manage_payloads_trio(self): + self._trio_token = trio.lowlevel.current_trio_token() + # buffer of 256 is somewhat arbitrary but should be large enough to rarely stall + # and small enough to smooth out explosive backlog. + self._submit_tasks, receive_tasks = trio.open_memory_channel(256) async with trio.open_nursery() as nursery: - while self.running.is_set(): - await self._start_payloads(nursery=nursery) - await trio.sleep(delay) - delay = min(delay + 0.1, 1.0) - # cancel the scope to cancel all payloads + async for task in receive_tasks: + nursery.start_soon(raise_return(task)) + # shutting down: cancel the scope to cancel all payloads nursery.cancel_scope.cancel() - async def _start_payloads(self, nursery): - """Start all queued payloads""" - with self._lock: - for coroutine in self._payloads: - nursery.start_soon(coroutine) - self._payloads.clear() - await trio.sleep(0) + async def _aclose_trio(self): + self._running.clear() + await self._submit_tasks.aclose() + + async def aclose(self): + await self.asyncio_loop.run_in_executor( + None, partial( + trio.from_thread.run, self._aclose_trio, trio_token=self._trio_token + ) + ) From 25034d3c6d1d2693c4e6959daff14118c5d69e2e Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Tue, 1 Mar 2022 18:35:17 +0100 Subject: [PATCH 21/89] switched asyncio runner to queue model --- src/cobald/daemon/runners/_compat.py | 7 +++ src/cobald/daemon/runners/asyncio_runner.py | 50 ++++++++++++++------- src/cobald/daemon/runners/thread_runner.py | 7 ++- src/cobald/daemon/runners/trio_runner.py | 6 ++- 4 files changed, 51 insertions(+), 19 deletions(-) diff --git a/src/cobald/daemon/runners/_compat.py b/src/cobald/daemon/runners/_compat.py index 1a7708f3..93e66627 100644 --- a/src/cobald/daemon/runners/_compat.py +++ b/src/cobald/daemon/runners/_compat.py @@ -40,3 +40,10 @@ def _cancel_all_tasks(loop): "task": task, } ) + + +if sys.version_info >= (3, 7): + asyncio_current_task = asyncio.current_task +else: + def asyncio_current_task() -> asyncio.Task: + return asyncio.Task.current_task() diff --git a/src/cobald/daemon/runners/asyncio_runner.py b/src/cobald/daemon/runners/asyncio_runner.py index 099039bc..a964d0e7 100644 --- a/src/cobald/daemon/runners/asyncio_runner.py +++ b/src/cobald/daemon/runners/asyncio_runner.py @@ -2,22 +2,26 @@ import asyncio from .base_runner import BaseRunner -from .async_tools import raise_return, ensure_coroutine +from .async_tools import OrphanedReturn, ensure_coroutine +from ._compat import asyncio_current_task class AsyncioRunner(BaseRunner): - """Runner for coroutines with :py:mod:`asyncio`""" + """ + Runner for coroutines with :py:mod:`asyncio` + + All active payloads are actively cancelled when the runner is closed. + """ flavour = asyncio def __init__(self, asyncio_loop: asyncio.AbstractEventLoop): super().__init__(asyncio_loop) self._tasks = set() + self._failure_queue: Optional[asyncio.Queue] = None def register_payload(self, payload: Callable[[], Awaitable]): - self.asyncio_loop.call_soon_threadsafe( - lambda: self._tasks.add(self.asyncio_loop.create_task(raise_return(payload))) - ) + self.asyncio_loop.call_soon_threadsafe(self._setup_payload, payload) def run_payload(self, payload: Callable[[], Awaitable]): future = asyncio.run_coroutine_threadsafe( @@ -25,24 +29,36 @@ def run_payload(self, payload: Callable[[], Awaitable]): ) return future.result() + def _setup_payload(self, payload: Callable[[], Awaitable]): + task = self.asyncio_loop.create_task(self._monitor_payload(payload)) + self._tasks.add(task) + + async def _monitor_payload(self, payload: Callable[[], Awaitable]): + try: + result = payload() + except BaseException as e: + failure = e + else: + if result is None: + return + failure = OrphanedReturn(payload, result) + finally: + self._tasks.discard(asyncio_current_task()) + assert self._failure_queue is not None + await self._failure_queue.put(failure) + async def manage_payloads(self): - # Remove tracked tasks and raise if tasks leak - while self._tasks or self._running.is_set(): - # let asyncio efficiently wait for errors - # we only force wake up via timeout every now and then to clean up - done, pending = await asyncio.wait( - self._tasks, timeout=60, return_when=asyncio.FIRST_EXCEPTION - ) - self._tasks.difference_update(done) - for task in done: - # re-raise any exceptions - task.result() + self._failure_queue = asyncio.Queue() + failure = await self._failure_queue.get() + if failure is not None: + raise failure async def aclose(self): self._running.clear() + await self._failure_queue.put(None) while self._tasks: for task in self._tasks.copy(): if task.done(): - self._tasks.remove(task) + self._tasks.discard(task) task.cancel() await asyncio.sleep(0.5) diff --git a/src/cobald/daemon/runners/thread_runner.py b/src/cobald/daemon/runners/thread_runner.py index 315a25fb..09fb8866 100644 --- a/src/cobald/daemon/runners/thread_runner.py +++ b/src/cobald/daemon/runners/thread_runner.py @@ -6,7 +6,12 @@ class ThreadRunner(BaseRunner): - """Runner for subroutines with :py:mod:`threading`""" + """ + Runner for subroutines with :py:mod:`threading` + + All active payloads are *not* cancelled when the runner is closed. + Only program termination forcefully cancels leftover payloads. + """ flavour = threading diff --git a/src/cobald/daemon/runners/trio_runner.py b/src/cobald/daemon/runners/trio_runner.py index eccd5e5f..eb56bc50 100644 --- a/src/cobald/daemon/runners/trio_runner.py +++ b/src/cobald/daemon/runners/trio_runner.py @@ -8,7 +8,11 @@ class TrioRunner(BaseRunner): - """Runner for coroutines with :py:mod:`trio`""" + """ + Runner for coroutines with :py:mod:`trio` + + All active payloads are actively cancelled when the runner is closed. + """ flavour = trio From de856b10979b40bfec28a69082cf906366f2750e Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Tue, 1 Mar 2022 18:36:52 +0100 Subject: [PATCH 22/89] passing trio coroutines properly --- src/cobald/daemon/runners/trio_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cobald/daemon/runners/trio_runner.py b/src/cobald/daemon/runners/trio_runner.py index eb56bc50..48144076 100644 --- a/src/cobald/daemon/runners/trio_runner.py +++ b/src/cobald/daemon/runners/trio_runner.py @@ -47,7 +47,7 @@ async def _manage_payloads_trio(self): self._submit_tasks, receive_tasks = trio.open_memory_channel(256) async with trio.open_nursery() as nursery: async for task in receive_tasks: - nursery.start_soon(raise_return(task)) + nursery.start_soon(raise_return, task) # shutting down: cancel the scope to cancel all payloads nursery.cancel_scope.cancel() From 95ae7f585b2d923eb76a943a1e10a206ab67f123 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Tue, 1 Mar 2022 18:49:05 +0100 Subject: [PATCH 23/89] removed unused running flag --- src/cobald/daemon/runners/asyncio_runner.py | 1 - src/cobald/daemon/runners/base_runner.py | 5 ----- src/cobald/daemon/runners/thread_runner.py | 1 - src/cobald/daemon/runners/trio_runner.py | 1 - 4 files changed, 8 deletions(-) diff --git a/src/cobald/daemon/runners/asyncio_runner.py b/src/cobald/daemon/runners/asyncio_runner.py index a964d0e7..3f14abc8 100644 --- a/src/cobald/daemon/runners/asyncio_runner.py +++ b/src/cobald/daemon/runners/asyncio_runner.py @@ -54,7 +54,6 @@ async def manage_payloads(self): raise failure async def aclose(self): - self._running.clear() await self._failure_queue.put(None) while self._tasks: for task in self._tasks.copy(): diff --git a/src/cobald/daemon/runners/base_runner.py b/src/cobald/daemon/runners/base_runner.py index c46d5f83..6e9fb0bf 100644 --- a/src/cobald/daemon/runners/base_runner.py +++ b/src/cobald/daemon/runners/base_runner.py @@ -15,7 +15,6 @@ def __init__(self, asyncio_loop: asyncio.AbstractEventLoop): self._logger = logging.getLogger( "cobald.runtime.runner.%s" % NameRepr(self.flavour) ) - self._running = threading.Event() self._stopped = threading.Event() self._stopped.set() @@ -49,7 +48,6 @@ async def run(self): """ self._logger.info("runner started: %s", self) self._stopped.clear() - self._running.set() try: await self.manage_payloads() except Exception: @@ -58,7 +56,6 @@ async def run(self): else: self._logger.info("runner stopped: %s", self) finally: - self._running.clear() self._stopped.set() async def manage_payloads(self): @@ -69,8 +66,6 @@ async def aclose(self): def stop(self): """Stop execution of all current and future payloads and block until success""" - if not self._running.wait(0.2): - return # the loop exists independently of all runners, we can use it to shut down closed = asyncio.run_coroutine_threadsafe(self.aclose(), self.asyncio_loop) closed.result() diff --git a/src/cobald/daemon/runners/thread_runner.py b/src/cobald/daemon/runners/thread_runner.py index 09fb8866..898452f7 100644 --- a/src/cobald/daemon/runners/thread_runner.py +++ b/src/cobald/daemon/runners/thread_runner.py @@ -48,5 +48,4 @@ async def manage_payloads(self): raise failure async def aclose(self): - self._running.clear() await self._failure_queue.put(None) diff --git a/src/cobald/daemon/runners/trio_runner.py b/src/cobald/daemon/runners/trio_runner.py index 48144076..ad237d4d 100644 --- a/src/cobald/daemon/runners/trio_runner.py +++ b/src/cobald/daemon/runners/trio_runner.py @@ -52,7 +52,6 @@ async def _manage_payloads_trio(self): nursery.cancel_scope.cancel() async def _aclose_trio(self): - self._running.clear() await self._submit_tasks.aclose() async def aclose(self): From 0aa7d32e3305c4b48e9ff761df3cb894234a15aa Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Tue, 1 Mar 2022 19:01:23 +0100 Subject: [PATCH 24/89] added method to check if runner is ready --- src/cobald/daemon/runners/base_runner.py | 8 +++++++- src/cobald/daemon/runners/meta_runner.py | 2 ++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/src/cobald/daemon/runners/base_runner.py b/src/cobald/daemon/runners/base_runner.py index 6e9fb0bf..2b5f1d84 100644 --- a/src/cobald/daemon/runners/base_runner.py +++ b/src/cobald/daemon/runners/base_runner.py @@ -8,6 +8,7 @@ class BaseRunner(object): """Concurrency backend on top of `asyncio`""" + flavour = None # type: Any def __init__(self, asyncio_loop: asyncio.AbstractEventLoop): @@ -36,6 +37,12 @@ def run_payload(self, payload): """ raise NotImplementedError + async def ready(self): + """Wait until the runner is ready to accept payloads""" + assert ( + not self._stopped.is_set() + ), "runner must be .run before waiting until it is ready" + async def run(self): """ Execute all current and future payloads in an `asyncio` task @@ -71,7 +78,6 @@ def stop(self): closed.result() - class OrphanedReturn(Exception): """A runnable returned a value without anyone to receive it""" diff --git a/src/cobald/daemon/runners/meta_runner.py b/src/cobald/daemon/runners/meta_runner.py index e3ae96a9..557c740c 100644 --- a/src/cobald/daemon/runners/meta_runner.py +++ b/src/cobald/daemon/runners/meta_runner.py @@ -84,6 +84,8 @@ async def _launch_runners(self): for runner_type in self.runner_types: runner = self._runners[runner_type.flavour] = runner_type(asyncio_loop) runner_tasks.append(asyncio_loop.create_task(runner.run())) + for runner in self._runners.values(): + await runner.ready() await self._unqueue_payloads() await asyncio.gather(*runner_tasks) From caa51239804a90eb2cdecd0e5895600cecd59c9b Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Tue, 1 Mar 2022 19:03:12 +0100 Subject: [PATCH 25/89] initialising asyncio resources early --- src/cobald/daemon/runners/asyncio_runner.py | 3 +-- src/cobald/daemon/runners/thread_runner.py | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/cobald/daemon/runners/asyncio_runner.py b/src/cobald/daemon/runners/asyncio_runner.py index 3f14abc8..b7a144fe 100644 --- a/src/cobald/daemon/runners/asyncio_runner.py +++ b/src/cobald/daemon/runners/asyncio_runner.py @@ -18,7 +18,7 @@ class AsyncioRunner(BaseRunner): def __init__(self, asyncio_loop: asyncio.AbstractEventLoop): super().__init__(asyncio_loop) self._tasks = set() - self._failure_queue: Optional[asyncio.Queue] = None + self._failure_queue = asyncio.Queue() def register_payload(self, payload: Callable[[], Awaitable]): self.asyncio_loop.call_soon_threadsafe(self._setup_payload, payload) @@ -48,7 +48,6 @@ async def _monitor_payload(self, payload: Callable[[], Awaitable]): await self._failure_queue.put(failure) async def manage_payloads(self): - self._failure_queue = asyncio.Queue() failure = await self._failure_queue.get() if failure is not None: raise failure diff --git a/src/cobald/daemon/runners/thread_runner.py b/src/cobald/daemon/runners/thread_runner.py index 898452f7..de9154a9 100644 --- a/src/cobald/daemon/runners/thread_runner.py +++ b/src/cobald/daemon/runners/thread_runner.py @@ -17,7 +17,7 @@ class ThreadRunner(BaseRunner): def __init__(self, asyncio_loop: asyncio.AbstractEventLoop): super().__init__(asyncio_loop) - self._failure_queue: Optional[asyncio.Queue] = None + self._failure_queue = asyncio.Queue() def register_payload(self, payload): thread = threading.Thread(target=self.run_payload, args=(payload,), daemon=True) @@ -42,7 +42,6 @@ def _monitor_payload(self, payload): self.asyncio_loop.call_soon_threadsafe(self._failure_queue.put_nowait, failure) async def manage_payloads(self): - self._failure_queue = asyncio.Queue() failure = await self._failure_queue.get() if failure is not None: raise failure From db7c2606b7d4bfc5405dc6fc4c19893b51653ab3 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Tue, 1 Mar 2022 19:07:46 +0100 Subject: [PATCH 26/89] trio signals ready --- src/cobald/daemon/runners/trio_runner.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/cobald/daemon/runners/trio_runner.py b/src/cobald/daemon/runners/trio_runner.py index ad237d4d..6f045e19 100644 --- a/src/cobald/daemon/runners/trio_runner.py +++ b/src/cobald/daemon/runners/trio_runner.py @@ -18,6 +18,7 @@ class TrioRunner(BaseRunner): def __init__(self, asyncio_loop: asyncio.AbstractEventLoop): super().__init__(asyncio_loop) + self._ready = asyncio.Event() self._trio_token: Optional[trio.lowlevel.TrioToken] = None self._submit_tasks: Optional[trio.MemorySendChannel] = None @@ -33,6 +34,9 @@ def run_payload(self, payload: Callable[[], Awaitable]): payload, trio_token=self._trio_token ) + async def ready(self): + await self._ready.wait() + async def manage_payloads(self): # this blocks one thread of the asyncio event loop await self.asyncio_loop.run_in_executor(None, self._run_trio_blocking) @@ -45,6 +49,7 @@ async def _manage_payloads_trio(self): # buffer of 256 is somewhat arbitrary but should be large enough to rarely stall # and small enough to smooth out explosive backlog. self._submit_tasks, receive_tasks = trio.open_memory_channel(256) + self._ready.set() async with trio.open_nursery() as nursery: async for task in receive_tasks: nursery.start_soon(raise_return, task) From fbd6e1b717413a9b777fa705988cc631e3d93e7d Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Tue, 1 Mar 2022 19:16:23 +0100 Subject: [PATCH 27/89] stopping a runner is idempotent --- src/cobald/daemon/runners/base_runner.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/cobald/daemon/runners/base_runner.py b/src/cobald/daemon/runners/base_runner.py index 2b5f1d84..73cfb0a8 100644 --- a/src/cobald/daemon/runners/base_runner.py +++ b/src/cobald/daemon/runners/base_runner.py @@ -73,6 +73,8 @@ async def aclose(self): def stop(self): """Stop execution of all current and future payloads and block until success""" + if self._stopped.is_set(): + return # the loop exists independently of all runners, we can use it to shut down closed = asyncio.run_coroutine_threadsafe(self.aclose(), self.asyncio_loop) closed.result() From 5830e4caf9fbd5115f11e2c49bfabeee0897b7da Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Tue, 1 Mar 2022 19:23:11 +0100 Subject: [PATCH 28/89] removed outdated test --- cobald_tests/daemon/test_service.py | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/cobald_tests/daemon/test_service.py b/cobald_tests/daemon/test_service.py index c1bc3833..19170e53 100644 --- a/cobald_tests/daemon/test_service.py +++ b/cobald_tests/daemon/test_service.py @@ -34,17 +34,6 @@ def accept(payload: ServiceRunner, name=None): class TestServiceRunner(object): - def test_no_tainting(self): - """Assert that no payloads may be scheduled before starting""" - - def payload(): - return - - runner = ServiceRunner() - runner._meta_runner.register_payload(payload, flavour=threading) - with pytest.raises(RuntimeError): - runner.accept() - def test_unique_reaper(self): """Assert that no two runners may fetch services""" with accept(ServiceRunner(accept_delay=0.1), name="outer"): From c81ece7c126d762a4c44d1cf3bef86e9132e11f3 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Tue, 1 Mar 2022 19:23:24 +0100 Subject: [PATCH 29/89] removed faulty indirection in test --- cobald_tests/daemon/test_service.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cobald_tests/daemon/test_service.py b/cobald_tests/daemon/test_service.py index 19170e53..5db30baf 100644 --- a/cobald_tests/daemon/test_service.py +++ b/cobald_tests/daemon/test_service.py @@ -38,8 +38,7 @@ def test_unique_reaper(self): """Assert that no two runners may fetch services""" with accept(ServiceRunner(accept_delay=0.1), name="outer"): with pytest.raises(RuntimeError): - with accept(ServiceRunner(accept_delay=0.1), name="inner"): - pass + ServiceRunner(accept_delay=0.1).accept() def test_service(self): """Test running service classes automatically""" From eeba44ad5816f7a5be865806e1d1c08b8946888e Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Tue, 1 Mar 2022 19:38:31 +0100 Subject: [PATCH 30/89] removed timing issue when starting trio runner --- src/cobald/daemon/runners/trio_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cobald/daemon/runners/trio_runner.py b/src/cobald/daemon/runners/trio_runner.py index 6f045e19..c017310c 100644 --- a/src/cobald/daemon/runners/trio_runner.py +++ b/src/cobald/daemon/runners/trio_runner.py @@ -49,7 +49,7 @@ async def _manage_payloads_trio(self): # buffer of 256 is somewhat arbitrary but should be large enough to rarely stall # and small enough to smooth out explosive backlog. self._submit_tasks, receive_tasks = trio.open_memory_channel(256) - self._ready.set() + self.asyncio_loop.call_soon_threadsafe(self._ready.set) async with trio.open_nursery() as nursery: async for task in receive_tasks: nursery.start_soon(raise_return, task) From 9c2621e34768ef10da6505385624d089da15b038 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Wed, 2 Mar 2022 10:18:21 +0100 Subject: [PATCH 31/89] returning value --- src/cobald/daemon/runners/trio_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cobald/daemon/runners/trio_runner.py b/src/cobald/daemon/runners/trio_runner.py index c017310c..1a6e8207 100644 --- a/src/cobald/daemon/runners/trio_runner.py +++ b/src/cobald/daemon/runners/trio_runner.py @@ -30,7 +30,7 @@ def register_payload(self, payload: Callable[[], Awaitable]): def run_payload(self, payload: Callable[[], Awaitable]): assert self._trio_token is not None and self._submit_tasks is not None - trio.from_thread.run( + return trio.from_thread.run( payload, trio_token=self._trio_token ) From fe64d29531e0d0e1b0f297973bd6f533b36a747c Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Wed, 2 Mar 2022 10:32:55 +0100 Subject: [PATCH 32/89] missed awaiting coroutine --- src/cobald/daemon/runners/asyncio_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cobald/daemon/runners/asyncio_runner.py b/src/cobald/daemon/runners/asyncio_runner.py index b7a144fe..6f156172 100644 --- a/src/cobald/daemon/runners/asyncio_runner.py +++ b/src/cobald/daemon/runners/asyncio_runner.py @@ -35,7 +35,7 @@ def _setup_payload(self, payload: Callable[[], Awaitable]): async def _monitor_payload(self, payload: Callable[[], Awaitable]): try: - result = payload() + result = await payload() except BaseException as e: failure = e else: From 97f6ebf6e1a3b3474a050f47f840f2238997d069 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Wed, 2 Mar 2022 11:41:48 +0100 Subject: [PATCH 33/89] added waitable event to meta runner --- src/cobald/daemon/runners/meta_runner.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/cobald/daemon/runners/meta_runner.py b/src/cobald/daemon/runners/meta_runner.py index 557c740c..4f15802f 100644 --- a/src/cobald/daemon/runners/meta_runner.py +++ b/src/cobald/daemon/runners/meta_runner.py @@ -27,7 +27,7 @@ def __init__(self): self._logger = logging.getLogger("cobald.runtime.runner.meta") self._runners: Dict[ModuleType, BaseRunner] = {} self._runner_queues: Dict[ModuleType, Any] = {} - self._lock = threading.Lock() + self.running = threading.Event() @property def runners(self): @@ -87,12 +87,15 @@ async def _launch_runners(self): for runner in self._runners.values(): await runner.ready() await self._unqueue_payloads() - await asyncio.gather(*runner_tasks) + self.running.set() + try: + await asyncio.gather(*runner_tasks) + finally: + self.running.clear() def run(self): """Run all runners, blocking until completion or error""" self._logger.info("starting all runners") - self._lock.acquire() try: asyncio_run(self._launch_runners()) except KeyboardInterrupt: @@ -103,7 +106,6 @@ def run(self): finally: self._stop_runners() self._logger.info("stopped all runners") - self._lock.release() def stop(self): """Stop all runners""" From f989c5185bcf751f421ea93598ee8d9ece9d755b Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Wed, 2 Mar 2022 11:42:10 +0100 Subject: [PATCH 34/89] properly running meta runner during tests --- .../utility/concurrent/test_meta_runner.py | 63 +++++++------------ 1 file changed, 24 insertions(+), 39 deletions(-) diff --git a/cobald_tests/utility/concurrent/test_meta_runner.py b/cobald_tests/utility/concurrent/test_meta_runner.py index 85dbe392..dbfd661c 100644 --- a/cobald_tests/utility/concurrent/test_meta_runner.py +++ b/cobald_tests/utility/concurrent/test_meta_runner.py @@ -2,6 +2,7 @@ import pytest import time import asyncio +import contextlib import trio @@ -13,36 +14,23 @@ class TerminateRunner(Exception): pass -def run_in_thread(payload, name, daemon=True): - thread = threading.Thread(target=payload, name=name, daemon=daemon) +@contextlib.contextmanager +def threaded_run(name=None): + runner = MetaRunner() + thread = threading.Thread( + target=runner.run, name=name or str(runner), daemon=True + ) thread.start() - time.sleep(0.0) + if not runner.running.wait(1): + raise RuntimeError("%s failed to start" % runner) + try: + yield runner + finally: + runner.stop() + thread.join() class TestMetaRunner(object): - def test_bool_payloads(self): - def subroutine(): - time.sleep(0.5) - - async def a_coroutine(): - await asyncio.sleep(0.5) - - async def t_coroutine(): - await trio.sleep(0.5) - - for flavour, payload in ( - (threading, subroutine), - (asyncio, a_coroutine), - (trio, t_coroutine), - ): - runner = MetaRunner() - assert not bool(runner) - runner.register_payload(payload, flavour=flavour) - assert bool(runner) - run_in_thread(runner.run, name="test_bool_payloads %s" % flavour) - assert bool(runner) - runner.stop() - @pytest.mark.parametrize("flavour", (threading,)) def test_run_subroutine(self, flavour): """Test executing a subroutine""" @@ -53,11 +41,11 @@ def with_return(): def with_raise(): raise KeyError("expected exception") - runner = MetaRunner() - result = runner.run_payload(with_return, flavour=flavour) - assert result == with_return() - with pytest.raises(KeyError): - runner.run_payload(with_raise, flavour=flavour) + with threaded_run() as runner: + result = runner.run_payload(with_return, flavour=flavour) + assert result == with_return() + with pytest.raises(KeyError): + runner.run_payload(with_raise, flavour=flavour) @pytest.mark.parametrize("flavour", (asyncio, trio)) def test_run_coroutine(self, flavour): @@ -69,13 +57,11 @@ async def with_return(): async def with_raise(): raise KeyError("expected exception") - runner = MetaRunner() - run_in_thread(runner.run, name="test_run_coroutine %s" % flavour) - result = runner.run_payload(with_return, flavour=flavour) - assert result == trio.run(with_return) - with pytest.raises(KeyError): - runner.run_payload(with_raise, flavour=flavour) - runner.stop() + with threaded_run() as runner: + result = runner.run_payload(with_return, flavour=flavour) + assert result == trio.run(with_return) + with pytest.raises(KeyError): + runner.run_payload(with_raise, flavour=flavour) @pytest.mark.parametrize("flavour", (threading,)) def test_return_subroutine(self, flavour): @@ -151,7 +137,6 @@ async def loop(): await flavour.sleep(0) runner = MetaRunner() - runner.register_payload(noop, loop, flavour=flavour) runner.register_payload(abort, flavour=flavour) with pytest.raises(RuntimeError) as exc: From fa08944d6c04e6b5d00209171e1a7c4fd9d4014a Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Wed, 2 Mar 2022 11:55:22 +0100 Subject: [PATCH 35/89] properly wrapping threads --- src/cobald/daemon/runners/thread_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cobald/daemon/runners/thread_runner.py b/src/cobald/daemon/runners/thread_runner.py index de9154a9..28da92e6 100644 --- a/src/cobald/daemon/runners/thread_runner.py +++ b/src/cobald/daemon/runners/thread_runner.py @@ -20,7 +20,7 @@ def __init__(self, asyncio_loop: asyncio.AbstractEventLoop): self._failure_queue = asyncio.Queue() def register_payload(self, payload): - thread = threading.Thread(target=self.run_payload, args=(payload,), daemon=True) + thread = threading.Thread(target=self._monitor_payload, args=(payload,), daemon=True) thread.start() def run_payload(self, payload): From 99e25c148f7f9017793f54058c6f3ad22a29e1bd Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Wed, 2 Mar 2022 12:14:43 +0100 Subject: [PATCH 36/89] metarunner shuts down children gracefully --- src/cobald/daemon/runners/meta_runner.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/cobald/daemon/runners/meta_runner.py b/src/cobald/daemon/runners/meta_runner.py index 4f15802f..4e5e578b 100644 --- a/src/cobald/daemon/runners/meta_runner.py +++ b/src/cobald/daemon/runners/meta_runner.py @@ -90,6 +90,10 @@ async def _launch_runners(self): self.running.set() try: await asyncio.gather(*runner_tasks) + except BaseException: + for runner in self._runners.values(): + await runner.aclose() + raise finally: self.running.clear() From cbb03f818432f54a1ceb6cad6d7cd6ba25eb75a9 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Wed, 2 Mar 2022 12:19:04 +0100 Subject: [PATCH 37/89] made runner aclose idempotent --- src/cobald/daemon/runners/asyncio_runner.py | 2 ++ src/cobald/daemon/runners/thread_runner.py | 2 ++ src/cobald/daemon/runners/trio_runner.py | 2 ++ 3 files changed, 6 insertions(+) diff --git a/src/cobald/daemon/runners/asyncio_runner.py b/src/cobald/daemon/runners/asyncio_runner.py index 6f156172..77a9b83e 100644 --- a/src/cobald/daemon/runners/asyncio_runner.py +++ b/src/cobald/daemon/runners/asyncio_runner.py @@ -53,6 +53,8 @@ async def manage_payloads(self): raise failure async def aclose(self): + if self._stopped.is_set(): + return await self._failure_queue.put(None) while self._tasks: for task in self._tasks.copy(): diff --git a/src/cobald/daemon/runners/thread_runner.py b/src/cobald/daemon/runners/thread_runner.py index 28da92e6..789530ff 100644 --- a/src/cobald/daemon/runners/thread_runner.py +++ b/src/cobald/daemon/runners/thread_runner.py @@ -47,4 +47,6 @@ async def manage_payloads(self): raise failure async def aclose(self): + if self._stopped.is_set(): + return await self._failure_queue.put(None) diff --git a/src/cobald/daemon/runners/trio_runner.py b/src/cobald/daemon/runners/trio_runner.py index 1a6e8207..8af067ce 100644 --- a/src/cobald/daemon/runners/trio_runner.py +++ b/src/cobald/daemon/runners/trio_runner.py @@ -60,6 +60,8 @@ async def _aclose_trio(self): await self._submit_tasks.aclose() async def aclose(self): + if self._stopped.is_set(): + return await self.asyncio_loop.run_in_executor( None, partial( trio.from_thread.run, self._aclose_trio, trio_token=self._trio_token From 09846566afee7813f831a4d9f64aa1e283a2f7b7 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Wed, 2 Mar 2022 12:19:52 +0100 Subject: [PATCH 38/89] the future is black --- cobald_tests/utility/concurrent/test_meta_runner.py | 4 +--- src/cobald/daemon/runners/_compat.py | 1 + src/cobald/daemon/runners/meta_runner.py | 2 +- src/cobald/daemon/runners/thread_runner.py | 4 +++- src/cobald/daemon/runners/trio_runner.py | 9 ++++----- 5 files changed, 10 insertions(+), 10 deletions(-) diff --git a/cobald_tests/utility/concurrent/test_meta_runner.py b/cobald_tests/utility/concurrent/test_meta_runner.py index dbfd661c..b58df80e 100644 --- a/cobald_tests/utility/concurrent/test_meta_runner.py +++ b/cobald_tests/utility/concurrent/test_meta_runner.py @@ -17,9 +17,7 @@ class TerminateRunner(Exception): @contextlib.contextmanager def threaded_run(name=None): runner = MetaRunner() - thread = threading.Thread( - target=runner.run, name=name or str(runner), daemon=True - ) + thread = threading.Thread(target=runner.run, name=name or str(runner), daemon=True) thread.start() if not runner.running.wait(1): raise RuntimeError("%s failed to start" % runner) diff --git a/src/cobald/daemon/runners/_compat.py b/src/cobald/daemon/runners/_compat.py index 93e66627..63109e80 100644 --- a/src/cobald/daemon/runners/_compat.py +++ b/src/cobald/daemon/runners/_compat.py @@ -45,5 +45,6 @@ def _cancel_all_tasks(loop): if sys.version_info >= (3, 7): asyncio_current_task = asyncio.current_task else: + def asyncio_current_task() -> asyncio.Task: return asyncio.Task.current_task() diff --git a/src/cobald/daemon/runners/meta_runner.py b/src/cobald/daemon/runners/meta_runner.py index 4e5e578b..1a7fe932 100644 --- a/src/cobald/daemon/runners/meta_runner.py +++ b/src/cobald/daemon/runners/meta_runner.py @@ -26,7 +26,7 @@ class MetaRunner(object): def __init__(self): self._logger = logging.getLogger("cobald.runtime.runner.meta") self._runners: Dict[ModuleType, BaseRunner] = {} - self._runner_queues: Dict[ModuleType, Any] = {} + self._runner_queues: Dict[ModuleType, Any] = {} self.running = threading.Event() @property diff --git a/src/cobald/daemon/runners/thread_runner.py b/src/cobald/daemon/runners/thread_runner.py index 789530ff..fcace89e 100644 --- a/src/cobald/daemon/runners/thread_runner.py +++ b/src/cobald/daemon/runners/thread_runner.py @@ -20,7 +20,9 @@ def __init__(self, asyncio_loop: asyncio.AbstractEventLoop): self._failure_queue = asyncio.Queue() def register_payload(self, payload): - thread = threading.Thread(target=self._monitor_payload, args=(payload,), daemon=True) + thread = threading.Thread( + target=self._monitor_payload, args=(payload,), daemon=True + ) thread.start() def run_payload(self, payload): diff --git a/src/cobald/daemon/runners/trio_runner.py b/src/cobald/daemon/runners/trio_runner.py index 8af067ce..d3e5b5c8 100644 --- a/src/cobald/daemon/runners/trio_runner.py +++ b/src/cobald/daemon/runners/trio_runner.py @@ -30,9 +30,7 @@ def register_payload(self, payload: Callable[[], Awaitable]): def run_payload(self, payload: Callable[[], Awaitable]): assert self._trio_token is not None and self._submit_tasks is not None - return trio.from_thread.run( - payload, trio_token=self._trio_token - ) + return trio.from_thread.run(payload, trio_token=self._trio_token) async def ready(self): await self._ready.wait() @@ -63,7 +61,8 @@ async def aclose(self): if self._stopped.is_set(): return await self.asyncio_loop.run_in_executor( - None, partial( + None, + partial( trio.from_thread.run, self._aclose_trio, trio_token=self._trio_token - ) + ), ) From cc7cf94052eadd8af8a764fc590d0f9a0d05a3b9 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Wed, 2 Mar 2022 12:24:06 +0100 Subject: [PATCH 39/89] 3.6 has no means to shutdown executors --- src/cobald/daemon/runners/_compat.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/cobald/daemon/runners/_compat.py b/src/cobald/daemon/runners/_compat.py index 63109e80..2ebfb295 100644 --- a/src/cobald/daemon/runners/_compat.py +++ b/src/cobald/daemon/runners/_compat.py @@ -19,7 +19,6 @@ def asyncio_run(main, *, debug=None): try: _cancel_all_tasks(loop) loop.run_until_complete(loop.shutdown_asyncgens()) - loop.run_until_complete(loop.shutdown_default_executor()) finally: asyncio.set_event_loop(None) loop.close() From 916442b762dd70b69fd2aec669704e0f2c12639f Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Wed, 2 Mar 2022 12:28:27 +0100 Subject: [PATCH 40/89] cleaned up imports --- src/cobald/daemon/runners/asyncio_runner.py | 2 +- src/cobald/daemon/runners/base_runner.py | 2 +- src/cobald/daemon/runners/meta_runner.py | 4 +--- src/cobald/daemon/runners/thread_runner.py | 1 - src/cobald/daemon/runners/trio_runner.py | 1 + 5 files changed, 4 insertions(+), 6 deletions(-) diff --git a/src/cobald/daemon/runners/asyncio_runner.py b/src/cobald/daemon/runners/asyncio_runner.py index 77a9b83e..f1007d98 100644 --- a/src/cobald/daemon/runners/asyncio_runner.py +++ b/src/cobald/daemon/runners/asyncio_runner.py @@ -1,4 +1,4 @@ -from typing import Optional, Callable, Awaitable +from typing import Callable, Awaitable import asyncio from .base_runner import BaseRunner diff --git a/src/cobald/daemon/runners/base_runner.py b/src/cobald/daemon/runners/base_runner.py index 73cfb0a8..48cc4e62 100644 --- a/src/cobald/daemon/runners/base_runner.py +++ b/src/cobald/daemon/runners/base_runner.py @@ -3,7 +3,7 @@ import threading from typing import Any -from cobald.daemon.debug import NameRepr +from ..debug import NameRepr class BaseRunner(object): diff --git a/src/cobald/daemon/runners/meta_runner.py b/src/cobald/daemon/runners/meta_runner.py index 1a7fe932..2f716a75 100644 --- a/src/cobald/daemon/runners/meta_runner.py +++ b/src/cobald/daemon/runners/meta_runner.py @@ -3,7 +3,6 @@ import threading import warnings import asyncio - from types import ModuleType from .base_runner import BaseRunner @@ -12,8 +11,7 @@ from .thread_runner import ThreadRunner from ._compat import asyncio_run - -from cobald.daemon.debug import NameRepr +from ..debug import NameRepr class MetaRunner(object): diff --git a/src/cobald/daemon/runners/thread_runner.py b/src/cobald/daemon/runners/thread_runner.py index fcace89e..7444ce26 100644 --- a/src/cobald/daemon/runners/thread_runner.py +++ b/src/cobald/daemon/runners/thread_runner.py @@ -1,4 +1,3 @@ -from typing import Optional import threading import asyncio diff --git a/src/cobald/daemon/runners/trio_runner.py b/src/cobald/daemon/runners/trio_runner.py index d3e5b5c8..f33aad70 100644 --- a/src/cobald/daemon/runners/trio_runner.py +++ b/src/cobald/daemon/runners/trio_runner.py @@ -1,6 +1,7 @@ from typing import Optional, Callable, Awaitable import asyncio from functools import partial + import trio from .base_runner import BaseRunner From c3951e6f8eaab37c07ca168db6bf3347b987716f Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Wed, 2 Mar 2022 12:32:45 +0100 Subject: [PATCH 41/89] documented quadrupel wrap --- src/cobald/daemon/runners/trio_runner.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/cobald/daemon/runners/trio_runner.py b/src/cobald/daemon/runners/trio_runner.py index f33aad70..46ee78ba 100644 --- a/src/cobald/daemon/runners/trio_runner.py +++ b/src/cobald/daemon/runners/trio_runner.py @@ -61,6 +61,8 @@ async def _aclose_trio(self): async def aclose(self): if self._stopped.is_set(): return + # Trio only allows us an *synchronously blocking* call it from other threads. + # Use an executor thread to make that *asynchronously* blocking for asyncio. await self.asyncio_loop.run_in_executor( None, partial( From 582e39f55d211b44582c24271987cade9402a176 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Wed, 2 Mar 2022 12:44:05 +0100 Subject: [PATCH 42/89] using original asyncio.run filter --- src/cobald/daemon/runners/_compat.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/cobald/daemon/runners/_compat.py b/src/cobald/daemon/runners/_compat.py index 2ebfb295..0243c25a 100644 --- a/src/cobald/daemon/runners/_compat.py +++ b/src/cobald/daemon/runners/_compat.py @@ -31,7 +31,9 @@ def _cancel_all_tasks(loop): task.cancel() loop.run_until_complete(asyncio.gather(*to_cancel, return_exceptions=True)) for task in to_cancel: - if task.exception() is not None and not task.cancelled(): + if task.cancelled(): + continue + if task.exception() is not None: loop.call_exception_handler( { "message": "unhandled exception during asyncio.run() shutdown", From 08baa32330b82cdfbfd15f1de7e9aa9ea21bbcb8 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Wed, 2 Mar 2022 13:20:31 +0100 Subject: [PATCH 43/89] removed sleep obsoleted by ready mechanic --- src/cobald/daemon/runners/meta_runner.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/cobald/daemon/runners/meta_runner.py b/src/cobald/daemon/runners/meta_runner.py index 2f716a75..4b78c44e 100644 --- a/src/cobald/daemon/runners/meta_runner.py +++ b/src/cobald/daemon/runners/meta_runner.py @@ -67,7 +67,6 @@ def run_payload(self, payload, *, flavour: ModuleType): async def _unqueue_payloads(self): """Register payloads once runners are started""" assert self._runners, "runners must be launched before unqueueing" - await asyncio.sleep(0) # runners are started already, so no new payloads can be registered for flavour, queue in self._runner_queues.items(): self.register_payload(*queue, flavour=flavour) From 4a5e8dd94a6dce6163cbe3257ceb38d0c984baec Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Wed, 2 Mar 2022 13:31:14 +0100 Subject: [PATCH 44/89] testing for running state directly --- src/cobald/daemon/runners/meta_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cobald/daemon/runners/meta_runner.py b/src/cobald/daemon/runners/meta_runner.py index 4b78c44e..e92d08db 100644 --- a/src/cobald/daemon/runners/meta_runner.py +++ b/src/cobald/daemon/runners/meta_runner.py @@ -45,7 +45,7 @@ def register_payload(self, *payloads, flavour: ModuleType): try: runner = self._runners[flavour] except KeyError: - if self._runners: + if self.running.is_set(): raise RuntimeError(f"unknown runner {NameRepr(flavour)}") from None self._runner_queues.setdefault(flavour, []).extend(payloads) else: From 47405b478b6e1a6af7f95992d7742c27d19b1493 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Wed, 2 Mar 2022 13:45:55 +0100 Subject: [PATCH 45/89] moved public functions to top --- src/cobald/daemon/runners/meta_runner.py | 54 ++++++++++++------------ 1 file changed, 28 insertions(+), 26 deletions(-) diff --git a/src/cobald/daemon/runners/meta_runner.py b/src/cobald/daemon/runners/meta_runner.py index e92d08db..0f18f4ad 100644 --- a/src/cobald/daemon/runners/meta_runner.py +++ b/src/cobald/daemon/runners/meta_runner.py @@ -24,6 +24,7 @@ class MetaRunner(object): def __init__(self): self._logger = logging.getLogger("cobald.runtime.runner.meta") self._runners: Dict[ModuleType, BaseRunner] = {} + # queue to store payloads submitted before the runner is started self._runner_queues: Dict[ModuleType, Any] = {} self.running = threading.Event() @@ -64,10 +65,35 @@ def run_payload(self, payload, *, flavour: ModuleType): """ return self._runners[flavour].run_payload(payload) + def run(self): + """Run all runners, blocking until completion or error""" + self._logger.info("starting all runners") + try: + asyncio_run(self._launch_runners()) + except KeyboardInterrupt: + self._logger.info("runner interrupted") + except Exception as err: + self._logger.exception("runner terminated: %s", err) + raise RuntimeError from err + finally: + self._stop_runners() + self._logger.info("stopped all runners") + + def stop(self): + """Stop all runners""" + self._stop_runners() + + def _stop_runners(self): + for runner in self._runners.values(): + if runner.flavour == threading: + continue + runner.stop() + self._runners[threading].stop() + async def _unqueue_payloads(self): """Register payloads once runners are started""" assert self._runners, "runners must be launched before unqueueing" - # runners are started already, so no new payloads can be registered + # runners are started, so re-registering payloads does not them queue again for flavour, queue in self._runner_queues.items(): self.register_payload(*queue, flavour=flavour) queue.clear() @@ -86,6 +112,7 @@ async def _launch_runners(self): await self._unqueue_payloads() self.running.set() try: + # wait for all runners to either stop gracefully or propagate errors await asyncio.gather(*runner_tasks) except BaseException: for runner in self._runners.values(): @@ -93,28 +120,3 @@ async def _launch_runners(self): raise finally: self.running.clear() - - def run(self): - """Run all runners, blocking until completion or error""" - self._logger.info("starting all runners") - try: - asyncio_run(self._launch_runners()) - except KeyboardInterrupt: - self._logger.info("runner interrupted") - except Exception as err: - self._logger.exception("runner terminated: %s", err) - raise RuntimeError from err - finally: - self._stop_runners() - self._logger.info("stopped all runners") - - def stop(self): - """Stop all runners""" - self._stop_runners() - - def _stop_runners(self): - for runner in self._runners.values(): - if runner.flavour == threading: - continue - runner.stop() - self._runners[threading].stop() From c452a8021314fca7d3b8fb85b2164284e7cb5aee Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Wed, 2 Mar 2022 13:46:29 +0100 Subject: [PATCH 46/89] simplified metarunner stopping --- src/cobald/daemon/runners/meta_runner.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/src/cobald/daemon/runners/meta_runner.py b/src/cobald/daemon/runners/meta_runner.py index 0f18f4ad..140b18f8 100644 --- a/src/cobald/daemon/runners/meta_runner.py +++ b/src/cobald/daemon/runners/meta_runner.py @@ -76,19 +76,13 @@ def run(self): self._logger.exception("runner terminated: %s", err) raise RuntimeError from err finally: - self._stop_runners() + self.stop() self._logger.info("stopped all runners") def stop(self): """Stop all runners""" - self._stop_runners() - - def _stop_runners(self): for runner in self._runners.values(): - if runner.flavour == threading: - continue runner.stop() - self._runners[threading].stop() async def _unqueue_payloads(self): """Register payloads once runners are started""" From 69fe060b1653686a0e29dfbd765b924f7a161f4b Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Wed, 2 Mar 2022 13:50:46 +0100 Subject: [PATCH 47/89] split code by responsibilities --- src/cobald/daemon/runners/meta_runner.py | 47 +++++++++++++----------- 1 file changed, 26 insertions(+), 21 deletions(-) diff --git a/src/cobald/daemon/runners/meta_runner.py b/src/cobald/daemon/runners/meta_runner.py index 140b18f8..9215ddbd 100644 --- a/src/cobald/daemon/runners/meta_runner.py +++ b/src/cobald/daemon/runners/meta_runner.py @@ -1,4 +1,4 @@ -from typing import Dict, Any +from typing import Dict, List, Any import logging import threading import warnings @@ -69,7 +69,7 @@ def run(self): """Run all runners, blocking until completion or error""" self._logger.info("starting all runners") try: - asyncio_run(self._launch_runners()) + asyncio_run(self._manage_runners()) except KeyboardInterrupt: self._logger.info("runner interrupted") except Exception as err: @@ -84,25 +84,9 @@ def stop(self): for runner in self._runners.values(): runner.stop() - async def _unqueue_payloads(self): - """Register payloads once runners are started""" - assert self._runners, "runners must be launched before unqueueing" - # runners are started, so re-registering payloads does not them queue again - for flavour, queue in self._runner_queues.items(): - self.register_payload(*queue, flavour=flavour) - queue.clear() - self._runner_queues.clear() - - async def _launch_runners(self): - """Launch all runners inside an `asyncio` event loop and wait for them""" - asyncio_loop = asyncio.get_event_loop() - self._runners = {} - runner_tasks = [] - for runner_type in self.runner_types: - runner = self._runners[runner_type.flavour] = runner_type(asyncio_loop) - runner_tasks.append(asyncio_loop.create_task(runner.run())) - for runner in self._runners.values(): - await runner.ready() + async def _manage_runners(self): + """Manage all runners inside the current `asyncio` event loop""" + runner_tasks = await self._launch_runners() await self._unqueue_payloads() self.running.set() try: @@ -114,3 +98,24 @@ async def _launch_runners(self): raise finally: self.running.clear() + + async def _launch_runners(self) -> List[asyncio.Task]: + """Launch all runners inside the current `asyncio` event loop""" + asyncio_loop = asyncio.get_event_loop() + self._runners = {} + runner_tasks = [] + for runner_type in self.runner_types: + runner = self._runners[runner_type.flavour] = runner_type(asyncio_loop) + runner_tasks.append(asyncio_loop.create_task(runner.run())) + for runner in self._runners.values(): + await runner.ready() + return runner_tasks + + async def _unqueue_payloads(self) -> None: + """Register payloads once runners are started""" + assert self._runners, "runners must be launched before unqueueing" + # runners are started, so re-registering payloads does not them queue again + for flavour, queue in self._runner_queues.items(): + self.register_payload(*queue, flavour=flavour) + queue.clear() + self._runner_queues.clear() From 1d689e7f548cf9c45a32adc53e16e027491287ce Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Wed, 2 Mar 2022 13:56:45 +0100 Subject: [PATCH 48/89] removed dead code --- src/cobald/daemon/runners/async_tools.py | 32 ------------------------ 1 file changed, 32 deletions(-) diff --git a/src/cobald/daemon/runners/async_tools.py b/src/cobald/daemon/runners/async_tools.py index 8fb67691..12ea143d 100644 --- a/src/cobald/daemon/runners/async_tools.py +++ b/src/cobald/daemon/runners/async_tools.py @@ -20,35 +20,3 @@ async def wrapper(): return await awaitable return wrapper() - - -class AsyncExecution(object): - def __init__(self, payload): - self.payload = payload - self._result = None - self._done = threading.Event() - self._done.clear() - - # explicit coroutine for libraries that type check - async def coroutine(self): - await self - - def __await__(self): - try: - value = yield from self.payload().__await__() - except Exception as err: - self._result = None, err - else: - self._result = value, None - self._done.set() - - def wait(self): - self._done.wait() - value, exception = self._result - if exception is None: - return value - else: - raise exception - - def __repr__(self): - return "%s(%s)" % (self.__class__.__name__, self.payload) From e1d571602723b3a5670f5252895f8a107a0a10e6 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Wed, 2 Mar 2022 13:58:13 +0100 Subject: [PATCH 49/89] split async tools to respective usages --- src/cobald/daemon/runners/async_tools.py | 22 --------------------- src/cobald/daemon/runners/asyncio_runner.py | 15 ++++++++++++-- src/cobald/daemon/runners/trio_runner.py | 10 ++++++++-- 3 files changed, 21 insertions(+), 26 deletions(-) delete mode 100644 src/cobald/daemon/runners/async_tools.py diff --git a/src/cobald/daemon/runners/async_tools.py b/src/cobald/daemon/runners/async_tools.py deleted file mode 100644 index 12ea143d..00000000 --- a/src/cobald/daemon/runners/async_tools.py +++ /dev/null @@ -1,22 +0,0 @@ -from typing import Callable, Awaitable, Coroutine -import threading - -from .base_runner import OrphanedReturn - - -async def raise_return(payload: Callable[[], Awaitable]) -> None: - """Wrapper to raise exception on unhandled return values""" - value = await payload() - if value is not None: - raise OrphanedReturn(payload, value) - - -def ensure_coroutine(awaitable: Awaitable) -> Coroutine: - """Ensure that ``awaitable`` is a coroutine and wrap it otherwise""" - if isinstance(awaitable, Coroutine): - return awaitable - - async def wrapper(): - return await awaitable - - return wrapper() diff --git a/src/cobald/daemon/runners/asyncio_runner.py b/src/cobald/daemon/runners/asyncio_runner.py index f1007d98..f66ac5e2 100644 --- a/src/cobald/daemon/runners/asyncio_runner.py +++ b/src/cobald/daemon/runners/asyncio_runner.py @@ -1,11 +1,22 @@ -from typing import Callable, Awaitable +from typing import Callable, Awaitable, Coroutine import asyncio from .base_runner import BaseRunner -from .async_tools import OrphanedReturn, ensure_coroutine +from .async_tools import OrphanedReturn from ._compat import asyncio_current_task +def ensure_coroutine(awaitable: Awaitable) -> Coroutine: + """Ensure that ``awaitable`` is a coroutine and wrap it otherwise""" + if isinstance(awaitable, Coroutine): + return awaitable + + async def wrapper(): + return await awaitable + + return wrapper() + + class AsyncioRunner(BaseRunner): """ Runner for coroutines with :py:mod:`asyncio` diff --git a/src/cobald/daemon/runners/trio_runner.py b/src/cobald/daemon/runners/trio_runner.py index 46ee78ba..423368f4 100644 --- a/src/cobald/daemon/runners/trio_runner.py +++ b/src/cobald/daemon/runners/trio_runner.py @@ -4,8 +4,14 @@ import trio -from .base_runner import BaseRunner -from .async_tools import raise_return +from .base_runner import BaseRunner, OrphanedReturn + + +async def raise_return(payload: Callable[[], Awaitable]) -> None: + """Wrapper to raise exception on unhandled return values""" + value = await payload() + if value is not None: + raise OrphanedReturn(payload, value) class TrioRunner(BaseRunner): From 793a9f6a20bf798b3f04c23f9f2239dc8a364f12 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Wed, 2 Mar 2022 14:01:13 +0100 Subject: [PATCH 50/89] removed outdated initialisation guards --- src/cobald/daemon/runners/asyncio_runner.py | 4 +--- src/cobald/daemon/runners/thread_runner.py | 1 - 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/src/cobald/daemon/runners/asyncio_runner.py b/src/cobald/daemon/runners/asyncio_runner.py index f66ac5e2..00ee6412 100644 --- a/src/cobald/daemon/runners/asyncio_runner.py +++ b/src/cobald/daemon/runners/asyncio_runner.py @@ -1,8 +1,7 @@ from typing import Callable, Awaitable, Coroutine import asyncio -from .base_runner import BaseRunner -from .async_tools import OrphanedReturn +from .base_runner import BaseRunner, OrphanedReturn from ._compat import asyncio_current_task @@ -55,7 +54,6 @@ async def _monitor_payload(self, payload: Callable[[], Awaitable]): failure = OrphanedReturn(payload, result) finally: self._tasks.discard(asyncio_current_task()) - assert self._failure_queue is not None await self._failure_queue.put(failure) async def manage_payloads(self): diff --git a/src/cobald/daemon/runners/thread_runner.py b/src/cobald/daemon/runners/thread_runner.py index 7444ce26..ac9f55a5 100644 --- a/src/cobald/daemon/runners/thread_runner.py +++ b/src/cobald/daemon/runners/thread_runner.py @@ -39,7 +39,6 @@ def _monitor_payload(self, payload): if result is None: return failure = OrphanedReturn(payload, result) - assert self._failure_queue is not None self.asyncio_loop.call_soon_threadsafe(self._failure_queue.put_nowait, failure) async def manage_payloads(self): From d49247fb18d08ed3931bea5797daffaea9670567 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Wed, 2 Mar 2022 14:16:34 +0100 Subject: [PATCH 51/89] added explanations for all runners --- src/cobald/daemon/runners/asyncio_runner.py | 9 ++++++++- src/cobald/daemon/runners/thread_runner.py | 3 +++ src/cobald/daemon/runners/trio_runner.py | 5 +++++ 3 files changed, 16 insertions(+), 1 deletion(-) diff --git a/src/cobald/daemon/runners/asyncio_runner.py b/src/cobald/daemon/runners/asyncio_runner.py index 00ee6412..a579abcf 100644 --- a/src/cobald/daemon/runners/asyncio_runner.py +++ b/src/cobald/daemon/runners/asyncio_runner.py @@ -25,6 +25,11 @@ class AsyncioRunner(BaseRunner): flavour = asyncio + # This runner directly uses asyncio.Task to run payloads. + # To detect errors, each payload is wrapped; errors and unexpected return values + # are pushed to a queue from which the main task re-raises. + # Tasks are registered in a container to allow cancelling them. The payload wrapper + # takes care of adding/removing tasks. def __init__(self, asyncio_loop: asyncio.AbstractEventLoop): super().__init__(asyncio_loop) self._tasks = set() @@ -64,10 +69,12 @@ async def manage_payloads(self): async def aclose(self): if self._stopped.is_set(): return + # let the manage task wake up and exit await self._failure_queue.put(None) while self._tasks: for task in self._tasks.copy(): if task.done(): self._tasks.discard(task) - task.cancel() + else: + task.cancel() await asyncio.sleep(0.5) diff --git a/src/cobald/daemon/runners/thread_runner.py b/src/cobald/daemon/runners/thread_runner.py index ac9f55a5..b0d10caa 100644 --- a/src/cobald/daemon/runners/thread_runner.py +++ b/src/cobald/daemon/runners/thread_runner.py @@ -14,6 +14,9 @@ class ThreadRunner(BaseRunner): flavour = threading + # This runner directly uses threading.Thread to run payloads. + # To detect errors, each payload is wrapped; errors and unexpected return values + # are pushed to a queue from which the main task re-raises. def __init__(self, asyncio_loop: asyncio.AbstractEventLoop): super().__init__(asyncio_loop) self._failure_queue = asyncio.Queue() diff --git a/src/cobald/daemon/runners/trio_runner.py b/src/cobald/daemon/runners/trio_runner.py index 423368f4..df929ab9 100644 --- a/src/cobald/daemon/runners/trio_runner.py +++ b/src/cobald/daemon/runners/trio_runner.py @@ -23,6 +23,11 @@ class TrioRunner(BaseRunner): flavour = trio + # This runner uses a trio loop in a separate thread to run payloads. + # Tracking payloads and errors is handled by a trio nursery. A queue ("channel") + # is used to move payloads into the trio loop. + # Since the trio loop runs in its own thread, all public methods have to move + # payloads/tasks into that thread. def __init__(self, asyncio_loop: asyncio.AbstractEventLoop): super().__init__(asyncio_loop) self._ready = asyncio.Event() From 988cece888556af17965fd25a71f46ad044d447b Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Thu, 3 Mar 2022 09:24:17 +0100 Subject: [PATCH 52/89] updated codecov for src layout? --- .coveragerc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.coveragerc b/.coveragerc index 0fc9f392..650d3eee 100644 --- a/.coveragerc +++ b/.coveragerc @@ -1,5 +1,5 @@ [run] -source = cobald +source = src/cobald branch = TRUE cover_pylib = FALSE parallel = False From 139d69af019b322a1a02782fa557b62195e627f2 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Thu, 3 Mar 2022 09:43:24 +0100 Subject: [PATCH 53/89] revert coverage change, was the bot? --- .coveragerc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.coveragerc b/.coveragerc index 650d3eee..0fc9f392 100644 --- a/.coveragerc +++ b/.coveragerc @@ -1,5 +1,5 @@ [run] -source = src/cobald +source = cobald branch = TRUE cover_pylib = FALSE parallel = False From 01d318dd1f31968f2b9312b79786ce220e50f0ae Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Thu, 3 Mar 2022 11:56:42 +0100 Subject: [PATCH 54/89] removed paranoid safety check --- src/cobald/daemon/runners/service.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/cobald/daemon/runners/service.py b/src/cobald/daemon/runners/service.py index 4a714f7b..bacfbf8a 100644 --- a/src/cobald/daemon/runners/service.py +++ b/src/cobald/daemon/runners/service.py @@ -159,8 +159,6 @@ def accept(self): Since services are globally defined, only one :py:class:`ServiceRunner` may :py:meth:`accept` payloads at any time. """ - if self._meta_runner: - raise RuntimeError("payloads scheduled for %s before being started" % self) self._must_shutdown = False self._logger.info("%s starting", self.__class__.__name__) # force collecting objects so that defunct, From 92e7914e85b132dac9961d8d325fba21b6aa4094 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Thu, 3 Mar 2022 18:01:13 +0100 Subject: [PATCH 55/89] added test for payload failure --- cobald_tests/daemon/test_service.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/cobald_tests/daemon/test_service.py b/cobald_tests/daemon/test_service.py index 5db30baf..dcec960d 100644 --- a/cobald_tests/daemon/test_service.py +++ b/cobald_tests/daemon/test_service.py @@ -121,3 +121,20 @@ async def co_pingpong(what=default): break else: assert len(reply_store) == 9 + + @pytest.mark.parametrize("flavour", (asyncio, trio)) + def test_error_reporting(self, flavour): + """Test that fatal errors do not pass silently""" + def async_raise(what): + raise what + + # errors should fail the entire runtime + runner = ServiceRunner(accept_delay=0.1) + runner.adopt(async_raise, LookupError, flavour=flavour) + with pytest.raises(RuntimeError): + runner.accept() + + # KeyboardInterrupt/^C is graceful shutdown + runner = ServiceRunner(accept_delay=0.1) + runner.adopt(async_raise, KeyboardInterrupt, flavour=flavour) + runner.accept() From c2c50b1a3df6e13814c2d95283ad23d189cae917 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Thu, 3 Mar 2022 18:01:31 +0100 Subject: [PATCH 56/89] added description to payload failure --- src/cobald/daemon/runners/meta_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cobald/daemon/runners/meta_runner.py b/src/cobald/daemon/runners/meta_runner.py index 9215ddbd..6fa86af4 100644 --- a/src/cobald/daemon/runners/meta_runner.py +++ b/src/cobald/daemon/runners/meta_runner.py @@ -74,7 +74,7 @@ def run(self): self._logger.info("runner interrupted") except Exception as err: self._logger.exception("runner terminated: %s", err) - raise RuntimeError from err + raise RuntimeError("background task failed") from err finally: self.stop() self._logger.info("stopped all runners") From 0e62e617766163dffced7226bb6dee77dcb61725 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Thu, 3 Mar 2022 18:26:51 +0100 Subject: [PATCH 57/89] unqueueing only when watching runners --- src/cobald/daemon/runners/meta_runner.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/cobald/daemon/runners/meta_runner.py b/src/cobald/daemon/runners/meta_runner.py index 6fa86af4..ba816880 100644 --- a/src/cobald/daemon/runners/meta_runner.py +++ b/src/cobald/daemon/runners/meta_runner.py @@ -87,11 +87,12 @@ def stop(self): async def _manage_runners(self): """Manage all runners inside the current `asyncio` event loop""" runner_tasks = await self._launch_runners() - await self._unqueue_payloads() self.running.set() try: # wait for all runners to either stop gracefully or propagate errors - await asyncio.gather(*runner_tasks) + # we only unqueue payloads *while* watching runners as payloads could + # cause the runners to fail – we need to stop unqueueing then as well. + await asyncio.gather(*runner_tasks, self._unqueue_payloads()) except BaseException: for runner in self._runners.values(): await runner.aclose() @@ -116,6 +117,8 @@ async def _unqueue_payloads(self) -> None: assert self._runners, "runners must be launched before unqueueing" # runners are started, so re-registering payloads does not them queue again for flavour, queue in self._runner_queues.items(): - self.register_payload(*queue, flavour=flavour) + for payload in queue: + self.register_payload(payload, flavour=flavour) + await asyncio.sleep(0) queue.clear() self._runner_queues.clear() From 70193e33751933ff4d34b385e8b2adc8bf362102 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Thu, 3 Mar 2022 18:27:45 +0100 Subject: [PATCH 58/89] logging and discarding payloads during shutdown --- src/cobald/daemon/runners/trio_runner.py | 26 ++++++++++++++++-------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/src/cobald/daemon/runners/trio_runner.py b/src/cobald/daemon/runners/trio_runner.py index df929ab9..14ac42f2 100644 --- a/src/cobald/daemon/runners/trio_runner.py +++ b/src/cobald/daemon/runners/trio_runner.py @@ -36,9 +36,13 @@ def __init__(self, asyncio_loop: asyncio.AbstractEventLoop): def register_payload(self, payload: Callable[[], Awaitable]): assert self._trio_token is not None and self._submit_tasks is not None - trio.from_thread.run( - self._submit_tasks.send, payload, trio_token=self._trio_token - ) + try: + trio.from_thread.run( + self._submit_tasks.send, payload, trio_token=self._trio_token + ) + except trio.RunFinishedError: + self._logger.warning(f"discarding payload {payload} during shutdown") + return def run_payload(self, payload: Callable[[], Awaitable]): assert self._trio_token is not None and self._submit_tasks is not None @@ -74,9 +78,13 @@ async def aclose(self): return # Trio only allows us an *synchronously blocking* call it from other threads. # Use an executor thread to make that *asynchronously* blocking for asyncio. - await self.asyncio_loop.run_in_executor( - None, - partial( - trio.from_thread.run, self._aclose_trio, trio_token=self._trio_token - ), - ) + try: + await self.asyncio_loop.run_in_executor( + None, + partial( + trio.from_thread.run, self._aclose_trio, trio_token=self._trio_token + ), + ) + except trio.RunFinishedError: + # trio already finished in its own thread + return From 8730f319e6dfa3c0b277f8b0903d9098bf59cd7a Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Thu, 3 Mar 2022 20:06:49 +0100 Subject: [PATCH 59/89] split failure test cases --- cobald_tests/daemon/test_service.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/cobald_tests/daemon/test_service.py b/cobald_tests/daemon/test_service.py index dcec960d..0d4ab591 100644 --- a/cobald_tests/daemon/test_service.py +++ b/cobald_tests/daemon/test_service.py @@ -33,6 +33,11 @@ def accept(payload: ServiceRunner, name=None): thread.join() +async def async_raise(what): + logging.info(f"raising {what}") + raise what + + class TestServiceRunner(object): def test_unique_reaper(self): """Assert that no two runners may fetch services""" @@ -125,16 +130,15 @@ async def co_pingpong(what=default): @pytest.mark.parametrize("flavour", (asyncio, trio)) def test_error_reporting(self, flavour): """Test that fatal errors do not pass silently""" - def async_raise(what): - raise what - # errors should fail the entire runtime runner = ServiceRunner(accept_delay=0.1) runner.adopt(async_raise, LookupError, flavour=flavour) with pytest.raises(RuntimeError): runner.accept() - # KeyboardInterrupt/^C is graceful shutdown + @pytest.mark.parametrize("flavour", (asyncio, trio)) + def test_interrupt(self, flavour): + """Test that KeyboardInterrupt/^C is graceful shutdown""" runner = ServiceRunner(accept_delay=0.1) - runner.adopt(async_raise, KeyboardInterrupt, flavour=flavour) + runner.adopt(async_raise, KeyboardInterrupt("test_interrupt"), flavour=flavour) runner.accept() From e62fd0be2b792f8d051d876c21ae2dd7304c06ca Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Thu, 3 Mar 2022 20:50:04 +0100 Subject: [PATCH 60/89] fixed race when service runner is shutdown before starting --- src/cobald/daemon/runners/service.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/cobald/daemon/runners/service.py b/src/cobald/daemon/runners/service.py index bacfbf8a..41c2da98 100644 --- a/src/cobald/daemon/runners/service.py +++ b/src/cobald/daemon/runners/service.py @@ -126,6 +126,7 @@ def __init__(self, accept_delay: float = 1): self._meta_runner = MetaRunner() self._must_shutdown = False self._is_shutdown = threading.Event() + self._is_shutdown.set() self.running = threading.Event() self.accept_delay = accept_delay From 1e3c00808c65857f78dc965a9649ae429efbaaec Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Thu, 3 Mar 2022 21:41:02 +0100 Subject: [PATCH 61/89] cancelling trio task cancels trio --- src/cobald/daemon/runners/trio_runner.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/cobald/daemon/runners/trio_runner.py b/src/cobald/daemon/runners/trio_runner.py index 14ac42f2..8577f20b 100644 --- a/src/cobald/daemon/runners/trio_runner.py +++ b/src/cobald/daemon/runners/trio_runner.py @@ -52,8 +52,10 @@ async def ready(self): await self._ready.wait() async def manage_payloads(self): - # this blocks one thread of the asyncio event loop - await self.asyncio_loop.run_in_executor(None, self._run_trio_blocking) + try: + await self.asyncio_loop.run_in_executor(None, self._run_trio_blocking) + except asyncio.CancelledError: + await self.aclose() def _run_trio_blocking(self): return trio.run(self._manage_payloads_trio) From 6f3a75bfda0f29644224fbb2e4963629614f042f Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Thu, 3 Mar 2022 21:41:36 +0100 Subject: [PATCH 62/89] tracked asyncio tasks no longer supress cancellation --- src/cobald/daemon/runners/asyncio_runner.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/cobald/daemon/runners/asyncio_runner.py b/src/cobald/daemon/runners/asyncio_runner.py index a579abcf..b42b460f 100644 --- a/src/cobald/daemon/runners/asyncio_runner.py +++ b/src/cobald/daemon/runners/asyncio_runner.py @@ -51,6 +51,8 @@ def _setup_payload(self, payload: Callable[[], Awaitable]): async def _monitor_payload(self, payload: Callable[[], Awaitable]): try: result = await payload() + except asyncio.CancelledError: + raise except BaseException as e: failure = e else: From 313343cf7df0700ed55f58224212c97d0954a492 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Thu, 3 Mar 2022 21:44:50 +0100 Subject: [PATCH 63/89] removed duplicate handling of KI --- src/cobald/daemon/runners/meta_runner.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/src/cobald/daemon/runners/meta_runner.py b/src/cobald/daemon/runners/meta_runner.py index ba816880..fee5cb55 100644 --- a/src/cobald/daemon/runners/meta_runner.py +++ b/src/cobald/daemon/runners/meta_runner.py @@ -76,11 +76,11 @@ def run(self): self._logger.exception("runner terminated: %s", err) raise RuntimeError("background task failed") from err finally: - self.stop() self._logger.info("stopped all runners") def stop(self): """Stop all runners""" + self._logger.debug("stop all runners") for runner in self._runners.values(): runner.stop() @@ -93,9 +93,13 @@ async def _manage_runners(self): # we only unqueue payloads *while* watching runners as payloads could # cause the runners to fail – we need to stop unqueueing then as well. await asyncio.gather(*runner_tasks, self._unqueue_payloads()) + except KeyboardInterrupt: + # KeyboardInterrupt in a runner task immediately kills the event loop. + # When we get resurrected, the exception has already been handled! + # Just clean up... + await self._aclose_runners(runner_tasks) except BaseException: - for runner in self._runners.values(): - await runner.aclose() + await self._aclose_runners(runner_tasks) raise finally: self.running.clear() @@ -122,3 +126,10 @@ async def _unqueue_payloads(self) -> None: await asyncio.sleep(0) queue.clear() self._runner_queues.clear() + + async def _aclose_runners(self, runner_tasks): + for runner in self._runners.values(): + await runner.aclose() + # wait until runners are closed + await asyncio.gather(*runner_tasks, return_exceptions=True) + self._runners.clear() From fc3bceb3113158bcf239c146bb43b32399272518 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Thu, 3 Mar 2022 21:56:48 +0100 Subject: [PATCH 64/89] added long-running background task to cancel tests --- cobald_tests/daemon/test_service.py | 1 + src/cobald/daemon/runners/asyncio_runner.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/cobald_tests/daemon/test_service.py b/cobald_tests/daemon/test_service.py index 0d4ab591..1a49aa76 100644 --- a/cobald_tests/daemon/test_service.py +++ b/cobald_tests/daemon/test_service.py @@ -140,5 +140,6 @@ def test_error_reporting(self, flavour): def test_interrupt(self, flavour): """Test that KeyboardInterrupt/^C is graceful shutdown""" runner = ServiceRunner(accept_delay=0.1) + runner.adopt(getattr(flavour, "sleep"), 5, flavour=flavour) runner.adopt(async_raise, KeyboardInterrupt("test_interrupt"), flavour=flavour) runner.accept() diff --git a/src/cobald/daemon/runners/asyncio_runner.py b/src/cobald/daemon/runners/asyncio_runner.py index b42b460f..db2a1bcb 100644 --- a/src/cobald/daemon/runners/asyncio_runner.py +++ b/src/cobald/daemon/runners/asyncio_runner.py @@ -79,4 +79,4 @@ async def aclose(self): self._tasks.discard(task) else: task.cancel() - await asyncio.sleep(0.5) + await asyncio.sleep(0.1) From d854a70cbf41ec95daef9f68d0eb84e9e28aa69b Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Fri, 4 Mar 2022 08:18:56 +0100 Subject: [PATCH 65/89] logging runner exceptions thoroughly --- src/cobald/daemon/runners/base_runner.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/cobald/daemon/runners/base_runner.py b/src/cobald/daemon/runners/base_runner.py index 48cc4e62..43ba9e88 100644 --- a/src/cobald/daemon/runners/base_runner.py +++ b/src/cobald/daemon/runners/base_runner.py @@ -57,7 +57,10 @@ async def run(self): self._stopped.clear() try: await self.manage_payloads() - except Exception: + except asyncio.CancelledError: + self._logger.info("runner cancelled: %s", self) + raise + except BaseException: self._logger.exception("runner aborted: %s", self) raise else: From d981a6a631d4869d5363eb0185c6e88fa764414c Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Fri, 4 Mar 2022 08:19:43 +0100 Subject: [PATCH 66/89] propagating cancellation out of trio runner --- src/cobald/daemon/runners/trio_runner.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/cobald/daemon/runners/trio_runner.py b/src/cobald/daemon/runners/trio_runner.py index 8577f20b..e06bece9 100644 --- a/src/cobald/daemon/runners/trio_runner.py +++ b/src/cobald/daemon/runners/trio_runner.py @@ -56,6 +56,7 @@ async def manage_payloads(self): await self.asyncio_loop.run_in_executor(None, self._run_trio_blocking) except asyncio.CancelledError: await self.aclose() + raise def _run_trio_blocking(self): return trio.run(self._manage_payloads_trio) From 956f0e4aee0e19e3dc7adcf17ddcdee2787c6405 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Fri, 4 Mar 2022 09:32:21 +0100 Subject: [PATCH 67/89] runner shutdown is shielded from cancellation --- src/cobald/daemon/runners/meta_runner.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cobald/daemon/runners/meta_runner.py b/src/cobald/daemon/runners/meta_runner.py index fee5cb55..7c2ba8ac 100644 --- a/src/cobald/daemon/runners/meta_runner.py +++ b/src/cobald/daemon/runners/meta_runner.py @@ -97,9 +97,9 @@ async def _manage_runners(self): # KeyboardInterrupt in a runner task immediately kills the event loop. # When we get resurrected, the exception has already been handled! # Just clean up... - await self._aclose_runners(runner_tasks) + await asyncio.shield(self._aclose_runners(runner_tasks)) except BaseException: - await self._aclose_runners(runner_tasks) + await asyncio.shield(self._aclose_runners(runner_tasks)) raise finally: self.running.clear() From de0cf3dcd69540230f8d4e8b33b00b1abea65b6e Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Fri, 4 Mar 2022 09:33:13 +0100 Subject: [PATCH 68/89] accurately reproducing KI --- cobald_tests/daemon/test_service.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/cobald_tests/daemon/test_service.py b/cobald_tests/daemon/test_service.py index 1a49aa76..b0c3ef85 100644 --- a/cobald_tests/daemon/test_service.py +++ b/cobald_tests/daemon/test_service.py @@ -5,6 +5,8 @@ import asyncio import contextlib import logging +import signal +import os import pytest @@ -38,6 +40,11 @@ async def async_raise(what): raise what +async def async_raise_signal(what): + logging.info(f"signal {what}") + os.kill(os.getpid(), what) + + class TestServiceRunner(object): def test_unique_reaper(self): """Assert that no two runners may fetch services""" @@ -141,5 +148,6 @@ def test_interrupt(self, flavour): """Test that KeyboardInterrupt/^C is graceful shutdown""" runner = ServiceRunner(accept_delay=0.1) runner.adopt(getattr(flavour, "sleep"), 5, flavour=flavour) - runner.adopt(async_raise, KeyboardInterrupt("test_interrupt"), flavour=flavour) + # signal.SIGINT == KeyboardInterrupt + runner.adopt(async_raise_signal, signal.SIGINT, flavour=flavour) runner.accept() From 5ce0f07fb2082669a4882dd26ba56a88d8360c1a Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Fri, 4 Mar 2022 09:37:18 +0100 Subject: [PATCH 69/89] removed a stain from this world --- cobald_tests/daemon/test_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cobald_tests/daemon/test_service.py b/cobald_tests/daemon/test_service.py index b0c3ef85..97c28b7f 100644 --- a/cobald_tests/daemon/test_service.py +++ b/cobald_tests/daemon/test_service.py @@ -147,7 +147,7 @@ def test_error_reporting(self, flavour): def test_interrupt(self, flavour): """Test that KeyboardInterrupt/^C is graceful shutdown""" runner = ServiceRunner(accept_delay=0.1) - runner.adopt(getattr(flavour, "sleep"), 5, flavour=flavour) + runner.adopt(flavour.sleep, 5, flavour=flavour) # signal.SIGINT == KeyboardInterrupt runner.adopt(async_raise_signal, signal.SIGINT, flavour=flavour) runner.accept() From b8fd282fbcf9cb3dc32d1015a61a64ac9245b504 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Fri, 4 Mar 2022 09:42:09 +0100 Subject: [PATCH 70/89] expanded exceptions to log --- src/cobald/daemon/runners/service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cobald/daemon/runners/service.py b/src/cobald/daemon/runners/service.py index 41c2da98..67606b8e 100644 --- a/src/cobald/daemon/runners/service.py +++ b/src/cobald/daemon/runners/service.py @@ -185,7 +185,7 @@ async def _accept_services(self): self._adopt_services() await trio.sleep(delay) delay = min(delay + increase, max_delay) - except Exception: + except BaseException: self._logger.exception("%s aborted", self.__class__.__name__) raise else: From bcce674c23a4d167924ecdba9648d836a2b881e4 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Fri, 4 Mar 2022 09:43:48 +0100 Subject: [PATCH 71/89] removed unused special method --- src/cobald/daemon/runners/meta_runner.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/cobald/daemon/runners/meta_runner.py b/src/cobald/daemon/runners/meta_runner.py index 7c2ba8ac..8df40703 100644 --- a/src/cobald/daemon/runners/meta_runner.py +++ b/src/cobald/daemon/runners/meta_runner.py @@ -38,9 +38,6 @@ def runners(self): ) return self._runners - def __bool__(self): - return any(bool(runner) for runner in self._runners.values()) - def register_payload(self, *payloads, flavour: ModuleType): """Queue one or more payloads for execution after its runner is started""" try: From f776e0209d1aa23d33ad610a2f2ce6ae3afa33f6 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Fri, 4 Mar 2022 10:18:33 +0100 Subject: [PATCH 72/89] removed incomplete compat layer to turn awaitables into coroutines --- src/cobald/daemon/runners/asyncio_runner.py | 15 ++------------- src/cobald/daemon/runners/trio_runner.py | 4 ++-- 2 files changed, 4 insertions(+), 15 deletions(-) diff --git a/src/cobald/daemon/runners/asyncio_runner.py b/src/cobald/daemon/runners/asyncio_runner.py index db2a1bcb..bbdbd3db 100644 --- a/src/cobald/daemon/runners/asyncio_runner.py +++ b/src/cobald/daemon/runners/asyncio_runner.py @@ -5,17 +5,6 @@ from ._compat import asyncio_current_task -def ensure_coroutine(awaitable: Awaitable) -> Coroutine: - """Ensure that ``awaitable`` is a coroutine and wrap it otherwise""" - if isinstance(awaitable, Coroutine): - return awaitable - - async def wrapper(): - return await awaitable - - return wrapper() - - class AsyncioRunner(BaseRunner): """ Runner for coroutines with :py:mod:`asyncio` @@ -38,9 +27,9 @@ def __init__(self, asyncio_loop: asyncio.AbstractEventLoop): def register_payload(self, payload: Callable[[], Awaitable]): self.asyncio_loop.call_soon_threadsafe(self._setup_payload, payload) - def run_payload(self, payload: Callable[[], Awaitable]): + def run_payload(self, payload: Callable[[], Coroutine]): future = asyncio.run_coroutine_threadsafe( - ensure_coroutine(payload()), self.asyncio_loop + payload(), self.asyncio_loop ) return future.result() diff --git a/src/cobald/daemon/runners/trio_runner.py b/src/cobald/daemon/runners/trio_runner.py index e06bece9..e7adafdf 100644 --- a/src/cobald/daemon/runners/trio_runner.py +++ b/src/cobald/daemon/runners/trio_runner.py @@ -1,4 +1,4 @@ -from typing import Optional, Callable, Awaitable +from typing import Optional, Callable, Awaitable, Coroutine import asyncio from functools import partial @@ -44,7 +44,7 @@ def register_payload(self, payload: Callable[[], Awaitable]): self._logger.warning(f"discarding payload {payload} during shutdown") return - def run_payload(self, payload: Callable[[], Awaitable]): + def run_payload(self, payload: Callable[[], Coroutine]): assert self._trio_token is not None and self._submit_tasks is not None return trio.from_thread.run(payload, trio_token=self._trio_token) From 782c6c94ef64628cc0b767728e4c59dfdb1bffef Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Fri, 4 Mar 2022 10:23:27 +0100 Subject: [PATCH 73/89] the future is black --- src/cobald/daemon/runners/asyncio_runner.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/cobald/daemon/runners/asyncio_runner.py b/src/cobald/daemon/runners/asyncio_runner.py index bbdbd3db..9d9d3471 100644 --- a/src/cobald/daemon/runners/asyncio_runner.py +++ b/src/cobald/daemon/runners/asyncio_runner.py @@ -28,9 +28,7 @@ def register_payload(self, payload: Callable[[], Awaitable]): self.asyncio_loop.call_soon_threadsafe(self._setup_payload, payload) def run_payload(self, payload: Callable[[], Coroutine]): - future = asyncio.run_coroutine_threadsafe( - payload(), self.asyncio_loop - ) + future = asyncio.run_coroutine_threadsafe(payload(), self.asyncio_loop) return future.result() def _setup_payload(self, payload: Callable[[], Awaitable]): From 3f9f7d03721f5aac6d60ba492e2a5f3c71748c2a Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Fri, 4 Mar 2022 10:32:20 +0100 Subject: [PATCH 74/89] added background task to failure test --- cobald_tests/daemon/test_service.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cobald_tests/daemon/test_service.py b/cobald_tests/daemon/test_service.py index 97c28b7f..d18220a8 100644 --- a/cobald_tests/daemon/test_service.py +++ b/cobald_tests/daemon/test_service.py @@ -139,6 +139,7 @@ def test_error_reporting(self, flavour): """Test that fatal errors do not pass silently""" # errors should fail the entire runtime runner = ServiceRunner(accept_delay=0.1) + runner.adopt(flavour.sleep, 5, flavour=flavour) runner.adopt(async_raise, LookupError, flavour=flavour) with pytest.raises(RuntimeError): runner.accept() From 422b8f8c159989368dfe1ea58f29ad74c25f3513 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Fri, 4 Mar 2022 11:01:51 +0100 Subject: [PATCH 75/89] handling cancellation as finishing --- src/cobald/daemon/runners/trio_runner.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cobald/daemon/runners/trio_runner.py b/src/cobald/daemon/runners/trio_runner.py index e7adafdf..a5d3662d 100644 --- a/src/cobald/daemon/runners/trio_runner.py +++ b/src/cobald/daemon/runners/trio_runner.py @@ -40,7 +40,7 @@ def register_payload(self, payload: Callable[[], Awaitable]): trio.from_thread.run( self._submit_tasks.send, payload, trio_token=self._trio_token ) - except trio.RunFinishedError: + except (trio.RunFinishedError, trio.Cancelled): self._logger.warning(f"discarding payload {payload} during shutdown") return @@ -88,6 +88,6 @@ async def aclose(self): trio.from_thread.run, self._aclose_trio, trio_token=self._trio_token ), ) - except trio.RunFinishedError: + except (trio.RunFinishedError, trio.Cancelled): # trio already finished in its own thread return From e9e39eca8cc6e4e32bc74cf8f0734134a54b948b Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Fri, 4 Mar 2022 11:12:15 +0100 Subject: [PATCH 76/89] added descriptive names to test threads --- cobald_tests/daemon/test_service.py | 1 + cobald_tests/utility/concurrent/test_meta_runner.py | 5 +++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/cobald_tests/daemon/test_service.py b/cobald_tests/daemon/test_service.py index d18220a8..dc0c1e54 100644 --- a/cobald_tests/daemon/test_service.py +++ b/cobald_tests/daemon/test_service.py @@ -27,6 +27,7 @@ def accept(payload: ServiceRunner, name=None): ) thread.start() if not payload.running.wait(1): + payload.shutdown() raise RuntimeError("%s failed to start" % payload) try: yield diff --git a/cobald_tests/utility/concurrent/test_meta_runner.py b/cobald_tests/utility/concurrent/test_meta_runner.py index b58df80e..c317000a 100644 --- a/cobald_tests/utility/concurrent/test_meta_runner.py +++ b/cobald_tests/utility/concurrent/test_meta_runner.py @@ -20,6 +20,7 @@ def threaded_run(name=None): thread = threading.Thread(target=runner.run, name=name or str(runner), daemon=True) thread.start() if not runner.running.wait(1): + runner.stop() raise RuntimeError("%s failed to start" % runner) try: yield runner @@ -39,7 +40,7 @@ def with_return(): def with_raise(): raise KeyError("expected exception") - with threaded_run() as runner: + with threaded_run("test_run_subroutine") as runner: result = runner.run_payload(with_return, flavour=flavour) assert result == with_return() with pytest.raises(KeyError): @@ -55,7 +56,7 @@ async def with_return(): async def with_raise(): raise KeyError("expected exception") - with threaded_run() as runner: + with threaded_run("test_run_coroutine") as runner: result = runner.run_payload(with_return, flavour=flavour) assert result == trio.run(with_return) with pytest.raises(KeyError): From 37748ad469d606e27687e6fd96dfbb71a5e018c5 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Fri, 4 Mar 2022 11:36:58 +0100 Subject: [PATCH 77/89] asyncio runner also cleans up if tasks remain --- src/cobald/daemon/runners/asyncio_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cobald/daemon/runners/asyncio_runner.py b/src/cobald/daemon/runners/asyncio_runner.py index 9d9d3471..2ff61260 100644 --- a/src/cobald/daemon/runners/asyncio_runner.py +++ b/src/cobald/daemon/runners/asyncio_runner.py @@ -56,7 +56,7 @@ async def manage_payloads(self): raise failure async def aclose(self): - if self._stopped.is_set(): + if self._stopped.is_set() and not self._tasks: return # let the manage task wake up and exit await self._failure_queue.put(None) From 2b74a7e2d9dc70865a2ccca0d46dd847fc6839ab Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Fri, 4 Mar 2022 11:37:14 +0100 Subject: [PATCH 78/89] service runner is cancelled more quietly --- src/cobald/daemon/runners/service.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/cobald/daemon/runners/service.py b/src/cobald/daemon/runners/service.py index 67606b8e..1081cbc8 100644 --- a/src/cobald/daemon/runners/service.py +++ b/src/cobald/daemon/runners/service.py @@ -185,6 +185,8 @@ async def _accept_services(self): self._adopt_services() await trio.sleep(delay) delay = min(delay + increase, max_delay) + except trio.Cancelled: + self._logger.info("%s cancelled", self.__class__.__name__) except BaseException: self._logger.exception("%s aborted", self.__class__.__name__) raise From 2cb23fcb5d94915e46c63ae674ae17f7b0f63e44 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Tue, 22 Mar 2022 21:04:34 +0100 Subject: [PATCH 79/89] Apply suggestions from code review Co-authored-by: Eileen Kuehn --- src/cobald/daemon/runners/meta_runner.py | 8 ++++---- src/cobald/daemon/runners/service.py | 7 ++++--- src/cobald/daemon/runners/thread_runner.py | 8 ++++---- 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/src/cobald/daemon/runners/meta_runner.py b/src/cobald/daemon/runners/meta_runner.py index 8df40703..89cee190 100644 --- a/src/cobald/daemon/runners/meta_runner.py +++ b/src/cobald/daemon/runners/meta_runner.py @@ -57,8 +57,8 @@ def run_payload(self, payload, *, flavour: ModuleType): """ Execute one payload and return its output - This method will block until the payload is completed. To avoid deadlocks, - it is an error to call it during initialisation before the runners are started. + This method will block until the payload is completed. + It is an error to call it during initialisation before the runners are started. """ return self._runners[flavour].run_payload(payload) @@ -88,7 +88,7 @@ async def _manage_runners(self): try: # wait for all runners to either stop gracefully or propagate errors # we only unqueue payloads *while* watching runners as payloads could - # cause the runners to fail – we need to stop unqueueing then as well. + # cause the runners to fail – we need to stop unqueueing them as well. await asyncio.gather(*runner_tasks, self._unqueue_payloads()) except KeyboardInterrupt: # KeyboardInterrupt in a runner task immediately kills the event loop. @@ -116,7 +116,7 @@ async def _launch_runners(self) -> List[asyncio.Task]: async def _unqueue_payloads(self) -> None: """Register payloads once runners are started""" assert self._runners, "runners must be launched before unqueueing" - # runners are started, so re-registering payloads does not them queue again + # runners are started, so re-registering payloads does not queue them again for flavour, queue in self._runner_queues.items(): for payload in queue: self.register_payload(payload, flavour=flavour) diff --git a/src/cobald/daemon/runners/service.py b/src/cobald/daemon/runners/service.py index 1081cbc8..eeccd4d7 100644 --- a/src/cobald/daemon/runners/service.py +++ b/src/cobald/daemon/runners/service.py @@ -111,12 +111,13 @@ class ServiceRunner(object): """ Runner for coroutines, subroutines and services - The service runner provides safe concurrency by tracking concurrent tasks - to prevent silent failures. If any task fails with an exception or provides + The service runner prevents silent failures by tracking concurrent tasks + and therefore provides safer concurrency. + If any task fails with an exception or provides unexpected output values, this is registered as an error; the runner will gracefully shut down all tasks in this case. - In order to provide ``async`` concurrency, the runner also manages common + To provide ``async`` concurrency, the runner also manages common ``async`` event loops and tracks them for failures as well. As a result, ``async`` code should usually use the "current" event loop directly. """ diff --git a/src/cobald/daemon/runners/thread_runner.py b/src/cobald/daemon/runners/thread_runner.py index b0d10caa..87bc919e 100644 --- a/src/cobald/daemon/runners/thread_runner.py +++ b/src/cobald/daemon/runners/thread_runner.py @@ -8,7 +8,7 @@ class ThreadRunner(BaseRunner): """ Runner for subroutines with :py:mod:`threading` - All active payloads are *not* cancelled when the runner is closed. + Active payloads are *not* cancelled when the runner is closed. Only program termination forcefully cancels leftover payloads. """ @@ -28,9 +28,9 @@ def register_payload(self, payload): thread.start() def run_payload(self, payload): - # - run_payload has to block until payload is done - # instead of running payload in a thread and blocking this one, - # we just block this thread by running payload directly + # The method has to block until payload is done. + # Instead of running payload in a thread and blocking this one, + # this thread is blocked by running the payload directly. return payload() def _monitor_payload(self, payload): From 3f96852bfac3f11d6265d4e72f00de8f8909e731 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Tue, 22 Mar 2022 21:15:48 +0100 Subject: [PATCH 80/89] clarified docstrings --- src/cobald/daemon/runners/base_runner.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/cobald/daemon/runners/base_runner.py b/src/cobald/daemon/runners/base_runner.py index 43ba9e88..f38b23a3 100644 --- a/src/cobald/daemon/runners/base_runner.py +++ b/src/cobald/daemon/runners/base_runner.py @@ -30,7 +30,7 @@ def register_payload(self, payload): def run_payload(self, payload): """ - Register ``payload`` for direct execution in a threadsafe manner + Execute ``payload`` and return its result in a threadsafe manner This runs ``payload`` as soon as possible, blocking until completion. Should ``payload`` return or raise anything, it is propagated to the caller. @@ -45,10 +45,13 @@ async def ready(self): async def run(self): """ - Execute all current and future payloads in an `asyncio` task + Execute all current and future payloads in an `asyncio` coroutine - Blocks and executes payloads until :py:meth:`stop` is called. - It is an error for any orphaned payload to return or raise. + This method will continuously execute payloads sent to the runner. + It only returns when :py:meth:`stop` is called + or if any orphaned payload return or raise. + In the latter case, :py:exc:`~.OrphanedReturn` or the raised exception + is re-raised by this method. Implementations should override :py:meth:`~.manage_payloads` to customize their specific parts. From d657959f0568c13b11aeafae7ff358faea3347c0 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Tue, 22 Mar 2022 21:24:31 +0100 Subject: [PATCH 81/89] enforced and documented method overrides in subclasses --- src/cobald/daemon/runners/base_runner.py | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/src/cobald/daemon/runners/base_runner.py b/src/cobald/daemon/runners/base_runner.py index f38b23a3..50b264d5 100644 --- a/src/cobald/daemon/runners/base_runner.py +++ b/src/cobald/daemon/runners/base_runner.py @@ -1,12 +1,13 @@ +from typing import Any +from abc import abstractmethod, ABCMeta import asyncio import logging import threading -from typing import Any from ..debug import NameRepr -class BaseRunner(object): +class BaseRunner(metaclass=ABCMeta): """Concurrency backend on top of `asyncio`""" flavour = None # type: Any @@ -19,6 +20,7 @@ def __init__(self, asyncio_loop: asyncio.AbstractEventLoop): self._stopped = threading.Event() self._stopped.set() + @abstractmethod def register_payload(self, payload): """ Register ``payload`` for background execution in a threadsafe manner @@ -28,6 +30,7 @@ def register_payload(self, payload): """ raise NotImplementedError + @abstractmethod def run_payload(self, payload): """ Execute ``payload`` and return its result in a threadsafe manner @@ -42,6 +45,9 @@ async def ready(self): assert ( not self._stopped.is_set() ), "runner must be .run before waiting until it is ready" + # Most runners are ready when instantiated, simply queueing payloads + # until they get a chance to run them. Only override this method when + # the runner has to do some `async` setup before being ready. async def run(self): """ @@ -71,11 +77,23 @@ async def run(self): finally: self._stopped.set() + @abstractmethod async def manage_payloads(self): + """ + Implementation of managing payloads when :py:meth:`~.run` + + This method must continuously execute payloads sent to the runner. + It may only return when :py:meth:`stop` is called + or if any orphaned payload return or raise. + In the latter case, :py:exc:`~.OrphanedReturn` or the raised exception + must re-raised by this method. + """ raise NotImplementedError + @abstractmethod async def aclose(self): """Shut down this runner""" + raise NotImplementedError def stop(self): """Stop execution of all current and future payloads and block until success""" From da0bfe5249ff65d4c132f33f230614bd7759cb6a Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Tue, 22 Mar 2022 22:04:08 +0100 Subject: [PATCH 82/89] documented internal running meaning --- src/cobald/daemon/runners/meta_runner.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/cobald/daemon/runners/meta_runner.py b/src/cobald/daemon/runners/meta_runner.py index 89cee190..bc7812b0 100644 --- a/src/cobald/daemon/runners/meta_runner.py +++ b/src/cobald/daemon/runners/meta_runner.py @@ -115,7 +115,9 @@ async def _launch_runners(self) -> List[asyncio.Task]: async def _unqueue_payloads(self) -> None: """Register payloads once runners are started""" - assert self._runners, "runners must be launched before unqueueing" + # Unqueue when we are running so that payloads do not get requeued. + # This also provides checking that the queued flavours correspond to a runner. + assert self.running.is_set(), "runners must be launched before unqueueing" # runners are started, so re-registering payloads does not queue them again for flavour, queue in self._runner_queues.items(): for payload in queue: From 087c08903d61f3be6789810bbdab213f5cc6ec11 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Tue, 22 Mar 2022 22:04:25 +0100 Subject: [PATCH 83/89] directly applying queued payloads --- src/cobald/daemon/runners/meta_runner.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/cobald/daemon/runners/meta_runner.py b/src/cobald/daemon/runners/meta_runner.py index bc7812b0..7ebad5bc 100644 --- a/src/cobald/daemon/runners/meta_runner.py +++ b/src/cobald/daemon/runners/meta_runner.py @@ -120,9 +120,7 @@ async def _unqueue_payloads(self) -> None: assert self.running.is_set(), "runners must be launched before unqueueing" # runners are started, so re-registering payloads does not queue them again for flavour, queue in self._runner_queues.items(): - for payload in queue: - self.register_payload(payload, flavour=flavour) - await asyncio.sleep(0) + self.register_payload(*queue, flavour=flavour) queue.clear() self._runner_queues.clear() From 5e9be46fef4082304c618ace5dae7d1341aeac4e Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Tue, 22 Mar 2022 22:13:44 +0100 Subject: [PATCH 84/89] transmit failures via Future --- src/cobald/daemon/runners/asyncio_runner.py | 12 ++++++------ src/cobald/daemon/runners/thread_runner.py | 15 +++++++++------ 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/src/cobald/daemon/runners/asyncio_runner.py b/src/cobald/daemon/runners/asyncio_runner.py index 2ff61260..10b94a49 100644 --- a/src/cobald/daemon/runners/asyncio_runner.py +++ b/src/cobald/daemon/runners/asyncio_runner.py @@ -22,7 +22,7 @@ class AsyncioRunner(BaseRunner): def __init__(self, asyncio_loop: asyncio.AbstractEventLoop): super().__init__(asyncio_loop) self._tasks = set() - self._failure_queue = asyncio.Queue() + self._payload_failure = asyncio_loop.create_future() def register_payload(self, payload: Callable[[], Awaitable]): self.asyncio_loop.call_soon_threadsafe(self._setup_payload, payload) @@ -48,18 +48,18 @@ async def _monitor_payload(self, payload: Callable[[], Awaitable]): failure = OrphanedReturn(payload, result) finally: self._tasks.discard(asyncio_current_task()) - await self._failure_queue.put(failure) + if not self._payload_failure.done(): + self._payload_failure.set_exception(failure) async def manage_payloads(self): - failure = await self._failure_queue.get() - if failure is not None: - raise failure + await self._payload_failure async def aclose(self): if self._stopped.is_set() and not self._tasks: return # let the manage task wake up and exit - await self._failure_queue.put(None) + if not self._payload_failure.done(): + self._payload_failure.set_result(None) while self._tasks: for task in self._tasks.copy(): if task.done(): diff --git a/src/cobald/daemon/runners/thread_runner.py b/src/cobald/daemon/runners/thread_runner.py index 87bc919e..1cf3dc84 100644 --- a/src/cobald/daemon/runners/thread_runner.py +++ b/src/cobald/daemon/runners/thread_runner.py @@ -19,7 +19,7 @@ class ThreadRunner(BaseRunner): # are pushed to a queue from which the main task re-raises. def __init__(self, asyncio_loop: asyncio.AbstractEventLoop): super().__init__(asyncio_loop) - self._failure_queue = asyncio.Queue() + self._payload_failure = asyncio_loop.create_future() def register_payload(self, payload): thread = threading.Thread( @@ -42,14 +42,17 @@ def _monitor_payload(self, payload): if result is None: return failure = OrphanedReturn(payload, result) - self.asyncio_loop.call_soon_threadsafe(self._failure_queue.put_nowait, failure) + self.asyncio_loop.call_soon_threadsafe(self._set_failure, failure) + + def _set_failure(self, failure: BaseException): + if not self._payload_failure.done(): + self._payload_failure.set_exception(failure) async def manage_payloads(self): - failure = await self._failure_queue.get() - if failure is not None: - raise failure + await self._payload_failure async def aclose(self): if self._stopped.is_set(): return - await self._failure_queue.put(None) + if not self._payload_failure.done(): + self._payload_failure.set_result(None) From c0faa08c9990926e23cd4f47fc2a9b8fe0d883f6 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Tue, 22 Mar 2022 22:38:53 +0100 Subject: [PATCH 85/89] pulled wrapper into class --- src/cobald/daemon/runners/trio_runner.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/src/cobald/daemon/runners/trio_runner.py b/src/cobald/daemon/runners/trio_runner.py index a5d3662d..6f94e048 100644 --- a/src/cobald/daemon/runners/trio_runner.py +++ b/src/cobald/daemon/runners/trio_runner.py @@ -7,13 +7,6 @@ from .base_runner import BaseRunner, OrphanedReturn -async def raise_return(payload: Callable[[], Awaitable]) -> None: - """Wrapper to raise exception on unhandled return values""" - value = await payload() - if value is not None: - raise OrphanedReturn(payload, value) - - class TrioRunner(BaseRunner): """ Runner for coroutines with :py:mod:`trio` @@ -69,10 +62,16 @@ async def _manage_payloads_trio(self): self.asyncio_loop.call_soon_threadsafe(self._ready.set) async with trio.open_nursery() as nursery: async for task in receive_tasks: - nursery.start_soon(raise_return, task) + nursery.start_soon(self._monitor_payload, task) # shutting down: cancel the scope to cancel all payloads nursery.cancel_scope.cancel() + async def _monitor_payload(self, payload: Callable[[], Awaitable]): + """Wrapper for awaitables and to raise exception on unhandled return values""" + value = await payload() + if value is not None: + raise OrphanedReturn(payload, value) + async def _aclose_trio(self): await self._submit_tasks.aclose() From 825665bcf4bc27d9693d8fc945f1c73edec38c26 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Tue, 22 Mar 2022 22:48:30 +0100 Subject: [PATCH 86/89] testing thread service behaviour --- cobald_tests/daemon/test_service.py | 42 ++++++++++++++++++++++------- 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/cobald_tests/daemon/test_service.py b/cobald_tests/daemon/test_service.py index dc0c1e54..c9642f7d 100644 --- a/cobald_tests/daemon/test_service.py +++ b/cobald_tests/daemon/test_service.py @@ -36,16 +36,24 @@ def accept(payload: ServiceRunner, name=None): thread.join() -async def async_raise(what): +def sync_raise(what): logging.info(f"raising {what}") raise what -async def async_raise_signal(what): +async def async_raise(what): + sync_raise(what) + + +def sync_raise_signal(what): logging.info(f"signal {what}") os.kill(os.getpid(), what) +async def async_raise_signal(what): + sync_raise_signal(what) + + class TestServiceRunner(object): def test_unique_reaper(self): """Assert that no two runners may fetch services""" @@ -135,21 +143,35 @@ async def co_pingpong(what=default): else: assert len(reply_store) == 9 - @pytest.mark.parametrize("flavour", (asyncio, trio)) - def test_error_reporting(self, flavour): + @pytest.mark.parametrize( + "flavour, do_sleep, do_raise", + ( + (asyncio, asyncio.sleep, async_raise), + (trio, trio.sleep, async_raise), + (threading, time.sleep, sync_raise), + ), + ) + def test_error_reporting(self, flavour, do_sleep, do_raise): """Test that fatal errors do not pass silently""" # errors should fail the entire runtime runner = ServiceRunner(accept_delay=0.1) - runner.adopt(flavour.sleep, 5, flavour=flavour) - runner.adopt(async_raise, LookupError, flavour=flavour) + runner.adopt(do_sleep, 5, flavour=flavour) + runner.adopt(do_raise, LookupError, flavour=flavour) with pytest.raises(RuntimeError): runner.accept() - @pytest.mark.parametrize("flavour", (asyncio, trio)) - def test_interrupt(self, flavour): + @pytest.mark.parametrize( + "flavour, do_sleep, do_raise", + ( + (asyncio, asyncio.sleep, async_raise_signal), + (trio, trio.sleep, async_raise_signal), + (threading, time.sleep, sync_raise_signal), + ), + ) + def test_interrupt(self, flavour, do_sleep, do_raise): """Test that KeyboardInterrupt/^C is graceful shutdown""" runner = ServiceRunner(accept_delay=0.1) - runner.adopt(flavour.sleep, 5, flavour=flavour) + runner.adopt(do_sleep, 5, flavour=flavour) # signal.SIGINT == KeyboardInterrupt - runner.adopt(async_raise_signal, signal.SIGINT, flavour=flavour) + runner.adopt(do_raise, signal.SIGINT, flavour=flavour) runner.accept() From 2ac054b8a5fe961b7d5f0cbdbd23ba048dcd643b Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Tue, 22 Mar 2022 23:17:23 +0100 Subject: [PATCH 87/89] removed KI indirection in asyncio runner --- src/cobald/daemon/runners/asyncio_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cobald/daemon/runners/asyncio_runner.py b/src/cobald/daemon/runners/asyncio_runner.py index 10b94a49..a63e000b 100644 --- a/src/cobald/daemon/runners/asyncio_runner.py +++ b/src/cobald/daemon/runners/asyncio_runner.py @@ -38,7 +38,7 @@ def _setup_payload(self, payload: Callable[[], Awaitable]): async def _monitor_payload(self, payload: Callable[[], Awaitable]): try: result = await payload() - except asyncio.CancelledError: + except (asyncio.CancelledError, KeyboardInterrupt): raise except BaseException as e: failure = e From 6d7c46a27999da94e3b8719a0ef3dc261189fa04 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Tue, 22 Mar 2022 23:27:16 +0100 Subject: [PATCH 88/89] suppress trio cancellation during asyncio shutdown --- src/cobald/daemon/runners/trio_runner.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/cobald/daemon/runners/trio_runner.py b/src/cobald/daemon/runners/trio_runner.py index 6f94e048..b5d41587 100644 --- a/src/cobald/daemon/runners/trio_runner.py +++ b/src/cobald/daemon/runners/trio_runner.py @@ -73,7 +73,11 @@ async def _monitor_payload(self, payload: Callable[[], Awaitable]): raise OrphanedReturn(payload, value) async def _aclose_trio(self): - await self._submit_tasks.aclose() + # suppress trio cancellation to avoid raising an error in aclose + try: + await self._submit_tasks.aclose() + except trio.Cancelled: + pass async def aclose(self): if self._stopped.is_set(): From 68b83699e66f4e08d6a4046fd5f4f9513d8ade65 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Thu, 24 Mar 2022 21:30:17 +0100 Subject: [PATCH 89/89] Apply suggestions from review Co-authored-by: Eileen Kuehn --- src/cobald/daemon/runners/base_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cobald/daemon/runners/base_runner.py b/src/cobald/daemon/runners/base_runner.py index 50b264d5..825df75f 100644 --- a/src/cobald/daemon/runners/base_runner.py +++ b/src/cobald/daemon/runners/base_runner.py @@ -55,7 +55,7 @@ async def run(self): This method will continuously execute payloads sent to the runner. It only returns when :py:meth:`stop` is called - or if any orphaned payload return or raise. + or if any orphaned payload returns or raises. In the latter case, :py:exc:`~.OrphanedReturn` or the raised exception is re-raised by this method.