Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Interrupt and sigterm before sigkill kernel. #620

Merged
merged 9 commits into from
Mar 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@
Changes in Jupyter Client
=========================

dev
===

- Shutdown request sequence has been modified to be more graceful, it now is
preceded by interrupt, and will also send a ``SIGTERM`` before forcibly
killing the kernel. :ghpull:`620`

6.1.11
======
- Move jedi pinning to test requirements (:ghpull:`599`)
Expand Down
141 changes: 134 additions & 7 deletions jupyter_client/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import time
import warnings

from enum import Enum

import zmq

from ipython_genutils.importstring import import_item
Expand All @@ -29,13 +31,30 @@
KernelManagerABC
)

class _ShutdownStatus(Enum):
"""

This is so far used only for testing in order to track the internal state of
the shutdown logic, and verifying which path is taken for which
missbehavior.

"""
Unset = None
ShutdownRequest = "ShutdownRequest"
SigtermRequest = "SigtermRequest"
SigkillRequest = "SigkillRequest"


class KernelManager(ConnectionFileMixin):
"""Manages a single kernel in a subprocess on this host.

This version starts kernels with Popen.
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._shutdown_status = _ShutdownStatus.Unset

_created_context = Bool(False)

# The PyZMQ Context to use for communication with the kernel.
Expand Down Expand Up @@ -71,7 +90,13 @@ def _kernel_spec_manager_changed(self, change):
shutdown_wait_time = Float(
5.0, config=True,
help="Time to wait for a kernel to terminate before killing it, "
"in seconds.")
"in seconds. When a shutdown request is initiated, the kernel "
"will be immediately send and interrupt (SIGINT), followed"
"by a shutdown_request message, after 1/2 of `shutdown_wait_time`"
"it will be sent a terminate (SIGTERM) request, and finally at "
"the end of `shutdown_wait_time` will be killed (SIGKILL). terminate "
"and kill may be equivalent on windows.",
)

kernel_name = Unicode(kernelspec.NATIVE_KERNEL_NAME)

Expand Down Expand Up @@ -330,20 +355,45 @@ def finish_shutdown(self, waittime=None, pollinterval=0.1):
"""
if waittime is None:
waittime = max(self.shutdown_wait_time, 0)
for i in range(int(waittime/pollinterval)):
self._shutdown_status = _ShutdownStatus.ShutdownRequest

def poll_or_sleep_to_kernel_gone():
"""
Poll until the kernel is not responding,
then wait (the subprocess), until process gone.

After this function the kernel is either:
- still responding; or
- subprocess has been culled.
"""
if self.is_alive():
time.sleep(pollinterval)
else:
# If there's still a proc, wait and clear
if self.has_kernel:
self.kernel.wait()
self.kernel = None
return True

# wait 50% of the shutdown timeout...
for i in range(int(waittime / 2 / pollinterval)):
if poll_or_sleep_to_kernel_gone():
break
else:
# OK, we've waited long enough.
if self.has_kernel:
self.log.debug("Kernel is taking too long to finish, killing")
self._kill_kernel()
# if we've exited the loop normally (no break)
# send sigterm and wait the other 50%.
self.log.debug("Kernel is taking too long to finish, terminating")
self._shutdown_status = _ShutdownStatus.SigtermRequest
self._send_kernel_sigterm()
for i in range(int(waittime / 2 / pollinterval)):
if poll_or_sleep_to_kernel_gone():
break
else:
# OK, we've waited long enough.
if self.has_kernel:
self.log.debug("Kernel is taking too long to finish, killing")
self._shutdown_status = _ShutdownStatus.SigkillRequest
self._kill_kernel()

def cleanup_resources(self, restart=False):
"""Clean up resources when the kernel is shut down"""
Expand Down Expand Up @@ -384,6 +434,8 @@ def shutdown_kernel(self, now=False, restart=False):
# Stop monitoring for restarting while we shutdown.
self.stop_restarter()

self.interrupt_kernel()

if now:
self._kill_kernel()
else:
Expand Down Expand Up @@ -458,6 +510,36 @@ def has_kernel(self):
"""Has a kernel been started that we are managing."""
return self.kernel is not None

def _send_kernel_sigterm(self):
"""similar to _kill_kernel, but with sigterm (not sigkill), but do not block"""
if self.has_kernel:
# Signal the kernel to terminate (sends SIGTERM on Unix and
# if the kernel is a subprocess and we are on windows; this is
# equivalent to kill
try:
if hasattr(self.kernel, "terminate"):
self.kernel.terminate()
elif hasattr(signal, "SIGTERM"):
self.signal_kernel(signal.SIGTERM)
Carreau marked this conversation as resolved.
Show resolved Hide resolved
else:
self.log.debug(
"Cannot set term signal to kernel, no"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see we prefer SIGKILL over kill() and assume that kill() exists. Should we do the same with SIGTERM and terminate()? (I suspect this is due to Windows not having SIGTERM?)

Actually, given that Popen is well-documented to use SIGKILL and SIGTERM for kill() and terminate(), respectively, and there don't appear to be python version compatibility issues, I think we'd be fine just calling the desired methods directly. This would then forgo the need to log anything and remove the conditional attribute checks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Diggning in:

I don't think that's compatibility issue, signal_kernel signal the process group, not the kernel alone.
Sigterm can be handled by a subprocess, not sigkill.

Signal_kernel send the signal to the process group (when possible) instead of only the single process, so if the kernel has children you don't want to send sigterm to the (grand)children, put leave the child process a chance to clean up it children, in case there is some cleanup logic.

Thus terminate() is prefered, and if you can't you sigterm the process group.

Kill is the opposite, as the kernel has no chance to terminate children by catching sigkill, you want to tell them to die as well. See #314

I guess in a perfect world you would

  • 1 shutdown request
  • 2 wait
  • 3 term child
  • 4 wait
    • 4b if don't respond term grand children
  • 5 wait
    • 5b kill everybody.

we just skip 4,4b, and replace 3 by "term child, if you can't term everybody".

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point on the process group portion of things. I had conflated signal_kernel() with send_signal().

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, i I had to dig deep in the code and history to figure that one :-)

" `.terminate()` method and no values for SIGTERM"
)
except OSError as e:
# In Windows, we will get an Access Denied error if the process
# has already terminated. Ignore it.
if sys.platform == "win32":
if e.winerror != 5:
raise
# On Unix, we may get an ESRCH error if the process has already
# terminated. Ignore it.
else:
from errno import ESRCH

if e.errno != ESRCH:
raise

def _kill_kernel(self):
"""Kill the running kernel.

Expand Down Expand Up @@ -583,10 +665,23 @@ async def finish_shutdown(self, waittime=None, pollinterval=0.1):
"""
if waittime is None:
waittime = max(self.shutdown_wait_time, 0)
self._shutdown_status = _ShutdownStatus.ShutdownRequest
try:
await asyncio.wait_for(self._async_wait(pollinterval=pollinterval), timeout=waittime)
await asyncio.wait_for(
self._async_wait(pollinterval=pollinterval), timeout=waittime / 2
)
Comment on lines +670 to +672
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this spin-wait needs to be associated with the except block - otherwise, it will occur again for the successful shutdown-request case (which will immediately exit, but probably not the intent).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's with the one just below, let me try to change the layout to be clearer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've split the two try/except to make the intention clearer, is that better ?

except asyncio.TimeoutError:
self.log.debug("Kernel is taking too long to finish, terminating")
self._shutdown_status = _ShutdownStatus.SigtermRequest
await self._send_kernel_sigterm()

try:
await asyncio.wait_for(
self._async_wait(pollinterval=pollinterval), timeout=waittime / 2
)
except asyncio.TimeoutError:
self.log.debug("Kernel is taking too long to finish, killing")
self._shutdown_status = _ShutdownStatus.SigkillRequest
await self._kill_kernel()
else:
# Process is no longer alive, wait and clear
Expand Down Expand Up @@ -615,6 +710,8 @@ async def shutdown_kernel(self, now=False, restart=False):
# Stop monitoring for restarting while we shutdown.
self.stop_restarter()

await self.interrupt_kernel()

if now:
await self._kill_kernel()
else:
Expand Down Expand Up @@ -673,6 +770,36 @@ async def restart_kernel(self, now=False, newports=False, **kw):
await self.start_kernel(**self._launch_args)
return None

async def _send_kernel_sigterm(self):
"""similar to _kill_kernel, but with sigterm (not sigkill), but do not block"""
if self.has_kernel:
# Signal the kernel to terminate (sends SIGTERM on Unix and
# if the kernel is a subprocess and we are on windows; this is
# equivalent to kill
try:
if hasattr(self.kernel, "terminate"):
self.kernel.terminate()
elif hasattr(signal, "SIGTERM"):
await self.signal_kernel(signal.SIGTERM)
Carreau marked this conversation as resolved.
Show resolved Hide resolved
else:
self.log.debug(
"Cannot set term signal to kernel, no"
" `.terminate()` method and no values for SIGTERM"
)
except OSError as e:
# In Windows, we will get an Access Denied error if the process
# has already terminated. Ignore it.
if sys.platform == "win32":
if e.winerror != 5:
raise
# On Unix, we may get an ESRCH error if the process has already
# terminated. Ignore it.
else:
from errno import ESRCH

if e.errno != ESRCH:
raise

async def _kill_kernel(self):
"""Kill the running kernel.

Expand Down
12 changes: 12 additions & 0 deletions jupyter_client/tests/signalkernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
from ipykernel.kernelbase import Kernel
from ipykernel.kernelapp import IPKernelApp

from tornado.web import gen

import signal


class SignalTestKernel(Kernel):
"""Kernel for testing subprocess signaling"""
Expand All @@ -25,6 +29,14 @@ def __init__(self, **kwargs):
super().__init__(**kwargs)
self.children = []

if os.environ.get("NO_SIGTERM_REPLY", None) == "1":
signal.signal(signal.SIGTERM, signal.SIG_IGN)

@gen.coroutine
def shutdown_request(self, stream, ident, parent):
if os.environ.get("NO_SHUTDOWN_REPLY") != "1":
yield gen.maybe_future(super().shutdown_request(stream, ident, parent))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not super familiar with gen from tornado. Is there a risk of issues here depending on the wrapping event loop?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe so, that's the exact way ipykernel uses coroutines:

 @gen.coroutine
    def shutdown_request(self, stream, ident, parent):
        content = yield gen.maybe_future(self.do_shutdown(parent['content']['restart']))
        ...

So if this breaks; IPykernel would be broken, and if it works with ipykernel, I see no reason this wouldn't.

I would prefer async def and await, but that unfortunately does not mix.


def do_execute(self, code, silent, store_history=True, user_expressions=None,
allow_stdin=False):
code = code.strip()
Expand Down
94 changes: 84 additions & 10 deletions jupyter_client/tests/test_kernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from subprocess import PIPE

from ..manager import start_new_kernel, start_new_async_kernel
from ..manager import _ShutdownStatus
from .utils import test_env, skip_win32, AsyncKernelManagerSubclass, AsyncKernelManagerWithCleanup

pjoin = os.path.join
Expand Down Expand Up @@ -52,18 +53,42 @@ def config(transport):
return c


def _install_kernel(name="signaltest", extra_env=None):
if extra_env is None:
extra_env = dict()
kernel_dir = pjoin(paths.jupyter_data_dir(), "kernels", name)
os.makedirs(kernel_dir)
with open(pjoin(kernel_dir, "kernel.json"), "w") as f:
f.write(
json.dumps(
{
"argv": [
sys.executable,
"-m",
"jupyter_client.tests.signalkernel",
"-f",
"{connection_file}",
],
"display_name": "Signal Test Kernel",
"env": {"TEST_VARS": "${TEST_VARS}:test_var_2", **extra_env},
}
)
)


@pytest.fixture
def install_kernel():
kernel_dir = pjoin(paths.jupyter_data_dir(), 'kernels', 'signaltest')
os.makedirs(kernel_dir)
with open(pjoin(kernel_dir, 'kernel.json'), 'w') as f:
f.write(json.dumps({
'argv': [sys.executable,
'-m', 'jupyter_client.tests.signalkernel',
'-f', '{connection_file}'],
'display_name': "Signal Test Kernel",
'env': {'TEST_VARS': '${TEST_VARS}:test_var_2'},
}))
return _install_kernel()


def install_kernel_dont_shutdown():
_install_kernel("signaltest-no-shutdown", {"NO_SHUTDOWN_REPLY": "1"})


def install_kernel_dont_terminate():
return _install_kernel(
"signaltest-no-terminate", {"NO_SHUTDOWN_REPLY": "1", "NO_SIGTERM_REPLY": "1"}
)


@pytest.fixture
Expand Down Expand Up @@ -105,6 +130,55 @@ async def start_async_kernel():
assert km.context.closed


class TestKernelManagerShutDownGracefully:
parameters = (
"name, install, expected",
[
("signaltest", _install_kernel, _ShutdownStatus.ShutdownRequest),
(
"signaltest-no-shutdown",
install_kernel_dont_shutdown,
_ShutdownStatus.SigtermRequest,
),
(
"signaltest-no-terminate",
install_kernel_dont_terminate,
_ShutdownStatus.SigkillRequest,
),
],
)

@pytest.mark.skipif(
sys.platform == "win32", reason="Windows doesn't support signals"
)
@pytest.mark.parametrize(*parameters)
def test_signal_kernel_subprocesses(self, name, install, expected):
install()
km, kc = start_new_kernel(kernel_name=name)
assert km._shutdown_status == _ShutdownStatus.Unset
assert km.is_alive()
# kc.execute("1")
kc.stop_channels()
km.shutdown_kernel()

assert km._shutdown_status == expected

@pytest.mark.skipif(
sys.platform == "win32", reason="Windows doesn't support signals"
)
@pytest.mark.parametrize(*parameters)
async def test_async_signal_kernel_subprocesses(self, name, install, expected):
install()
km, kc = await start_new_async_kernel(kernel_name=name)
assert km._shutdown_status == _ShutdownStatus.Unset
assert await km.is_alive()
# kc.execute("1")
kc.stop_channels()
await km.shutdown_kernel()

assert km._shutdown_status == expected


class TestKernelManager:

def test_lifecycle(self, km):
Expand Down