diff --git a/jupyter_client/client.py b/jupyter_client/client.py index 763af85a7..8bdd75893 100644 --- a/jupyter_client/client.py +++ b/jupyter_client/client.py @@ -51,7 +51,7 @@ class KernelClient(ConnectionFileMixin): # The PyZMQ Context to use for communication with the kernel. context = Instance(zmq.Context) def _context_default(self): - return zmq.Context.instance() + return zmq.Context() # The classes to use for the various channels shell_channel_class = Type(ChannelABC) diff --git a/jupyter_client/tests/test_kernelmanager.py b/jupyter_client/tests/test_kernelmanager.py index a23b33fa6..63ef56394 100644 --- a/jupyter_client/tests/test_kernelmanager.py +++ b/jupyter_client/tests/test_kernelmanager.py @@ -11,6 +11,9 @@ from subprocess import PIPE import sys import time +import threading +import multiprocessing as mp +import pytest from unittest import TestCase from traitlets.config.loader import Config @@ -28,7 +31,7 @@ def setUp(self): def tearDown(self): self.env_patch.stop() - + def _install_test_kernel(self): kernel_dir = pjoin(paths.jupyter_data_dir(), 'kernels', 'signaltest') os.makedirs(kernel_dir) @@ -127,3 +130,118 @@ def test_start_new_kernel(self): self.assertTrue(km.is_alive()) self.assertTrue(kc.is_alive()) + +@pytest.mark.parallel +class TestParallel: + + @pytest.fixture(autouse=True) + def env(self): + env_patch = test_env() + env_patch.start() + yield + env_patch.stop() + + @pytest.fixture(params=['tcp', 'ipc']) + def transport(self, request): + return request.param + + @pytest.fixture + def config(self, transport): + c = Config() + c.transport = transport + if transport == 'ipc': + c.ip = 'test' + return c + + def _install_test_kernel(self): + kernel_dir = pjoin(paths.jupyter_data_dir(), 'kernels', 'signaltest') + os.makedirs(kernel_dir) + with open(pjoin(kernel_dir, 'kernel.json'), 'w') as f: + f.write(json.dumps({ + 'argv': [sys.executable, + '-m', 'jupyter_client.tests.signalkernel', + '-f', '{connection_file}'], + 'display_name': "Signal Test Kernel", + })) + + def test_start_sequence_kernels(self, config): + """Ensure that a sequence of kernel startups doesn't break anything.""" + + self._install_test_kernel() + self._run_signaltest_lifecycle(config) + self._run_signaltest_lifecycle(config) + self._run_signaltest_lifecycle(config) + + def test_start_parallel_thread_kernels(self, config): + self._install_test_kernel() + self._run_signaltest_lifecycle(config) + + thread = threading.Thread(target=self._run_signaltest_lifecycle, args=(config,)) + thread2 = threading.Thread(target=self._run_signaltest_lifecycle, args=(config,)) + try: + thread.start() + thread2.start() + finally: + thread.join() + thread2.join() + + def test_start_parallel_process_kernels(self, config): + self._install_test_kernel() + + self._run_signaltest_lifecycle(config) + thread = threading.Thread(target=self._run_signaltest_lifecycle, args=(config,)) + proc = mp.Process(target=self._run_signaltest_lifecycle, args=(config,)) + try: + thread.start() + proc.start() + finally: + thread.join() + proc.join() + + assert proc.exitcode == 0 + + def test_start_sequence_process_kernels(self, config): + self._install_test_kernel() + self._run_signaltest_lifecycle(config) + proc = mp.Process(target=self._run_signaltest_lifecycle, args=(config,)) + try: + proc.start() + finally: + proc.join() + + assert proc.exitcode == 0 + + def _prepare_kernel(self, km, startup_timeout=TIMEOUT, **kwargs): + km.start_kernel(**kwargs) + kc = km.client() + kc.start_channels() + try: + kc.wait_for_ready(timeout=startup_timeout) + except RuntimeError: + kc.stop_channels() + km.shutdown_kernel() + raise + + return kc + + def _run_signaltest_lifecycle(self, config=None): + km = KernelManager(config=config, kernel_name='signaltest') + kc = self._prepare_kernel(km, stdout=PIPE, stderr=PIPE) + + def execute(cmd): + kc.execute(cmd) + reply = kc.get_shell_msg(TIMEOUT) + content = reply['content'] + assert content['status'] == 'ok' + return content + + execute("start") + assert km.is_alive() + execute('check') + assert km.is_alive() + + km.restart_kernel(now=True) + assert km.is_alive() + execute('check') + + km.shutdown_kernel() \ No newline at end of file