Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix buffered message replay #4110

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions notebook/services/kernels/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ def initialize(self):
self._kernel_info_future = Future()
self._close_future = Future()
self.session_key = ''
self.buffer_key = ''

# Rate limiting code
self._iopub_window_msg_count = 0
Expand All @@ -222,6 +223,7 @@ def pre_get(self):
# servers never respond to websocket connection requests.
kernel = self.kernel_manager.get_kernel(self.kernel_id)
self.session.key = kernel.session.key
self.buffer_key = cast_unicode(kernel.session.key, "utf-8") # Use kernel's session key for buffer replay id
future = self.request_kernel_info()

def give_up():
Expand Down Expand Up @@ -261,9 +263,9 @@ def open(self, kernel_id):
km.notify_connect(kernel_id)

# on new connections, flush the message buffer
buffer_info = km.get_buffer(kernel_id, self.session_key)
if buffer_info and buffer_info['session_key'] == self.session_key:
self.log.info("Restoring connection for %s", self.session_key)
buffer_info = km.get_buffer(kernel_id, self.buffer_key)
if buffer_info and buffer_info['session_key'] == self.buffer_key:
self.log.info("Restoring connection for kernel_id %s", self.kernel_id)
self.channels = buffer_info['channels']
replay_buffer = buffer_info['buffer']
if replay_buffer:
Expand Down Expand Up @@ -440,7 +442,7 @@ def on_close(self):

# start buffering instead of closing if this was the last connection
if km._kernel_connections[self.kernel_id] == 0:
km.start_buffering(self.kernel_id, self.session_key, self.channels)
km.start_buffering(self.kernel_id, self.buffer_key, self.channels)
self._close_future.set_result(None)
return

Expand Down
10 changes: 5 additions & 5 deletions notebook/services/kernels/kernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ def start_buffering(self, kernel_id, session_key, channels):
stream.close()
return

self.log.info("Starting buffering for %s", session_key)
self.log.info("Starting buffering for kernel_id %s with session_key %s", kernel_id, session_key)
self._check_kernel_id(kernel_id)
# clear previous buffering state
self.stop_buffering(kernel_id)
Expand Down Expand Up @@ -239,7 +239,7 @@ def get_buffer(self, kernel_id, session_key):
If the session_key matches the current buffered session_key,
the buffer will be returned.
"""
self.log.debug("Getting buffer for %s", kernel_id)
self.log.debug("Getting buffer for kernel_id %s with session_key %s", kernel_id, session_key)
if kernel_id not in self._kernel_buffers:
return

Expand All @@ -260,7 +260,7 @@ def stop_buffering(self, kernel_id):
kernel_id : str
The id of the kernel to stop buffering.
"""
self.log.debug("Clearing buffer for %s", kernel_id)
self.log.debug("Clearing buffer for kernel_id %s", kernel_id)
self._check_kernel_id(kernel_id)

if kernel_id not in self._kernel_buffers:
Expand All @@ -274,8 +274,8 @@ def stop_buffering(self, kernel_id):

msg_buffer = buffer_info['buffer']
if msg_buffer:
self.log.info("Discarding %s buffered messages for %s",
len(msg_buffer), buffer_info['session_key'])
self.log.info("Discarding %s buffered messages for kernel_id %s with session_key %s",
len(msg_buffer), kernel_id, buffer_info['session_key'])

def shutdown_kernel(self, kernel_id, now=False):
"""Shutdown a kernel by kernel_id"""
Expand Down