Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] [Resource Leak] Gracefully Close ZMQ Context upon kernel shutdown #548

Merged
merged 14 commits into from
Jun 22, 2020
2 changes: 1 addition & 1 deletion jupyter_client/kernelapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def start(self):
self.setup_signals()
self.loop.start()
finally:
self.km.cleanup()
self.km.cleanup_resources()


main = KernelApp.launch_instance
32 changes: 28 additions & 4 deletions jupyter_client/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,12 @@ class KernelManager(ConnectionFileMixin):
This version starts kernels with Popen.
"""

_created_context = Bool(False)

# The PyZMQ Context to use for communication with the kernel.
context = Instance(zmq.Context)
def _context_default(self):
self._created_context = True
return zmq.Context()

# the class to create with our `client` method
Expand Down Expand Up @@ -337,15 +340,24 @@ def finish_shutdown(self, waittime=None, pollinterval=0.1):
self.log.debug("Kernel is taking too long to finish, killing")
self._kill_kernel()

def cleanup(self, connection_file=True):
def cleanup_resources(self, restart=False):
"""Clean up resources when the kernel is shut down"""
if connection_file:
if not restart:
self.cleanup_connection_file()

self.cleanup_ipc_files()
self._close_control_socket()
self.session.parent = None

if self._created_context and not restart:
self.context.destroy(linger=100)

def cleanup(self, connection_file=True):
"""Clean up resources when the kernel is shut down"""
warnings.warn("Method cleanup(connection_file=True) is deprecated, use cleanup_resources(restart=False).",
FutureWarning)
self.cleanup_resources(restart=not connection_file)

def shutdown_kernel(self, now=False, restart=False):
"""Attempts to stop the kernel process cleanly.

Expand Down Expand Up @@ -376,7 +388,13 @@ def shutdown_kernel(self, now=False, restart=False):
# most 1s, checking every 0.1s.
self.finish_shutdown()

self.cleanup(connection_file=not restart)
from . import __version__
from distutils.version import LooseVersion

if LooseVersion(__version__) < LooseVersion('6.2'):
self.cleanup(connection_file=not restart)
else:
self.cleanup_resources(restart=restart)

def restart_kernel(self, now=False, newports=False, **kw):
"""Restarts a kernel with the arguments that were used to launch it.
Expand Down Expand Up @@ -591,7 +609,13 @@ async def shutdown_kernel(self, now=False, restart=False):
# most 1s, checking every 0.1s.
await self.finish_shutdown()

self.cleanup(connection_file=not restart)
from . import __version__
from distutils.version import LooseVersion

if LooseVersion(__version__) < LooseVersion('6.2'):
self.cleanup(connection_file=not restart)
else:
self.cleanup_resources(restart=restart)

async def restart_kernel(self, now=False, newports=False, **kw):
"""Restarts a kernel with the arguments that were used to launch it.
Expand Down
34 changes: 33 additions & 1 deletion jupyter_client/multikernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ def _create_kernel_manager_factory(self):
kernel_manager_ctor = import_item(self.kernel_manager_class)

def create_kernel_manager(*args, **kwargs):
if self.shared_context:
if self.context.closed:
# recreate context if closed
self.context = self._context_default()
kwargs.setdefault("context", self.context)
km = kernel_manager_ctor(*args, **kwargs)

if km.cache_ports:
Expand Down Expand Up @@ -104,10 +109,33 @@ def _find_available_port(self, ip):

return port

shared_context = Bool(
True,
config=True,
help="Share a single zmq.Context to talk to all my kernels",
)

_created_context = Bool(False)

context = Instance('zmq.Context')

@default("context")
def _context_default(self):
self._created_context = True
return zmq.Context()

def __del__(self):
if self._created_context and self.context and not self.context.closed:
if self.log:
self.log.debug("Destroying zmq context for %s", self)
self.context.destroy()
try:
super_del = super().__del__
except AttributeError:
pass
else:
super_del()

connection_dir = Unicode('')

_kernels = Dict()
Expand Down Expand Up @@ -201,6 +229,10 @@ def finish_shutdown(self, kernel_id, waittime=None, pollinterval=0.1):
def cleanup(self, kernel_id, connection_file=True):
"""Clean up a kernel's resources"""

@kernel_method
def cleanup_resources(self, kernel_id, restart=False):
"""Clean up a kernel's resources"""

def remove_kernel(self, kernel_id):
"""remove a kernel from our mapping.

Expand Down Expand Up @@ -508,5 +540,5 @@ async def shutdown_all(self, now=False):
self.request_shutdown(kid)
for kid in kids:
await self.finish_shutdown(kid)
self.cleanup(kid)
self.cleanup_resources(kid)
self.remove_kernel(kid)
30 changes: 30 additions & 0 deletions jupyter_client/tests/test_kernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def _run_lifecycle(self, km):
km.interrupt_kernel()
self.assertTrue(isinstance(km, KernelManager))
km.shutdown_kernel(now=True)
self.assertTrue(km.context.closed)

def test_tcp_lifecycle(self):
km = self._get_tcp_km()
Expand Down Expand Up @@ -135,6 +136,7 @@ def test_start_new_kernel(self):

self.assertTrue(km.is_alive())
self.assertTrue(kc.is_alive())
self.assertFalse(km.context.closed)

def _env_test_body(self, kc):

Expand All @@ -157,6 +159,7 @@ def test_templated_kspec_env(self):

self.assertTrue(km.is_alive())
self.assertTrue(kc.is_alive())
self.assertFalse(km.context.closed)

self._env_test_body(kc)

Expand Down Expand Up @@ -190,9 +193,32 @@ def test_templated_extra_env(self):

self.assertTrue(km.is_alive())
self.assertTrue(kc.is_alive())
self.assertFalse(km.context.closed)

self._env_test_body(kc)

def test_cleanup_context(self):
km = KernelManager()
self.assertIsNotNone(km.context)

km.cleanup_resources(restart=False)

self.assertTrue(km.context.closed)

def test_no_cleanup_shared_context(self):
"""kernel manager does not terminate shared context"""
import zmq
ctx = zmq.Context()
km = KernelManager(context=ctx)
self.assertEquals(km.context, ctx)
self.assertIsNotNone(km.context)

km.cleanup_resources(restart=False)
self.assertFalse(km.context.closed)
self.assertFalse(ctx.closed)

ctx.term()


class TestParallel:

Expand Down Expand Up @@ -307,6 +333,7 @@ def execute(cmd):
execute('check')

km.shutdown_kernel()
assert km.context.closed


class TestAsyncKernelManager(AsyncTestCase):
Expand Down Expand Up @@ -351,6 +378,7 @@ async def _run_lifecycle(self, km):
self.assertTrue(isinstance(km, AsyncKernelManager))
await km.shutdown_kernel(now=True)
self.assertFalse(await km.is_alive())
self.assertTrue(km.context.closed)

@gen_test
async def test_tcp_lifecycle(self):
Expand Down Expand Up @@ -417,6 +445,7 @@ async def execute(cmd):
finally:
await km.shutdown_kernel(now=True)
kc.stop_channels()
self.assertTrue(km.context.closed)

@gen_test(timeout=10.0)
async def test_start_new_async_kernel(self):
Expand All @@ -431,3 +460,4 @@ async def test_start_new_async_kernel(self):
finally:
await km.shutdown_kernel(now=True)
kc.stop_channels()
self.assertTrue(km.context.closed)