Skip to content

Commit

Permalink
BUG: Kill subprocesses on shutdown.
Browse files Browse the repository at this point in the history
Fixes #jupyter/jupyter_client#104

This should make sure we properly cull all subprocesses at shutdown,
it does change one of the private method from sync to async in order to
no user time.sleep or thread so this may affect subclasses, though I
doubt it.

It's also not completely clear to me whether this works on windows as
SIGINT I belove is not a thing.

Regardless as this affects things like dask, and others that are mostly
on unix, it should be an improvement.

It does the following, stopping as soon as it does not find any more
children to current process.

 - Send sigint to everything
 - Immediately send sigterm in look with an exponential backoff from
   0.01 to 1 second roughtly multiplying the delay until next send by 3
     each time.
 - Switch to sending sigkill with same backoff.

There is no delay after sigint, as this is just a courtesy.
The delays backoff are not configurable. I can imagine that on slow
systems it may make sens
  • Loading branch information
Carreau committed Feb 17, 2022
1 parent 221dca6 commit adbdb77
Showing 1 changed file with 71 additions and 18 deletions.
89 changes: 71 additions & 18 deletions ipykernel/kernelbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,44 +5,44 @@

import asyncio
import concurrent.futures
from datetime import datetime
from functools import partial
import inspect
import itertools
import logging
import inspect
import os
from signal import signal, default_int_handler, SIGINT
import sys
import socket
import sys
import time
import uuid
import warnings
from datetime import datetime
from functools import partial
from signal import (SIGINT, SIGKILL, SIGTERM, Signals, default_int_handler,
signal)

try:
import psutil
except ImportError:
psutil = None


try:
# jupyter_client >= 5, use tz-aware now
from jupyter_client.session import utcnow as now
except ImportError:
# jupyter_client < 5, use local now()
now = datetime.now

import zmq
from IPython.core.error import StdinNotImplementedError
from jupyter_client.session import Session
from tornado import ioloop
from tornado.queues import Queue, QueueEmpty
import zmq
from traitlets import (Any, Bool, Dict, Float, Instance, Integer, List, Set,
Unicode, default, observe)
from traitlets.config.configurable import SingletonConfigurable
from zmq.eventloop.zmqstream import ZMQStream

from traitlets.config.configurable import SingletonConfigurable
from IPython.core.error import StdinNotImplementedError
from ipykernel.jsonutil import json_clean
from traitlets import (
Any, Instance, Float, Dict, List, Set, Integer, Unicode, Bool,
observe, default
)

from jupyter_client.session import Session

from ._version import kernel_protocol_version

Expand Down Expand Up @@ -796,13 +796,13 @@ async def comm_info_request(self, stream, ident, parent):
reply_content, parent, ident)
self.log.debug("%s", msg)

async def interrupt_request(self, stream, ident, parent):
def _send_interupt_children(self):

pid = os.getpid()
pgid = os.getpgid(pid)

if os.name == "nt":
self.log.error("Interrupt message not supported on Windows")

else:
# Prefer process-group over process
if pgid and hasattr(os, "killpg"):
Expand All @@ -816,6 +816,8 @@ async def interrupt_request(self, stream, ident, parent):
except OSError:
pass

async def interrupt_request(self, stream, ident, parent):
self._send_interupt_children()
content = parent['content']
self.session.send(stream, 'interrupt_reply', content, parent, ident=ident)
return
Expand All @@ -830,7 +832,7 @@ async def shutdown_request(self, stream, ident, parent):
content, parent
)

self._at_shutdown()
await self._at_shutdown()

self.log.debug('Stopping control ioloop')
control_io_loop = self.control_stream.io_loop
Expand Down Expand Up @@ -1131,9 +1133,60 @@ def _input_request(self, prompt, ident, parent, password=False):
raise EOFError
return value

def _at_shutdown(self):
async def _progressively_terminate_all_children(self):

pgid = os.getpgid(os.getpid())
if not pgid:
self.log.warning(f"No Pgid ({pgid=}), not trying to stop subprocesses.")
return
if psutil is None:
# blindly send quickly sigterm/sigkill to processes if psutil not there.
self.log.warning(
f"Please install psutil for a cleaner subprocess shutdown."
)
self._send_interupt_children()
try:
await asyncio.sleep(0.05)
self.log.debug("Sending SIGTERM to {pgid=}")
os.killpg(pgid, SIGTERM)
await asyncio.sleep(0.05)
self.log.debug("Sending SIGKILL to {pgid=}")
os.killpg(pgid, SIGKILL)
except Exception:
self.log.exception("Exception during subprocesses termination")
return

sleeps = (0.01, 0.03, 0.1, 0.3, 1)
children = psutil.Process().children(recursive=True)
if not children:
self.log.debug("Kernel has no children.")
return
self.log.debug(f"Trying to interrupt then kill subprocesses : {children=}")
self._send_interupt_children()
for signum in (SIGTERM, SIGKILL):
self.log.debug(
f"Will try to send {signum} ({Signals(signum)}) to subprocesses :{children}"
)
for delay in sleeps:
children = psutil.Process().children(recursive=True)
if not children:
self.log.debug("No more children, continuing shutdown routine.")
return
if pgid and hasattr(os, "killpg"):
try:
os.killpg(pgid, signum)
except OSError:
self.log.warning("OSError running killpg, not killing children")
return
self.log.debug(
f"Will sleep {delay}s before checking for children and retrying."
)
await ascynio.sleep(delay)

async def _at_shutdown(self):
"""Actions taken at shutdown by the kernel, called by python's atexit.
"""
await self._progressively_terminate_all_children()
if self._shutdown_message is not None:
self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
self.log.debug("%s", self._shutdown_message)
Expand Down

0 comments on commit adbdb77

Please sign in to comment.