Skip to content

Commit

Permalink
Better handle dynamic registry sampler broadcasts
Browse files Browse the repository at this point in the history
In situations where clients are (dynamically) subscribing *while*
broadcasts are starting to taking place we need to handle the
`set`-modified-during-iteration case. This scenario seems to be more
common during races on concurrent startup of multiple symbols. The
solution here is to use another set to take note of subscribers which
are successfully sent-to and then skipping them on re-try.

This also contains an attempt to exception-handle throttled stream
overruns caused by higher frequency feeds (like binance) pushing more
quotes then can be handled during (UI) client startup.
  • Loading branch information
goodboy committed Jan 30, 2023
1 parent 98a8979 commit 896e640
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 13 deletions.
46 changes: 33 additions & 13 deletions piker/data/_sampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,20 +253,30 @@ async def broadcast(
# f'consumers: {subs}'
)
borked: set[tractor.MsgStream] = set()
for stream in subs:
sent: set[tractor.MsgStream] = set()
while True:
try:
await stream.send({
'index': time_stamp or last_ts,
'period': period_s,
})
except (
trio.BrokenResourceError,
trio.ClosedResourceError
):
log.error(
f'{stream._ctx.chan.uid} dropped connection'
)
borked.add(stream)
for stream in (subs - sent):
try:
await stream.send({
'index': time_stamp or last_ts,
'period': period_s,
})
sent.add(stream)

except (
trio.BrokenResourceError,
trio.ClosedResourceError
):
log.error(
f'{stream._ctx.chan.uid} dropped connection'
)
borked.add(stream)
else:
break
except RuntimeError:
log.warning(f'Client subs {subs} changed while broadcasting')
continue

for stream in borked:
try:
Expand Down Expand Up @@ -848,6 +858,16 @@ async def uniform_rate_send(
# rate timing exactly lul
try:
await stream.send({sym: first_quote})
except tractor.RemoteActorError as rme:
if rme.type is not tractor._exceptions.StreamOverrun:
raise
ctx = stream._ctx
chan = ctx.chan
log.warning(
'Throttled quote-stream overrun!\n'
f'{sym}:{ctx.cid}@{chan.uid}'
)

except (
# NOTE: any of these can be raised by ``tractor``'s IPC
# transport-layer and we want to be highly resilient
Expand Down
3 changes: 3 additions & 0 deletions piker/data/feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -1589,6 +1589,9 @@ async def open_feed(
(brokermod, bfqsns),
) in zip(ctxs, providers.items()):

# NOTE: do it asap to avoid overruns during multi-feed setup?
ctx._backpressure = backpressure

for fqsn, flume_msg in flumes_msg_dict.items():
flume = Flume.from_msg(flume_msg)
assert flume.symbol.fqsn == fqsn
Expand Down

0 comments on commit 896e640

Please sign in to comment.