diff --git a/docs/changelog.rst b/docs/changelog.rst index d658c53f6..fb8ce84f9 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -4,6 +4,13 @@ Changes in Jupyter Client ========================= +dev +=== + +- Shutdown request sequence has been modified to be more graceful, it now is + preceded by interrupt, and will also send a ``SIGTERM`` before forcibly + killing the kernel. :ghpull:`620` + 6.1.11 ====== - Move jedi pinning to test requirements (:ghpull:`599`) diff --git a/jupyter_client/manager.py b/jupyter_client/manager.py index f3c7ff7c1..434f79a34 100644 --- a/jupyter_client/manager.py +++ b/jupyter_client/manager.py @@ -12,6 +12,8 @@ import time import warnings +from enum import Enum + import zmq from ipython_genutils.importstring import import_item @@ -29,6 +31,19 @@ KernelManagerABC ) +class _ShutdownStatus(Enum): + """ + + This is so far used only for testing in order to track the internal state of + the shutdown logic, and verifying which path is taken for which + missbehavior. + + """ + Unset = None + ShutdownRequest = "ShutdownRequest" + SigtermRequest = "SigtermRequest" + SigkillRequest = "SigkillRequest" + class KernelManager(ConnectionFileMixin): """Manages a single kernel in a subprocess on this host. @@ -36,6 +51,10 @@ class KernelManager(ConnectionFileMixin): This version starts kernels with Popen. """ + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._shutdown_status = _ShutdownStatus.Unset + _created_context = Bool(False) # The PyZMQ Context to use for communication with the kernel. @@ -71,7 +90,13 @@ def _kernel_spec_manager_changed(self, change): shutdown_wait_time = Float( 5.0, config=True, help="Time to wait for a kernel to terminate before killing it, " - "in seconds.") + "in seconds. When a shutdown request is initiated, the kernel " + "will be immediately send and interrupt (SIGINT), followed" + "by a shutdown_request message, after 1/2 of `shutdown_wait_time`" + "it will be sent a terminate (SIGTERM) request, and finally at " + "the end of `shutdown_wait_time` will be killed (SIGKILL). terminate " + "and kill may be equivalent on windows.", + ) kernel_name = Unicode(kernelspec.NATIVE_KERNEL_NAME) @@ -333,7 +358,17 @@ def finish_shutdown(self, waittime=None, pollinterval=0.1): """ if waittime is None: waittime = max(self.shutdown_wait_time, 0) - for i in range(int(waittime/pollinterval)): + self._shutdown_status = _ShutdownStatus.ShutdownRequest + + def poll_or_sleep_to_kernel_gone(): + """ + Poll until the kernel is not responding, + then wait (the subprocess), until process gone. + + After this function the kernel is either: + - still responding; or + - subprocess has been culled. + """ if self.is_alive(): time.sleep(pollinterval) else: @@ -341,12 +376,27 @@ def finish_shutdown(self, waittime=None, pollinterval=0.1): if self.has_kernel: self.kernel.wait() self.kernel = None + return True + + # wait 50% of the shutdown timeout... + for i in range(int(waittime / 2 / pollinterval)): + if poll_or_sleep_to_kernel_gone(): break else: - # OK, we've waited long enough. - if self.has_kernel: - self.log.debug("Kernel is taking too long to finish, killing") - self._kill_kernel() + # if we've exited the loop normally (no break) + # send sigterm and wait the other 50%. + self.log.debug("Kernel is taking too long to finish, terminating") + self._shutdown_status = _ShutdownStatus.SigtermRequest + self._send_kernel_sigterm() + for i in range(int(waittime / 2 / pollinterval)): + if poll_or_sleep_to_kernel_gone(): + break + else: + # OK, we've waited long enough. + if self.has_kernel: + self.log.debug("Kernel is taking too long to finish, killing") + self._shutdown_status = _ShutdownStatus.SigkillRequest + self._kill_kernel() def cleanup_resources(self, restart=False): """Clean up resources when the kernel is shut down""" @@ -388,6 +438,8 @@ def shutdown_kernel(self, now=False, restart=False): # Stop monitoring for restarting while we shutdown. self.stop_restarter() + self.interrupt_kernel() + if now: self._kill_kernel() else: @@ -462,6 +514,36 @@ def has_kernel(self): """Has a kernel been started that we are managing.""" return self.kernel is not None + def _send_kernel_sigterm(self): + """similar to _kill_kernel, but with sigterm (not sigkill), but do not block""" + if self.has_kernel: + # Signal the kernel to terminate (sends SIGTERM on Unix and + # if the kernel is a subprocess and we are on windows; this is + # equivalent to kill + try: + if hasattr(self.kernel, "terminate"): + self.kernel.terminate() + elif hasattr(signal, "SIGTERM"): + self.signal_kernel(signal.SIGTERM) + else: + self.log.debug( + "Cannot set term signal to kernel, no" + " `.terminate()` method and no values for SIGTERM" + ) + except OSError as e: + # In Windows, we will get an Access Denied error if the process + # has already terminated. Ignore it. + if sys.platform == "win32": + if e.winerror != 5: + raise + # On Unix, we may get an ESRCH error if the process has already + # terminated. Ignore it. + else: + from errno import ESRCH + + if e.errno != ESRCH: + raise + def _kill_kernel(self): """Kill the running kernel. @@ -587,10 +669,23 @@ async def finish_shutdown(self, waittime=None, pollinterval=0.1): """ if waittime is None: waittime = max(self.shutdown_wait_time, 0) + self._shutdown_status = _ShutdownStatus.ShutdownRequest try: - await asyncio.wait_for(self._async_wait(pollinterval=pollinterval), timeout=waittime) + await asyncio.wait_for( + self._async_wait(pollinterval=pollinterval), timeout=waittime / 2 + ) + except asyncio.TimeoutError: + self.log.debug("Kernel is taking too long to finish, terminating") + self._shutdown_status = _ShutdownStatus.SigtermRequest + await self._send_kernel_sigterm() + + try: + await asyncio.wait_for( + self._async_wait(pollinterval=pollinterval), timeout=waittime / 2 + ) except asyncio.TimeoutError: self.log.debug("Kernel is taking too long to finish, killing") + self._shutdown_status = _ShutdownStatus.SigkillRequest await self._kill_kernel() else: # Process is no longer alive, wait and clear @@ -620,6 +715,8 @@ async def shutdown_kernel(self, now=False, restart=False): # Stop monitoring for restarting while we shutdown. self.stop_restarter() + await self.interrupt_kernel() + if now: await self._kill_kernel() else: @@ -678,6 +775,36 @@ async def restart_kernel(self, now=False, newports=False, **kw): await self.start_kernel(**self._launch_args) return None + async def _send_kernel_sigterm(self): + """similar to _kill_kernel, but with sigterm (not sigkill), but do not block""" + if self.has_kernel: + # Signal the kernel to terminate (sends SIGTERM on Unix and + # if the kernel is a subprocess and we are on windows; this is + # equivalent to kill + try: + if hasattr(self.kernel, "terminate"): + self.kernel.terminate() + elif hasattr(signal, "SIGTERM"): + await self.signal_kernel(signal.SIGTERM) + else: + self.log.debug( + "Cannot set term signal to kernel, no" + " `.terminate()` method and no values for SIGTERM" + ) + except OSError as e: + # In Windows, we will get an Access Denied error if the process + # has already terminated. Ignore it. + if sys.platform == "win32": + if e.winerror != 5: + raise + # On Unix, we may get an ESRCH error if the process has already + # terminated. Ignore it. + else: + from errno import ESRCH + + if e.errno != ESRCH: + raise + async def _kill_kernel(self): """Kill the running kernel. diff --git a/jupyter_client/tests/signalkernel.py b/jupyter_client/tests/signalkernel.py index 632b44e6a..93e290b7b 100644 --- a/jupyter_client/tests/signalkernel.py +++ b/jupyter_client/tests/signalkernel.py @@ -13,6 +13,10 @@ from ipykernel.kernelbase import Kernel from ipykernel.kernelapp import IPKernelApp +from tornado.web import gen + +import signal + class SignalTestKernel(Kernel): """Kernel for testing subprocess signaling""" @@ -25,6 +29,14 @@ def __init__(self, **kwargs): super().__init__(**kwargs) self.children = [] + if os.environ.get("NO_SIGTERM_REPLY", None) == "1": + signal.signal(signal.SIGTERM, signal.SIG_IGN) + + @gen.coroutine + def shutdown_request(self, stream, ident, parent): + if os.environ.get("NO_SHUTDOWN_REPLY") != "1": + yield gen.maybe_future(super().shutdown_request(stream, ident, parent)) + def do_execute(self, code, silent, store_history=True, user_expressions=None, allow_stdin=False): code = code.strip() diff --git a/jupyter_client/tests/test_kernelmanager.py b/jupyter_client/tests/test_kernelmanager.py index 9393c06f1..7cb3876eb 100644 --- a/jupyter_client/tests/test_kernelmanager.py +++ b/jupyter_client/tests/test_kernelmanager.py @@ -21,6 +21,7 @@ from subprocess import PIPE from ..manager import start_new_kernel, start_new_async_kernel +from ..manager import _ShutdownStatus from .utils import test_env, skip_win32, AsyncKernelManagerSubclass, AsyncKernelManagerWithCleanup pjoin = os.path.join @@ -51,18 +52,42 @@ def config(transport): return c +def _install_kernel(name="signaltest", extra_env=None): + if extra_env is None: + extra_env = dict() + kernel_dir = pjoin(paths.jupyter_data_dir(), "kernels", name) + os.makedirs(kernel_dir) + with open(pjoin(kernel_dir, "kernel.json"), "w") as f: + f.write( + json.dumps( + { + "argv": [ + sys.executable, + "-m", + "jupyter_client.tests.signalkernel", + "-f", + "{connection_file}", + ], + "display_name": "Signal Test Kernel", + "env": {"TEST_VARS": "${TEST_VARS}:test_var_2", **extra_env}, + } + ) + ) + + @pytest.fixture def install_kernel(): - kernel_dir = pjoin(paths.jupyter_data_dir(), 'kernels', 'signaltest') - os.makedirs(kernel_dir) - with open(pjoin(kernel_dir, 'kernel.json'), 'w') as f: - f.write(json.dumps({ - 'argv': [sys.executable, - '-m', 'jupyter_client.tests.signalkernel', - '-f', '{connection_file}'], - 'display_name': "Signal Test Kernel", - 'env': {'TEST_VARS': '${TEST_VARS}:test_var_2'}, - })) + return _install_kernel() + + +def install_kernel_dont_shutdown(): + _install_kernel("signaltest-no-shutdown", {"NO_SHUTDOWN_REPLY": "1"}) + + +def install_kernel_dont_terminate(): + return _install_kernel( + "signaltest-no-terminate", {"NO_SHUTDOWN_REPLY": "1", "NO_SIGTERM_REPLY": "1"} + ) @pytest.fixture @@ -104,6 +129,55 @@ async def start_async_kernel(): assert km.context.closed +class TestKernelManagerShutDownGracefully: + parameters = ( + "name, install, expected", + [ + ("signaltest", _install_kernel, _ShutdownStatus.ShutdownRequest), + ( + "signaltest-no-shutdown", + install_kernel_dont_shutdown, + _ShutdownStatus.SigtermRequest, + ), + ( + "signaltest-no-terminate", + install_kernel_dont_terminate, + _ShutdownStatus.SigkillRequest, + ), + ], + ) + + @pytest.mark.skipif( + sys.platform == "win32", reason="Windows doesn't support signals" + ) + @pytest.mark.parametrize(*parameters) + def test_signal_kernel_subprocesses(self, name, install, expected): + install() + km, kc = start_new_kernel(kernel_name=name) + assert km._shutdown_status == _ShutdownStatus.Unset + assert km.is_alive() + # kc.execute("1") + kc.stop_channels() + km.shutdown_kernel() + + assert km._shutdown_status == expected + + @pytest.mark.skipif( + sys.platform == "win32", reason="Windows doesn't support signals" + ) + @pytest.mark.parametrize(*parameters) + async def test_async_signal_kernel_subprocesses(self, name, install, expected): + install() + km, kc = await start_new_async_kernel(kernel_name=name) + assert km._shutdown_status == _ShutdownStatus.Unset + assert await km.is_alive() + # kc.execute("1") + kc.stop_channels() + await km.shutdown_kernel() + + assert km._shutdown_status == expected + + class TestKernelManager: def test_lifecycle(self, km):