From 43a97807fcdf04bd66968593bf954c4201c2b696 Mon Sep 17 00:00:00 2001 From: Kyle Kelley Date: Fri, 6 Oct 2017 09:15:06 -0700 Subject: [PATCH] buffer messages when websocket connection is interrupted (#2871) * provide some top level comments * implement buffering of messages on last dropped connection - buffer is per-kernel - session_key is stored because only a single session can resume the buffer and we can't be sure - on any new connection to a kernel, buffer is flushed. If session_key matches, it is replayed. Otherwise, it is discarded. - buffer is an unbounded list for now * restore actual zmq channels when resuming connection rather than establishing new connections fixes failure to resume shell channel * hookup restart callbacks in open instead of in `create_stream`, which is not called on reconnect * improve handling of restored connections in js - dismiss 'connection lost' dialog on reconnect - set busy status on reconnect (if not busy, idle will come soon after via kernel_ready) --- notebook/services/kernels/handlers.py | 62 +++++++---- notebook/services/kernels/kernelmanager.py | 101 +++++++++++++++++- .../static/notebook/js/notificationarea.js | 17 ++- 3 files changed, 158 insertions(+), 22 deletions(-) diff --git a/notebook/services/kernels/handlers.py b/notebook/services/kernels/handlers.py index d421077038..9477bdeb93 100644 --- a/notebook/services/kernels/handlers.py +++ b/notebook/services/kernels/handlers.py @@ -93,6 +93,9 @@ def post(self, kernel_id, action): class ZMQChannelsHandler(AuthenticatedZMQStreamHandler): + '''There is one ZMQChannelsHandler per running kernel and it oversees all + the sessions. + ''' # class-level registry of open sessions # allows checking for conflict on session-id, @@ -125,8 +128,6 @@ def create_stream(self): meth = getattr(km, 'connect_' + channel) self.channels[channel] = stream = meth(self.kernel_id, identity=identity) stream.channel = channel - km.add_restart_callback(self.kernel_id, self.on_kernel_restarted) - km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead') def request_kernel_info(self): """send a request for kernel_info""" @@ -252,23 +253,41 @@ def _register_session(self): self.log.warning("Replacing stale connection: %s", self.session_key) yield stale_handler.close() self._open_sessions[self.session_key] = self - + def open(self, kernel_id): super(ZMQChannelsHandler, self).open() - self.kernel_manager.notify_connect(kernel_id) - try: - self.create_stream() - except web.HTTPError as e: - self.log.error("Error opening stream: %s", e) - # WebSockets don't response to traditional error codes so we - # close the connection. - for channel, stream in self.channels.items(): - if not stream.closed(): - stream.close() - self.close() + km = self.kernel_manager + km.notify_connect(kernel_id) + + # on new connections, flush the message buffer + buffer_info = km.get_buffer(kernel_id, self.session_key) + if buffer_info and buffer_info['session_key'] == self.session_key: + self.log.info("Restoring connection for %s", self.session_key) + self.channels = buffer_info['channels'] + replay_buffer = buffer_info['buffer'] + if replay_buffer: + self.log.info("Replaying %s buffered messages", len(replay_buffer)) + for channel, msg_list in replay_buffer: + stream = self.channels[channel] + self._on_zmq_reply(stream, msg_list) else: - for channel, stream in self.channels.items(): - stream.on_recv_stream(self._on_zmq_reply) + try: + self.create_stream() + except web.HTTPError as e: + self.log.error("Error opening stream: %s", e) + # WebSockets don't response to traditional error codes so we + # close the connection. + for channel, stream in self.channels.items(): + if not stream.closed(): + stream.close() + self.close() + return + + km.add_restart_callback(self.kernel_id, self.on_kernel_restarted) + km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead') + + for channel, stream in self.channels.items(): + stream.on_recv_stream(self._on_zmq_reply) def on_message(self, msg): if not self.channels: @@ -288,7 +307,7 @@ def on_message(self, msg): return stream = self.channels[channel] self.session.send(stream, msg) - + def _on_zmq_reply(self, stream, msg_list): idents, fed_msg_list = self.session.feed_identities(msg_list) msg = self.session.deserialize(fed_msg_list) @@ -301,7 +320,6 @@ def write_stderr(error_message): ) msg['channel'] = 'iopub' self.write_message(json.dumps(msg, default=date_default)) - channel = getattr(stream, 'channel', None) msg_type = msg['header']['msg_type'] @@ -408,6 +426,7 @@ def on_close(self): # unregister myself as an open session (only if it's really me) if self._open_sessions.get(self.session_key) is self: self._open_sessions.pop(self.session_key) + km = self.kernel_manager if self.kernel_id in km: km.notify_disconnect(self.kernel_id) @@ -417,6 +436,13 @@ def on_close(self): km.remove_restart_callback( self.kernel_id, self.on_restart_failed, 'dead', ) + + # start buffering instead of closing if this was the last connection + if km._kernel_connections[self.kernel_id] == 0: + km.start_buffering(self.kernel_id, self.session_key, self.channels) + self._close_future.set_result(None) + return + # This method can be called twice, once by self.kernel_died and once # from the WebSocket close event. If the WebSocket connection is # closed before the ZMQ streams are setup, they could be None. diff --git a/notebook/services/kernels/kernelmanager.py b/notebook/services/kernels/kernelmanager.py index 2453d6f05f..6f47ed6a71 100644 --- a/notebook/services/kernels/kernelmanager.py +++ b/notebook/services/kernels/kernelmanager.py @@ -7,6 +7,8 @@ # Copyright (c) Jupyter Development Team. # Distributed under the terms of the Modified BSD License. +from collections import defaultdict +from functools import partial import os from tornado import gen, web @@ -15,13 +17,13 @@ from jupyter_client.session import Session from jupyter_client.multikernelmanager import MultiKernelManager -from traitlets import Bool, Dict, List, Unicode, TraitError, Integer, default, validate +from traitlets import Any, Bool, Dict, List, Unicode, TraitError, Integer, default, validate from notebook.utils import to_os_path, exists from notebook._tz import utcnow, isoformat from ipython_genutils.py3compat import getcwd -from datetime import datetime, timedelta +from datetime import timedelta class MappingKernelManager(MultiKernelManager): @@ -81,6 +83,11 @@ def _update_root_dir(self, proposal): Only effective if cull_idle_timeout is not 0.""" ) + _kernel_buffers = Any() + @default('_kernel_buffers') + def _default_kernel_buffers(self): + return defaultdict(lambda: {'buffer': [], 'session_key': '', 'channels': {}}) + #------------------------------------------------------------------------- # Methods for managing kernels and sessions #------------------------------------------------------------------------- @@ -142,10 +149,97 @@ def start_kernel(self, kernel_id=None, path=None, **kwargs): # py2-compat raise gen.Return(kernel_id) + def start_buffering(self, kernel_id, session_key, channels): + """Start buffering messages for a kernel + + Parameters + ---------- + kernel_id : str + The id of the kernel to stop buffering. + session_key: str + The session_key, if any, that should get the buffer. + If the session_key matches the current buffered session_key, + the buffer will be returned. + channels: dict({'channel': ZMQStream}) + The zmq channels whose messages should be buffered. + """ + self.log.info("Starting buffering for %s", session_key) + self._check_kernel_id(kernel_id) + # clear previous buffering state + self.stop_buffering(kernel_id) + buffer_info = self._kernel_buffers[kernel_id] + # record the session key because only one session can buffer + buffer_info['session_key'] = session_key + # TODO: the buffer should likely be a memory bounded queue, we're starting with a list to keep it simple + buffer_info['buffer'] = [] + buffer_info['channels'] = channels + + # forward any future messages to the internal buffer + def buffer_msg(channel, msg_parts): + self.log.debug("Buffering msg on %s:%s", kernel_id, channel) + buffer_info['buffer'].append((channel, msg_parts)) + + for channel, stream in channels.items(): + stream.on_recv(partial(buffer_msg, channel)) + + + def get_buffer(self, kernel_id, session_key): + """Get the buffer for a given kernel + + Parameters + ---------- + kernel_id : str + The id of the kernel to stop buffering. + session_key: str, optional + The session_key, if any, that should get the buffer. + If the session_key matches the current buffered session_key, + the buffer will be returned. + """ + self.log.debug("Getting buffer for %s", kernel_id) + if kernel_id not in self._kernel_buffers: + return + + buffer_info = self._kernel_buffers[kernel_id] + if buffer_info['session_key'] == session_key: + # remove buffer + self._kernel_buffers.pop(kernel_id) + # only return buffer_info if it's a match + return buffer_info + else: + self.stop_buffering(kernel_id) + + def stop_buffering(self, kernel_id): + """Stop buffering kernel messages + + Parameters + ---------- + kernel_id : str + The id of the kernel to stop buffering. + """ + self.log.debug("Clearing buffer for %s", kernel_id) + self._check_kernel_id(kernel_id) + + if kernel_id not in self._kernel_buffers: + return + buffer_info = self._kernel_buffers.pop(kernel_id) + # close buffering streams + for stream in buffer_info['channels'].values(): + if not stream.closed(): + stream.on_recv(None) + stream.socket.close() + stream.close() + + msg_buffer = buffer_info['buffer'] + if msg_buffer: + self.log.info("Discarding %s buffered messages for %s", + len(msg_buffer), buffer_info['session_key']) + def shutdown_kernel(self, kernel_id, now=False): """Shutdown a kernel by kernel_id""" self._check_kernel_id(kernel_id) - self._kernels[kernel_id]._activity_stream.close() + kernel = self._kernels[kernel_id] + kernel._activity_stream.close() + self.stop_buffering(kernel_id) self._kernel_connections.pop(kernel_id, None) return super(MappingKernelManager, self).shutdown_kernel(kernel_id, now=now) @@ -256,6 +350,7 @@ def record_activity(msg_list): idents, fed_msg_list = session.feed_identities(msg_list) msg = session.deserialize(fed_msg_list) + msg_type = msg['header']['msg_type'] self.log.debug("activity on %s: %s", kernel_id, msg_type) if msg_type == 'status': diff --git a/notebook/static/notebook/js/notificationarea.js b/notebook/static/notebook/js/notificationarea.js index ed183db42b..2cebb44b6f 100644 --- a/notebook/static/notebook/js/notificationarea.js +++ b/notebook/static/notebook/js/notificationarea.js @@ -107,6 +107,11 @@ define([ this.events.on('kernel_connected.Kernel', function () { knw.info("Connected", 500); + // trigger busy in the status to clear broken-link state immediately + // a kernel_ready event will come when the kernel becomes responsive. + $kernel_ind_icon + .attr('class', 'kernel_busy_icon') + .attr('title', i18n.msg._('Kernel Connected')); }); this.events.on('kernel_restarting.Kernel', function () { @@ -161,7 +166,7 @@ define([ " The notebook will continue trying to reconnect. Check your" + " network connection or notebook server configuration."); - dialog.kernel_modal({ + var the_dialog = dialog.kernel_modal({ title: i18n.msg._("Connection failed"), body: msg, keyboard_manager: that.keyboard_manager, @@ -170,6 +175,16 @@ define([ "OK": {} } }); + + // hide the dialog on reconnect if it's still visible + var dismiss = function () { + the_dialog.modal('hide'); + } + that.events.on("kernel_connected.Kernel", dismiss); + the_dialog.on("hidden.bs.modal", function () { + // clear handler on dismiss + that.events.off("kernel_connected.Kernel", dismiss); + }); } });