Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make KernelManager subclass tests DRY #628

Merged
merged 1 commit into from
Mar 18, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 41 additions & 57 deletions jupyter_client/tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,95 +79,79 @@ def reset_counts(self) -> None:
self.method_calls[record] = 0


class SyncKernelManagerSubclass(RecordCallMixin, KernelManager):

def km_recorder(f):
def wrapped(self, *args, **kwargs):
# record this call
self.record(f.__name__)
method = getattr(self._km_class, f.__name__)
# call the superclass method
r = method(self, *args, **kwargs)
# call anything defined in the actual class method
f(self, *args, **kwargs)
return r
return wrapped


class KernelManagerSubclass(RecordCallMixin):

@km_recorder
def start_kernel(self, **kw):
self.record('start_kernel')
return super().start_kernel(**kw)
""" Record call and defer to superclass """

@km_recorder
def shutdown_kernel(self, now=False, restart=False):
self.record('shutdown_kernel')
return super().shutdown_kernel(now=now, restart=restart)
""" Record call and defer to superclass """

@km_recorder
def restart_kernel(self, now=False, **kw):
self.record('restart_kernel')
return super().restart_kernel(now=now, **kw)
""" Record call and defer to superclass """

@km_recorder
def interrupt_kernel(self):
self.record('interrupt_kernel')
return super().interrupt_kernel()
""" Record call and defer to superclass """

@km_recorder
def request_shutdown(self, restart=False):
self.record('request_shutdown')
return super().request_shutdown(restart=restart)
""" Record call and defer to superclass """

@km_recorder
def finish_shutdown(self, waittime=None, pollinterval=0.1):
self.record('finish_shutdown')
return super().finish_shutdown(waittime=waittime, pollinterval=pollinterval)
""" Record call and defer to superclass """

@km_recorder
def _launch_kernel(self, kernel_cmd, **kw):
self.record('_launch_kernel')
return super()._launch_kernel(kernel_cmd, **kw)
""" Record call and defer to superclass """

@km_recorder
def _kill_kernel(self):
self.record('_kill_kernel')
return super()._kill_kernel()
""" Record call and defer to superclass """

@km_recorder
def cleanup_resources(self, restart=False):
self.record('cleanup_resources')
super().cleanup_resources(restart=restart)
""" Record call and defer to superclass """


class SyncKernelManagerSubclass(KernelManagerSubclass, KernelManager):
_km_class = KernelManager

class AsyncKernelManagerSubclass(RecordCallMixin, AsyncKernelManager):

class AsyncKernelManagerSubclass(KernelManagerSubclass, AsyncKernelManager):
"""Used to test subclass hierarchies to ensure methods are called when expected.
This class is also used to test deprecation "routes" that are determined by superclass'
detection of methods.
This class represents a current subclass that overrides "interesting" methods of AsyncKernelManager.
"""
_km_class = AsyncKernelManager
which_cleanup = "" # cleanup deprecation testing

async def start_kernel(self, **kw):
self.record('start_kernel')
return await super().start_kernel(**kw)

async def shutdown_kernel(self, now=False, restart=False):
self.record('shutdown_kernel')
return await super().shutdown_kernel(now=now, restart=restart)

async def restart_kernel(self, now=False, **kw):
self.record('restart_kernel')
return await super().restart_kernel(now=now, **kw)

async def interrupt_kernel(self):
self.record('interrupt_kernel')
return await super().interrupt_kernel()

def request_shutdown(self, restart=False):
self.record('request_shutdown')
return super().request_shutdown(restart=restart)

async def finish_shutdown(self, waittime=None, pollinterval=0.1):
self.record('finish_shutdown')
return await super().finish_shutdown(waittime=waittime, pollinterval=pollinterval)

async def _launch_kernel(self, kernel_cmd, **kw):
self.record('_launch_kernel')
return await super()._launch_kernel(kernel_cmd, **kw)

async def _kill_kernel(self):
self.record('_kill_kernel')
return await super()._kill_kernel()

@km_recorder
def cleanup(self, connection_file=True):
self.record('cleanup')
super().cleanup(connection_file=connection_file)
self.which_cleanup = 'cleanup'

@km_recorder
def cleanup_resources(self, restart=False):
self.record('cleanup_resources')
super().cleanup_resources(restart=restart)
self.which_cleanup = 'cleanup_resources'


Expand Down