diff --git a/piker/accounting/cli.py b/piker/accounting/cli.py index 16712c8cf..7e68ce6f9 100644 --- a/piker/accounting/cli.py +++ b/piker/accounting/cli.py @@ -79,7 +79,7 @@ def broker_init( # enabled.append('piker.data.feed') # non-blocking setup of brokerd service nursery - from ..data import _setup_persistent_brokerd + from ..brokers import _setup_persistent_brokerd return ( start_actor_kwargs, # to `ActorNursery.start_actor()` diff --git a/piker/brokers/_daemon.py b/piker/brokers/_daemon.py new file mode 100644 index 000000000..8a81b1d67 --- /dev/null +++ b/piker/brokers/_daemon.py @@ -0,0 +1,169 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. + +''' +Broker-daemon-actor "endpoint-hooks": the service task entry points for +``brokerd``. + +''' +from contextlib import ( + asynccontextmanager as acm, +) + +import tractor +import trio + +from . import _util +from . import get_brokermod + +# `brokerd` enabled modules +# TODO: move this def to the `.data` subpkg.. +# NOTE: keeping this list as small as possible is part of our caps-sec +# model and should be treated with utmost care! +_data_mods = [ + 'piker.brokers.core', + 'piker.brokers.data', + 'piker.brokers._daemon', + 'piker.data', + 'piker.data.feed', + 'piker.data._sampling' +] + + +# TODO: we should rename the daemon to datad prolly once we split up +# broker vs. data tasks into separate actors? +@tractor.context +async def _setup_persistent_brokerd( + ctx: tractor.Context, + brokername: str, + loglevel: str | None = None, + +) -> None: + ''' + Allocate a actor-wide service nursery in ``brokerd`` + such that feeds can be run in the background persistently by + the broker backend as needed. + + ''' + log = _util.get_console_log( + loglevel or tractor.current_actor().loglevel, + name=f'{_util.subsys}.{brokername}', + ) + # set global for this actor to this new process-wide instance B) + _util.log = log + + from piker.data.feed import ( + _bus, + get_feed_bus, + ) + global _bus + assert not _bus + + async with trio.open_nursery() as service_nursery: + # assign a nursery to the feeds bus for spawning + # background tasks from clients + get_feed_bus(brokername, service_nursery) + + # unblock caller + await ctx.started() + + # we pin this task to keep the feeds manager active until the + # parent actor decides to tear it down + await trio.sleep_forever() + + +async def spawn_brokerd( + + brokername: str, + loglevel: str | None = None, + + **tractor_kwargs, + +) -> bool: + + from piker.service import Services + from piker.service._util import log # use service mngr log + + log.info(f'Spawning {brokername} broker daemon') + + brokermod = get_brokermod(brokername) + dname = f'brokerd.{brokername}' + + extra_tractor_kwargs = getattr(brokermod, '_spawn_kwargs', {}) + tractor_kwargs.update(extra_tractor_kwargs) + + # ask `pikerd` to spawn a new sub-actor and manage it under its + # actor nursery + modpath = brokermod.__name__ + broker_enable = [modpath] + for submodname in getattr( + brokermod, + '__enable_modules__', + [], + ): + subpath = f'{modpath}.{submodname}' + broker_enable.append(subpath) + + portal = await Services.actor_n.start_actor( + dname, + enable_modules=_data_mods + broker_enable, + loglevel=loglevel, + debug_mode=Services.debug_mode, + **tractor_kwargs + ) + + # non-blocking setup of brokerd service nursery + await Services.start_service_task( + dname, + portal, + + # signature of target root-task endpoint + _setup_persistent_brokerd, + brokername=brokername, + loglevel=loglevel, + ) + return True + + +@acm +async def maybe_spawn_brokerd( + + brokername: str, + loglevel: str | None = None, + + **pikerd_kwargs, + +) -> tractor.Portal: + ''' + Helper to spawn a brokerd service *from* a client + who wishes to use the sub-actor-daemon. + + ''' + from piker.service import maybe_spawn_daemon + + async with maybe_spawn_daemon( + + f'brokerd.{brokername}', + service_task_target=spawn_brokerd, + spawn_args={ + 'brokername': brokername, + }, + loglevel=loglevel, + + **pikerd_kwargs, + + ) as portal: + yield portal diff --git a/piker/brokers/_util.py b/piker/brokers/_util.py index ba1231565..7e7a3ec7d 100644 --- a/piker/brokers/_util.py +++ b/piker/brokers/_util.py @@ -31,6 +31,7 @@ ) subsys: str = 'piker.brokers' +# NOTE: level should be reset by any actor that is spawned log = get_logger(subsys) get_console_log = partial( diff --git a/piker/data/__init__.py b/piker/data/__init__.py index 37da54b08..ba6af4caf 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -50,40 +50,3 @@ 'open_shm_array', 'get_shm_token', ] - - -@tractor.context -async def _setup_persistent_brokerd( - ctx: tractor.Context, - brokername: str, - loglevel: str | None = None, - -) -> None: - ''' - Allocate a actor-wide service nursery in ``brokerd`` - such that feeds can be run in the background persistently by - the broker backend as needed. - - ''' - get_console_log( - loglevel or tractor.current_actor().loglevel, - ) - - from .feed import ( - _bus, - get_feed_bus, - ) - global _bus - assert not _bus - - async with trio.open_nursery() as service_nursery: - # assign a nursery to the feeds bus for spawning - # background tasks from clients - get_feed_bus(brokername, service_nursery) - - # unblock caller - await ctx.started() - - # we pin this task to keep the feeds manager active until the - # parent actor decides to tear it down - await trio.sleep_forever() diff --git a/piker/service/__init__.py b/piker/service/__init__.py index a885bc399..e6a17da09 100644 --- a/piker/service/__init__.py +++ b/piker/service/__init__.py @@ -20,7 +20,6 @@ """ from __future__ import annotations -from ._util import log from ._mngr import Services from ._registry import ( # noqa _tractor_kwargs, @@ -33,8 +32,6 @@ ) from ._daemon import ( # noqa maybe_spawn_daemon, - spawn_brokerd, - maybe_spawn_brokerd, spawn_emsd, maybe_open_emsd, ) @@ -44,6 +41,10 @@ open_pikerd, get_tractor_runtime_kwargs, ) +from ..brokers._daemon import ( + spawn_brokerd, + maybe_spawn_brokerd, +) __all__ = [ diff --git a/piker/service/_actor_runtime.py b/piker/service/_actor_runtime.py index ea7399fa8..ec14dbf9a 100644 --- a/piker/service/_actor_runtime.py +++ b/piker/service/_actor_runtime.py @@ -133,8 +133,11 @@ async def open_piker_runtime( _root_modules = [ __name__, 'piker.service._daemon', + 'piker.brokers._daemon', + 'piker.clearing._ems', 'piker.clearing._client', + 'piker.data._sampling', ] diff --git a/piker/service/_daemon.py b/piker/service/_daemon.py index ba1a467a6..df94a9927 100644 --- a/piker/service/_daemon.py +++ b/piker/service/_daemon.py @@ -32,25 +32,12 @@ from ._util import ( log, # sub-sys logger ) -from ..brokers import get_brokermod from ._mngr import ( Services, ) from ._actor_runtime import maybe_open_pikerd from ._registry import find_service -# `brokerd` enabled modules -# TODO: move this def to the `.data` subpkg.. -# NOTE: keeping this list as small as possible is part of our caps-sec -# model and should be treated with utmost care! -_data_mods = [ - 'piker.brokers.core', - 'piker.brokers.data', - 'piker.data', - 'piker.data.feed', - 'piker.data._sampling' -] - @acm async def maybe_spawn_daemon( @@ -145,87 +132,6 @@ async def maybe_spawn_daemon( await portal.cancel_actor() -async def spawn_brokerd( - - brokername: str, - loglevel: str | None = None, - - **tractor_kwargs, - -) -> bool: - - log.info(f'Spawning {brokername} broker daemon') - - brokermod = get_brokermod(brokername) - dname = f'brokerd.{brokername}' - - extra_tractor_kwargs = getattr(brokermod, '_spawn_kwargs', {}) - tractor_kwargs.update(extra_tractor_kwargs) - - # ask `pikerd` to spawn a new sub-actor and manage it under its - # actor nursery - modpath = brokermod.__name__ - broker_enable = [modpath] - for submodname in getattr( - brokermod, - '__enable_modules__', - [], - ): - subpath = f'{modpath}.{submodname}' - broker_enable.append(subpath) - - portal = await Services.actor_n.start_actor( - dname, - enable_modules=_data_mods + broker_enable, - loglevel=loglevel, - debug_mode=Services.debug_mode, - **tractor_kwargs - ) - - # non-blocking setup of brokerd service nursery - from ..data import _setup_persistent_brokerd - - await Services.start_service_task( - dname, - portal, - - # signature of target root-task endpoint - _setup_persistent_brokerd, - brokername=brokername, - loglevel=loglevel, - ) - return True - - -@acm -async def maybe_spawn_brokerd( - - brokername: str, - loglevel: str | None = None, - - **pikerd_kwargs, - -) -> tractor.Portal: - ''' - Helper to spawn a brokerd service *from* a client - who wishes to use the sub-actor-daemon. - - ''' - async with maybe_spawn_daemon( - - f'brokerd.{brokername}', - service_task_target=spawn_brokerd, - spawn_args={ - 'brokername': brokername, - }, - loglevel=loglevel, - - **pikerd_kwargs, - - ) as portal: - yield portal - - async def spawn_emsd( loglevel: str | None = None, diff --git a/piker/service/elastic.py b/piker/service/elastic.py index 71097dcb0..6714a9ecb 100644 --- a/piker/service/elastic.py +++ b/piker/service/elastic.py @@ -26,7 +26,7 @@ import docker from ._ahab import DockerContainer -from . import log # sub-sys logger +from ._util import log # sub-sys logger from ._util import ( get_console_log, )