Skip to content

Commit

Permalink
[DRAFT] interrupt and sigterm before sigkill kernel.
Browse files Browse the repository at this point in the history
This implements the discussion in #18, to try to more progressively stop
the kernels.

1) this always sends and interrupt before any shutdown requests;
  goal being to stop any processing happening that may block any event
  loop.

2) sends the shutdown_requests (no changes here).

3) wait 50% of the wait time, and sends a "terminate" in quote as this
depends on your platform/os, and the type of kernel you have.
    - if subprocess.Popen calls `.terminate()` which is SIGTERM on unix; the same as
    `.kill()` on windows; if not a Popen instance send SIGTERM.

4) wait the other 50% and kills, (same as before).

This does this both on the sync client and async client.

TBD: write tests and docs; and test more properly;
  • Loading branch information
Carreau committed Feb 22, 2021
1 parent 5e6e7ab commit 93dd3ac
Showing 1 changed file with 86 additions and 6 deletions.
92 changes: 86 additions & 6 deletions jupyter_client/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,9 @@ def finish_shutdown(self, waittime=None, pollinterval=0.1):
"""
if waittime is None:
waittime = max(self.shutdown_wait_time, 0)
for i in range(int(waittime/pollinterval)):

# wait 50% of the shutdown timeout...
for i in range(int(waittime / 2 / pollinterval)):
if self.is_alive():
time.sleep(pollinterval)
else:
Expand All @@ -340,10 +342,23 @@ def finish_shutdown(self, waittime=None, pollinterval=0.1):
self.kernel = None
break
else:
# OK, we've waited long enough.
if self.has_kernel:
self.log.debug("Kernel is taking too long to finish, killing")
self._kill_kernel()
# if we've exited the loop normally (no break)
# send sigterm and wait the other 50%.
self._send_kernel_sigterm()
for i in range(int(waittime / 2 / pollinterval)):
if self.is_alive():
time.sleep(pollinterval)
else:
# If there's still a proc, wait and clear
if self.has_kernel:
self.kernel.wait()
self.kernel = None
break
else:
# OK, we've waited long enough.
if self.has_kernel:
self.log.debug("Kernel is taking too long to finish, killing")
self._kill_kernel()

def cleanup_resources(self, restart=False):
"""Clean up resources when the kernel is shut down"""
Expand Down Expand Up @@ -384,6 +399,8 @@ def shutdown_kernel(self, now=False, restart=False):
# Stop monitoring for restarting while we shutdown.
self.stop_restarter()

self.interrupt_kernel()

if now:
self._kill_kernel()
else:
Expand Down Expand Up @@ -458,6 +475,31 @@ def has_kernel(self):
"""Has a kernel been started that we are managing."""
return self.kernel is not None

def _send_kernel_sigterm(self):
"""similar to _kill_kernel, but with sigterm (not sigkill), but do not block"""
if self.has_kernel:
# Signal the kernel to terminate (sends SIGTERM on Unix and
# if the kernel is a subprocess and we are on windows; this is
# equivalent to kill
try:
if hasattr(self.kernel, "terminate"):
self.kernel.terminate()
elif hasattr(signal, "SIGTERM"):
self.signal_kernel(signal.SIGTERM)
except OSError as e:
# In Windows, we will get an Access Denied error if the process
# has already terminated. Ignore it.
if sys.platform == "win32":
if e.winerror != 5:
raise
# On Unix, we may get an ESRCH error if the process has already
# terminated. Ignore it.
else:
from errno import ESRCH

if e.errno != ESRCH:
raise

def _kill_kernel(self):
"""Kill the running kernel.
Expand Down Expand Up @@ -584,7 +626,18 @@ async def finish_shutdown(self, waittime=None, pollinterval=0.1):
if waittime is None:
waittime = max(self.shutdown_wait_time, 0)
try:
await asyncio.wait_for(self._async_wait(pollinterval=pollinterval), timeout=waittime)
try:
await asyncio.wait_for(
self._async_wait(pollinterval=pollinterval), timeout=waittime / 2
)
except asyncio.TimeoutError:
self.log.debug("Kernel is taking too long to finish, terminating")
await self._send_kernel_sigterm()

await asyncio.wait_for(
self._async_wait(pollinterval=pollinterval), timeout=waittime / 2
)

except asyncio.TimeoutError:
self.log.debug("Kernel is taking too long to finish, killing")
await self._kill_kernel()
Expand Down Expand Up @@ -615,6 +668,8 @@ async def shutdown_kernel(self, now=False, restart=False):
# Stop monitoring for restarting while we shutdown.
self.stop_restarter()

await self.interrupt_kernel()

if now:
await self._kill_kernel()
else:
Expand Down Expand Up @@ -673,6 +728,31 @@ async def restart_kernel(self, now=False, newports=False, **kw):
await self.start_kernel(**self._launch_args)
return None

async def _send_kernel_sigterm(self):
"""similar to _kill_kernel, but with sigterm (not sigkill), but do not block"""
if self.has_kernel:
# Signal the kernel to terminate (sends SIGTERM on Unix and
# if the kernel is a subprocess and we are on windows; this is
# equivalent to kill
try:
if hasattr(self.kernel, "terminate"):
self.kernel.terminate()
elif hasattr(signal, "SIGTERM"):
await self.signal_kernel(signal.SIGTERM)
except OSError as e:
# In Windows, we will get an Access Denied error if the process
# has already terminated. Ignore it.
if sys.platform == "win32":
if e.winerror != 5:
raise
# On Unix, we may get an ESRCH error if the process has already
# terminated. Ignore it.
else:
from errno import ESRCH

if e.errno != ESRCH:
raise

async def _kill_kernel(self):
"""Kill the running kernel.
Expand Down

0 comments on commit 93dd3ac

Please sign in to comment.