Skip to content

Commit

Permalink
Ensure changes on ReactiveData source are scheduled correctly (#2134)
Browse files Browse the repository at this point in the history
  • Loading branch information
philippjfr authored Mar 31, 2021
1 parent 24022e4 commit c66df83
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 56 deletions.
91 changes: 50 additions & 41 deletions panel/reactive.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from .config import config
from .io.callbacks import PeriodicCallback
from .io.model import hold
from .io.notebook import push, push_on_root
from .io.notebook import push
from .io.server import unlocked
from .io.state import state
from .models.reactive_html import (
Expand Down Expand Up @@ -188,6 +188,19 @@ def _update_manual(self, *events):
else:
cb()

def _apply_update(self, events, msg, model, ref):
if ref not in state._views or ref in state._fake_roots:
return
viewable, root, doc, comm = state._views[ref]
if comm or not doc.session_context or state._unblocked(doc):
with unlocked():
self._update_model(events, msg, root, model, doc, comm)
if comm and 'embedded' not in root.tags:
push(doc, comm)
else:
cb = partial(self._update_model, events, msg, root, model, doc, comm)
doc.add_next_tick_callback(cb)

def _update_model(self, events, msg, root, model, doc, comm):
self._changing[root.ref['id']] = [
attr for attr, value in msg.items()
Expand Down Expand Up @@ -227,17 +240,7 @@ def _param_change(self, *events):
return

for ref, (model, parent) in self._models.items():
if ref not in state._views or ref in state._fake_roots:
continue
viewable, root, doc, comm = state._views[ref]
if comm or not doc.session_context or state._unblocked(doc):
with unlocked():
self._update_model(events, msg, root, model, doc, comm)
if comm and 'embedded' not in root.tags:
push(doc, comm)
else:
cb = partial(self._update_model, events, msg, root, model, doc, comm)
doc.add_next_tick_callback(cb)
self._apply_update(events, msg, model, ref)

def _process_events(self, events):
with edit_readonly(state):
Expand Down Expand Up @@ -651,31 +654,49 @@ def _manual_update(self, events, model, doc, root, parent, comm):
elif hasattr(self, '_update_' + event.name):
getattr(self, '_update_' + event.name)(model)

@updating
def _update_cds(self, *events):
if self._updating:
return
self._processed, self._data = self._get_data()
msg = {'data': self._data}
for ref, (m, _) in self._models.items():
m.source.data = self._data
push_on_root(ref)
self._apply_update(events, msg, m.source, ref)

@updating
def _update_selected(self, *events, indices=None):
indices = self.selection if indices is None else indices
msg = {'indices': indices}
for ref, (m, _) in self._models.items():
m.source.selected.indices = indices
push_on_root(ref)
self._apply_update(events, msg, m.source.selected, ref)

@updating
def _stream(self, stream, rollover=None):
for ref, (m, _) in self._models.items():
m.source.stream(stream, rollover)
push_on_root(ref)
if ref not in state._views or ref in state._fake_roots:
continue
viewable, root, doc, comm = state._views[ref]
if comm or not doc.session_context or state._unblocked(doc):
with unlocked():
m.source.stream(stream, rollover)
if comm and 'embedded' not in root.tags:
push(doc, comm)
else:
cb = partial(m.source.stream, stream, rollover)
doc.add_next_tick_callback(cb)

@updating
def _patch(self, patch):
for ref, (m, _) in self._models.items():
m.source.patch(patch)
push_on_root(ref)
if ref not in state._views or ref in state._fake_roots:
continue
viewable, root, doc, comm = state._views[ref]
if comm or not doc.session_context or state._unblocked(doc):
with unlocked():
m.source.patch(patch)
if comm and 'embedded' not in root.tags:
push(doc, comm)
else:
cb = partial(m.source.patch, patch)
doc.add_next_tick_callback(cb)

def stream(self, stream_value, rollover=None, reset_index=True):
"""
Expand Down Expand Up @@ -755,11 +776,7 @@ def stream(self, stream_value, rollover=None, reset_index=True):
self.param.trigger(self._data_params[0])
finally:
self._updating = False
try:
self._updating = True
self._stream(stream_value, rollover)
finally:
self._updating = False
self._stream(stream_value, rollover)
elif pd and isinstance(stream_value, pd.Series):
if isinstance(self._processed, dict):
self.stream({k: [v] for k, v in stream_value.to_dict().items()}, rollover)
Expand All @@ -768,11 +785,7 @@ def stream(self, stream_value, rollover=None, reset_index=True):
self._processed.loc[value_index_start] = stream_value
with param.discard_events(self):
self._update_data(self._processed)
self._updating = True
try:
self._stream(self._processed.iloc[-1:], rollover)
finally:
self._updating = False
self._stream(self._processed.iloc[-1:], rollover)
elif isinstance(stream_value, dict):
if isinstance(self._processed, dict):
if not all(col in stream_value for col in self._data):
Expand All @@ -782,11 +795,7 @@ def stream(self, stream_value, rollover=None, reset_index=True):
if rollover is not None:
combined = combined[-rollover:]
self._update_column(col, combined)
self._updating = True
try:
self._stream(stream_value, rollover)
finally:
self._updating = False
self._stream(stream_value, rollover)
else:
try:
stream_value = pd.DataFrame(stream_value)
Expand Down Expand Up @@ -1330,7 +1339,7 @@ def _update_model(self, events, msg, root, model, doc, comm):
for prop, v in list(msg.items()):
if prop in child_params:
new_children[prop] = prop
elif prop in Reactive.param:
elif prop in list(Reactive.param)+['events']:
model_msg[prop] = v
elif prop in self.param and (self.param[prop].precedence or 0) < 0:
continue
Expand Down Expand Up @@ -1372,6 +1381,6 @@ def on_event(self, node, event, callback):
f"nodes include: {self._parser.nodes}.")
self._event_callbacks[node][event].append(callback)
events = self._get_events()
for ref, (m, _) in self._models.items():
m.events = events
push_on_root(ref)
for ref, (model, _) in self._models.items():
print(model)
self._apply_update([], {'events': events}, model, ref)
24 changes: 9 additions & 15 deletions panel/widgets/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
)

from ..depends import param_value_if_widget
from ..io.notebook import push_on_root
from ..models.tabulator import (
DataTabulator as _BkTabulator, TABULATOR_THEMES, THEME_URL
)
Expand Down Expand Up @@ -827,9 +826,9 @@ def _get_style_data(self):

def _update_style(self):
styles = self._get_style_data()
msg = {'styles': styles}
for ref, (m, _) in self._models.items():
m.styles = styles
push_on_root(ref)
self._apply_update([], msg, m, ref)

@updating
def _stream(self, stream, rollover=None, follow=True):
Expand All @@ -843,9 +842,8 @@ def _stream(self, stream, rollover=None, follow=True):
self._update_style()

def stream(self, stream_value, rollover=None, reset_index=True, follow=True):
for ref, (m, _) in self._models.items():
m.follow = follow
push_on_root(ref)
for ref, (model, _) in self._models.items():
self._apply_update([], {'follow': follow}, model, ref)
if follow and self.pagination:
length = self._length
nrows = self.page_size
Expand Down Expand Up @@ -885,9 +883,8 @@ def _update_max_page(self):
nrows = self.page_size
max_page = length//nrows + bool(length%nrows)
self.param.page.bounds = (1, max_page)
for ref, (m, _) in self._models.items():
m.max_page = max_page
push_on_root(ref)
for ref, (model, _) in self._models.items():
self._apply_update([], {'max_page': max_page}, model, ref)

def _update_selected(self, *events, indices=None):
if self._updating:
Expand Down Expand Up @@ -1052,12 +1049,9 @@ def download(self, filename='table.csv'):
filename: str
The filename to save the table as.
"""
for ref, (m, _) in self._models.items():
m.filename = m.filename
push_on_root(ref)
for ref, (m, _) in self._models.items():
m.download = not m.download
push_on_root(ref)
for ref, (model, _) in self._models.items():
self._apply_update([], {'filename': filename}, model, ref)
self._apply_update([], {'download': not model.download}, model, ref)

def download_menu(self, text_kwargs={}, button_kwargs={}):
"""
Expand Down

0 comments on commit c66df83

Please sign in to comment.