Skip to content

Commit

Permalink
Backport PR #352: tornado 5 fixes in ThreadedClient
Browse files Browse the repository at this point in the history
The main issue was failing to schedule `ioloop.stop` in the ioloop thread, which is required when tornado is running on asyncio in order to wake the thread. The result of not doing this is hanging forever when trying to exit, e.g. in QtConsole.

There is further cleanup of threadsafety issues with respect to asyncio and tornado objects.

closes jupyter/qtconsole#275

cc  ccordoba12

Signed-off-by: Min RK <[email protected]>
  • Loading branch information
willingc authored and minrk committed Mar 11, 2018
1 parent e6e6b03 commit 652fd43
Showing 1 changed file with 36 additions and 13 deletions.
49 changes: 36 additions & 13 deletions jupyter_client/threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
from __future__ import absolute_import
import atexit
import errno
from threading import Thread
import sys
from threading import Thread, Event
import time

# import ZMQError in top-level namespace, to avoid ugly attribute-error messages
Expand Down Expand Up @@ -41,9 +42,15 @@ def __init__(self, socket, session, loop):
self.socket = socket
self.session = session
self.ioloop = loop
evt = Event()

self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
self.stream.on_recv(self._handle_recv)
def setup_stream():
self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
self.stream.on_recv(self._handle_recv)
evt.set()

self.ioloop.add_callback(setup_stream)
evt.wait()

_is_alive = False
def is_alive(self):
Expand Down Expand Up @@ -142,11 +149,11 @@ class IOLoopThread(Thread):
"""Run a pyzmq ioloop in a thread to send and receive messages
"""
_exiting = False
ioloop = None

def __init__(self, loop):
def __init__(self):
super(IOLoopThread, self).__init__()
self.daemon = True
self.ioloop = loop or ioloop.IOLoop()

@staticmethod
@atexit.register
Expand All @@ -156,8 +163,26 @@ def _notice_exit():
if IOLoopThread is not None:
IOLoopThread._exiting = True

def start(self):
"""Start the IOLoop thread
Don't return until self.ioloop is defined,
which is created in the thread
"""
self._start_event = Event()
Thread.start(self)
self._start_event.wait()

def run(self):
"""Run my loop, ignoring EINTR events in the poller"""
if 'asyncio' in sys.modules:
# tornado may be using asyncio,
# ensure an eventloop exists for this thread
import asyncio
asyncio.set_event_loop(asyncio.new_event_loop())
self.ioloop = ioloop.IOLoop()
# signal that self.ioloop is defined
self._start_event.set()
while True:
try:
self.ioloop.start()
Expand All @@ -182,9 +207,10 @@ def stop(self):
:meth:`~threading.Thread.start` is called again.
"""
if self.ioloop is not None:
self.ioloop.stop()
self.ioloop.add_callback(self.ioloop.stop)
self.join()
self.close()
self.ioloop = None

def close(self):
if self.ioloop is not None:
Expand All @@ -198,22 +224,19 @@ class ThreadedKernelClient(KernelClient):
""" A KernelClient that provides thread-safe sockets with async callbacks on message replies.
"""

_ioloop = None
@property
def ioloop(self):
if self._ioloop is None:
self._ioloop = ioloop.IOLoop()
return self._ioloop
return self.ioloop_thread.ioloop

ioloop_thread = Instance(IOLoopThread, allow_none=True)

def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
self.ioloop_thread = IOLoopThread()
self.ioloop_thread.start()

if shell:
self.shell_channel._inspect = self._check_kernel_info_reply

self.ioloop_thread = IOLoopThread(self.ioloop)
self.ioloop_thread.start()

super(ThreadedKernelClient, self).start_channels(shell, iopub, stdin, hb)

def _check_kernel_info_reply(self, msg):
Expand Down

0 comments on commit 652fd43

Please sign in to comment.