diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index a5df96cca..20067f826 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -253,20 +253,30 @@ async def broadcast( # f'consumers: {subs}' ) borked: set[tractor.MsgStream] = set() - for stream in subs: + sent: set[tractor.MsgStream] = set() + while True: try: - await stream.send({ - 'index': time_stamp or last_ts, - 'period': period_s, - }) - except ( - trio.BrokenResourceError, - trio.ClosedResourceError - ): - log.error( - f'{stream._ctx.chan.uid} dropped connection' - ) - borked.add(stream) + for stream in (subs - sent): + try: + await stream.send({ + 'index': time_stamp or last_ts, + 'period': period_s, + }) + sent.add(stream) + + except ( + trio.BrokenResourceError, + trio.ClosedResourceError + ): + log.error( + f'{stream._ctx.chan.uid} dropped connection' + ) + borked.add(stream) + else: + break + except RuntimeError: + log.warning(f'Client subs {subs} changed while broadcasting') + continue for stream in borked: try: @@ -848,6 +858,16 @@ async def uniform_rate_send( # rate timing exactly lul try: await stream.send({sym: first_quote}) + except tractor.RemoteActorError as rme: + if rme.type is not tractor._exceptions.StreamOverrun: + raise + ctx = stream._ctx + chan = ctx.chan + log.warning( + 'Throttled quote-stream overrun!\n' + f'{sym}:{ctx.cid}@{chan.uid}' + ) + except ( # NOTE: any of these can be raised by ``tractor``'s IPC # transport-layer and we want to be highly resilient diff --git a/piker/data/feed.py b/piker/data/feed.py index 7f628c49b..156913aa6 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -1589,6 +1589,9 @@ async def open_feed( (brokermod, bfqsns), ) in zip(ctxs, providers.items()): + # NOTE: do it asap to avoid overruns during multi-feed setup? + ctx._backpressure = backpressure + for fqsn, flume_msg in flumes_msg_dict.items(): flume = Flume.from_msg(flume_msg) assert flume.symbol.fqsn == fqsn