Skip to content

Commit

Permalink
Merge pull request #620 from Carreau/sigterm
Browse files Browse the repository at this point in the history
Interrupt and sigterm before sigkill kernel.
  • Loading branch information
Carreau authored Mar 8, 2021
2 parents 987f3e1 + 60aa969 commit 7df16d5
Show file tree
Hide file tree
Showing 4 changed files with 237 additions and 17 deletions.
7 changes: 7 additions & 0 deletions docs/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@
Changes in Jupyter Client
=========================

dev
===

- Shutdown request sequence has been modified to be more graceful, it now is
preceded by interrupt, and will also send a ``SIGTERM`` before forcibly
killing the kernel. :ghpull:`620`

6.1.11
======
- Move jedi pinning to test requirements (:ghpull:`599`)
Expand Down
141 changes: 134 additions & 7 deletions jupyter_client/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import time
import warnings

from enum import Enum

import zmq

from ipython_genutils.importstring import import_item
Expand All @@ -29,13 +31,30 @@
KernelManagerABC
)

class _ShutdownStatus(Enum):
"""
This is so far used only for testing in order to track the internal state of
the shutdown logic, and verifying which path is taken for which
missbehavior.
"""
Unset = None
ShutdownRequest = "ShutdownRequest"
SigtermRequest = "SigtermRequest"
SigkillRequest = "SigkillRequest"


class KernelManager(ConnectionFileMixin):
"""Manages a single kernel in a subprocess on this host.
This version starts kernels with Popen.
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._shutdown_status = _ShutdownStatus.Unset

_created_context = Bool(False)

# The PyZMQ Context to use for communication with the kernel.
Expand Down Expand Up @@ -71,7 +90,13 @@ def _kernel_spec_manager_changed(self, change):
shutdown_wait_time = Float(
5.0, config=True,
help="Time to wait for a kernel to terminate before killing it, "
"in seconds.")
"in seconds. When a shutdown request is initiated, the kernel "
"will be immediately send and interrupt (SIGINT), followed"
"by a shutdown_request message, after 1/2 of `shutdown_wait_time`"
"it will be sent a terminate (SIGTERM) request, and finally at "
"the end of `shutdown_wait_time` will be killed (SIGKILL). terminate "
"and kill may be equivalent on windows.",
)

kernel_name = Unicode(kernelspec.NATIVE_KERNEL_NAME)

Expand Down Expand Up @@ -333,20 +358,45 @@ def finish_shutdown(self, waittime=None, pollinterval=0.1):
"""
if waittime is None:
waittime = max(self.shutdown_wait_time, 0)
for i in range(int(waittime/pollinterval)):
self._shutdown_status = _ShutdownStatus.ShutdownRequest

def poll_or_sleep_to_kernel_gone():
"""
Poll until the kernel is not responding,
then wait (the subprocess), until process gone.
After this function the kernel is either:
- still responding; or
- subprocess has been culled.
"""
if self.is_alive():
time.sleep(pollinterval)
else:
# If there's still a proc, wait and clear
if self.has_kernel:
self.kernel.wait()
self.kernel = None
return True

# wait 50% of the shutdown timeout...
for i in range(int(waittime / 2 / pollinterval)):
if poll_or_sleep_to_kernel_gone():
break
else:
# OK, we've waited long enough.
if self.has_kernel:
self.log.debug("Kernel is taking too long to finish, killing")
self._kill_kernel()
# if we've exited the loop normally (no break)
# send sigterm and wait the other 50%.
self.log.debug("Kernel is taking too long to finish, terminating")
self._shutdown_status = _ShutdownStatus.SigtermRequest
self._send_kernel_sigterm()
for i in range(int(waittime / 2 / pollinterval)):
if poll_or_sleep_to_kernel_gone():
break
else:
# OK, we've waited long enough.
if self.has_kernel:
self.log.debug("Kernel is taking too long to finish, killing")
self._shutdown_status = _ShutdownStatus.SigkillRequest
self._kill_kernel()

def cleanup_resources(self, restart=False):
"""Clean up resources when the kernel is shut down"""
Expand Down Expand Up @@ -388,6 +438,8 @@ def shutdown_kernel(self, now=False, restart=False):
# Stop monitoring for restarting while we shutdown.
self.stop_restarter()

self.interrupt_kernel()

if now:
self._kill_kernel()
else:
Expand Down Expand Up @@ -462,6 +514,36 @@ def has_kernel(self):
"""Has a kernel been started that we are managing."""
return self.kernel is not None

def _send_kernel_sigterm(self):
"""similar to _kill_kernel, but with sigterm (not sigkill), but do not block"""
if self.has_kernel:
# Signal the kernel to terminate (sends SIGTERM on Unix and
# if the kernel is a subprocess and we are on windows; this is
# equivalent to kill
try:
if hasattr(self.kernel, "terminate"):
self.kernel.terminate()
elif hasattr(signal, "SIGTERM"):
self.signal_kernel(signal.SIGTERM)
else:
self.log.debug(
"Cannot set term signal to kernel, no"
" `.terminate()` method and no values for SIGTERM"
)
except OSError as e:
# In Windows, we will get an Access Denied error if the process
# has already terminated. Ignore it.
if sys.platform == "win32":
if e.winerror != 5:
raise
# On Unix, we may get an ESRCH error if the process has already
# terminated. Ignore it.
else:
from errno import ESRCH

if e.errno != ESRCH:
raise

def _kill_kernel(self):
"""Kill the running kernel.
Expand Down Expand Up @@ -587,10 +669,23 @@ async def finish_shutdown(self, waittime=None, pollinterval=0.1):
"""
if waittime is None:
waittime = max(self.shutdown_wait_time, 0)
self._shutdown_status = _ShutdownStatus.ShutdownRequest
try:
await asyncio.wait_for(self._async_wait(pollinterval=pollinterval), timeout=waittime)
await asyncio.wait_for(
self._async_wait(pollinterval=pollinterval), timeout=waittime / 2
)
except asyncio.TimeoutError:
self.log.debug("Kernel is taking too long to finish, terminating")
self._shutdown_status = _ShutdownStatus.SigtermRequest
await self._send_kernel_sigterm()

try:
await asyncio.wait_for(
self._async_wait(pollinterval=pollinterval), timeout=waittime / 2
)
except asyncio.TimeoutError:
self.log.debug("Kernel is taking too long to finish, killing")
self._shutdown_status = _ShutdownStatus.SigkillRequest
await self._kill_kernel()
else:
# Process is no longer alive, wait and clear
Expand Down Expand Up @@ -620,6 +715,8 @@ async def shutdown_kernel(self, now=False, restart=False):
# Stop monitoring for restarting while we shutdown.
self.stop_restarter()

await self.interrupt_kernel()

if now:
await self._kill_kernel()
else:
Expand Down Expand Up @@ -678,6 +775,36 @@ async def restart_kernel(self, now=False, newports=False, **kw):
await self.start_kernel(**self._launch_args)
return None

async def _send_kernel_sigterm(self):
"""similar to _kill_kernel, but with sigterm (not sigkill), but do not block"""
if self.has_kernel:
# Signal the kernel to terminate (sends SIGTERM on Unix and
# if the kernel is a subprocess and we are on windows; this is
# equivalent to kill
try:
if hasattr(self.kernel, "terminate"):
self.kernel.terminate()
elif hasattr(signal, "SIGTERM"):
await self.signal_kernel(signal.SIGTERM)
else:
self.log.debug(
"Cannot set term signal to kernel, no"
" `.terminate()` method and no values for SIGTERM"
)
except OSError as e:
# In Windows, we will get an Access Denied error if the process
# has already terminated. Ignore it.
if sys.platform == "win32":
if e.winerror != 5:
raise
# On Unix, we may get an ESRCH error if the process has already
# terminated. Ignore it.
else:
from errno import ESRCH

if e.errno != ESRCH:
raise

async def _kill_kernel(self):
"""Kill the running kernel.
Expand Down
12 changes: 12 additions & 0 deletions jupyter_client/tests/signalkernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
from ipykernel.kernelbase import Kernel
from ipykernel.kernelapp import IPKernelApp

from tornado.web import gen

import signal


class SignalTestKernel(Kernel):
"""Kernel for testing subprocess signaling"""
Expand All @@ -25,6 +29,14 @@ def __init__(self, **kwargs):
super().__init__(**kwargs)
self.children = []

if os.environ.get("NO_SIGTERM_REPLY", None) == "1":
signal.signal(signal.SIGTERM, signal.SIG_IGN)

@gen.coroutine
def shutdown_request(self, stream, ident, parent):
if os.environ.get("NO_SHUTDOWN_REPLY") != "1":
yield gen.maybe_future(super().shutdown_request(stream, ident, parent))

def do_execute(self, code, silent, store_history=True, user_expressions=None,
allow_stdin=False):
code = code.strip()
Expand Down
94 changes: 84 additions & 10 deletions jupyter_client/tests/test_kernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from subprocess import PIPE

from ..manager import start_new_kernel, start_new_async_kernel
from ..manager import _ShutdownStatus
from .utils import test_env, skip_win32, AsyncKernelManagerSubclass, AsyncKernelManagerWithCleanup

pjoin = os.path.join
Expand Down Expand Up @@ -51,18 +52,42 @@ def config(transport):
return c


def _install_kernel(name="signaltest", extra_env=None):
if extra_env is None:
extra_env = dict()
kernel_dir = pjoin(paths.jupyter_data_dir(), "kernels", name)
os.makedirs(kernel_dir)
with open(pjoin(kernel_dir, "kernel.json"), "w") as f:
f.write(
json.dumps(
{
"argv": [
sys.executable,
"-m",
"jupyter_client.tests.signalkernel",
"-f",
"{connection_file}",
],
"display_name": "Signal Test Kernel",
"env": {"TEST_VARS": "${TEST_VARS}:test_var_2", **extra_env},
}
)
)


@pytest.fixture
def install_kernel():
kernel_dir = pjoin(paths.jupyter_data_dir(), 'kernels', 'signaltest')
os.makedirs(kernel_dir)
with open(pjoin(kernel_dir, 'kernel.json'), 'w') as f:
f.write(json.dumps({
'argv': [sys.executable,
'-m', 'jupyter_client.tests.signalkernel',
'-f', '{connection_file}'],
'display_name': "Signal Test Kernel",
'env': {'TEST_VARS': '${TEST_VARS}:test_var_2'},
}))
return _install_kernel()


def install_kernel_dont_shutdown():
_install_kernel("signaltest-no-shutdown", {"NO_SHUTDOWN_REPLY": "1"})


def install_kernel_dont_terminate():
return _install_kernel(
"signaltest-no-terminate", {"NO_SHUTDOWN_REPLY": "1", "NO_SIGTERM_REPLY": "1"}
)


@pytest.fixture
Expand Down Expand Up @@ -104,6 +129,55 @@ async def start_async_kernel():
assert km.context.closed


class TestKernelManagerShutDownGracefully:
parameters = (
"name, install, expected",
[
("signaltest", _install_kernel, _ShutdownStatus.ShutdownRequest),
(
"signaltest-no-shutdown",
install_kernel_dont_shutdown,
_ShutdownStatus.SigtermRequest,
),
(
"signaltest-no-terminate",
install_kernel_dont_terminate,
_ShutdownStatus.SigkillRequest,
),
],
)

@pytest.mark.skipif(
sys.platform == "win32", reason="Windows doesn't support signals"
)
@pytest.mark.parametrize(*parameters)
def test_signal_kernel_subprocesses(self, name, install, expected):
install()
km, kc = start_new_kernel(kernel_name=name)
assert km._shutdown_status == _ShutdownStatus.Unset
assert km.is_alive()
# kc.execute("1")
kc.stop_channels()
km.shutdown_kernel()

assert km._shutdown_status == expected

@pytest.mark.skipif(
sys.platform == "win32", reason="Windows doesn't support signals"
)
@pytest.mark.parametrize(*parameters)
async def test_async_signal_kernel_subprocesses(self, name, install, expected):
install()
km, kc = await start_new_async_kernel(kernel_name=name)
assert km._shutdown_status == _ShutdownStatus.Unset
assert await km.is_alive()
# kc.execute("1")
kc.stop_channels()
await km.shutdown_kernel()

assert km._shutdown_status == expected


class TestKernelManager:

def test_lifecycle(self, km):
Expand Down

0 comments on commit 7df16d5

Please sign in to comment.