Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mxmn from m4 #342

Merged
merged 16 commits into from
Jun 28, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions piker/fsp/_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ async def fsp_compute(
dict[str, np.ndarray], # multi-output case
np.ndarray, # single output case
]
history_output = await out_stream.__anext__()
history_output = await anext(out_stream)

func_name = func.__name__
profiler(f'{func_name} generated history')
Expand Down Expand Up @@ -374,7 +374,8 @@ async def resync(
'key': dst_shm_token,
'first': dst._first.value,
'last': dst._last.value,
}})
}
})
return tracker, index

def is_synced(
Expand Down
38 changes: 24 additions & 14 deletions piker/ui/_chart.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,25 +230,26 @@ async def load_symbol(
# - we'll probably want per-instrument/provider state here?
# change the order config form over to the new chart

# XXX: since the pp config is a singleton widget we have to
# also switch it over to the new chart's interal-layout
# self.linkedsplits.chart.qframe.hbox.removeWidget(self.pp_pane)
chart = linkedsplits.chart

# chart is already in memory so just focus it
linkedsplits.show()
linkedsplits.focus()
linkedsplits.graphics_cycle()
await trio.sleep(0)

# XXX: since the pp config is a singleton widget we have to
# also switch it over to the new chart's interal-layout
# self.linkedsplits.chart.qframe.hbox.removeWidget(self.pp_pane)
chart = linkedsplits.chart

# resume feeds *after* rendering chart view asap
chart.resume_all_feeds()
if chart:
chart.resume_all_feeds()

# TODO: we need a check to see if the chart
# last had the xlast in view, if so then shift so it's
# still in view, if the user was viewing history then
# do nothing yah?
chart.default_view()
# TODO: we need a check to see if the chart
# last had the xlast in view, if so then shift so it's
# still in view, if the user was viewing history then
# do nothing yah?
chart.default_view()

self.linkedsplits = linkedsplits
symbol = linkedsplits.symbol
Expand Down Expand Up @@ -761,8 +762,12 @@ def __init__(
self.pi_overlay: PlotItemOverlay = PlotItemOverlay(self.plotItem)

def resume_all_feeds(self):
for feed in self._feeds.values():
self.linked.godwidget._root_n.start_soon(feed.resume)
try:
for feed in self._feeds.values():
self.linked.godwidget._root_n.start_soon(feed.resume)
except RuntimeError:
# TODO: cancel the qtractor runtime here?
raise

def pause_all_feeds(self):
for feed in self._feeds.values():
Expand Down Expand Up @@ -1287,11 +1292,16 @@ def maxmin(

key = round(lbar), round(rbar)
res = flow.maxmin(*key)
if res == (None, None):

if (
res is None or
res == (None, None)
):
log.error(
f"{flow_key} no mxmn for bars_range => {key} !?"
)
res = 0, 0

profiler(f'yrange mxmn: {key} -> {res}')
# print(f'{flow_key} yrange mxmn: {key} -> {res}')
return res
57 changes: 41 additions & 16 deletions piker/ui/_compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,14 +223,20 @@ def ds_m4(
assert frames >= (xrange / uppx)

# call into ``numba``
nb, i_win, y_out = _m4(
(
nb,
x_out,
y_out,
ymn,
ymx,
) = _m4(
x,
y,

frames,

# TODO: see func below..
# i_win,
# x_out,
# y_out,

# first index in x data to start at
Expand All @@ -243,10 +249,11 @@ def ds_m4(
# filter out any overshoot in the input allocation arrays by
# removing zero-ed tail entries which should start at a certain
# index.
i_win = i_win[i_win != 0]
y_out = y_out[:i_win.size]
x_out = x_out[x_out != 0]
y_out = y_out[:x_out.size]

return nb, i_win, y_out
# print(f'M4 output ymn, ymx: {ymn},{ymx}')
return nb, x_out, y_out, ymn, ymx


@jit(
Expand All @@ -260,8 +267,8 @@ def _m4(

frames: int,

# TODO: using this approach by having the ``.zeros()`` alloc lines
# below, in put python was causing segs faults and alloc crashes..
# TODO: using this approach, having the ``.zeros()`` alloc lines
# below in pure python, there were segs faults and alloc crashes..
# we might need to see how it behaves with shm arrays and consider
# allocating them once at startup?

Expand All @@ -274,14 +281,22 @@ def _m4(
x_start: int,
step: float,

) -> int:
# nbins = len(i_win)
# count = len(xs)
) -> tuple[
int,
np.ndarray,
np.ndarray,
float,
float,
]:
'''
Implementation of the m4 algorithm in ``numba``:
http://www.vldb.org/pvldb/vol7/p797-jugel.pdf

'''
# these are pre-allocated and mutated by ``numba``
# code in-place.
y_out = np.zeros((frames, 4), ys.dtype)
i_win = np.zeros(frames, xs.dtype)
x_out = np.zeros(frames, xs.dtype)

bincount = 0
x_left = x_start
Expand All @@ -295,24 +310,34 @@ def _m4(

# set all bins in the left-most entry to the starting left-most x value
# (aka a row broadcast).
i_win[bincount] = x_left
x_out[bincount] = x_left
# set all y-values to the first value passed in.
y_out[bincount] = ys[0]

# full input y-data mx and mn
mx: float = -np.inf
mn: float = np.inf

# compute OHLC style max / min values per window sized x-frame.
for i in range(len(xs)):

x = xs[i]
y = ys[i]

if x < x_left + step: # the current window "step" is [bin, bin+1)
y_out[bincount, 1] = min(y, y_out[bincount, 1])
y_out[bincount, 2] = max(y, y_out[bincount, 2])
ymn = y_out[bincount, 1] = min(y, y_out[bincount, 1])
ymx = y_out[bincount, 2] = max(y, y_out[bincount, 2])
y_out[bincount, 3] = y
mx = max(mx, ymx)
Copy link
Contributor Author

@goodboy goodboy Jun 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah so just to point to the crux of it all, there's really no point in not capturing these summary max/min values during each downsample cycle and this is now what we take an use in place of "manual" y-range sorting on the source data when possible.

mn = min(mn, ymn)

else:
# Find the next bin
while x >= x_left + step:
x_left += step

bincount += 1
i_win[bincount] = x_left
x_out[bincount] = x_left
y_out[bincount] = y

return bincount, i_win, y_out
return bincount, x_out, y_out, mn, mx
8 changes: 5 additions & 3 deletions piker/ui/_display.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ def chart_maxmin(
mn, mx = out

mx_vlm_in_view = 0

# TODO: we need to NOT call this to avoid a manual
# np.max/min trigger and especially on the vlm_chart
# flows which aren't shown.. like vlm?
if vlm_chart:
out = vlm_chart.maxmin()
if out:
Expand Down Expand Up @@ -416,10 +420,8 @@ def graphics_update_cycle(
)
or trigger_all
):
# TODO: we should track and compute whether the last
# pixel in a curve should show new data based on uppx
# and then iff update curves and shift?
chart.increment_view(steps=i_diff)
# chart.increment_view(steps=i_diff + round(append_diff - uppx))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left this is in because to me the logic makes sense but for whatever reason it doesn't seem to actually be correct based on testing.

Ideally we can get to where there's never an error margin on shifts when optimizing for uppx scaling.


if vlm_chart:
vlm_chart.increment_view(steps=i_diff)
Expand Down
24 changes: 22 additions & 2 deletions piker/ui/_flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ class Flow(msgspec.Struct): # , frozen=True):
name: str
plot: pg.PlotItem
graphics: Union[Curve, BarItems]
yrange: tuple[float, float] = None

# in some cases a flow may want to change its
# graphical "type" or, "form" when downsampling,
Expand Down Expand Up @@ -416,6 +417,10 @@ def maxmin(
if not slice_view.size:
mxmn = None

elif self.yrange:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the bit where we choose to use the m4 yrange output if set instead of the normal "manual" sorting calls from prior.

mxmn = self.yrange
# print(f'{self.name} M4 maxmin: {mxmn}')

else:
if self.is_ohlc:
ylow = np.min(slice_view['low'])
Expand All @@ -427,6 +432,7 @@ def maxmin(
yhigh = np.max(view)

mxmn = ylow, yhigh
# print(f'{self.name} MANUAL maxmin: {mxmin}')

if mxmn is not None:
# cache new mxmn result
Expand Down Expand Up @@ -628,10 +634,13 @@ def update_graphics(
# source data so we clear our path data in prep
# to generate a new one from original source data.
new_sample_rate = True
showing_src_data = True
should_ds = False
should_redraw = True

showing_src_data = True
# reset yrange to be computed from source data
self.yrange = None

# MAIN RENDER LOGIC:
# - determine in view data and redraw on range change
# - determine downsampling ops if needed
Expand All @@ -657,13 +666,20 @@ def update_graphics(

**rkwargs,
)
if showing_src_data:
# print(f"{self.name} SHOWING SOURCE")
# reset yrange to be computed from source data
self.yrange = None

if not out:
log.warning(f'{self.name} failed to render!?')
return graphics

path, data, reset = out

# if self.yrange:
# print(f'flow {self.name} yrange from m4: {self.yrange}')

# XXX: SUPER UGGGHHH... without this we get stale cache
# graphics that don't update until you downsampler again..
if reset:
Expand Down Expand Up @@ -1058,6 +1074,7 @@ def render(
# xy-path data transform: convert source data to a format
# able to be passed to a `QPainterPath` rendering routine.
if not len(hist):
# XXX: this might be why the profiler only has exits?
return

x_out, y_out, connect = self.format_xy(
Expand Down Expand Up @@ -1144,11 +1161,14 @@ def render(

elif should_ds and uppx > 1:

x_out, y_out = xy_downsample(
x_out, y_out, ymn, ymx = xy_downsample(
x_out,
y_out,
uppx,
)
self.flow.yrange = ymn, ymx
# print(f'{self.flow.name} post ds: ymn, ymx: {ymn},{ymx}')

reset = True
profiler(f'FULL PATH downsample redraw={should_ds}')
self._in_ds = True
Expand Down
26 changes: 18 additions & 8 deletions piker/ui/_fsp.py
Original file line number Diff line number Diff line change
Expand Up @@ -639,20 +639,25 @@ def multi_maxmin(
names: list[str],

) -> tuple[float, float]:
'''
Flows "group" maxmin loop; assumes all named flows
are in the same co-domain and thus can be sorted
as one set.

Iterates all the named flows and calls the chart
api to find their range values and return.

TODO: really we should probably have a more built-in API
for this?

'''
mx = 0
for name in names:

mxmn = chart.maxmin(name=name)
if mxmn:
ymax = mxmn[1]
if ymax > mx:
mx = ymax
ymn, ymx = chart.maxmin(name=name)
mx = max(mx, ymx)

return 0, mx

chart.view.maxmin = partial(multi_maxmin, names=['volume'])

# TODO: fix the x-axis label issue where if you put
# the axis on the left it's totally not lined up...
# show volume units value on LHS (for dinkus)
Expand Down Expand Up @@ -776,12 +781,16 @@ def chart_curves(

) -> None:
for name in names:

render = False
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh this can be dropped..


if 'dark' in name:
color = dark_vlm_color
elif 'rate' in name:
color = vlm_color
else:
color = 'bracket'
render = True

curve, _ = chart.draw_curve(
name=name,
Expand All @@ -799,6 +808,7 @@ def chart_curves(
# since only a placeholder of `None` is entered in
# ``.draw_curve()``.
flow = chart._flows[name]
# flow.render = render
assert flow.plot is pi

chart_curves(
Expand Down
1 change: 1 addition & 0 deletions piker/ui/_interaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -923,6 +923,7 @@ def maybe_downsample_graphics(
# XXX: super important to be aware of this.
# or not flow.graphics.isVisible()
):
# print(f'skipping {flow.name}')
continue

# pass in no array which will read and render from the last
Expand Down
Loading