Skip to content

Commit

Permalink
Handle time-indexing for fill arrows
Browse files Browse the repository at this point in the history
Call into a reworked `Flume.get_index()` for both the slow and fast
chart and do time index clipping to last datum where necessary.
  • Loading branch information
goodboy committed Dec 10, 2022
1 parent 2c42a06 commit dd91225
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 28 deletions.
28 changes: 14 additions & 14 deletions piker/data/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@
from ._sampling import (
iter_ohlc_periods,
)
from .._profile import (
Profiler,
pg_profile_enabled,
)
# from .._profile import (
# Profiler,
# pg_profile_enabled,
# )

if TYPE_CHECKING:
from pyqtgraph import PlotItem
# from pyqtgraph import PlotItem
from .feed import Feed


Expand Down Expand Up @@ -235,18 +235,18 @@ def from_msg(cls, msg: dict) -> dict:
def get_index(
self,
time_s: float,
array: np.ndarray,

) -> int:
) -> int | float:
'''
Return array shm-buffer index for for epoch time.
'''
array = self.rt_shm.array
times = array['time']
mask = (times >= time_s)

if any(mask):
return array['index'][mask][0]

# just the latest index
return array['index'][-1]
first = np.searchsorted(
times,
time_s,
side='left',
)
imx = times.shape[0] - 1
return min(first, imx)
36 changes: 22 additions & 14 deletions piker/ui/order_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ def on_fill(

uuid: str,
price: float,
arrow_index: float,
time_s: float,

pointing: Optional[str] = None,

Expand All @@ -513,18 +513,26 @@ def on_fill(
'''
dialog = self.dialogs[uuid]
lines = dialog.lines
chart = self.chart

# XXX: seems to fail on certain types of races?
# assert len(lines) == 2
if lines:
flume: Flume = self.feed.flumes[self.chart.linked.symbol.fqsn]
flume: Flume = self.feed.flumes[chart.linked.symbol.fqsn]
_, _, ratio = flume.get_ds_info()
for i, chart in [
(arrow_index, self.chart),
(flume.izero_hist
+
round((arrow_index - flume.izero_rt)/ratio),
self.hist_chart)

for chart, shm in [
(self.chart, flume.rt_shm),
(self.hist_chart, flume.hist_shm),
]:
viz = chart.get_viz(chart.name)
index_field = viz.index_field
arr = shm.array
index = flume.get_index(time_s, arr)

if index_field == 'time':
i = arr['time'][index]

self.arrows.add(
chart.plotItem,
uuid,
Expand Down Expand Up @@ -933,6 +941,8 @@ async def process_trade_msg(
fmsg = pformat(msg)
log.debug(f'Received order msg:\n{fmsg}')
name = msg['name']
viz = mode.chart.get_viz(mode.chart.name)
index_field = viz.index_field

if name in (
'position',
Expand Down Expand Up @@ -1037,11 +1047,11 @@ async def process_trade_msg(
# should only be one "fill" for an alert
# add a triangle and remove the level line
req = Order(**req)
index = flume.get_index(time.time())
tm = time.time()
mode.on_fill(
oid,
price=req.price,
arrow_index=index,
time_s=tm,
)
mode.lines.remove_line(uuid=oid)
msg.req = req
Expand Down Expand Up @@ -1080,10 +1090,8 @@ async def process_trade_msg(
# a true backend one? This will require finagling
# with how each backend tracks/summarizes time
# stamps for the downstream API.
index = flume.get_index(
details['broker_time']
)

tm = details['broker_time']
index = flume.get_index(tm, index_field)
# TODO: some kinda progress system
mode.on_fill(
oid,
Expand Down

0 comments on commit dd91225

Please sign in to comment.