Skip to content

Commit

Permalink
buffer messages when websocket connection is interrupted (#2871)
Browse files Browse the repository at this point in the history
* provide some top level comments

* implement buffering of messages on last dropped connection

- buffer is per-kernel
- session_key is stored because only a single session can resume the buffer and we can't be sure
- on any new connection to a kernel, buffer is flushed.
  If session_key matches, it is replayed.
  Otherwise, it is discarded.
- buffer is an unbounded list for now

* restore actual zmq channels when resuming connection

rather than establishing new connections

fixes failure to resume shell channel

* hookup restart callbacks in open

instead of in `create_stream`, which is not called on reconnect

* improve handling of restored connections in js

- dismiss 'connection lost' dialog on reconnect
- set busy status on reconnect (if not busy, idle will come soon after via kernel_ready)
  • Loading branch information
rgbkrk authored and gnestor committed Oct 6, 2017
1 parent 9b4660f commit 43a9780
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 22 deletions.
62 changes: 44 additions & 18 deletions notebook/services/kernels/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ def post(self, kernel_id, action):


class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
'''There is one ZMQChannelsHandler per running kernel and it oversees all
the sessions.
'''

# class-level registry of open sessions
# allows checking for conflict on session-id,
Expand Down Expand Up @@ -125,8 +128,6 @@ def create_stream(self):
meth = getattr(km, 'connect_' + channel)
self.channels[channel] = stream = meth(self.kernel_id, identity=identity)
stream.channel = channel
km.add_restart_callback(self.kernel_id, self.on_kernel_restarted)
km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead')

def request_kernel_info(self):
"""send a request for kernel_info"""
Expand Down Expand Up @@ -252,23 +253,41 @@ def _register_session(self):
self.log.warning("Replacing stale connection: %s", self.session_key)
yield stale_handler.close()
self._open_sessions[self.session_key] = self

def open(self, kernel_id):
super(ZMQChannelsHandler, self).open()
self.kernel_manager.notify_connect(kernel_id)
try:
self.create_stream()
except web.HTTPError as e:
self.log.error("Error opening stream: %s", e)
# WebSockets don't response to traditional error codes so we
# close the connection.
for channel, stream in self.channels.items():
if not stream.closed():
stream.close()
self.close()
km = self.kernel_manager
km.notify_connect(kernel_id)

# on new connections, flush the message buffer
buffer_info = km.get_buffer(kernel_id, self.session_key)
if buffer_info and buffer_info['session_key'] == self.session_key:
self.log.info("Restoring connection for %s", self.session_key)
self.channels = buffer_info['channels']
replay_buffer = buffer_info['buffer']
if replay_buffer:
self.log.info("Replaying %s buffered messages", len(replay_buffer))
for channel, msg_list in replay_buffer:
stream = self.channels[channel]
self._on_zmq_reply(stream, msg_list)
else:
for channel, stream in self.channels.items():
stream.on_recv_stream(self._on_zmq_reply)
try:
self.create_stream()
except web.HTTPError as e:
self.log.error("Error opening stream: %s", e)
# WebSockets don't response to traditional error codes so we
# close the connection.
for channel, stream in self.channels.items():
if not stream.closed():
stream.close()
self.close()
return

km.add_restart_callback(self.kernel_id, self.on_kernel_restarted)
km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead')

for channel, stream in self.channels.items():
stream.on_recv_stream(self._on_zmq_reply)

def on_message(self, msg):
if not self.channels:
Expand All @@ -288,7 +307,7 @@ def on_message(self, msg):
return
stream = self.channels[channel]
self.session.send(stream, msg)

def _on_zmq_reply(self, stream, msg_list):
idents, fed_msg_list = self.session.feed_identities(msg_list)
msg = self.session.deserialize(fed_msg_list)
Expand All @@ -301,7 +320,6 @@ def write_stderr(error_message):
)
msg['channel'] = 'iopub'
self.write_message(json.dumps(msg, default=date_default))

channel = getattr(stream, 'channel', None)
msg_type = msg['header']['msg_type']

Expand Down Expand Up @@ -408,6 +426,7 @@ def on_close(self):
# unregister myself as an open session (only if it's really me)
if self._open_sessions.get(self.session_key) is self:
self._open_sessions.pop(self.session_key)

km = self.kernel_manager
if self.kernel_id in km:
km.notify_disconnect(self.kernel_id)
Expand All @@ -417,6 +436,13 @@ def on_close(self):
km.remove_restart_callback(
self.kernel_id, self.on_restart_failed, 'dead',
)

# start buffering instead of closing if this was the last connection
if km._kernel_connections[self.kernel_id] == 0:
km.start_buffering(self.kernel_id, self.session_key, self.channels)
self._close_future.set_result(None)
return

# This method can be called twice, once by self.kernel_died and once
# from the WebSocket close event. If the WebSocket connection is
# closed before the ZMQ streams are setup, they could be None.
Expand Down
101 changes: 98 additions & 3 deletions notebook/services/kernels/kernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.

from collections import defaultdict
from functools import partial
import os

from tornado import gen, web
Expand All @@ -15,13 +17,13 @@

from jupyter_client.session import Session
from jupyter_client.multikernelmanager import MultiKernelManager
from traitlets import Bool, Dict, List, Unicode, TraitError, Integer, default, validate
from traitlets import Any, Bool, Dict, List, Unicode, TraitError, Integer, default, validate

from notebook.utils import to_os_path, exists
from notebook._tz import utcnow, isoformat
from ipython_genutils.py3compat import getcwd

from datetime import datetime, timedelta
from datetime import timedelta


class MappingKernelManager(MultiKernelManager):
Expand Down Expand Up @@ -81,6 +83,11 @@ def _update_root_dir(self, proposal):
Only effective if cull_idle_timeout is not 0."""
)

_kernel_buffers = Any()
@default('_kernel_buffers')
def _default_kernel_buffers(self):
return defaultdict(lambda: {'buffer': [], 'session_key': '', 'channels': {}})

#-------------------------------------------------------------------------
# Methods for managing kernels and sessions
#-------------------------------------------------------------------------
Expand Down Expand Up @@ -142,10 +149,97 @@ def start_kernel(self, kernel_id=None, path=None, **kwargs):
# py2-compat
raise gen.Return(kernel_id)

def start_buffering(self, kernel_id, session_key, channels):
"""Start buffering messages for a kernel
Parameters
----------
kernel_id : str
The id of the kernel to stop buffering.
session_key: str
The session_key, if any, that should get the buffer.
If the session_key matches the current buffered session_key,
the buffer will be returned.
channels: dict({'channel': ZMQStream})
The zmq channels whose messages should be buffered.
"""
self.log.info("Starting buffering for %s", session_key)
self._check_kernel_id(kernel_id)
# clear previous buffering state
self.stop_buffering(kernel_id)
buffer_info = self._kernel_buffers[kernel_id]
# record the session key because only one session can buffer
buffer_info['session_key'] = session_key
# TODO: the buffer should likely be a memory bounded queue, we're starting with a list to keep it simple
buffer_info['buffer'] = []
buffer_info['channels'] = channels

# forward any future messages to the internal buffer
def buffer_msg(channel, msg_parts):
self.log.debug("Buffering msg on %s:%s", kernel_id, channel)
buffer_info['buffer'].append((channel, msg_parts))

for channel, stream in channels.items():
stream.on_recv(partial(buffer_msg, channel))


def get_buffer(self, kernel_id, session_key):
"""Get the buffer for a given kernel
Parameters
----------
kernel_id : str
The id of the kernel to stop buffering.
session_key: str, optional
The session_key, if any, that should get the buffer.
If the session_key matches the current buffered session_key,
the buffer will be returned.
"""
self.log.debug("Getting buffer for %s", kernel_id)
if kernel_id not in self._kernel_buffers:
return

buffer_info = self._kernel_buffers[kernel_id]
if buffer_info['session_key'] == session_key:
# remove buffer
self._kernel_buffers.pop(kernel_id)
# only return buffer_info if it's a match
return buffer_info
else:
self.stop_buffering(kernel_id)

def stop_buffering(self, kernel_id):
"""Stop buffering kernel messages
Parameters
----------
kernel_id : str
The id of the kernel to stop buffering.
"""
self.log.debug("Clearing buffer for %s", kernel_id)
self._check_kernel_id(kernel_id)

if kernel_id not in self._kernel_buffers:
return
buffer_info = self._kernel_buffers.pop(kernel_id)
# close buffering streams
for stream in buffer_info['channels'].values():
if not stream.closed():
stream.on_recv(None)
stream.socket.close()
stream.close()

msg_buffer = buffer_info['buffer']
if msg_buffer:
self.log.info("Discarding %s buffered messages for %s",
len(msg_buffer), buffer_info['session_key'])

def shutdown_kernel(self, kernel_id, now=False):
"""Shutdown a kernel by kernel_id"""
self._check_kernel_id(kernel_id)
self._kernels[kernel_id]._activity_stream.close()
kernel = self._kernels[kernel_id]
kernel._activity_stream.close()
self.stop_buffering(kernel_id)
self._kernel_connections.pop(kernel_id, None)
return super(MappingKernelManager, self).shutdown_kernel(kernel_id, now=now)

Expand Down Expand Up @@ -256,6 +350,7 @@ def record_activity(msg_list):

idents, fed_msg_list = session.feed_identities(msg_list)
msg = session.deserialize(fed_msg_list)

msg_type = msg['header']['msg_type']
self.log.debug("activity on %s: %s", kernel_id, msg_type)
if msg_type == 'status':
Expand Down
17 changes: 16 additions & 1 deletion notebook/static/notebook/js/notificationarea.js
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ define([

this.events.on('kernel_connected.Kernel', function () {
knw.info("Connected", 500);
// trigger busy in the status to clear broken-link state immediately
// a kernel_ready event will come when the kernel becomes responsive.
$kernel_ind_icon
.attr('class', 'kernel_busy_icon')
.attr('title', i18n.msg._('Kernel Connected'));
});

this.events.on('kernel_restarting.Kernel', function () {
Expand Down Expand Up @@ -161,7 +166,7 @@ define([
" The notebook will continue trying to reconnect. Check your" +
" network connection or notebook server configuration.");

dialog.kernel_modal({
var the_dialog = dialog.kernel_modal({
title: i18n.msg._("Connection failed"),
body: msg,
keyboard_manager: that.keyboard_manager,
Expand All @@ -170,6 +175,16 @@ define([
"OK": {}
}
});

// hide the dialog on reconnect if it's still visible
var dismiss = function () {
the_dialog.modal('hide');
}
that.events.on("kernel_connected.Kernel", dismiss);
the_dialog.on("hidden.bs.modal", function () {
// clear handler on dismiss
that.events.off("kernel_connected.Kernel", dismiss);
});
}
});

Expand Down

0 comments on commit 43a9780

Please sign in to comment.