Skip to content

Commit

Permalink
Merge pull request #172 from minrk/api
Browse files Browse the repository at this point in the history
PyZMQ API improvements

* on_recv_stream
* scrub mistaken use of POLLERR
* scrub unused prefix
* ctx.sockopts
* get_includes() fixes
* Message -> Frame
* _unicode -> _string

closes #178
  • Loading branch information
minrk committed Feb 19, 2012
2 parents baf9c17 + 38d832f commit cadd6d9
Show file tree
Hide file tree
Showing 23 changed files with 461 additions and 237 deletions.
45 changes: 45 additions & 0 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,51 @@ Changes in PyZMQ
This is a coarse summary of changes in pyzmq versions. For a real changelog, consult the
`git log <https://github.com/zeromq/pyzmq/commits>`_

2.1dev
======

Some effort has gone into refining the pyzmq API in this release to make it a model for
other language bindings. This is principally made in a few renames of objects and methods,
all of which leave the old name for backwards compatibility.

Name Changes
------------

* The :class:`~.Message` class has been renamed to :class:`~.Frame`, to better match other
zmq bindings. The old Message name remains for backwards-compatibility. Wherever pyzmq
docs say "Message", they should refer to a complete zmq atom of communication (one or
more Frames, connected by ZMQ_SNDMORE). Please report any remaining instances of
Message== MessagePart with an Issue (or better yet a Pull Request).

* All ``foo_unicode`` methods are now called ``foo_string`` (``_unicode`` remains for
backwards compatibility). This is not only for cross-language consistency, but it makes
more sense in Python 3, where native strings are unicode, and the ``_unicode`` suffix
was wedded too much to Python 2.

Other Changes and Removals
--------------------------

* ``prefix`` removed as an unused keyword argument from :meth:`~.Socket.send_multipart`.
* ZMQStream :meth:`~.ZMQStream.send` default has been changed to `copy=True`, so it matches
Socket :meth:`~.Socket.send`.
* ZMQStream :meth:`~.ZMQStream.on_err` is deprecated, because it never did anything.

New Stuff
---------

* :class:`~.Context` objects can now set default options when they create a socket. These
are set and accessed as attributes to the context. Socket options that do not apply to a
socket (e.g. SUBSCRIBE on non-SUB sockets) will simply be ignored.

* :meth:`~.ZMQStream.on_recv_stream` has been added, which adds the stream itself as a
second argument to the callback, making it easier to use a single callback on multiple
streams.

* A :attr:`~Frame.more` boolean attribute has been added to the :class:`~.Frame` (née
Message) class, so that frames can be identified as terminal without extra queires of
:attr:`~.Socket.rcvmore`.


2.1.11
======

Expand Down
37 changes: 31 additions & 6 deletions docs/source/eventloop.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ native sockets. We have included a small part of Tornado (specifically its
:mod:`.ioloop`), and adapted its :class:`IOStream` class into :class:`.ZMQStream` for
handling poll events on ØMQ sockets. A ZMQStream object works much like a Socket object,
but instead of calling :meth:`~.Socket.recv` directly, you register a callback with
:meth:`~.ZMQStream.on_recv`. callbacks can also be registered for send and error events
with :meth:`~.ZMQStream.on_send` and :meth:`~.ZMQStream.on_err` respectively.
:meth:`~.ZMQStream.on_recv`. callbacks can also be registered for send events
with :meth:`~.ZMQStream.on_send`.


:func:`install()`
Expand Down Expand Up @@ -77,17 +77,42 @@ even if its length is 1. You can easily use this to build things like an echo so

s = ctx.socket(zmq.REP)
s.bind('tcp://localhost:12345')
loop = ioloop.IOLoop.instance()
stream = ZMQStream(s, loop)
stream = ZMQStream(s)
def echo(msg):
stream.send_multipart(msg)
stream.on_recv(echo)
loop.start()
ioloop.IOLoop.instance().start()

on_recv can also take a `copy` flag, just like :meth:`.Socket.recv`. If `copy=False`, then
callbacks registered with on_recv will receive tracked :class:`.Message` objects instead of
callbacks registered with on_recv will receive tracked :class:`.Frame` objects instead of
bytes.

:meth:`on_recv_stream`
----------------------

:meth:`.ZMQStream.on_recv_stream` is just like on_recv above, but the callback will be
passed both the message and the stream, rather than just the message. This is meant to make
it easier to use a single callback with multiple streams.

.. sourcecode:: python

s1 = ctx.socket(zmq.REP)
s1.bind('tcp://localhost:12345')
stream1 = ZMQStream(s1)

s2 = ctx.socket(zmq.REP)
s2.bind('tcp://localhost:54321')
stream2 = ZMQStream(s2)

def echo(msg, stream):
stream.send_multipart(msg)

stream1.on_recv_stream(echo)
stream2.on_recv_stream(echo)

ioloop.IOLoop.instance().start()


:meth:`flush`
-------------

Expand Down
38 changes: 29 additions & 9 deletions docs/source/morethanbindings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ approach is not recommended.


Socket Options as Attributes
****************************
----------------------------

.. versionadded:: 2.1.9

Expand All @@ -67,6 +67,26 @@ behave just as you would expect:
s.fd
# 16


Default Options on the Context
******************************

.. versionadded:: 2.1.12

Just like setting socket options as attributes on Sockets, you can do the same on Contexts.
This affects the default options of any *new* sockets created after the assignment.

.. sourcecode:: python

ctx = zmq.Context()
ctx.linger = 0
rep = ctx.socket(zmq.REP)
req = ctx.socket(zmq.REQ)

Socket options that do not apply to a socket (e.g. SUBSCRIBE on non-SUB sockets) will
simply be ignored.


Core Extensions
---------------

Expand All @@ -83,7 +103,7 @@ object over the wire after serializing with :mod:`json` and :mod:`pickle` respec
and any object sent via those methods can be reconstructed with the
:meth:`~.Socket.recv_json` and :meth:`~.Socket.recv_pyobj` methods. Unicode strings are
other objects that are not unambiguously sendable over the wire, so we include
:meth:`~.Socket.send_unicode` and :meth:`~.Socket.recv_unicode` that simply send bytes
:meth:`~.Socket.send_string` and :meth:`~.Socket.recv_string` that simply send bytes
after encoding the message ('utf-8' is the default).

.. seealso::
Expand Down Expand Up @@ -112,25 +132,25 @@ builtin :py:class:`~Queue.Queue` object), instantiating a MessageTracker takes a
amount of time (10s of µs), so in situations instantiating many small messages, this can
actually dominate performance. As a result, tracking is optional, via the ``track`` flag,
which is optionally passed, always defaulting to ``False``, in each of the three places
where a Message is instantiated: The :class:`.Message` constructor, and non-copying sends
and receives.
where a Frame object (the pyzmq object for wrapping a segment of a message) is
instantiated: The :class:`.Frame` constructor, and non-copying sends and receives.

A MessageTracker is very simple, and has just one method and one attribute. The property
:attr:`MessageTracker.done` will be ``True`` when the Message(s) being tracked are no
:attr:`MessageTracker.done` will be ``True`` when the Frame(s) being tracked are no
longer in use by ØMQ, and :meth:`.MessageTracker.wait` will block, waiting for the
Message(s) to be released.
Frame(s) to be released.

.. Note::

A message cannot be tracked after it has been instantiated without tracking. If a
Message is to even have the *option* of tracking, it must be constructed with
A Frame cannot be tracked after it has been instantiated without tracking. If a
Frame is to even have the *option* of tracking, it must be constructed with
``track=True``.


Extensions
----------

So far, PyZMQ includes three extensions to core ØMQ that we found basic enough to be
So far, PyZMQ includes four extensions to core ØMQ that we found basic enough to be
included in PyZMQ itself:

* :ref:`zmq.log <logging>` : Logging handlers for hooking Python logging up to the
Expand Down
23 changes: 16 additions & 7 deletions docs/source/unicode.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ PyZMQ and Unicode
=================

PyZMQ is built with an eye towards an easy transition to Python 3, and part of
that is dealing with unicode objects. This is an overview of some of what we
that is dealing with unicode strings. This is an overview of some of what we
found, and what it means for PyZMQ.

First, Unicode in Python 2 and 3
Expand Down Expand Up @@ -93,7 +93,7 @@ bytes, then we are potentially using up enormous amounts of excess memory
unnecessarily, due to copying and larger memory footprint of unicode strings.

Still, we recognize the fact that users will quite frequently have unicode
strings that they want to send, so we have added ``socket.<method>_unicode()``
strings that they want to send, so we have added ``socket.<method>_string()``
wrappers. These methods simply wrap their bytes counterpart by encoding
to/decoding from bytes around them, and they all take an `encoding` keyword
argument that defaults to utf-8. Since encoding and decoding are necessary to
Expand All @@ -104,7 +104,16 @@ actions with these wrappers.
strictly setters and there is not corresponding getter method. As a result, we
feel that we can safely coerce unicode objects to bytes (always to utf-8) in
these methods.


.. note::

For cross-language symmetry (including Python 3), the ``_unicode`` methods
are now ``_string``. Many languages have a notion of native strings, and
the use of ``_unicode`` was wedded too closely to the name of such objects
in Python 2. For the time being, anywhere you see ``_string``, ``_unicode``
also works, and is the only option in pyzmq ≤ 2.1.11.


The Methods
-----------

Expand Down Expand Up @@ -138,14 +147,14 @@ Overview of the relevant methods:
`unicode(message)` decodes `message.buffer` with utf-8
.. py:function:: socket.send_unicode(self, unicode s, flags=0, encoding='utf-8')
.. py:function:: socket.send_string(self, unicode s, flags=0, encoding='utf-8')
takes a ``unicode`` string `s`, and sends the ``bytes``
after encoding without an extra copy, via:
`socket.send(s.encode(encoding), flags, copy=False)`
.. py:function:: socket.recv_unicode(self, flags=0, encoding='utf-8')
.. py:function:: socket.recv_string(self, flags=0, encoding='utf-8')
always returns ``unicode`` string
Expand All @@ -163,14 +172,14 @@ Overview of the relevant methods:
returns ``bytes`` (or ``int``), never ``unicode``
.. py:function:: socket.setsockopt_unicode(self, opt, unicode optval, encoding='utf-8')
.. py:function:: socket.setsockopt_string(self, opt, unicode optval, encoding='utf-8')
accepts ``unicode`` string for `optval`
encodes `optval` with `encoding` before passing the ``bytes`` to
`setsockopt`
.. py:function:: socket.getsockopt_unicode(self, opt, encoding='utf-8')
.. py:function:: socket.getsockopt_string(self, opt, encoding='utf-8')
always returns ``unicode`` string, after decoding with `encoding`
Expand Down
2 changes: 1 addition & 1 deletion zmq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def get_includes():
from os.path import join, dirname, abspath, pardir
base = dirname(__file__)
parent = abspath(join(base, pardir))
return [ parent ] + [ join(base, subdir) for subdir in ('utils',) ]
return [ parent ] + [ join(parent, base, subdir) for subdir in ('utils',) ]


__all__ = ['get_includes'] + core.__all__
Expand Down
3 changes: 3 additions & 0 deletions zmq/core/context.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,7 @@ cdef class Context:
# helpers for events on _sockets in Socket.__cinit__()/close()
cdef inline void _add_socket(self, void* handle)
cdef inline void _remove_socket(self, void* handle)

cdef public dict sockopts # dict to store default sockopts
cdef dict _attrs # dict needed for *non-sockopt* get/setattr in subclasses

64 changes: 61 additions & 3 deletions zmq/core/context.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ from libc.stdlib cimport free, malloc, realloc
from libzmq cimport *

from error import ZMQError
from zmq.core import constants
from constants import *

#-----------------------------------------------------------------------------
Expand Down Expand Up @@ -64,6 +65,8 @@ cdef class Context:
if self._sockets == NULL:
raise MemoryError("Could not allocate _sockets array")

self.sockopts = {}
self._attrs = {}

def __del__(self):
"""deleting a Context should terminate it, without trying non-threadsafe destroy"""
Expand Down Expand Up @@ -192,14 +195,69 @@ cdef class Context:
----------
socket_type : int
The socket type, which can be any of the 0MQ socket types:
REQ, REP, PUB, SUB, PAIR, XREQ, DEALER, XREP, ROUTER, PULL, PUSH, XSUB, XPUB.
REQ, REP, PUB, SUB, PAIR, DEALER, ROUTER, PULL, PUSH, XSUB, XPUB.
"""
# import here to prevent circular import
from zmq.core.socket import Socket
if self.closed:
raise ZMQError(ENOTSUP)
return Socket(self, socket_type)

s = Socket(self, socket_type)
for opt, value in self.sockopts.iteritems():
try:
s.setsockopt(opt, value)
except ZMQError:
# ignore ZMQErrors, which are likely for socket options
# that do not apply to a particular socket type, e.g.
# SUBSCRIBE for non-SUB sockets.
pass
return s

def __setattr__(self, key, value):
"""set default sockopts as attributes"""
try:
opt = getattr(constants, key.upper())
except AttributeError:
# allow subclasses to have extended attributes
if self.__class__.__module__ != 'zmq.core.context':
self._attrs[key] = value
else:
raise AttributeError("No such socket option: %s" % key.upper())
else:
self.sockopts[opt] = value

def __getattr__(self, key):
"""get default sockopts as attributes"""
if key in self._attrs:
# `key` is subclass extended attribute
return self._attrs[key]
key = key.upper()
try:
opt = getattr(constants, key)
except AttributeError:
raise AttributeError("no such socket option: %s" % key)
else:
if opt not in self.sockopts:
raise AttributeError(key)
else:
return self.sockopts[opt]

def __delattr__(self, key):
"""delete default sockopts as attributes"""
if key in self._attrs:
# `key` is subclass extended attribute
del self._attrs[key]
return
key = key.upper()
try:
opt = getattr(constants, key)
except AttributeError:
raise AttributeError("no such socket option: %s" % key)
else:
if opt not in self.sockopts:
raise AttributeError(key)
else:
del self.sockopts[opt]

@property
def _handle(self):
return <Py_ssize_t> self.handle
Expand Down
7 changes: 4 additions & 3 deletions zmq/core/message.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ cdef class MessageTracker(object):
cdef set peers # Other Message or MessageTracker objects.


cdef class Message:
"""A Message class for non-copy send/recvs."""
cdef class Frame:
"""A Message Frame class for non-copy send/recvs."""

cdef zmq_msg_t zmq_msg
cdef object _data # The actual message data as a Python object.
Expand All @@ -46,8 +46,9 @@ cdef class Message:
cdef bint _failed_init # Flag to handle failed zmq_msg_init
cdef public object tracker_event # Event for use with zmq_free_fn.
cdef public object tracker # MessageTracker object.
cdef public bint more # whether RCVMORE was set

cdef Message fast_copy(self) # Create shallow copy of Message object.
cdef Frame fast_copy(self) # Create shallow copy of Message object.
cdef object _getbuffer(self) # Construct self._buffer.

cdef inline object copy_zmq_msg_bytes(zmq_msg_t *zmq_msg)
Loading

0 comments on commit cadd6d9

Please sign in to comment.