Skip to content

Commit

Permalink
Implemented basic client/server negotiation #52 (#53)
Browse files Browse the repository at this point in the history
* Implemented basic client/server negotation
* Raise Connection Error when hitting the max channel count.
* Renamed MAX_FRAME to MAX_FRAME_SIZE.
  • Loading branch information
eandersson authored Apr 19, 2018
1 parent e9eaccd commit 065e708
Show file tree
Hide file tree
Showing 20 changed files with 197 additions and 106 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
Changelog
=========

Version 2.4.1
-------------
- Added client/server negotiation to better determine the maximum supported channels and frame size [#52] - Thanks gastlich.
- We now raise an exception if the maximum allowed channel count is ever reached.

Version 2.4.0
-------------
- basic.consume now allows for multiple callbacks [#48].
Expand Down
65 changes: 10 additions & 55 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@ Introduction
============
AMQPStorm is a library designed to be consistent, stable and thread-safe.

- 100% Unit-test Coverage!
- 100% Test Coverage!
- Supports Python 2.7 and Python 3.3+.
- Fully tested against Python Implementations; CPython, PyPy and Pyston.
- When using a SSL connection, TLSv1 or higher is required.

Documentation
=============
Expand All @@ -21,6 +20,11 @@ Additional documentation is available on `amqpstorm.io <https://www.amqpstorm.io
Changelog
=========

Version 2.4.1
-------------
- Added client/server negotiation to better determine the maximum supported channels and maximum allowed frame size [#52] - Thanks gastlich.
- We now raise an exception if the maximum allowed channel count is reached.

Version 2.4.0
-------------
- basic.consume now allows for multiple callbacks [#48].
Expand All @@ -34,67 +38,18 @@ Version 2.3.0
- Fixed delivery_tag being set to None by default [#47] - tkram01.
- Exposed requests verify and certs flags to Management Api [#40] - Thanks Julien Carpentier.

Version 2.2.2
-------------
- Fixed potential Heartbeat deadlock when forcefully killing process - Thanks Charles Pierre.

Version 2.2.1
-------------
- Fixed potential Channel leak [#36] - Thanks Adam Mills.
- Fixed threading losing select module during python shutdown [#37] - Thanks Adam Mills.

Version 2.2.0
-------------
- Connection.close should now be more responsive.
- Channels are now reset when re-opening an existing connection.
- Re-wrote large portions of the Test suit.

Version 2.1.4
-------------
- Added parameter to override auto-decode on incoming Messages - Thanks Travis Griggs.
- Fixed a rare bug that could cause the consumer to get stuck if the connection unexpectedly dies - Thanks Connor Wolf.

Version 2.1.3
-------------
- Fixed a potential recursion error in Connection.close.

Version 2.1.1
-------------
- Reduced default TCP Timeout from 30s to 10s.
- Connection Open/Close timeout is now three times the value of TCP Timeout.
- Connection will now wait for a response from the remote server before closing.

Version 2.1.0
-------------
- [Experimental] Added support for the RabbitMQ Management Api.
- Documentation https://www.amqpstorm.io/#management-api-documentation
- Examples https://github.com/eandersson/amqpstorm/tree/master/examples/management_api

- Connection/Channel function check_for_errors now behave more consistently.

Version 2.0.0
-------------
- Messages are now delivered as Message objects by default.
- to_tuple and to_dict are now set to False by default.

This is a breaking change that affects the following function:

- channel.process_data_events
- channel.start_consuming
- channel.basic.get

Credits
=======
Special thanks to gmr (Gavin M. Roy) for creating pamqp, and in addition amqpstorm is heavily influenced by his pika and rabbitpy libraries.

.. |Version| image:: https://badge.fury.io/py/amqpstorm.svg?
:target: http://badge.fury.io/py/amqpstorm
:target: http://badge.fury.io/py/amqpstorm

.. |CodeClimate| image:: https://codeclimate.com/github/eandersson/amqpstorm/badges/gpa.svg
:target: https://codeclimate.com/github/eandersson/amqpstorm
:target: https://codeclimate.com/github/eandersson/amqpstorm

.. |Travis| image:: https://travis-ci.org/eandersson/amqpstorm.svg
:target: https://travis-ci.org/eandersson/amqpstorm
:target: https://travis-ci.org/eandersson/amqpstorm

.. |Coverage| image:: https://codecov.io/gh/eandersson/amqpstorm/branch/master/graph/badge.svg
:target: https://codecov.io/gh/eandersson/amqpstorm
:target: https://codecov.io/gh/eandersson/amqpstorm
2 changes: 1 addition & 1 deletion amqpstorm/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""AMQPStorm."""
__version__ = '2.4.0' # noqa
__version__ = '2.4.1' # noqa
__author__ = 'eandersson' # noqa

import logging
Expand Down
4 changes: 2 additions & 2 deletions amqpstorm/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@

AUTH_MECHANISM = 'PLAIN'
IDLE_WAIT = 0.01
FRAME_MAX = 131072
MAX_CHANNELS = 65535
LOCALE = locale.getdefaultlocale()[0] or 'en_US'
MAX_FRAME_SIZE = 131072
MAX_CHANNELS = 65535


class Stateful(object):
Expand Down
17 changes: 10 additions & 7 deletions amqpstorm/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from pamqp import specification

from amqpstorm import compatibility
from amqpstorm.base import FRAME_MAX
from amqpstorm.base import Handler
from amqpstorm.base import MAX_FRAME_SIZE
from amqpstorm.exception import AMQPChannelError
from amqpstorm.exception import AMQPInvalidArgument
from amqpstorm.message import Message
Expand All @@ -19,7 +19,11 @@

class Basic(Handler):
"""RabbitMQ Basic Operations."""
__slots__ = []
__slots__ = ['_max_frame_size']

def __init__(self, channel, max_frame_size=None):
super(Basic, self).__init__(channel)
self._max_frame_size = max_frame_size or MAX_FRAME_SIZE

def qos(self, prefetch_count=0, prefetch_size=0, global_=False):
"""Specify quality of service.
Expand Down Expand Up @@ -387,8 +391,7 @@ def _publish_confirm(self, frames_out):
return True
return False

@staticmethod
def _create_content_body(body):
def _create_content_body(self, body):
"""Split body based on the maximum frame size.
This function is based on code from Rabbitpy.
Expand All @@ -398,10 +401,10 @@ def _create_content_body(body):
:rtype: collections.Iterable
"""
frames = int(math.ceil(len(body) / float(FRAME_MAX)))
frames = int(math.ceil(len(body) / float(self._max_frame_size)))
for offset in compatibility.RANGE(0, frames):
start_frame = FRAME_MAX * offset
end_frame = start_frame + FRAME_MAX
start_frame = self._max_frame_size * offset
end_frame = start_frame + self._max_frame_size
body_len = len(body)
if end_frame > body_len:
end_frame = body_len
Expand Down
2 changes: 1 addition & 1 deletion amqpstorm/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def __init__(self, channel_id, connection, rpc_timeout):
self._confirming_deliveries = False
self._connection = connection
self._inbound = []
self._basic = Basic(self)
self._basic = Basic(self, connection.max_frame_size)
self._exchange = Exchange(self)
self._tx = Tx(self)
self._queue = Queue(self)
Expand Down
36 changes: 31 additions & 5 deletions amqpstorm/channel0.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@

from amqpstorm import __version__
from amqpstorm.base import AUTH_MECHANISM
from amqpstorm.base import FRAME_MAX
from amqpstorm.base import LOCALE
from amqpstorm.base import MAX_CHANNELS
from amqpstorm.base import MAX_FRAME_SIZE
from amqpstorm.base import Stateful
from amqpstorm.compatibility import try_utf8_decode
from amqpstorm.exception import AMQPConnectionError
Expand All @@ -24,6 +24,8 @@ class Channel0(object):
def __init__(self, connection):
super(Channel0, self).__init__()
self.is_blocked = False
self.max_allowed_channels = MAX_CHANNELS
self.max_frame_size = MAX_FRAME_SIZE
self.server_properties = {}
self._connection = connection
self._heartbeat = connection.parameters['heartbeat']
Expand Down Expand Up @@ -52,7 +54,7 @@ def on_frame(self, frame_in):
self.server_properties = frame_in.server_properties
self._send_start_ok(frame_in)
elif frame_in.name == 'Connection.Tune':
self._send_tune_ok()
self._send_tune_ok(frame_in)
self._send_open_connection()
else:
LOGGER.error('[Channel0] Unhandled Frame: %s', frame_in.name)
Expand Down Expand Up @@ -108,6 +110,18 @@ def _blocked_connection(self, frame_in):
try_utf8_decode(frame_in.reason)
)

def _negotiate(self, server_value, client_value):
"""Negotiate the highest supported value. Fall back on the
client side value if zero.
:param int server_value: Server Side value
:param int client_value: Client Side value
:rtype: int
:return:
"""
return min(server_value, client_value) or client_value

def _unblocked_connection(self):
"""Connection is Unblocked.
Expand All @@ -128,6 +142,7 @@ def _send_start_ok(self, frame_in):
"""Send Start OK frame.
:param specification.Connection.Start frame_in: Amqp frame.
:return:
"""
if 'PLAIN' not in try_utf8_decode(frame_in.mechanisms):
Expand All @@ -146,14 +161,25 @@ def _send_start_ok(self, frame_in):
)
self._write_frame(start_ok_frame)

def _send_tune_ok(self):
def _send_tune_ok(self, frame_in):
"""Send Tune OK frame.
:param specification.Connection.Tune frame_in: Tune frame.
:return:
"""
self.max_allowed_channels = self._negotiate(frame_in.channel_max,
MAX_CHANNELS)
self.max_frame_size = self._negotiate(frame_in.frame_max,
MAX_FRAME_SIZE)
LOGGER.debug(
'Negotiated max frame size %d, max channels %d',
self.max_frame_size, self.max_allowed_channels
)

tune_ok_frame = specification.Connection.TuneOk(
channel_max=MAX_CHANNELS,
frame_max=FRAME_MAX,
channel_max=self.max_allowed_channels,
frame_max=self.max_frame_size,
heartbeat=self._heartbeat)
self._write_frame(tune_ok_frame)

Expand Down
32 changes: 31 additions & 1 deletion amqpstorm/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,22 @@ def is_blocked(self):
"""
return self._channel0.is_blocked

@property
def max_allowed_channels(self):
"""Returns the maximum allowed channels for the connection.
:rtype: int
"""
return self._channel0.max_allowed_channels

@property
def max_frame_size(self):
"""Returns the maximum allowed frame size for the connection.
:rtype: int
"""
return self._channel0.max_frame_size

@property
def server_properties(self):
"""Returns the RabbitMQ Server Properties.
Expand Down Expand Up @@ -131,7 +147,7 @@ def channel(self, rpc_timeout=60, lazy=False):
raise AMQPConnectionError('socket/connection closed')

with self.lock:
channel_id = len(self._channels) + 1
channel_id = self._get_next_available_channel_id()
channel = Channel(channel_id, self, rpc_timeout)
self._channels[channel_id] = channel
if not lazy:
Expand Down Expand Up @@ -231,6 +247,20 @@ def _close_remaining_channels(self):
self._channels[channel_id].set_state(Channel.CLOSED)
self._channels[channel_id].close()

def _get_next_available_channel_id(self):
"""Returns the next available available channel id.
:raises AMQPConnectionError: Raises if there is no available channel.
:rtype: int
"""
channel_id = len(self._channels) + 1
if channel_id == self.max_allowed_channels:
raise AMQPConnectionError(
'reached the maximum number of channels %d' %
self.max_allowed_channels)
return channel_id

def _handle_amqp_frame(self, data_in):
"""Unmarshal a single AMQP frame and return the result.
Expand Down
6 changes: 3 additions & 3 deletions amqpstorm/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
from time import sleep

from amqpstorm import compatibility
from amqpstorm.base import FRAME_MAX
from amqpstorm.base import IDLE_WAIT
from amqpstorm.base import MAX_FRAME_SIZE
from amqpstorm.compatibility import ssl
from amqpstorm.exception import AMQPConnectionError

Expand Down Expand Up @@ -253,7 +253,7 @@ def _read_from_socket(self):
:rtype: bytes
"""
if self.use_ssl:
data_in = self.socket.read(FRAME_MAX)
data_in = self.socket.read(MAX_FRAME_SIZE)
else:
data_in = self.socket.recv(FRAME_MAX)
data_in = self.socket.recv(MAX_FRAME_SIZE)
return data_in
3 changes: 2 additions & 1 deletion amqpstorm/tests/unit/basic/basic_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import string
import sys

from mock import Mock
from pamqp import specification
from pamqp.body import ContentBody
from pamqp.header import ContentHeader
Expand Down Expand Up @@ -384,7 +385,7 @@ def test_basic_content_not_in_properties(self):

def test_basic_consume_add_tag(self):
tag = 'travis-ci'
channel = Channel(0, None, 1)
channel = Channel(0, Mock(name='Connection'), 1)
basic = Basic(channel)

self.assertEqual(basic._consume_add_and_get_tag({'consumer_tag': tag}),
Expand Down
5 changes: 3 additions & 2 deletions amqpstorm/tests/unit/channel/channel_exception_tests.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import mock
from mock import Mock
from pamqp import specification

import amqpstorm
Expand All @@ -14,7 +15,7 @@

class ChannelExceptionTests(TestFramework):
def test_chanel_invalid_close_parameter(self):
channel = Channel(0, None, 360)
channel = Channel(0, Mock(name='Connection'), 360)

self.assertRaisesRegexp(
AMQPInvalidArgument,
Expand All @@ -28,7 +29,7 @@ def test_chanel_invalid_close_parameter(self):
)

def test_chanel_callback_not_set(self):
channel = Channel(0, None, 360)
channel = Channel(0, Mock(name='Connection'), 360)

self.assertRaisesRegexp(
AMQPChannelError,
Expand Down
Loading

0 comments on commit 065e708

Please sign in to comment.