From 2b0b7fd859865ae5666a41a4a5c9f92e43c66a27 Mon Sep 17 00:00:00 2001 From: Philipp Rudiger Date: Wed, 31 Mar 2021 12:43:09 -0400 Subject: [PATCH] Ensure changes on ReactiveData source are scheduled correctly (#2134) --- panel/reactive.py | 83 +++++++++++++++++++++++------------------ panel/widgets/tables.py | 24 +++++------- 2 files changed, 55 insertions(+), 52 deletions(-) diff --git a/panel/reactive.py b/panel/reactive.py index 1444074e00..8880b563c4 100644 --- a/panel/reactive.py +++ b/panel/reactive.py @@ -20,7 +20,7 @@ from .config import config from .io.callbacks import PeriodicCallback from .io.model import hold -from .io.notebook import push, push_on_root +from .io.notebook import push from .io.server import unlocked from .io.state import state from .util import edit_readonly, updating @@ -183,6 +183,19 @@ def _update_manual(self, *events): else: cb() + def _apply_update(self, events, msg, model, ref): + if ref not in state._views or ref in state._fake_roots: + return + viewable, root, doc, comm = state._views[ref] + if comm or not doc.session_context or state._unblocked(doc): + with unlocked(): + self._update_model(events, msg, root, model, doc, comm) + if comm and 'embedded' not in root.tags: + push(doc, comm) + else: + cb = partial(self._update_model, events, msg, root, model, doc, comm) + doc.add_next_tick_callback(cb) + def _update_model(self, events, msg, root, model, doc, comm): self._changing[root.ref['id']] = [ attr for attr, value in msg.items() @@ -222,17 +235,7 @@ def _param_change(self, *events): return for ref, (model, parent) in self._models.items(): - if ref not in state._views or ref in state._fake_roots: - continue - viewable, root, doc, comm = state._views[ref] - if comm or not doc.session_context or state._unblocked(doc): - with unlocked(): - self._update_model(events, msg, root, model, doc, comm) - if comm and 'embedded' not in root.tags: - push(doc, comm) - else: - cb = partial(self._update_model, events, msg, root, model, doc, comm) - doc.add_next_tick_callback(cb) + self._apply_update(events, msg, model, ref) def _process_events(self, events): with edit_readonly(state): @@ -644,31 +647,49 @@ def _manual_update(self, events, model, doc, root, parent, comm): elif hasattr(self, '_update_' + event.name): getattr(self, '_update_' + event.name)(model) + @updating def _update_cds(self, *events): - if self._updating: - return self._processed, self._data = self._get_data() + msg = {'data': self._data} for ref, (m, _) in self._models.items(): - m.source.data = self._data - push_on_root(ref) + self._apply_update(events, msg, m.source, ref) + @updating def _update_selected(self, *events, indices=None): indices = self.selection if indices is None else indices + msg = {'indices': indices} for ref, (m, _) in self._models.items(): - m.source.selected.indices = indices - push_on_root(ref) + self._apply_update(events, msg, m.source.selected, ref) @updating def _stream(self, stream, rollover=None): for ref, (m, _) in self._models.items(): - m.source.stream(stream, rollover) - push_on_root(ref) + if ref not in state._views or ref in state._fake_roots: + continue + viewable, root, doc, comm = state._views[ref] + if comm or not doc.session_context or state._unblocked(doc): + with unlocked(): + m.source.stream(stream, rollover) + if comm and 'embedded' not in root.tags: + push(doc, comm) + else: + cb = partial(m.source.stream, stream, rollover) + doc.add_next_tick_callback(cb) @updating def _patch(self, patch): for ref, (m, _) in self._models.items(): - m.source.patch(patch) - push_on_root(ref) + if ref not in state._views or ref in state._fake_roots: + continue + viewable, root, doc, comm = state._views[ref] + if comm or not doc.session_context or state._unblocked(doc): + with unlocked(): + m.source.patch(patch) + if comm and 'embedded' not in root.tags: + push(doc, comm) + else: + cb = partial(m.source.patch, patch) + doc.add_next_tick_callback(cb) def stream(self, stream_value, rollover=None, reset_index=True): """ @@ -748,11 +769,7 @@ def stream(self, stream_value, rollover=None, reset_index=True): self.param.trigger(self._data_params[0]) finally: self._updating = False - try: - self._updating = True - self._stream(stream_value, rollover) - finally: - self._updating = False + self._stream(stream_value, rollover) elif pd and isinstance(stream_value, pd.Series): if isinstance(self._processed, dict): self.stream({k: [v] for k, v in stream_value.to_dict().items()}, rollover) @@ -761,11 +778,7 @@ def stream(self, stream_value, rollover=None, reset_index=True): self._processed.loc[value_index_start] = stream_value with param.discard_events(self): self._update_data(self._processed) - self._updating = True - try: - self._stream(self._processed.iloc[-1:], rollover) - finally: - self._updating = False + self._stream(self._processed.iloc[-1:], rollover) elif isinstance(stream_value, dict): if isinstance(self._processed, dict): if not all(col in stream_value for col in self._data): @@ -775,11 +788,7 @@ def stream(self, stream_value, rollover=None, reset_index=True): if rollover is not None: combined = combined[-rollover:] self._update_column(col, combined) - self._updating = True - try: - self._stream(stream_value, rollover) - finally: - self._updating = False + self._stream(stream_value, rollover) else: try: stream_value = pd.DataFrame(stream_value) diff --git a/panel/widgets/tables.py b/panel/widgets/tables.py index 5d59c0ff6d..c334480884 100644 --- a/panel/widgets/tables.py +++ b/panel/widgets/tables.py @@ -15,7 +15,6 @@ ) from ..depends import param_value_if_widget -from ..io.notebook import push_on_root from ..models.tabulator import ( DataTabulator as _BkTabulator, TABULATOR_THEMES, THEME_URL ) @@ -827,9 +826,9 @@ def _get_style_data(self): def _update_style(self): styles = self._get_style_data() + msg = {'styles': styles} for ref, (m, _) in self._models.items(): - m.styles = styles - push_on_root(ref) + self._apply_update([], msg, m, ref) @updating def _stream(self, stream, rollover=None, follow=True): @@ -843,9 +842,8 @@ def _stream(self, stream, rollover=None, follow=True): self._update_style() def stream(self, stream_value, rollover=None, reset_index=True, follow=True): - for ref, (m, _) in self._models.items(): - m.follow = follow - push_on_root(ref) + for ref, (model, _) in self._models.items(): + self._apply_update([], {'follow': follow}, model, ref) if follow and self.pagination: length = self._length nrows = self.page_size @@ -885,9 +883,8 @@ def _update_max_page(self): nrows = self.page_size max_page = length//nrows + bool(length%nrows) self.param.page.bounds = (1, max_page) - for ref, (m, _) in self._models.items(): - m.max_page = max_page - push_on_root(ref) + for ref, (model, _) in self._models.items(): + self._apply_update([], {'max_page': max_page}, model, ref) def _update_selected(self, *events, indices=None): if self._updating: @@ -1052,12 +1049,9 @@ def download(self, filename='table.csv'): filename: str The filename to save the table as. """ - for ref, (m, _) in self._models.items(): - m.filename = m.filename - push_on_root(ref) - for ref, (m, _) in self._models.items(): - m.download = not m.download - push_on_root(ref) + for ref, (model, _) in self._models.items(): + self._apply_update([], {'filename': filename}, model, ref) + self._apply_update([], {'download': not model.download}, model, ref) def download_menu(self, text_kwargs={}, button_kwargs={}): """