Skip to content

Commit

Permalink
Merge pull request #388 from jasongrout/update
Browse files Browse the repository at this point in the history
Updates to the kernel messaging spec
  • Loading branch information
SylvainCorlay authored Jul 27, 2019
2 parents 4738e6a + 5f0a21b commit 4def2a2
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 51 deletions.
45 changes: 32 additions & 13 deletions docs/messaging.rst
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,28 @@ A message is defined by the following four-dictionary structure::
'buffers': list,
}

.. note::

The ``session`` id in a message header identifies a unique entity with state,
such as a kernel process or client process.

A client session id, in message headers from a client, should be unique among
all clients connected to a kernel. When a client reconnects to a kernel, it
should use the same client session id in its message headers. When a client
restarts, it should generate a new client session id.

A kernel session id, in message headers from a kernel, should identify a
particular kernel process. If a kernel is restarted, the kernel session id
should be regenerated.

The session id in a message header can be used to identify the sending entity.
For example, if a client disconnects and reconnects to a kernel, and messages
from the kernel have a different kernel session id than prior to the disconnect,
the client should assume that the kernel was restarted.

.. versionchanged:: 5.0

``version`` key added to the header.
``version`` key added to the header.

.. versionchanged:: 5.1

Expand All @@ -139,8 +158,9 @@ Compatibility
=============

Kernels must implement the :ref:`execute <execute>` and :ref:`kernel info
<msging_kernel_info>` messages in order to be usable. All other message types
are optional, although we recommend implementing :ref:`completion
<msging_kernel_info>` messages, along with the associated busy and idle
:ref:`status` messages. All other message types are
optional, although we recommend implementing :ref:`completion
<msging_completion>` if possible. Kernels do not need to send any reply for
messages they don't handle, and frontends should provide sensible behaviour if
no reply arrives (except for the required execution and kernel info messages).
Expand Down Expand Up @@ -940,8 +960,7 @@ multiple cases:

The client sends a shutdown request to the kernel, and once it receives the
reply message (which is otherwise empty), it can assume that the kernel has
completed shutdown safely. The request can be sent on either the `control` or
`shell` channels.
completed shutdown safely. The request is sent on the `control` channel.

Upon their own shutdown, client applications will typically execute a last
minute sanity check and forcefully terminate any kernel that is still alive, to
Expand All @@ -965,6 +984,12 @@ Message type: ``shutdown_reply``::
socket, they simply send a forceful process termination signal, since a dead
process is unlikely to respond in any useful way to messages.

.. versionchanged:: 5.4

Sending a ``shutdown_request`` message on the ``shell`` channel is deprecated.



.. _msging_interrupt:

Kernel interrupt
Expand Down Expand Up @@ -1197,6 +1222,8 @@ Message type: ``error``::

``pyerr`` renamed to ``error``

.. _status:

Kernel status
-------------

Expand Down Expand Up @@ -1235,14 +1262,6 @@ between the busy and idle status messages associated with a given request.
Busy and idle messages should be sent before/after handling every request,
not just execution.

.. note::

Extra status messages are added between the notebook webserver and websocket clients
that are not sent by the kernel. These are:

- restarting (kernel has died, but will be automatically restarted)
- dead (kernel has died, restarting has failed)

Clear output
------------

Expand Down
17 changes: 12 additions & 5 deletions jupyter_client/blocking/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@
TimeoutError = RuntimeError


def reqrep(meth):
def reqrep(meth, channel='shell'):
def wrapped(self, *args, **kwargs):
reply = kwargs.pop('reply', False)
timeout = kwargs.pop('timeout', None)
msg_id = meth(self, *args, **kwargs)
if not reply:
return msg_id

return self._recv_reply(msg_id, timeout=timeout)
return self._recv_reply(msg_id, timeout=timeout, channel=channel)

if not meth.__doc__:
# python -OO removes docstrings,
Expand Down Expand Up @@ -135,17 +135,21 @@ def wait_for_ready(self, timeout=None):
iopub_channel_class = Type(ZMQSocketChannel)
stdin_channel_class = Type(ZMQSocketChannel)
hb_channel_class = Type(HBChannel)
control_channel_class = Type(ZMQSocketChannel)


def _recv_reply(self, msg_id, timeout=None):
def _recv_reply(self, msg_id, timeout=None, channel='shell'):
"""Receive and return the reply for a given request"""
if timeout is not None:
deadline = monotonic() + timeout
while True:
if timeout is not None:
timeout = max(0, deadline - monotonic())
try:
reply = self.get_shell_msg(timeout=timeout)
if channel == 'control':
reply = self.get_control_msg(timeout=timeout)
else:
reply = self.get_shell_msg(timeout=timeout)
except Empty:
raise TimeoutError("Timeout waiting for reply")
if reply['parent_header'].get('msg_id') != msg_id:
Expand All @@ -154,13 +158,16 @@ def _recv_reply(self, msg_id, timeout=None):
return reply


# replies come on the shell channel
execute = reqrep(KernelClient.execute)
history = reqrep(KernelClient.history)
complete = reqrep(KernelClient.complete)
inspect = reqrep(KernelClient.inspect)
kernel_info = reqrep(KernelClient.kernel_info)
comm_info = reqrep(KernelClient.comm_info)
shutdown = reqrep(KernelClient.shutdown)

# replies come on the control channel
shutdown = reqrep(KernelClient.shutdown, channel='control')


def _stdin_hook_default(self, msg):
Expand Down
67 changes: 45 additions & 22 deletions jupyter_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ def validate_string_dict(dct):
class KernelClient(ConnectionFileMixin):
"""Communicates with a single kernel on any host via zmq channels.
There are four channels associated with each kernel:
There are five channels associated with each kernel:
* shell: for request/reply calls to the kernel.
* iopub: for the kernel to publish results to frontends.
* hb: for monitoring the kernel's heartbeat.
* stdin: for frontends to reply to raw_input calls in the kernel.
* control: for kernel management calls to the kernel.
The messages that can be sent on these channels are exposed as methods of the
client (KernelClient.execute, complete, history, etc.). These methods only
Expand All @@ -58,12 +59,14 @@ def _context_default(self):
iopub_channel_class = Type(ChannelABC)
stdin_channel_class = Type(ChannelABC)
hb_channel_class = Type(HBChannelABC)
control_channel_class = Type(ChannelABC)

# Protected traits
_shell_channel = Any()
_iopub_channel = Any()
_stdin_channel = Any()
_hb_channel = Any()
_control_channel = Any()

# flag for whether execute requests should be allowed to call raw_input:
allow_stdin = True
Expand All @@ -84,11 +87,15 @@ def get_stdin_msg(self, *args, **kwargs):
"""Get a message from the stdin channel"""
return self.stdin_channel.get_msg(*args, **kwargs)

def get_control_msg(self, *args, **kwargs):
"""Get a message from the control channel"""
return self.control_channel.get_msg(*args, **kwargs)

#--------------------------------------------------------------------------
# Channel management methods
#--------------------------------------------------------------------------

def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
def start_channels(self, shell=True, iopub=True, stdin=True, hb=True, control=True):
"""Starts the channels for this kernel.
This will create the channels if they do not exist and then start
Expand All @@ -109,6 +116,8 @@ def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
self.allow_stdin = False
if hb:
self.hb_channel.start()
if control:
self.control_channel.start()

def stop_channels(self):
"""Stops all the running channels for this kernel.
Expand All @@ -123,12 +132,15 @@ def stop_channels(self):
self.stdin_channel.stop()
if self.hb_channel.is_alive():
self.hb_channel.stop()
if self.control_channel.is_alive():
self.control_channel.stop()

@property
def channels_running(self):
"""Are any of the channels created and running?"""
return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
self.stdin_channel.is_alive() or self.hb_channel.is_alive())
self.stdin_channel.is_alive() or self.hb_channel.is_alive() or
self.control_channel.is_alive())

ioloop = None # Overridden in subclasses that use pyzmq event loop

Expand Down Expand Up @@ -179,6 +191,18 @@ def hb_channel(self):
)
return self._hb_channel

@property
def control_channel(self):
"""Get the control channel object for this kernel."""
if self._control_channel is None:
url = self._make_url('control')
self.log.debug("connecting control channel to %s", url)
socket = self.connect_control(identity=self.session.bsession)
self._control_channel = self.control_channel_class(
socket, self.session, self.ioloop
)
return self._control_channel

def is_alive(self):
"""Is the kernel process still running?"""
from .manager import KernelManager
Expand Down Expand Up @@ -383,8 +407,24 @@ def _handle_kernel_info_reply(self, msg):
if adapt_version != major_protocol_version:
self.session.adapt_version = adapt_version

def is_complete(self, code):
"""Ask the kernel whether some code is complete and ready to execute."""
msg = self.session.msg('is_complete_request', {'code': code})
self.shell_channel.send(msg)
return msg['header']['msg_id']

def input(self, string):
"""Send a string of raw input to the kernel.
This should only be called in response to the kernel sending an
``input_request`` message on the stdin channel.
"""
content = dict(value=string)
msg = self.session.msg('input_reply', content)
self.stdin_channel.send(msg)

def shutdown(self, restart=False):
"""Request an immediate kernel shutdown.
"""Request an immediate kernel shutdown on the control channel.
Upon receipt of the (empty) reply, client code can safely assume that
the kernel has shut down and it's safe to forcefully terminate it if
Expand All @@ -401,24 +441,7 @@ def shutdown(self, restart=False):
# Send quit message to kernel. Once we implement kernel-side setattr,
# this should probably be done that way, but for now this will do.
msg = self.session.msg('shutdown_request', {'restart':restart})
self.shell_channel.send(msg)
return msg['header']['msg_id']

def is_complete(self, code):
"""Ask the kernel whether some code is complete and ready to execute."""
msg = self.session.msg('is_complete_request', {'code': code})
self.shell_channel.send(msg)
self.control_channel.send(msg)
return msg['header']['msg_id']

def input(self, string):
"""Send a string of raw input to the kernel.
This should only be called in response to the kernel sending an
``input_request`` message on the stdin channel.
"""
content = dict(value=string)
msg = self.session.msg('input_reply', content)
self.stdin_channel.send(msg)


KernelClientABC.register(KernelClient)
10 changes: 9 additions & 1 deletion jupyter_client/clientabc.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,16 @@ def hb_channel_class(self):
def stdin_channel_class(self):
pass

@abc.abstractproperty
def control_channel_class(self):
pass

#--------------------------------------------------------------------------
# Channel management methods
#--------------------------------------------------------------------------

@abc.abstractmethod
def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
def start_channels(self, shell=True, iopub=True, stdin=True, hb=True, control=True):
pass

@abc.abstractmethod
Expand All @@ -78,3 +82,7 @@ def stdin_channel(self):
@abc.abstractproperty
def hb_channel(self):
pass

@abc.abstractproperty
def control_channel(self):
pass
10 changes: 5 additions & 5 deletions jupyter_client/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ def find_connection_file(filename='kernel-*.json', path=None, profile=None):
def tunnel_to_kernel(connection_info, sshserver, sshkey=None):
"""tunnel connections to a kernel via ssh
This will open four SSH tunnels from localhost on this machine to the
This will open five SSH tunnels from localhost on this machine to the
ports associated with the kernel. They can be either direct
localhost-localhost tunnels, or if an intermediate server is necessary,
the kernel must be listening on a public IP.
Expand All @@ -246,8 +246,8 @@ def tunnel_to_kernel(connection_info, sshserver, sshkey=None):
Returns
-------
(shell, iopub, stdin, hb) : ints
The four ports on localhost that have been forwarded to the kernel.
(shell, iopub, stdin, hb, control) : ints
The five ports on localhost that have been forwarded to the kernel.
"""
from .ssh import tunnel
if isinstance(connection_info, string_types):
Expand All @@ -257,8 +257,8 @@ def tunnel_to_kernel(connection_info, sshserver, sshkey=None):

cf = connection_info

lports = tunnel.select_random_ports(4)
rports = cf['shell_port'], cf['iopub_port'], cf['stdin_port'], cf['hb_port']
lports = tunnel.select_random_ports(5)
rports = cf['shell_port'], cf['iopub_port'], cf['stdin_port'], cf['hb_port'], cf['control_port']

remote_ip = cf['ip']

Expand Down
Loading

0 comments on commit 4def2a2

Please sign in to comment.