Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mxmn from m4 #342

Merged
merged 16 commits into from
Jun 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions piker/fsp/_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ async def fsp_compute(
dict[str, np.ndarray], # multi-output case
np.ndarray, # single output case
]
history_output = await out_stream.__anext__()
history_output = await anext(out_stream)

func_name = func.__name__
profiler(f'{func_name} generated history')
Expand Down Expand Up @@ -374,7 +374,8 @@ async def resync(
'key': dst_shm_token,
'first': dst._first.value,
'last': dst._last.value,
}})
}
})
return tracker, index

def is_synced(
Expand Down
58 changes: 39 additions & 19 deletions piker/ui/_chart.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,25 +230,26 @@ async def load_symbol(
# - we'll probably want per-instrument/provider state here?
# change the order config form over to the new chart

# XXX: since the pp config is a singleton widget we have to
# also switch it over to the new chart's interal-layout
# self.linkedsplits.chart.qframe.hbox.removeWidget(self.pp_pane)
chart = linkedsplits.chart

# chart is already in memory so just focus it
linkedsplits.show()
linkedsplits.focus()
linkedsplits.graphics_cycle()
await trio.sleep(0)

# XXX: since the pp config is a singleton widget we have to
# also switch it over to the new chart's interal-layout
# self.linkedsplits.chart.qframe.hbox.removeWidget(self.pp_pane)
chart = linkedsplits.chart

# resume feeds *after* rendering chart view asap
chart.resume_all_feeds()
if chart:
chart.resume_all_feeds()

# TODO: we need a check to see if the chart
# last had the xlast in view, if so then shift so it's
# still in view, if the user was viewing history then
# do nothing yah?
chart.default_view()
# TODO: we need a check to see if the chart
# last had the xlast in view, if so then shift so it's
# still in view, if the user was viewing history then
# do nothing yah?
chart.default_view()

self.linkedsplits = linkedsplits
symbol = linkedsplits.symbol
Expand Down Expand Up @@ -760,9 +761,18 @@ def __init__(

self.pi_overlay: PlotItemOverlay = PlotItemOverlay(self.plotItem)

# indempotent startup flag for auto-yrange subsys
# to detect the "first time" y-domain graphics begin
# to be shown in the (main) graphics view.
self._on_screen: bool = False

def resume_all_feeds(self):
for feed in self._feeds.values():
self.linked.godwidget._root_n.start_soon(feed.resume)
try:
for feed in self._feeds.values():
self.linked.godwidget._root_n.start_soon(feed.resume)
except RuntimeError:
# TODO: cancel the qtractor runtime here?
raise

def pause_all_feeds(self):
for feed in self._feeds.values():
Expand Down Expand Up @@ -859,7 +869,8 @@ def marker_right_points(

def default_view(
self,
bars_from_y: int = 3000,
bars_from_y: int = 616,
do_ds: bool = True,

) -> None:
'''
Expand Down Expand Up @@ -920,8 +931,11 @@ def default_view(
max=end,
padding=0,
)
self.view.maybe_downsample_graphics()
view._set_yrange()

if do_ds:
self.view.maybe_downsample_graphics()
view._set_yrange()

try:
self.linked.graphics_cycle()
except IndexError:
Expand Down Expand Up @@ -1255,7 +1269,6 @@ def maxmin(
If ``bars_range`` is provided use that range.

'''
# print(f'Chart[{self.name}].maxmin()')
profiler = pg.debug.Profiler(
msg=f'`{str(self)}.maxmin(name={name})`: `{self.name}`',
disabled=not pg_profile_enabled(),
Expand Down Expand Up @@ -1287,11 +1300,18 @@ def maxmin(

key = round(lbar), round(rbar)
res = flow.maxmin(*key)
if res == (None, None):
log.error(

if (
res is None
):
log.warning(
f"{flow_key} no mxmn for bars_range => {key} !?"
)
res = 0, 0
if not self._on_screen:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a hitch to avoid blank views on first datums load..

probably should do something more elegant for this eventually?

self.default_view(do_ds=False)
self._on_screen = True

profiler(f'yrange mxmn: {key} -> {res}')
# print(f'{flow_key} yrange mxmn: {key} -> {res}')
return res
57 changes: 41 additions & 16 deletions piker/ui/_compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,14 +223,20 @@ def ds_m4(
assert frames >= (xrange / uppx)

# call into ``numba``
nb, i_win, y_out = _m4(
(
nb,
x_out,
y_out,
ymn,
ymx,
) = _m4(
x,
y,

frames,

# TODO: see func below..
# i_win,
# x_out,
# y_out,

# first index in x data to start at
Expand All @@ -243,10 +249,11 @@ def ds_m4(
# filter out any overshoot in the input allocation arrays by
# removing zero-ed tail entries which should start at a certain
# index.
i_win = i_win[i_win != 0]
y_out = y_out[:i_win.size]
x_out = x_out[x_out != 0]
y_out = y_out[:x_out.size]

return nb, i_win, y_out
# print(f'M4 output ymn, ymx: {ymn},{ymx}')
return nb, x_out, y_out, ymn, ymx


@jit(
Expand All @@ -260,8 +267,8 @@ def _m4(

frames: int,

# TODO: using this approach by having the ``.zeros()`` alloc lines
# below, in put python was causing segs faults and alloc crashes..
# TODO: using this approach, having the ``.zeros()`` alloc lines
# below in pure python, there were segs faults and alloc crashes..
# we might need to see how it behaves with shm arrays and consider
# allocating them once at startup?

Expand All @@ -274,14 +281,22 @@ def _m4(
x_start: int,
step: float,

) -> int:
# nbins = len(i_win)
# count = len(xs)
) -> tuple[
int,
np.ndarray,
np.ndarray,
float,
float,
]:
'''
Implementation of the m4 algorithm in ``numba``:
http://www.vldb.org/pvldb/vol7/p797-jugel.pdf

'''
# these are pre-allocated and mutated by ``numba``
# code in-place.
y_out = np.zeros((frames, 4), ys.dtype)
i_win = np.zeros(frames, xs.dtype)
x_out = np.zeros(frames, xs.dtype)

bincount = 0
x_left = x_start
Expand All @@ -295,24 +310,34 @@ def _m4(

# set all bins in the left-most entry to the starting left-most x value
# (aka a row broadcast).
i_win[bincount] = x_left
x_out[bincount] = x_left
# set all y-values to the first value passed in.
y_out[bincount] = ys[0]

# full input y-data mx and mn
mx: float = -np.inf
mn: float = np.inf

# compute OHLC style max / min values per window sized x-frame.
for i in range(len(xs)):

x = xs[i]
y = ys[i]

if x < x_left + step: # the current window "step" is [bin, bin+1)
y_out[bincount, 1] = min(y, y_out[bincount, 1])
y_out[bincount, 2] = max(y, y_out[bincount, 2])
ymn = y_out[bincount, 1] = min(y, y_out[bincount, 1])
ymx = y_out[bincount, 2] = max(y, y_out[bincount, 2])
y_out[bincount, 3] = y
mx = max(mx, ymx)
Copy link
Contributor Author

@goodboy goodboy Jun 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah so just to point to the crux of it all, there's really no point in not capturing these summary max/min values during each downsample cycle and this is now what we take an use in place of "manual" y-range sorting on the source data when possible.

mn = min(mn, ymn)

else:
# Find the next bin
while x >= x_left + step:
x_left += step

bincount += 1
i_win[bincount] = x_left
x_out[bincount] = x_left
y_out[bincount] = y

return bincount, i_win, y_out
return bincount, x_out, y_out, mn, mx
34 changes: 6 additions & 28 deletions piker/ui/_display.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ def chart_maxmin(
mn, mx = out

mx_vlm_in_view = 0

# TODO: we need to NOT call this to avoid a manual
# np.max/min trigger and especially on the vlm_chart
# flows which aren't shown.. like vlm?
if vlm_chart:
out = vlm_chart.maxmin()
if out:
Expand Down Expand Up @@ -222,33 +226,9 @@ async def graphics_update_loop(
tick_margin = 3 * tick_size

chart.show()
# view = chart.view
last_quote = time.time()
i_last = ohlcv.index

# async def iter_drain_quotes():
# # NOTE: all code below this loop is expected to be synchronous
# # and thus draw instructions are not picked up jntil the next
# # wait / iteration.
# async for quotes in stream:
# while True:
# try:
# moar = stream.receive_nowait()
# except trio.WouldBlock:
# yield quotes
# break
# else:
# for sym, quote in moar.items():
# ticks_frame = quote.get('ticks')
# if ticks_frame:
# quotes[sym].setdefault(
# 'ticks', []).extend(ticks_frame)
# print('pulled extra')

# yield quotes

# async for quotes in iter_drain_quotes():

ds = linked.display_state = DisplayState(**{
'quotes': {},
'linked': linked,
Expand Down Expand Up @@ -293,6 +273,7 @@ async def graphics_update_loop(

# chart isn't active/shown so skip render cycle and pause feed(s)
if chart.linked.isHidden():
print('skipping update')
chart.pause_all_feeds()
continue

Expand Down Expand Up @@ -416,10 +397,8 @@ def graphics_update_cycle(
)
or trigger_all
):
# TODO: we should track and compute whether the last
# pixel in a curve should show new data based on uppx
# and then iff update curves and shift?
chart.increment_view(steps=i_diff)
# chart.increment_view(steps=i_diff + round(append_diff - uppx))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left this is in because to me the logic makes sense but for whatever reason it doesn't seem to actually be correct based on testing.

Ideally we can get to where there's never an error margin on shifts when optimizing for uppx scaling.


if vlm_chart:
vlm_chart.increment_view(steps=i_diff)
Expand Down Expand Up @@ -477,7 +456,6 @@ def graphics_update_cycle(
):
chart.update_graphics_from_flow(
chart.name,
# do_append=uppx < update_uppx,
do_append=do_append,
)

Expand Down
Loading