Skip to content

Commit

Permalink
Backport PR jupyter#437: Avoid kernel failures with multiple processes
Browse files Browse the repository at this point in the history
  • Loading branch information
MSeal authored and alexrudy committed May 14, 2019
1 parent 077dc8d commit f1dba27
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 2 deletions.
2 changes: 1 addition & 1 deletion jupyter_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class KernelClient(ConnectionFileMixin):
# The PyZMQ Context to use for communication with the kernel.
context = Instance(zmq.Context)
def _context_default(self):
return zmq.Context.instance()
return zmq.Context()

# The classes to use for the various channels
shell_channel_class = Type(ChannelABC)
Expand Down
120 changes: 119 additions & 1 deletion jupyter_client/tests/test_kernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
from subprocess import PIPE
import sys
import time
import threading
import multiprocessing as mp
import pytest
from unittest import TestCase

from traitlets.config.loader import Config
Expand All @@ -28,7 +31,7 @@ def setUp(self):

def tearDown(self):
self.env_patch.stop()

def _install_test_kernel(self):
kernel_dir = pjoin(paths.jupyter_data_dir(), 'kernels', 'signaltest')
os.makedirs(kernel_dir)
Expand Down Expand Up @@ -127,3 +130,118 @@ def test_start_new_kernel(self):

self.assertTrue(km.is_alive())
self.assertTrue(kc.is_alive())

@pytest.mark.parallel
class TestParallel:

@pytest.fixture(autouse=True)
def env(self):
env_patch = test_env()
env_patch.start()
yield
env_patch.stop()

@pytest.fixture(params=['tcp', 'ipc'])
def transport(self, request):
return request.param

@pytest.fixture
def config(self, transport):
c = Config()
c.transport = transport
if transport == 'ipc':
c.ip = 'test'
return c

def _install_test_kernel(self):
kernel_dir = pjoin(paths.jupyter_data_dir(), 'kernels', 'signaltest')
os.makedirs(kernel_dir)
with open(pjoin(kernel_dir, 'kernel.json'), 'w') as f:
f.write(json.dumps({
'argv': [sys.executable,
'-m', 'jupyter_client.tests.signalkernel',
'-f', '{connection_file}'],
'display_name': "Signal Test Kernel",
}))

def test_start_sequence_kernels(self, config):
"""Ensure that a sequence of kernel startups doesn't break anything."""

self._install_test_kernel()
self._run_signaltest_lifecycle(config)
self._run_signaltest_lifecycle(config)
self._run_signaltest_lifecycle(config)

def test_start_parallel_thread_kernels(self, config):
self._install_test_kernel()
self._run_signaltest_lifecycle(config)

thread = threading.Thread(target=self._run_signaltest_lifecycle, args=(config,))
thread2 = threading.Thread(target=self._run_signaltest_lifecycle, args=(config,))
try:
thread.start()
thread2.start()
finally:
thread.join()
thread2.join()

def test_start_parallel_process_kernels(self, config):
self._install_test_kernel()

self._run_signaltest_lifecycle(config)
thread = threading.Thread(target=self._run_signaltest_lifecycle, args=(config,))
proc = mp.Process(target=self._run_signaltest_lifecycle, args=(config,))
try:
thread.start()
proc.start()
finally:
thread.join()
proc.join()

assert proc.exitcode == 0

def test_start_sequence_process_kernels(self, config):
self._install_test_kernel()
self._run_signaltest_lifecycle(config)
proc = mp.Process(target=self._run_signaltest_lifecycle, args=(config,))
try:
proc.start()
finally:
proc.join()

assert proc.exitcode == 0

def _prepare_kernel(self, km, startup_timeout=TIMEOUT, **kwargs):
km.start_kernel(**kwargs)
kc = km.client()
kc.start_channels()
try:
kc.wait_for_ready(timeout=startup_timeout)
except RuntimeError:
kc.stop_channels()
km.shutdown_kernel()
raise

return kc

def _run_signaltest_lifecycle(self, config=None):
km = KernelManager(config=config, kernel_name='signaltest')
kc = self._prepare_kernel(km, stdout=PIPE, stderr=PIPE)

def execute(cmd):
kc.execute(cmd)
reply = kc.get_shell_msg(TIMEOUT)
content = reply['content']
assert content['status'] == 'ok'
return content

execute("start")
assert km.is_alive()
execute('check')
assert km.is_alive()

km.restart_kernel(now=True)
assert km.is_alive()
execute('check')

km.shutdown_kernel()

0 comments on commit f1dba27

Please sign in to comment.