Skip to content

Commit

Permalink
Implement by-type tick-framing in throttler loop
Browse files Browse the repository at this point in the history
This has been an outstanding idea for a while and changes the framing
format of tick events into a `dict[str, list[dict]]` wherein for each
tick "type" (eg. 'bid', 'ask', 'trade', 'asize'..etc) we create an FIFO
ordered `list` of events (data) and then pack this table into each
(throttled) send. This gives an additional implied downsample reduction
(in terms of iteration on the consumer side) from `N` tick-events to
a (max) `T` tick-types presuming the rx side only needs the latest tick
event.

Drop the `types: set` and adjust clearing event test to use the new
`ticks_by_type` map's keys.
  • Loading branch information
goodboy committed Nov 17, 2022
1 parent aa90348 commit 1027f9e
Showing 1 changed file with 56 additions and 19 deletions.
75 changes: 56 additions & 19 deletions piker/data/_sampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
"""
from __future__ import annotations
from collections import Counter
from collections import (
Counter,
defaultdict,
)
import time
from typing import (
TYPE_CHECKING,
Expand Down Expand Up @@ -405,10 +408,17 @@ async def sample_and_broadcast(
)


# a working tick-type-classes template
_tick_groups = {
'clears': {'trade', 'utrade', 'last'},
'bids': {'bid', 'bsize'},
'asks': {'ask', 'asize'},
}


# TODO: a less naive throttler, here's some snippets:
# token bucket by njs:
# https://gist.github.com/njsmith/7ea44ec07e901cb78ebe1dd8dd846cb9

async def uniform_rate_send(

rate: float,
Expand All @@ -428,7 +438,12 @@ async def uniform_rate_send(
diff = 0

task_status.started()
types: set[str] = set()
ticks_by_type: defaultdict[
str,
list[dict],
] = defaultdict(list)

clear_types = _tick_groups['clears']

while True:

Expand Down Expand Up @@ -457,25 +472,41 @@ async def uniform_rate_send(
# tick array/buffer.
ticks = last_quote.get('ticks')

# XXX: idea for frame type data structure we could
# use on the wire instead of a simple list?
# frames = {
# 'index': ['type_a', 'type_c', 'type_n', 'type_n'],

# 'type_a': [tick0, tick1, tick2, .., tickn],
# 'type_b': [tick0, tick1, tick2, .., tickn],
# 'type_c': [tick0, tick1, tick2, .., tickn],
# ...
# 'type_n': [tick0, tick1, tick2, .., tickn],
# }

# TODO: once we decide to get fancy really we should
# have a shared mem tick buffer that is just
# continually filled and the UI just ready from it
# at it's display rate.
if ticks:
# TODO: do we need this any more or can we just
# expect the receiver to unwind the below
# `ticks_by_type: dict`?
# => undwinding would potentially require a
# `dict[str, set | list]` instead with an
# included `'types' field which is an (ordered)
# set of tick type fields in the order which
# types arrived?
first_quote['ticks'].extend(ticks)
types.update(item['type'] for item in ticks)

# XXX: build a tick-by-type table of lists
# of tick messages. This allows for less
# iteration on the receiver side by allowing for
# a single "latest tick event" look up by
# indexing the last entry in each sub-list.
# tbt = {
# 'types': ['bid', 'asize', 'last', .. '<type_n>'],

# 'bid': [tick0, tick1, tick2, .., tickn],
# 'asize': [tick0, tick1, tick2, .., tickn],
# 'last': [tick0, tick1, tick2, .., tickn],
# ...
# '<type_n>': [tick0, tick1, tick2, .., tickn],
# }
for tick in ticks:
# append in reverse FIFO order for in-order
# iteration on receiver side.
ticks_by_type[tick['type']].append(tick)

first_quote['tbt'] = ticks_by_type

# send cycle isn't due yet so continue waiting
continue
Expand All @@ -496,7 +527,7 @@ async def uniform_rate_send(

with trio.move_on_after(1/60) as cs:
while (
not types.intersection({'trade', 'utrade', 'last'})
not set(ticks_by_type).intersection(clear_types)
):
try:
sym, last_quote = await quote_stream.receive()
Expand All @@ -506,7 +537,13 @@ async def uniform_rate_send(

ticks = last_quote.get('ticks')
first_quote['ticks'].extend(ticks)
types.update(item['type'] for item in ticks)
if ticks:
for tick in ticks:
# append in reverse FIFO order for in-order
# iteration on receiver side.
ticks_by_type[tick['type']].append(tick)

first_quote['tbt'] = ticks_by_type

# measured_rate = 1 / (time.time() - last_send)
# log.info(
Expand Down Expand Up @@ -537,4 +574,4 @@ async def uniform_rate_send(
first_quote = last_quote = None
diff = 0
last_send = time.time()
types.clear()
ticks_by_type.clear()

0 comments on commit 1027f9e

Please sign in to comment.