Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Interrupt and sigterm before sigkill kernel. #620

Merged
merged 9 commits into from
Mar 8, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@
Changes in Jupyter Client
=========================

dev
===

- Shutdown request sequence has been modified to be more graceful, it now is
preceded by interrupt, and will also send a ``SIGTERM`` before forcibly
killing the kernel. :ghpull:`620`

6.1.11
======
- Move jedi pinning to test requirements (:ghpull:`599`)
Expand Down
118 changes: 111 additions & 7 deletions jupyter_client/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import time
import warnings

from enum import Enum

import zmq

from ipython_genutils.importstring import import_item
Expand All @@ -29,13 +31,23 @@
KernelManagerABC
)

class _ShutdownStatus(Enum):
Unset = None
ShutdownRequest = "ShutdownRequest"
SigtermRequest = "SigtermRequest"
SigKillRequest = "SigKillRequest"
Carreau marked this conversation as resolved.
Show resolved Hide resolved


class KernelManager(ConnectionFileMixin):
"""Manages a single kernel in a subprocess on this host.

This version starts kernels with Popen.
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._shutdown_status = _ShutdownStatus.Unset

_created_context = Bool(False)

# The PyZMQ Context to use for communication with the kernel.
Expand Down Expand Up @@ -71,7 +83,13 @@ def _kernel_spec_manager_changed(self, change):
shutdown_wait_time = Float(
5.0, config=True,
help="Time to wait for a kernel to terminate before killing it, "
"in seconds.")
"in seconds. When a shutdown request is initiated, the kernel "
"will be immediately send and interrupt (SIGINT), followed"
"by a shutdown_request message, after 1/2 of `shutdown_wait_time`"
"it will be sent a terminate (SIGTERM) request, and finally at "
"the end of `shutdown_wait_time` will be killed (SIGKILL). terminate "
"and kill may be equivalent on windows.",
)

kernel_name = Unicode(kernelspec.NATIVE_KERNEL_NAME)

Expand Down Expand Up @@ -330,7 +348,10 @@ def finish_shutdown(self, waittime=None, pollinterval=0.1):
"""
if waittime is None:
waittime = max(self.shutdown_wait_time, 0)
for i in range(int(waittime/pollinterval)):
self._shutdown_status = _ShutdownStatus.ShutdownRequest

# wait 50% of the shutdown timeout...
for i in range(int(waittime / 2 / pollinterval)):
if self.is_alive():
time.sleep(pollinterval)
else:
Expand All @@ -340,10 +361,25 @@ def finish_shutdown(self, waittime=None, pollinterval=0.1):
self.kernel = None
break
else:
# OK, we've waited long enough.
if self.has_kernel:
self.log.debug("Kernel is taking too long to finish, killing")
self._kill_kernel()
# if we've exited the loop normally (no break)
# send sigterm and wait the other 50%.
self._shutdown_status = _ShutdownStatus.SigtermRequest
self._send_kernel_sigterm()
for i in range(int(waittime / 2 / pollinterval)):
if self.is_alive():
time.sleep(pollinterval)
else:
# If there's still a proc, wait and clear
if self.has_kernel:
self.kernel.wait()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just double checking that we can't get stuck here... I think we can't since the kernel isn't alive but I don't recall why we wait in that case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, that's the same logic as line 360; I could make it a local function and call it in both location to be clearer.

self.kernel = None
break
else:
# OK, we've waited long enough.
if self.has_kernel:
self.log.debug("Kernel is taking too long to finish, killing")
self._shutdown_status = _ShutdownStatus.SigKillRequest
self._kill_kernel()

def cleanup_resources(self, restart=False):
"""Clean up resources when the kernel is shut down"""
Expand Down Expand Up @@ -384,6 +420,8 @@ def shutdown_kernel(self, now=False, restart=False):
# Stop monitoring for restarting while we shutdown.
self.stop_restarter()

self.interrupt_kernel()

if now:
self._kill_kernel()
else:
Expand Down Expand Up @@ -458,6 +496,31 @@ def has_kernel(self):
"""Has a kernel been started that we are managing."""
return self.kernel is not None

def _send_kernel_sigterm(self):
"""similar to _kill_kernel, but with sigterm (not sigkill), but do not block"""
if self.has_kernel:
# Signal the kernel to terminate (sends SIGTERM on Unix and
# if the kernel is a subprocess and we are on windows; this is
# equivalent to kill
try:
if hasattr(self.kernel, "terminate"):
self.kernel.terminate()
elif hasattr(signal, "SIGTERM"):
self.signal_kernel(signal.SIGTERM)
Carreau marked this conversation as resolved.
Show resolved Hide resolved
except OSError as e:
# In Windows, we will get an Access Denied error if the process
# has already terminated. Ignore it.
if sys.platform == "win32":
if e.winerror != 5:
raise
# On Unix, we may get an ESRCH error if the process has already
# terminated. Ignore it.
else:
from errno import ESRCH

if e.errno != ESRCH:
raise

def _kill_kernel(self):
"""Kill the running kernel.

Expand Down Expand Up @@ -584,9 +647,23 @@ async def finish_shutdown(self, waittime=None, pollinterval=0.1):
if waittime is None:
waittime = max(self.shutdown_wait_time, 0)
try:
await asyncio.wait_for(self._async_wait(pollinterval=pollinterval), timeout=waittime)
try:
self._shutdown_status = _ShutdownStatus.ShutdownRequest
await asyncio.wait_for(
self._async_wait(pollinterval=pollinterval), timeout=waittime / 2
)
except asyncio.TimeoutError:
self.log.debug("Kernel is taking too long to finish, terminating")
Carreau marked this conversation as resolved.
Show resolved Hide resolved
self._shutdown_status = _ShutdownStatus.SigtermRequest
await self._send_kernel_sigterm()

await asyncio.wait_for(
self._async_wait(pollinterval=pollinterval), timeout=waittime / 2
)
Comment on lines +670 to +672
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this spin-wait needs to be associated with the except block - otherwise, it will occur again for the successful shutdown-request case (which will immediately exit, but probably not the intent).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's with the one just below, let me try to change the layout to be clearer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've split the two try/except to make the intention clearer, is that better ?


except asyncio.TimeoutError:
self.log.debug("Kernel is taking too long to finish, killing")
self._shutdown_status = _ShutdownStatus.SigKillRequest
await self._kill_kernel()
else:
# Process is no longer alive, wait and clear
Expand Down Expand Up @@ -615,6 +692,8 @@ async def shutdown_kernel(self, now=False, restart=False):
# Stop monitoring for restarting while we shutdown.
self.stop_restarter()

await self.interrupt_kernel()

if now:
await self._kill_kernel()
else:
Expand Down Expand Up @@ -673,6 +752,31 @@ async def restart_kernel(self, now=False, newports=False, **kw):
await self.start_kernel(**self._launch_args)
return None

async def _send_kernel_sigterm(self):
"""similar to _kill_kernel, but with sigterm (not sigkill), but do not block"""
if self.has_kernel:
# Signal the kernel to terminate (sends SIGTERM on Unix and
# if the kernel is a subprocess and we are on windows; this is
# equivalent to kill
try:
if hasattr(self.kernel, "terminate"):
self.kernel.terminate()
elif hasattr(signal, "SIGTERM"):
await self.signal_kernel(signal.SIGTERM)
Carreau marked this conversation as resolved.
Show resolved Hide resolved
except OSError as e:
# In Windows, we will get an Access Denied error if the process
# has already terminated. Ignore it.
if sys.platform == "win32":
if e.winerror != 5:
raise
# On Unix, we may get an ESRCH error if the process has already
# terminated. Ignore it.
else:
from errno import ESRCH

if e.errno != ESRCH:
raise

async def _kill_kernel(self):
"""Kill the running kernel.

Expand Down
12 changes: 12 additions & 0 deletions jupyter_client/tests/signalkernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
from ipykernel.kernelbase import Kernel
from ipykernel.kernelapp import IPKernelApp

from tornado.web import gen

import signal


class SignalTestKernel(Kernel):
"""Kernel for testing subprocess signaling"""
Expand All @@ -25,6 +29,14 @@ def __init__(self, **kwargs):
super().__init__(**kwargs)
self.children = []

if os.environ.get("NO_SIGTERM_REPLY", None) == "1":
signal.signal(signal.SIGTERM, signal.SIG_IGN)

@gen.coroutine
def shutdown_request(self, stream, ident, parent):
if os.environ.get("NO_SHUTDOWN_REPLY") != "1":
yield gen.maybe_future(super().shutdown_request(stream, ident, parent))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not super familiar with gen from tornado. Is there a risk of issues here depending on the wrapping event loop?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe so, that's the exact way ipykernel uses coroutines:

 @gen.coroutine
    def shutdown_request(self, stream, ident, parent):
        content = yield gen.maybe_future(self.do_shutdown(parent['content']['restart']))
        ...

So if this breaks; IPykernel would be broken, and if it works with ipykernel, I see no reason this wouldn't.

I would prefer async def and await, but that unfortunately does not mix.


def do_execute(self, code, silent, store_history=True, user_expressions=None,
allow_stdin=False):
code = code.strip()
Expand Down
94 changes: 84 additions & 10 deletions jupyter_client/tests/test_kernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from subprocess import PIPE

from ..manager import start_new_kernel, start_new_async_kernel
from ..manager import _ShutdownStatus
from .utils import test_env, skip_win32, AsyncKernelManagerSubclass, AsyncKernelManagerWithCleanup

pjoin = os.path.join
Expand Down Expand Up @@ -52,18 +53,42 @@ def config(transport):
return c


def _install_kernel(name="signaltest", extra_env=None):
if extra_env is None:
extra_env = dict()
kernel_dir = pjoin(paths.jupyter_data_dir(), "kernels", name)
os.makedirs(kernel_dir)
with open(pjoin(kernel_dir, "kernel.json"), "w") as f:
f.write(
json.dumps(
{
"argv": [
sys.executable,
"-m",
"jupyter_client.tests.signalkernel",
"-f",
"{connection_file}",
],
"display_name": "Signal Test Kernel",
"env": {"TEST_VARS": "${TEST_VARS}:test_var_2", **extra_env},
}
)
)


@pytest.fixture
def install_kernel():
kernel_dir = pjoin(paths.jupyter_data_dir(), 'kernels', 'signaltest')
os.makedirs(kernel_dir)
with open(pjoin(kernel_dir, 'kernel.json'), 'w') as f:
f.write(json.dumps({
'argv': [sys.executable,
'-m', 'jupyter_client.tests.signalkernel',
'-f', '{connection_file}'],
'display_name': "Signal Test Kernel",
'env': {'TEST_VARS': '${TEST_VARS}:test_var_2'},
}))
return _install_kernel()


def install_kernel_dont_shutdown():
_install_kernel("signaltest-no-shutdown", {"NO_SHUTDOWN_REPLY": "1"})


def install_kernel_dont_terminate():
return _install_kernel(
"signaltest-no-terminate", {"NO_SHUTDOWN_REPLY": "1", "NO_SIGTERM_REPLY": "1"}
)


@pytest.fixture
Expand Down Expand Up @@ -105,6 +130,55 @@ async def start_async_kernel():
assert km.context.closed


class TestKernelManagerShutDownGracefully:
parameters = (
"name, install, expected",
[
("signaltest", _install_kernel, _ShutdownStatus.ShutdownRequest),
(
"signaltest-no-shutdown",
install_kernel_dont_shutdown,
_ShutdownStatus.SigtermRequest,
),
(
"signaltest-no-terminate",
install_kernel_dont_terminate,
_ShutdownStatus.SigKillRequest,
),
],
)

@pytest.mark.skipif(
sys.platform == "win32", reason="Windows doesn't support signals"
)
@pytest.mark.parametrize(*parameters)
def test_signal_kernel_subprocesses(self, name, install, expected):
install()
km, kc = start_new_kernel(kernel_name=name)
assert km._shutdown_status == _ShutdownStatus.Unset
assert km.is_alive()
# kc.execute("1")
kc.stop_channels()
km.shutdown_kernel()

assert km._shutdown_status == expected

@pytest.mark.skipif(
sys.platform == "win32", reason="Windows doesn't support signals"
)
@pytest.mark.parametrize(*parameters)
async def test_async_signal_kernel_subprocesses(self, name, install, expected):
install()
km, kc = await start_new_async_kernel(kernel_name=name)
assert km._shutdown_status == _ShutdownStatus.Unset
assert await km.is_alive()
# kc.execute("1")
kc.stop_channels()
await km.shutdown_kernel()

assert km._shutdown_status == expected


class TestKernelManager:

def test_lifecycle(self, km):
Expand Down