Skip to content

Commit

Permalink
Added rate_limit to Streamz pane (#769)
Browse files Browse the repository at this point in the history
* Added rate_limit to Streamz pane

* Updates to docs
  • Loading branch information
philippjfr authored Nov 12, 2019
1 parent bdc073c commit 35c8b50
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 7 deletions.
11 changes: 6 additions & 5 deletions examples/reference/panes/Streamz.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"\n",
"* **``always_watch``** (boolean, default=False): Whether to watch the stream even when not displayed.\n",
"* **``object``** (streamz.Stream): The streamz.Stream object being watched\n",
"* **``rate_limit``** (floa, default=0.1): The minimum interval between events.\n",
"\n",
"___"
]
Expand Down Expand Up @@ -82,7 +83,7 @@
" y='y',\n",
" )\n",
"\n",
"altair_stream = df.cumsum().stream.sliding_window(100).map(line_plot)\n",
"altair_stream = df.cumsum().stream.sliding_window(50).map(line_plot)\n",
"\n",
"altair_pane = pn.pane.Streamz(altair_stream, height=350, always_watch=True)\n",
"\n",
Expand All @@ -105,10 +106,10 @@
"metadata": {},
"outputs": [],
"source": [
"import time\n",
"for i in range(100):\n",
" time.sleep(0.2)\n",
" df.emit(pd.DataFrame({'y': [np.random.randn()]}, index=pd.DatetimeIndex([pd.datetime.now()])))"
"def emit():\n",
" df.emit(pd.DataFrame({'y': [np.random.randn()]}, index=pd.DatetimeIndex([pd.datetime.now()])))\n",
"\n",
"altair_pane.add_periodic_callback(emit, period=100)"
]
}
],
Expand Down
7 changes: 5 additions & 2 deletions panel/pane/streamz.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,24 @@ class Streamz(ReplacementPane):
always_watch = param.Boolean(default=False, doc="""
Whether to watch even when not displayed.""")

rate_limit = param.Number(default=0.1, bounds=(0, None), doc="""
The minimum interval between events.""")

def __init__(self, object=None, **params):
super(Streamz, self).__init__(object, **params)
self._stream = None
if self.always_watch:
self._setup_stream()

@param.depends('object', 'always_watch', watch=True)
@param.depends('always_watch', 'object', 'rate_limit', watch=True)
def _setup_stream(self):
if self.object is None or (self.always_watch and self._stream):
return
elif self._stream:
self._stream.destroy()
self._stream = None
if self._pane._models or self.always_watch:
self._stream = self.object.latest().rate_limit(0.5).gather()
self._stream = self.object.latest().rate_limit(self.rate_limit).gather()
self._stream.sink(self._update_pane)

def _get_model(self, doc, root=None, parent=None, comm=None):
Expand Down

0 comments on commit 35c8b50

Please sign in to comment.