Skip to content

Commit

Permalink
Try having brokerd eps defined in .brokers._daemon
Browse files Browse the repository at this point in the history
Since it's a bit weird having service specific implementation details
inside the general service `._daemon` mod, and since i'd mentioned
trying this re-org; let's do it B)

Requires enabling the new mod in both `pikerd` and `brokerd` and
obviously a bit more runtime-loading of the service modules in the
`brokerd` service eps to avoid import cycles.

Also moved `_setup_persistent_brokerd()` into the new mod since the
naming would place it there even though the implementation really
wouldn't (longer run) since we want to split up `.data.feed` layer
backend-invoked eps into a separate actor eventually from the "actual"
`brokerd` which will be the actor running **only** the trade control eps
(eg. trades_dialogue()` and friends).
  • Loading branch information
goodboy committed May 9, 2023
1 parent 0900d51 commit 81ea1c8
Show file tree
Hide file tree
Showing 8 changed files with 179 additions and 136 deletions.
2 changes: 1 addition & 1 deletion piker/accounting/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def broker_init(
# enabled.append('piker.data.feed')

# non-blocking setup of brokerd service nursery
from ..data import _setup_persistent_brokerd
from ..brokers import _setup_persistent_brokerd

return (
start_actor_kwargs, # to `ActorNursery.start_actor()`
Expand Down
169 changes: 169 additions & 0 deletions piker/brokers/_daemon.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

'''
Broker-daemon-actor "endpoint-hooks": the service task entry points for
``brokerd``.
'''
from contextlib import (
asynccontextmanager as acm,
)

import tractor
import trio

from . import _util
from . import get_brokermod

# `brokerd` enabled modules
# TODO: move this def to the `.data` subpkg..
# NOTE: keeping this list as small as possible is part of our caps-sec
# model and should be treated with utmost care!
_data_mods = [
'piker.brokers.core',
'piker.brokers.data',
'piker.brokers._daemon',
'piker.data',
'piker.data.feed',
'piker.data._sampling'
]


# TODO: we should rename the daemon to datad prolly once we split up
# broker vs. data tasks into separate actors?
@tractor.context
async def _setup_persistent_brokerd(
ctx: tractor.Context,
brokername: str,
loglevel: str | None = None,

) -> None:
'''
Allocate a actor-wide service nursery in ``brokerd``
such that feeds can be run in the background persistently by
the broker backend as needed.
'''
log = _util.get_console_log(
loglevel or tractor.current_actor().loglevel,
name=f'{_util.subsys}.{brokername}',
)
# set global for this actor to this new process-wide instance B)
_util.log = log

from piker.data.feed import (
_bus,
get_feed_bus,
)
global _bus
assert not _bus

async with trio.open_nursery() as service_nursery:
# assign a nursery to the feeds bus for spawning
# background tasks from clients
get_feed_bus(brokername, service_nursery)

# unblock caller
await ctx.started()

# we pin this task to keep the feeds manager active until the
# parent actor decides to tear it down
await trio.sleep_forever()


async def spawn_brokerd(

brokername: str,
loglevel: str | None = None,

**tractor_kwargs,

) -> bool:

from piker.service import Services
from piker.service._util import log # use service mngr log

log.info(f'Spawning {brokername} broker daemon')

brokermod = get_brokermod(brokername)
dname = f'brokerd.{brokername}'

extra_tractor_kwargs = getattr(brokermod, '_spawn_kwargs', {})
tractor_kwargs.update(extra_tractor_kwargs)

# ask `pikerd` to spawn a new sub-actor and manage it under its
# actor nursery
modpath = brokermod.__name__
broker_enable = [modpath]
for submodname in getattr(
brokermod,
'__enable_modules__',
[],
):
subpath = f'{modpath}.{submodname}'
broker_enable.append(subpath)

portal = await Services.actor_n.start_actor(
dname,
enable_modules=_data_mods + broker_enable,
loglevel=loglevel,
debug_mode=Services.debug_mode,
**tractor_kwargs
)

# non-blocking setup of brokerd service nursery
await Services.start_service_task(
dname,
portal,

# signature of target root-task endpoint
_setup_persistent_brokerd,
brokername=brokername,
loglevel=loglevel,
)
return True


@acm
async def maybe_spawn_brokerd(

brokername: str,
loglevel: str | None = None,

**pikerd_kwargs,

) -> tractor.Portal:
'''
Helper to spawn a brokerd service *from* a client
who wishes to use the sub-actor-daemon.
'''
from piker.service import maybe_spawn_daemon

async with maybe_spawn_daemon(

f'brokerd.{brokername}',
service_task_target=spawn_brokerd,
spawn_args={
'brokername': brokername,
},
loglevel=loglevel,

**pikerd_kwargs,

) as portal:
yield portal
1 change: 1 addition & 0 deletions piker/brokers/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
)
subsys: str = 'piker.brokers'

# NOTE: level should be reset by any actor that is spawned
log = get_logger(subsys)

get_console_log = partial(
Expand Down
37 changes: 0 additions & 37 deletions piker/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,40 +50,3 @@
'open_shm_array',
'get_shm_token',
]


@tractor.context
async def _setup_persistent_brokerd(
ctx: tractor.Context,
brokername: str,
loglevel: str | None = None,

) -> None:
'''
Allocate a actor-wide service nursery in ``brokerd``
such that feeds can be run in the background persistently by
the broker backend as needed.
'''
get_console_log(
loglevel or tractor.current_actor().loglevel,
)

from .feed import (
_bus,
get_feed_bus,
)
global _bus
assert not _bus

async with trio.open_nursery() as service_nursery:
# assign a nursery to the feeds bus for spawning
# background tasks from clients
get_feed_bus(brokername, service_nursery)

# unblock caller
await ctx.started()

# we pin this task to keep the feeds manager active until the
# parent actor decides to tear it down
await trio.sleep_forever()
7 changes: 4 additions & 3 deletions piker/service/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
"""
from __future__ import annotations

from ._util import log
from ._mngr import Services
from ._registry import ( # noqa
_tractor_kwargs,
Expand All @@ -33,8 +32,6 @@
)
from ._daemon import ( # noqa
maybe_spawn_daemon,
spawn_brokerd,
maybe_spawn_brokerd,
spawn_emsd,
maybe_open_emsd,
)
Expand All @@ -44,6 +41,10 @@
open_pikerd,
get_tractor_runtime_kwargs,
)
from ..brokers._daemon import (
spawn_brokerd,
maybe_spawn_brokerd,
)


__all__ = [
Expand Down
3 changes: 3 additions & 0 deletions piker/service/_actor_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,11 @@ async def open_piker_runtime(
_root_modules = [
__name__,
'piker.service._daemon',
'piker.brokers._daemon',

'piker.clearing._ems',
'piker.clearing._client',

'piker.data._sampling',
]

Expand Down
94 changes: 0 additions & 94 deletions piker/service/_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,12 @@
from ._util import (
log, # sub-sys logger
)
from ..brokers import get_brokermod
from ._mngr import (
Services,
)
from ._actor_runtime import maybe_open_pikerd
from ._registry import find_service

# `brokerd` enabled modules
# TODO: move this def to the `.data` subpkg..
# NOTE: keeping this list as small as possible is part of our caps-sec
# model and should be treated with utmost care!
_data_mods = [
'piker.brokers.core',
'piker.brokers.data',
'piker.data',
'piker.data.feed',
'piker.data._sampling'
]


@acm
async def maybe_spawn_daemon(
Expand Down Expand Up @@ -145,87 +132,6 @@ async def maybe_spawn_daemon(
await portal.cancel_actor()


async def spawn_brokerd(

brokername: str,
loglevel: str | None = None,

**tractor_kwargs,

) -> bool:

log.info(f'Spawning {brokername} broker daemon')

brokermod = get_brokermod(brokername)
dname = f'brokerd.{brokername}'

extra_tractor_kwargs = getattr(brokermod, '_spawn_kwargs', {})
tractor_kwargs.update(extra_tractor_kwargs)

# ask `pikerd` to spawn a new sub-actor and manage it under its
# actor nursery
modpath = brokermod.__name__
broker_enable = [modpath]
for submodname in getattr(
brokermod,
'__enable_modules__',
[],
):
subpath = f'{modpath}.{submodname}'
broker_enable.append(subpath)

portal = await Services.actor_n.start_actor(
dname,
enable_modules=_data_mods + broker_enable,
loglevel=loglevel,
debug_mode=Services.debug_mode,
**tractor_kwargs
)

# non-blocking setup of brokerd service nursery
from ..data import _setup_persistent_brokerd

await Services.start_service_task(
dname,
portal,

# signature of target root-task endpoint
_setup_persistent_brokerd,
brokername=brokername,
loglevel=loglevel,
)
return True


@acm
async def maybe_spawn_brokerd(

brokername: str,
loglevel: str | None = None,

**pikerd_kwargs,

) -> tractor.Portal:
'''
Helper to spawn a brokerd service *from* a client
who wishes to use the sub-actor-daemon.
'''
async with maybe_spawn_daemon(

f'brokerd.{brokername}',
service_task_target=spawn_brokerd,
spawn_args={
'brokername': brokername,
},
loglevel=loglevel,

**pikerd_kwargs,

) as portal:
yield portal


async def spawn_emsd(

loglevel: str | None = None,
Expand Down
2 changes: 1 addition & 1 deletion piker/service/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import docker
from ._ahab import DockerContainer

from . import log # sub-sys logger
from ._util import log # sub-sys logger
from ._util import (
get_console_log,
)
Expand Down

0 comments on commit 81ea1c8

Please sign in to comment.