diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index c1494ffbb..497c6a2f7 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -5,23 +5,26 @@ import asyncio import concurrent.futures -from datetime import datetime -from functools import partial +import inspect import itertools import logging -import inspect import os -from signal import signal, default_int_handler, SIGINT -import sys import socket +import sys import time import uuid import warnings +from datetime import datetime +from functools import partial +from signal import (SIGINT, SIGKILL, SIGTERM, Signals, default_int_handler, + signal) + try: import psutil except ImportError: psutil = None + try: # jupyter_client >= 5, use tz-aware now from jupyter_client.session import utcnow as now @@ -29,20 +32,17 @@ # jupyter_client < 5, use local now() now = datetime.now +import zmq +from IPython.core.error import StdinNotImplementedError +from jupyter_client.session import Session from tornado import ioloop from tornado.queues import Queue, QueueEmpty -import zmq +from traitlets import (Any, Bool, Dict, Float, Instance, Integer, List, Set, + Unicode, default, observe) +from traitlets.config.configurable import SingletonConfigurable from zmq.eventloop.zmqstream import ZMQStream -from traitlets.config.configurable import SingletonConfigurable -from IPython.core.error import StdinNotImplementedError from ipykernel.jsonutil import json_clean -from traitlets import ( - Any, Instance, Float, Dict, List, Set, Integer, Unicode, Bool, - observe, default -) - -from jupyter_client.session import Session from ._version import kernel_protocol_version @@ -796,13 +796,13 @@ async def comm_info_request(self, stream, ident, parent): reply_content, parent, ident) self.log.debug("%s", msg) - async def interrupt_request(self, stream, ident, parent): + def _send_interupt_children(self): + pid = os.getpid() pgid = os.getpgid(pid) if os.name == "nt": self.log.error("Interrupt message not supported on Windows") - else: # Prefer process-group over process if pgid and hasattr(os, "killpg"): @@ -816,6 +816,8 @@ async def interrupt_request(self, stream, ident, parent): except OSError: pass + async def interrupt_request(self, stream, ident, parent): + self._send_interupt_children() content = parent['content'] self.session.send(stream, 'interrupt_reply', content, parent, ident=ident) return @@ -830,7 +832,7 @@ async def shutdown_request(self, stream, ident, parent): content, parent ) - self._at_shutdown() + await self._at_shutdown() self.log.debug('Stopping control ioloop') control_io_loop = self.control_stream.io_loop @@ -1131,9 +1133,60 @@ def _input_request(self, prompt, ident, parent, password=False): raise EOFError return value - def _at_shutdown(self): + async def _progressively_terminate_all_children(self): + + pgid = os.getpgid(os.getpid()) + if not pgid: + self.log.warning(f"No Pgid ({pgid=}), not trying to stop subprocesses.") + return + if psutil is None: + # blindly send quickly sigterm/sigkill to processes if psutil not there. + self.log.warning( + f"Please install psutil for a cleaner subprocess shutdown." + ) + self._send_interupt_children() + try: + await asyncio.sleep(0.05) + self.log.debug("Sending SIGTERM to {pgid=}") + os.killpg(pgid, SIGTERM) + await asyncio.sleep(0.05) + self.log.debug("Sending SIGKILL to {pgid=}") + os.killpg(pgid, SIGKILL) + except Exception: + self.log.exception("Exception during subprocesses termination") + return + + sleeps = (0.01, 0.03, 0.1, 0.3, 1) + children = psutil.Process().children(recursive=True) + if not children: + self.log.debug("Kernel has no children.") + return + self.log.debug(f"Trying to interrupt then kill subprocesses : {children=}") + self._send_interupt_children() + for signum in (SIGTERM, SIGKILL): + self.log.debug( + f"Will try to send {signum} ({Signals(signum)}) to subprocesses :{children}" + ) + for delay in sleeps: + children = psutil.Process().children(recursive=True) + if not children: + self.log.debug("No more children, continuing shutdown routine.") + return + if pgid and hasattr(os, "killpg"): + try: + os.killpg(pgid, signum) + except OSError: + self.log.warning("OSError running killpg, not killing children") + return + self.log.debug( + f"Will sleep {delay}s before checking for children and retrying." + ) + await ascynio.sleep(delay) + + async def _at_shutdown(self): """Actions taken at shutdown by the kernel, called by python's atexit. """ + await self._progressively_terminate_all_children() if self._shutdown_message is not None: self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown')) self.log.debug("%s", self._shutdown_message)