From 06a32c4f91aba9a8ee6ee76474215aa96e5cd1c1 Mon Sep 17 00:00:00 2001 From: Sylvain Corlay Date: Thu, 10 Dec 2020 11:01:30 +0100 Subject: [PATCH] Rework wait_for_ready logic --- jupyter_client/asynchronous/client.py | 11 +++++-- jupyter_client/blocking/client.py | 11 +++++-- jupyter_client/client.py | 5 ++-- jupyter_client/tests/signalkernel.py | 7 ----- jupyter_client/tests/test_kernelmanager.py | 35 +++++++++++++++------- 5 files changed, 45 insertions(+), 24 deletions(-) diff --git a/jupyter_client/asynchronous/client.py b/jupyter_client/asynchronous/client.py index c4ab1f179..1a21e3ac8 100644 --- a/jupyter_client/asynchronous/client.py +++ b/jupyter_client/asynchronous/client.py @@ -126,14 +126,21 @@ async def wait_for_ready(self, timeout=None): # Wait for kernel info reply on shell channel while True: + self.kernel_info() try: msg = await self.shell_channel.get_msg(timeout=1) except Empty: pass else: if msg['msg_type'] == 'kernel_info_reply': - self._handle_kernel_info_reply(msg) - break + # Checking that IOPub is connected. If it is not connected, start over. + try: + await self.iopub_channel.get_msg(timeout=0.2) + except Empty: + pass + else: + self._handle_kernel_info_reply(msg) + break if not await self.is_alive(): raise RuntimeError('Kernel died before replying to kernel_info') diff --git a/jupyter_client/blocking/client.py b/jupyter_client/blocking/client.py index 0779c225c..5f11b798a 100644 --- a/jupyter_client/blocking/client.py +++ b/jupyter_client/blocking/client.py @@ -91,14 +91,21 @@ def wait_for_ready(self, timeout=None): # Wait for kernel info reply on shell channel while True: + self.kernel_info() try: msg = self.shell_channel.get_msg(block=True, timeout=1) except Empty: pass else: if msg['msg_type'] == 'kernel_info_reply': - self._handle_kernel_info_reply(msg) - break + # Checking that IOPub is connected. If it is not connected, start over. + try: + self.iopub_channel.get_msg(block=True, timeout=0.2) + except Empty: + pass + else: + self._handle_kernel_info_reply(msg) + break if not self.is_alive(): raise RuntimeError('Kernel died before replying to kernel_info') diff --git a/jupyter_client/client.py b/jupyter_client/client.py index 347766b29..760ac5266 100644 --- a/jupyter_client/client.py +++ b/jupyter_client/client.py @@ -102,11 +102,10 @@ def start_channels(self, shell=True, iopub=True, stdin=True, hb=True, control=Tr :meth:`start_kernel`. If the channels have been stopped and you call this, :class:`RuntimeError` will be raised. """ - if shell: - self.shell_channel.start() - self.kernel_info() if iopub: self.iopub_channel.start() + if shell: + self.shell_channel.start() if stdin: self.stdin_channel.start() self.allow_stdin = True diff --git a/jupyter_client/tests/signalkernel.py b/jupyter_client/tests/signalkernel.py index f75d9b730..632b44e6a 100644 --- a/jupyter_client/tests/signalkernel.py +++ b/jupyter_client/tests/signalkernel.py @@ -54,13 +54,6 @@ def do_execute(self, code, silent, store_history=True, user_expressions=None, reply['traceback'] = ['no such command: %s' % code] return reply - def kernel_info_request(self, *args, **kwargs): - """Add delay to kernel_info_request - - triggers slow-response code in KernelClient.wait_for_ready - """ - return super().kernel_info_request(*args, **kwargs) - class SignalTestApp(IPKernelApp): kernel_class = SignalTestKernel diff --git a/jupyter_client/tests/test_kernelmanager.py b/jupyter_client/tests/test_kernelmanager.py index af7549edf..cc3f78feb 100644 --- a/jupyter_client/tests/test_kernelmanager.py +++ b/jupyter_client/tests/test_kernelmanager.py @@ -133,8 +133,11 @@ def test_signal_kernel_subprocesses(self, install_kernel, start_kernel): km, kc = start_kernel def execute(cmd): - kc.execute(cmd) - reply = kc.get_shell_msg(TIMEOUT) + request_id = kc.execute(cmd) + while True: + reply = kc.get_shell_msg(TIMEOUT) + if reply['parent_header']['msg_id'] == request_id: + break content = reply['content'] assert content['status'] == 'ok' return content @@ -172,8 +175,11 @@ def test_start_new_kernel(self, install_kernel, start_kernel): def _env_test_body(self, kc): def execute(cmd): - kc.execute(cmd) - reply = kc.get_shell_msg(TIMEOUT) + request_id = kc.execute(cmd) + while True: + reply = kc.get_shell_msg(TIMEOUT) + if reply['parent_header']['msg_id'] == request_id: + break content = reply['content'] assert content['status'] == 'ok' return content @@ -274,8 +280,11 @@ def _run_signaltest_lifecycle(self, config=None): kc = self._prepare_kernel(km, stdout=PIPE, stderr=PIPE) def execute(cmd): - kc.execute(cmd) - reply = kc.get_shell_msg(TIMEOUT) + request_id = kc.execute(cmd) + while True: + reply = kc.get_shell_msg(TIMEOUT) + if reply['parent_header']['msg_id'] == request_id: + break content = reply['content'] assert content['status'] == 'ok' return content @@ -344,8 +353,11 @@ async def test_signal_kernel_subprocesses(self, install_kernel, start_async_kern km, kc = start_async_kernel async def execute(cmd): - kc.execute(cmd) - reply = await kc.get_shell_msg(TIMEOUT) + request_id = kc.execute(cmd) + while True: + reply = await kc.get_shell_msg(TIMEOUT) + if reply['parent_header']['msg_id'] == request_id: + break content = reply['content'] assert content['status'] == 'ok' return content @@ -360,10 +372,13 @@ async def execute(cmd): assert reply['user_expressions']['poll'] == [None] * N # start a job on the kernel to be interrupted - kc.execute('sleep') + request_id = kc.execute('sleep') await asyncio.sleep(1) # ensure sleep message has been handled before we interrupt await km.interrupt_kernel() - reply = await kc.get_shell_msg(TIMEOUT) + while True: + reply = await kc.get_shell_msg(TIMEOUT) + if reply['parent_header']['msg_id'] == request_id: + break content = reply['content'] assert content['status'] == 'ok' assert content['user_expressions']['interrupted'] is True