Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feed fixes #167

Merged
merged 7 commits into from
Apr 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 39 additions & 14 deletions piker/_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
class Services(BaseModel):
actor_n: tractor._trionics.ActorNursery
service_n: trio.Nursery
debug_mode: bool # tractor sub-actor debug mode flag

class Config:
arbitrary_types_allowed = True
Expand All @@ -53,10 +54,16 @@ class Config:

@asynccontextmanager
async def open_pikerd(
start_method: str = 'trio',
loglevel: Optional[str] = None,
**kwargs,

# XXX: you should pretty much never want debug mode
# for data daemons when running in production.
debug_mode: bool = False,

) -> Optional[tractor._portal.Portal]:
"""Start a root piker daemon who's lifetime extends indefinitely
"""
Start a root piker daemon who's lifetime extends indefinitely
until cancelled.

A root actor nursery is created which can be used to create and keep
Expand All @@ -71,18 +78,23 @@ async def open_pikerd(
# passed through to ``open_root_actor``
name=_root_dname,
loglevel=loglevel,
debug_mode=debug_mode,
start_method=start_method,

# TODO: eventually we should be able to avoid
# having the root have more then permissions to
# spawn other specialized daemons I think?
# enable_modules=[__name__],
enable_modules=_root_modules,

) as _, tractor.open_nursery() as actor_nursery:
async with trio.open_nursery() as service_nursery:

# assign globally for future daemon/task creation
_services = Services(
actor_n=actor_nursery,
service_n=service_nursery
service_n=service_nursery,
debug_mode=debug_mode,
)

yield _services
Expand All @@ -93,6 +105,10 @@ async def maybe_open_runtime(
loglevel: Optional[str] = None,
**kwargs,
) -> None:
"""
Start the ``tractor`` runtime (a root actor) if none exists.

"""
if not tractor.current_actor(err_on_no_runtime=False):
async with tractor.open_root_actor(loglevel=loglevel, **kwargs):
yield
Expand Down Expand Up @@ -123,8 +139,7 @@ async def maybe_open_pikerd(

# presume pikerd role
async with open_pikerd(
loglevel,
**kwargs,
loglevel=loglevel,
) as _:
# in the case where we're starting up the
# tractor-piker runtime stack in **this** process
Expand All @@ -137,14 +152,17 @@ async def maybe_open_pikerd(
'piker.brokers.core',
'piker.brokers.data',
'piker.data',
'piker.data.feed',
'piker.data._sampling'
]


async def spawn_brokerd(
brokername,

brokername: str,
loglevel: Optional[str] = None,
**tractor_kwargs
**tractor_kwargs,

) -> tractor._portal.Portal:

from .data import _setup_persistent_brokerd
Expand All @@ -164,6 +182,7 @@ async def spawn_brokerd(
dname,
enable_modules=_data_mods + [brokermod.__name__],
loglevel=loglevel,
debug_mode=_services.debug_mode,
**tractor_kwargs
)

Expand All @@ -187,14 +206,14 @@ async def spawn_brokerd(

@asynccontextmanager
async def maybe_spawn_brokerd(

brokername: str,
loglevel: Optional[str] = None,
**kwargs,

# XXX: you should pretty much never want debug mode
# for data daemons when running in production.
debug_mode: bool = True,
) -> tractor._portal.Portal:
"""If no ``brokerd.{brokername}`` daemon-actor can be found,
"""
If no ``brokerd.{brokername}`` daemon-actor can be found,
spawn one in a local subactor and return a portal to it.

"""
Expand All @@ -213,7 +232,8 @@ async def maybe_spawn_brokerd(
# pikerd is not live we now become the root of the
# process tree
async with maybe_open_pikerd(
loglevel=loglevel
loglevel=loglevel,
**kwargs,
) as pikerd_portal:

if pikerd_portal is None:
Expand All @@ -226,19 +246,23 @@ async def maybe_spawn_brokerd(
spawn_brokerd,
brokername=brokername,
loglevel=loglevel,
debug_mode=debug_mode,
)

async with tractor.wait_for_actor(dname) as portal:
yield portal


async def spawn_emsd(
brokername,

brokername: str,
loglevel: Optional[str] = None,
**extra_tractor_kwargs

) -> tractor._portal.Portal:
"""
Start the clearing engine under ``pikerd``.

"""
log.info('Spawning emsd')

# TODO: raise exception when _services == None?
Expand All @@ -251,6 +275,7 @@ async def spawn_emsd(
'piker.clearing._client',
],
loglevel=loglevel,
debug_mode=_services.debug_mode, # set by pikerd flag
**extra_tractor_kwargs
)
return 'emsd'
2 changes: 1 addition & 1 deletion piker/brokers/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from ..cli import cli
from .. import watchlists as wl
from ..log import get_console_log, colorize_json, get_logger
from ..data import maybe_spawn_brokerd
from .._daemon import maybe_spawn_brokerd
from ..brokers import core, get_brokermod, data

log = get_logger('cli')
Expand Down
21 changes: 15 additions & 6 deletions piker/clearing/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,9 @@ def get_orders(


# TODO: make this a ``tractor.msg.pub``
async def send_order_cmds():
"""Order streaming task: deliver orders transmitted from UI
async def send_order_cmds(symbol_key: str):
"""
Order streaming task: deliver orders transmitted from UI
to downstream consumers.

This is run in the UI actor (usually the one running Qt but could be
Expand All @@ -160,10 +161,18 @@ async def send_order_cmds():
book._ready_to_receive.set()

async for cmd in orders_stream:

# send msg over IPC / wire
log.info(f'Send order cmd:\n{pformat(cmd)}')
yield cmd
print(cmd)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the major hack of note 😂

We need proper broadcast channels as mentioned in goodboy/tractor#204 to avoid this re-insertion blasphemy.

if cmd['symbol'] == symbol_key:

# send msg over IPC / wire
log.info(f'Send order cmd:\n{pformat(cmd)}')
yield cmd
else:
# XXX BRUTAL HACKZORZES !!!
# re-insert for another consumer
# we need broadcast channelz...asap
# https://github.com/goodboy/tractor/issues/204
book._to_ems.send_nowait(cmd)


@asynccontextmanager
Expand Down
7 changes: 5 additions & 2 deletions piker/clearing/_ems.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ async def execute_triggers(
tuple(execs.items())
):

if (ttype not in tf) or (not pred(price)):
if not pred or (ttype not in tf) or (not pred(price)):
# majority of iterations will be non-matches
continue

Expand Down Expand Up @@ -675,7 +675,10 @@ async def _emsd_main(
# acting as an EMS client and will submit orders) to
# receive requests pushed over a tractor stream
# using (for now) an async generator.
order_stream = await portal.run(send_order_cmds)
order_stream = await portal.run(
send_order_cmds,
symbol_key=symbol,
)

# start inbound order request processing
await process_order_cmds(
Expand Down
16 changes: 13 additions & 3 deletions piker/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,25 @@
@click.command()
@click.option('--loglevel', '-l', default='warning', help='Logging level')
@click.option('--tl', is_flag=True, help='Enable tractor logging')
@click.option('--pdb', is_flag=True, help='Enable tractor debug mode')
@click.option('--host', '-h', default='127.0.0.1', help='Host address to bind')
def pikerd(loglevel, host, tl):
def pikerd(loglevel, host, tl, pdb):
"""Spawn the piker broker-daemon.
"""
from .._daemon import _data_mods, open_pikerd
get_console_log(loglevel)
log = get_console_log(loglevel)

if pdb:
log.warning((
"\n"
"!!! You have enabled daemon DEBUG mode !!!\n"
"If a daemon crashes it will likely block"
" the service until resumed from console!\n"
"\n"
))

async def main():
async with open_pikerd(loglevel):
async with open_pikerd(loglevel=loglevel, debug_mode=pdb):
await trio.sleep_forever()

trio.run(main)
Expand Down
Loading