Skip to content

Commit

Permalink
Merge pull request #352 from minrk/threadsafe-shutdown
Browse files Browse the repository at this point in the history
tornado 5 fixes in ThreadedClient
  • Loading branch information
willingc authored Mar 9, 2018
2 parents d9832e0 + 64aca4d commit 4bfb5c4
Showing 1 changed file with 36 additions and 13 deletions.
49 changes: 36 additions & 13 deletions jupyter_client/threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
from __future__ import absolute_import
import atexit
import errno
from threading import Thread
import sys
from threading import Thread, Event
import time

# import ZMQError in top-level namespace, to avoid ugly attribute-error messages
Expand Down Expand Up @@ -41,9 +42,15 @@ def __init__(self, socket, session, loop):
self.socket = socket
self.session = session
self.ioloop = loop
evt = Event()

self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
self.stream.on_recv(self._handle_recv)
def setup_stream():
self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
self.stream.on_recv(self._handle_recv)
evt.set()

self.ioloop.add_callback(setup_stream)
evt.wait()

_is_alive = False
def is_alive(self):
Expand Down Expand Up @@ -142,11 +149,11 @@ class IOLoopThread(Thread):
"""Run a pyzmq ioloop in a thread to send and receive messages
"""
_exiting = False
ioloop = None

def __init__(self, loop):
def __init__(self):
super(IOLoopThread, self).__init__()
self.daemon = True
self.ioloop = loop or ioloop.IOLoop()

@staticmethod
@atexit.register
Expand All @@ -156,8 +163,26 @@ def _notice_exit():
if IOLoopThread is not None:
IOLoopThread._exiting = True

def start(self):
"""Start the IOLoop thread
Don't return until self.ioloop is defined,
which is created in the thread
"""
self._start_event = Event()
Thread.start(self)
self._start_event.wait()

def run(self):
"""Run my loop, ignoring EINTR events in the poller"""
if 'asyncio' in sys.modules:
# tornado may be using asyncio,
# ensure an eventloop exists for this thread
import asyncio
asyncio.set_event_loop(asyncio.new_event_loop())
self.ioloop = ioloop.IOLoop()
# signal that self.ioloop is defined
self._start_event.set()
while True:
try:
self.ioloop.start()
Expand All @@ -182,9 +207,10 @@ def stop(self):
:meth:`~threading.Thread.start` is called again.
"""
if self.ioloop is not None:
self.ioloop.stop()
self.ioloop.add_callback(self.ioloop.stop)
self.join()
self.close()
self.ioloop = None

def close(self):
if self.ioloop is not None:
Expand All @@ -198,22 +224,19 @@ class ThreadedKernelClient(KernelClient):
""" A KernelClient that provides thread-safe sockets with async callbacks on message replies.
"""

_ioloop = None
@property
def ioloop(self):
if self._ioloop is None:
self._ioloop = ioloop.IOLoop()
return self._ioloop
return self.ioloop_thread.ioloop

ioloop_thread = Instance(IOLoopThread, allow_none=True)

def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
self.ioloop_thread = IOLoopThread()
self.ioloop_thread.start()

if shell:
self.shell_channel._inspect = self._check_kernel_info_reply

self.ioloop_thread = IOLoopThread(self.ioloop)
self.ioloop_thread.start()

super(ThreadedKernelClient, self).start_channels(shell, iopub, stdin, hb)

def _check_kernel_info_reply(self, msg):
Expand Down

0 comments on commit 4bfb5c4

Please sign in to comment.