Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates to the kernel messaging spec #388

Merged
merged 10 commits into from
Jul 27, 2019
52 changes: 37 additions & 15 deletions docs/messaging.rst
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,28 @@ A message is defined by the following four-dictionary structure::
'buffers': list,
}

.. note::

The ``session`` id in a message header identifies a unique entity with state,
such as a kernel process or client process.

A client session id, in message headers from a client, should be unique among
all clients connected to a kernel. When a client reconnects to a kernel, it
should use the same client session id in its message headers. When a client
restarts, it should generate a new client session id.

A kernel session id, in message headers from a kernel, should identify a
particular kernel process. If a kernel is restarted, the kernel session id
should be regenerated.

The session id in a message header can be used to identify the sending entity.
For example, if a client disconnects and reconnects to a kernel, and messages
from the kernel have a different kernel session id than prior to the disconnect,
the client should assume that the kernel was restarted.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


.. versionchanged:: 5.0

``version`` key added to the header.
``version`` key added to the header.

.. versionchanged:: 5.1

Expand All @@ -139,8 +158,9 @@ Compatibility
=============

Kernels must implement the :ref:`execute <execute>` and :ref:`kernel info
<msging_kernel_info>` messages in order to be usable. All other message types
are optional, although we recommend implementing :ref:`completion
<msging_kernel_info>` messages, along with the associated busy and idle
:ref:`status` messages. All other message types are
optional, although we recommend implementing :ref:`completion
<msging_completion>` if possible. Kernels do not need to send any reply for
messages they don't handle, and frontends should provide sensible behaviour if
no reply arrives (except for the required execution and kernel info messages).
Expand Down Expand Up @@ -934,8 +954,7 @@ multiple cases:

The client sends a shutdown request to the kernel, and once it receives the
reply message (which is otherwise empty), it can assume that the kernel has
completed shutdown safely. The request can be sent on either the `control` or
`shell` channels.
completed shutdown safely. The request is sent on the `control` channel.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 this is really important this is specified here. +1000


Upon their own shutdown, client applications will typically execute a last
minute sanity check and forcefully terminate any kernel that is still alive, to
Expand All @@ -959,6 +978,12 @@ Message type: ``shutdown_reply``::
socket, they simply send a forceful process termination signal, since a dead
process is unlikely to respond in any useful way to messages.

.. versionchanged:: 5.4

Sending a ``shutdown_request`` message on the ``shell`` channel is deprecated.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the strategy for deprecating the shutdown_request?

We also deprecated payloads, yet we still use them and have no current replacement.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you basically asking what deprecation means in this context? Good question. @minrk, thoughts? Does this mean that if a kernel implements protocol 5.4, it doesn't need to listen to shutdown_request messages on the shell channel?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, that doesn't make much sense to me. How about this: we're just warning people now, and a kernel can officially ignore shutdown requests on the shell channel in spec 6.0 or later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Works for me!

Is most of the intent behind this that the shutdown API from the server is used? Was shutdown request put in place to help ensure that background resources got cleaned up appropriately (like with a cluster, deleting all the executors / workers?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, the kernel manager in the notebook server first sends a shutdown request to the kernel to let it clean up resources and exit by itself. If it doesn't receive a reply in a certain amount of time, then it forcefully shuts down the kernel with a signal.

This protocol definition is at the kernel level, not the kernel manager level. At this kernel message level, if I remember what @minrk said, sending the shutdown_request on either the shell or control channel is an artifact of history, and probably shouldn't have been done that way.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The notebook server uses shutdown_request on the control channel as a first, polite "please shutdown" before sending signals. This is deprecating the ~unused functionality of sending these messages on the shell channel. Does nteract send shutdown_request on either of these channels?

@jasongrout I notices that KernelClient.shutdown is sending it on the shell channel. This is a rarely, if ever used method, that I think would only be used by frontends that don't own the kernel (e.g. qtconsole --existing), so that should be updated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am +1 on disallowing the shutdown and interupt on shell.

I don't know of any kernel that do it anyways since our clients use the control channel for that.

Making this explicit is really important IMO.




.. _msging_interrupt:

Kernel interrupt
Expand Down Expand Up @@ -1191,6 +1216,8 @@ Message type: ``error``::

``pyerr`` renamed to ``error``

.. _status:

Kernel status
-------------

Expand All @@ -1201,8 +1228,7 @@ Message type: ``status``::
content = {
# When the kernel starts to handle a message, it will enter the 'busy'
# state and when it finishes, it will enter the 'idle' state.
# The kernel will publish state 'starting' exactly once at process startup.
execution_state : ('busy', 'idle', 'starting')
execution_state : ('busy', 'idle', other optional states)
}

When a kernel receives a request and begins processing it,
Expand All @@ -1213,6 +1239,10 @@ it shall publish a status message with ``execution_state: 'idle'``.
Thus, the outputs associated with a given execution shall generally arrive
between the busy and idle status messages associated with a given request.

A kernel may send optional status messages with execution states other than
`busy` or `idle`. For example, a kernel may send a status message with a
`starting` execution state exactly once at process startup.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@minrk, do we know of other kernels publishing other status states? We made the 'starting' status optional because there might be a race condition preventing the client from ever seeing a starting message. However, it is nice if we have a universe of only three status states, rather than opening it up to any status the kernel might want to send.

As for the kernel status states that the notebook spoofs, we already agreed that those should come on a different channel, a notebook channel, so I think those status states shouldn't count here as optional states sent from a kernel.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, I think I'm +1 on reverting this "optional status states" change, and just going back to mandating a 'starting' status, and having a defined universe of 'starting', 'busy', and 'idle' states sent from the kernel.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have an opinion on this.

.. note::

**A caveat for asynchronous output**
Expand All @@ -1229,14 +1259,6 @@ between the busy and idle status messages associated with a given request.
Busy and idle messages should be sent before/after handling every request,
not just execution.

.. note::

Extra status messages are added between the notebook webserver and websocket clients
that are not sent by the kernel. These are:

- restarting (kernel has died, but will be automatically restarted)
- dead (kernel has died, restarting has failed)

Clear output
------------

Expand Down
17 changes: 12 additions & 5 deletions jupyter_client/blocking/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@
TimeoutError = RuntimeError


def reqrep(meth):
def reqrep(meth, channel='shell'):
def wrapped(self, *args, **kwargs):
reply = kwargs.pop('reply', False)
timeout = kwargs.pop('timeout', None)
msg_id = meth(self, *args, **kwargs)
if not reply:
return msg_id

return self._recv_reply(msg_id, timeout=timeout)
return self._recv_reply(msg_id, timeout=timeout, channel=channel)

if not meth.__doc__:
# python -OO removes docstrings,
Expand Down Expand Up @@ -135,17 +135,21 @@ def wait_for_ready(self, timeout=None):
iopub_channel_class = Type(ZMQSocketChannel)
stdin_channel_class = Type(ZMQSocketChannel)
hb_channel_class = Type(HBChannel)
control_channel_class = Type(ZMQSocketChannel)


def _recv_reply(self, msg_id, timeout=None):
def _recv_reply(self, msg_id, timeout=None, channel='shell'):
"""Receive and return the reply for a given request"""
if timeout is not None:
deadline = monotonic() + timeout
while True:
if timeout is not None:
timeout = max(0, deadline - monotonic())
try:
reply = self.get_shell_msg(timeout=timeout)
if channel == 'control':
reply = self.get_control_msg(timeout=timeout)
else:
reply = self.get_shell_msg(timeout=timeout)
except Empty:
raise TimeoutError("Timeout waiting for reply")
if reply['parent_header'].get('msg_id') != msg_id:
Expand All @@ -154,13 +158,16 @@ def _recv_reply(self, msg_id, timeout=None):
return reply


# replies come on the shell channel
execute = reqrep(KernelClient.execute)
history = reqrep(KernelClient.history)
complete = reqrep(KernelClient.complete)
inspect = reqrep(KernelClient.inspect)
kernel_info = reqrep(KernelClient.kernel_info)
comm_info = reqrep(KernelClient.comm_info)
shutdown = reqrep(KernelClient.shutdown)

# replies come on the control channel
shutdown = reqrep(KernelClient.shutdown, channel='control')


def _stdin_hook_default(self, msg):
Expand Down
67 changes: 45 additions & 22 deletions jupyter_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ def validate_string_dict(dct):
class KernelClient(ConnectionFileMixin):
"""Communicates with a single kernel on any host via zmq channels.

There are four channels associated with each kernel:
There are five channels associated with each kernel:

* shell: for request/reply calls to the kernel.
* iopub: for the kernel to publish results to frontends.
* hb: for monitoring the kernel's heartbeat.
* stdin: for frontends to reply to raw_input calls in the kernel.
* control: for kernel management calls to the kernel.

The messages that can be sent on these channels are exposed as methods of the
client (KernelClient.execute, complete, history, etc.). These methods only
Expand All @@ -58,12 +59,14 @@ def _context_default(self):
iopub_channel_class = Type(ChannelABC)
stdin_channel_class = Type(ChannelABC)
hb_channel_class = Type(HBChannelABC)
control_channel_class = Type(ChannelABC)

# Protected traits
_shell_channel = Any()
_iopub_channel = Any()
_stdin_channel = Any()
_hb_channel = Any()
_control_channel = Any()

# flag for whether execute requests should be allowed to call raw_input:
allow_stdin = True
Expand All @@ -84,11 +87,15 @@ def get_stdin_msg(self, *args, **kwargs):
"""Get a message from the stdin channel"""
return self.stdin_channel.get_msg(*args, **kwargs)

def get_control_msg(self, *args, **kwargs):
"""Get a message from the control channel"""
return self.control_channel.get_msg(*args, **kwargs)

#--------------------------------------------------------------------------
# Channel management methods
#--------------------------------------------------------------------------

def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
def start_channels(self, shell=True, iopub=True, stdin=True, hb=True, control=True):
"""Starts the channels for this kernel.

This will create the channels if they do not exist and then start
Expand All @@ -109,6 +116,8 @@ def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
self.allow_stdin = False
if hb:
self.hb_channel.start()
if control:
self.control_channel.start()

def stop_channels(self):
"""Stops all the running channels for this kernel.
Expand All @@ -123,12 +132,15 @@ def stop_channels(self):
self.stdin_channel.stop()
if self.hb_channel.is_alive():
self.hb_channel.stop()
if self.control_channel.is_alive():
self.control_channel.stop()

@property
def channels_running(self):
"""Are any of the channels created and running?"""
return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
self.stdin_channel.is_alive() or self.hb_channel.is_alive())
self.stdin_channel.is_alive() or self.hb_channel.is_alive() or
self.control_channel.is_alive())

ioloop = None # Overridden in subclasses that use pyzmq event loop

Expand Down Expand Up @@ -179,6 +191,18 @@ def hb_channel(self):
)
return self._hb_channel

@property
def control_channel(self):
"""Get the control channel object for this kernel."""
if self._control_channel is None:
url = self._make_url('control')
self.log.debug("connecting control channel to %s", url)
socket = self.connect_control(identity=self.session.bsession)
self._control_channel = self.control_channel_class(
socket, self.session, self.ioloop
)
return self._control_channel

def is_alive(self):
"""Is the kernel process still running?"""
from .manager import KernelManager
Expand Down Expand Up @@ -383,8 +407,24 @@ def _handle_kernel_info_reply(self, msg):
if adapt_version != major_protocol_version:
self.session.adapt_version = adapt_version

def is_complete(self, code):
"""Ask the kernel whether some code is complete and ready to execute."""
msg = self.session.msg('is_complete_request', {'code': code})
self.shell_channel.send(msg)
return msg['header']['msg_id']

def input(self, string):
"""Send a string of raw input to the kernel.

This should only be called in response to the kernel sending an
``input_request`` message on the stdin channel.
"""
content = dict(value=string)
msg = self.session.msg('input_reply', content)
self.stdin_channel.send(msg)

def shutdown(self, restart=False):
"""Request an immediate kernel shutdown.
"""Request an immediate kernel shutdown on the control channel.

Upon receipt of the (empty) reply, client code can safely assume that
the kernel has shut down and it's safe to forcefully terminate it if
Expand All @@ -401,24 +441,7 @@ def shutdown(self, restart=False):
# Send quit message to kernel. Once we implement kernel-side setattr,
# this should probably be done that way, but for now this will do.
msg = self.session.msg('shutdown_request', {'restart':restart})
self.shell_channel.send(msg)
return msg['header']['msg_id']

def is_complete(self, code):
"""Ask the kernel whether some code is complete and ready to execute."""
msg = self.session.msg('is_complete_request', {'code': code})
self.shell_channel.send(msg)
self.control_channel.send(msg)
return msg['header']['msg_id']

def input(self, string):
"""Send a string of raw input to the kernel.

This should only be called in response to the kernel sending an
``input_request`` message on the stdin channel.
"""
content = dict(value=string)
msg = self.session.msg('input_reply', content)
self.stdin_channel.send(msg)


KernelClientABC.register(KernelClient)
10 changes: 9 additions & 1 deletion jupyter_client/clientabc.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,16 @@ def hb_channel_class(self):
def stdin_channel_class(self):
pass

@abc.abstractproperty
def control_channel_class(self):
pass

#--------------------------------------------------------------------------
# Channel management methods
#--------------------------------------------------------------------------

@abc.abstractmethod
def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
def start_channels(self, shell=True, iopub=True, stdin=True, hb=True, control=True):
pass

@abc.abstractmethod
Expand All @@ -78,3 +82,7 @@ def stdin_channel(self):
@abc.abstractproperty
def hb_channel(self):
pass

@abc.abstractproperty
def control_channel(self):
pass
10 changes: 5 additions & 5 deletions jupyter_client/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ def find_connection_file(filename='kernel-*.json', path=None, profile=None):
def tunnel_to_kernel(connection_info, sshserver, sshkey=None):
"""tunnel connections to a kernel via ssh

This will open four SSH tunnels from localhost on this machine to the
This will open five SSH tunnels from localhost on this machine to the
ports associated with the kernel. They can be either direct
localhost-localhost tunnels, or if an intermediate server is necessary,
the kernel must be listening on a public IP.
Expand All @@ -246,8 +246,8 @@ def tunnel_to_kernel(connection_info, sshserver, sshkey=None):
Returns
-------

(shell, iopub, stdin, hb) : ints
The four ports on localhost that have been forwarded to the kernel.
(shell, iopub, stdin, hb, control) : ints
The five ports on localhost that have been forwarded to the kernel.
"""
from .ssh import tunnel
if isinstance(connection_info, string_types):
Expand All @@ -257,8 +257,8 @@ def tunnel_to_kernel(connection_info, sshserver, sshkey=None):

cf = connection_info

lports = tunnel.select_random_ports(4)
rports = cf['shell_port'], cf['iopub_port'], cf['stdin_port'], cf['hb_port']
lports = tunnel.select_random_ports(5)
rports = cf['shell_port'], cf['iopub_port'], cf['stdin_port'], cf['hb_port'], cf['control_port']

remote_ip = cf['ip']

Expand Down
Loading