Skip to content

Commit

Permalink
Merge pull request #342 from pikers/mxmn_from_m4
Browse files Browse the repository at this point in the history
Mxmn from m4
  • Loading branch information
goodboy authored Jun 28, 2022
2 parents 5a3b465 + be7afda commit b2d5892
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 106 deletions.
5 changes: 3 additions & 2 deletions piker/fsp/_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ async def fsp_compute(
dict[str, np.ndarray], # multi-output case
np.ndarray, # single output case
]
history_output = await out_stream.__anext__()
history_output = await anext(out_stream)

func_name = func.__name__
profiler(f'{func_name} generated history')
Expand Down Expand Up @@ -374,7 +374,8 @@ async def resync(
'key': dst_shm_token,
'first': dst._first.value,
'last': dst._last.value,
}})
}
})
return tracker, index

def is_synced(
Expand Down
58 changes: 39 additions & 19 deletions piker/ui/_chart.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,25 +230,26 @@ async def load_symbol(
# - we'll probably want per-instrument/provider state here?
# change the order config form over to the new chart

# XXX: since the pp config is a singleton widget we have to
# also switch it over to the new chart's interal-layout
# self.linkedsplits.chart.qframe.hbox.removeWidget(self.pp_pane)
chart = linkedsplits.chart

# chart is already in memory so just focus it
linkedsplits.show()
linkedsplits.focus()
linkedsplits.graphics_cycle()
await trio.sleep(0)

# XXX: since the pp config is a singleton widget we have to
# also switch it over to the new chart's interal-layout
# self.linkedsplits.chart.qframe.hbox.removeWidget(self.pp_pane)
chart = linkedsplits.chart

# resume feeds *after* rendering chart view asap
chart.resume_all_feeds()
if chart:
chart.resume_all_feeds()

# TODO: we need a check to see if the chart
# last had the xlast in view, if so then shift so it's
# still in view, if the user was viewing history then
# do nothing yah?
chart.default_view()
# TODO: we need a check to see if the chart
# last had the xlast in view, if so then shift so it's
# still in view, if the user was viewing history then
# do nothing yah?
chart.default_view()

self.linkedsplits = linkedsplits
symbol = linkedsplits.symbol
Expand Down Expand Up @@ -760,9 +761,18 @@ def __init__(

self.pi_overlay: PlotItemOverlay = PlotItemOverlay(self.plotItem)

# indempotent startup flag for auto-yrange subsys
# to detect the "first time" y-domain graphics begin
# to be shown in the (main) graphics view.
self._on_screen: bool = False

def resume_all_feeds(self):
for feed in self._feeds.values():
self.linked.godwidget._root_n.start_soon(feed.resume)
try:
for feed in self._feeds.values():
self.linked.godwidget._root_n.start_soon(feed.resume)
except RuntimeError:
# TODO: cancel the qtractor runtime here?
raise

def pause_all_feeds(self):
for feed in self._feeds.values():
Expand Down Expand Up @@ -859,7 +869,8 @@ def marker_right_points(

def default_view(
self,
bars_from_y: int = 3000,
bars_from_y: int = 616,
do_ds: bool = True,

) -> None:
'''
Expand Down Expand Up @@ -920,8 +931,11 @@ def default_view(
max=end,
padding=0,
)
self.view.maybe_downsample_graphics()
view._set_yrange()

if do_ds:
self.view.maybe_downsample_graphics()
view._set_yrange()

try:
self.linked.graphics_cycle()
except IndexError:
Expand Down Expand Up @@ -1255,7 +1269,6 @@ def maxmin(
If ``bars_range`` is provided use that range.
'''
# print(f'Chart[{self.name}].maxmin()')
profiler = pg.debug.Profiler(
msg=f'`{str(self)}.maxmin(name={name})`: `{self.name}`',
disabled=not pg_profile_enabled(),
Expand Down Expand Up @@ -1287,11 +1300,18 @@ def maxmin(

key = round(lbar), round(rbar)
res = flow.maxmin(*key)
if res == (None, None):
log.error(

if (
res is None
):
log.warning(
f"{flow_key} no mxmn for bars_range => {key} !?"
)
res = 0, 0
if not self._on_screen:
self.default_view(do_ds=False)
self._on_screen = True

profiler(f'yrange mxmn: {key} -> {res}')
# print(f'{flow_key} yrange mxmn: {key} -> {res}')
return res
57 changes: 41 additions & 16 deletions piker/ui/_compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,14 +223,20 @@ def ds_m4(
assert frames >= (xrange / uppx)

# call into ``numba``
nb, i_win, y_out = _m4(
(
nb,
x_out,
y_out,
ymn,
ymx,
) = _m4(
x,
y,

frames,

# TODO: see func below..
# i_win,
# x_out,
# y_out,

# first index in x data to start at
Expand All @@ -243,10 +249,11 @@ def ds_m4(
# filter out any overshoot in the input allocation arrays by
# removing zero-ed tail entries which should start at a certain
# index.
i_win = i_win[i_win != 0]
y_out = y_out[:i_win.size]
x_out = x_out[x_out != 0]
y_out = y_out[:x_out.size]

return nb, i_win, y_out
# print(f'M4 output ymn, ymx: {ymn},{ymx}')
return nb, x_out, y_out, ymn, ymx


@jit(
Expand All @@ -260,8 +267,8 @@ def _m4(

frames: int,

# TODO: using this approach by having the ``.zeros()`` alloc lines
# below, in put python was causing segs faults and alloc crashes..
# TODO: using this approach, having the ``.zeros()`` alloc lines
# below in pure python, there were segs faults and alloc crashes..
# we might need to see how it behaves with shm arrays and consider
# allocating them once at startup?

Expand All @@ -274,14 +281,22 @@ def _m4(
x_start: int,
step: float,

) -> int:
# nbins = len(i_win)
# count = len(xs)
) -> tuple[
int,
np.ndarray,
np.ndarray,
float,
float,
]:
'''
Implementation of the m4 algorithm in ``numba``:
http://www.vldb.org/pvldb/vol7/p797-jugel.pdf
'''
# these are pre-allocated and mutated by ``numba``
# code in-place.
y_out = np.zeros((frames, 4), ys.dtype)
i_win = np.zeros(frames, xs.dtype)
x_out = np.zeros(frames, xs.dtype)

bincount = 0
x_left = x_start
Expand All @@ -295,24 +310,34 @@ def _m4(

# set all bins in the left-most entry to the starting left-most x value
# (aka a row broadcast).
i_win[bincount] = x_left
x_out[bincount] = x_left
# set all y-values to the first value passed in.
y_out[bincount] = ys[0]

# full input y-data mx and mn
mx: float = -np.inf
mn: float = np.inf

# compute OHLC style max / min values per window sized x-frame.
for i in range(len(xs)):

x = xs[i]
y = ys[i]

if x < x_left + step: # the current window "step" is [bin, bin+1)
y_out[bincount, 1] = min(y, y_out[bincount, 1])
y_out[bincount, 2] = max(y, y_out[bincount, 2])
ymn = y_out[bincount, 1] = min(y, y_out[bincount, 1])
ymx = y_out[bincount, 2] = max(y, y_out[bincount, 2])
y_out[bincount, 3] = y
mx = max(mx, ymx)
mn = min(mn, ymn)

else:
# Find the next bin
while x >= x_left + step:
x_left += step

bincount += 1
i_win[bincount] = x_left
x_out[bincount] = x_left
y_out[bincount] = y

return bincount, i_win, y_out
return bincount, x_out, y_out, mn, mx
34 changes: 6 additions & 28 deletions piker/ui/_display.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ def chart_maxmin(
mn, mx = out

mx_vlm_in_view = 0

# TODO: we need to NOT call this to avoid a manual
# np.max/min trigger and especially on the vlm_chart
# flows which aren't shown.. like vlm?
if vlm_chart:
out = vlm_chart.maxmin()
if out:
Expand Down Expand Up @@ -222,33 +226,9 @@ async def graphics_update_loop(
tick_margin = 3 * tick_size

chart.show()
# view = chart.view
last_quote = time.time()
i_last = ohlcv.index

# async def iter_drain_quotes():
# # NOTE: all code below this loop is expected to be synchronous
# # and thus draw instructions are not picked up jntil the next
# # wait / iteration.
# async for quotes in stream:
# while True:
# try:
# moar = stream.receive_nowait()
# except trio.WouldBlock:
# yield quotes
# break
# else:
# for sym, quote in moar.items():
# ticks_frame = quote.get('ticks')
# if ticks_frame:
# quotes[sym].setdefault(
# 'ticks', []).extend(ticks_frame)
# print('pulled extra')

# yield quotes

# async for quotes in iter_drain_quotes():

ds = linked.display_state = DisplayState(**{
'quotes': {},
'linked': linked,
Expand Down Expand Up @@ -293,6 +273,7 @@ async def graphics_update_loop(

# chart isn't active/shown so skip render cycle and pause feed(s)
if chart.linked.isHidden():
print('skipping update')
chart.pause_all_feeds()
continue

Expand Down Expand Up @@ -416,10 +397,8 @@ def graphics_update_cycle(
)
or trigger_all
):
# TODO: we should track and compute whether the last
# pixel in a curve should show new data based on uppx
# and then iff update curves and shift?
chart.increment_view(steps=i_diff)
# chart.increment_view(steps=i_diff + round(append_diff - uppx))

if vlm_chart:
vlm_chart.increment_view(steps=i_diff)
Expand Down Expand Up @@ -477,7 +456,6 @@ def graphics_update_cycle(
):
chart.update_graphics_from_flow(
chart.name,
# do_append=uppx < update_uppx,
do_append=do_append,
)

Expand Down
Loading

0 comments on commit b2d5892

Please sign in to comment.