Skip to content

Commit

Permalink
Experimental fix for #55
Browse files Browse the repository at this point in the history
  • Loading branch information
eandersson committed Dec 2, 2018
1 parent 0120963 commit 93f3314
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 3 deletions.
1 change: 1 addition & 0 deletions amqpstorm/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ def close(self, reply_code=200, reply_text=''):
if self._inbound:
del self._inbound[:]
self.set_state(self.CLOSED)
self._connection._cleanup_channel(self.channel_id)
LOGGER.debug('Channel #%d Closed', self.channel_id)

def check_for_errors(self):
Expand Down
27 changes: 24 additions & 3 deletions amqpstorm/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,20 @@ def write_frames(self, channel_id, frames_out):
self.heartbeat.register_write()
self._io.write_to_socket(data_out)

def _cleanup_channel(self, channel_id):
if channel_id not in self._channels:
return

channel = self._channels[channel_id]
if not channel.is_closed:
return

with self.lock:
try:
del self._channels[channel_id]
except KeyError:
LOGGER.error('Channel %i not in channels', channel_id)

def _close_remaining_channels(self):
"""Close any open channels.
Expand All @@ -246,6 +260,7 @@ def _close_remaining_channels(self):
continue
self._channels[channel_id].set_state(Channel.CLOSED)
self._channels[channel_id].close()
self._cleanup_channel(channel_id)

def _get_next_available_channel_id(self):
"""Returns the next available available channel id.
Expand All @@ -254,12 +269,18 @@ def _get_next_available_channel_id(self):
:rtype: int
"""
channel_id = len(self._channels) + 1
if channel_id == self.max_allowed_channels:
num_channels = len(self._channels) + 1
if num_channels == self.max_allowed_channels:
raise AMQPConnectionError(
'reached the maximum number of channels %d' %
self.max_allowed_channels)
return channel_id

for index in compatibility.RANGE(1, num_channels + 1):
if index in self._channels:
continue
return index

return num_channels

def _handle_amqp_frame(self, data_in):
"""Unmarshal a single AMQP frame and return the result.
Expand Down

0 comments on commit 93f3314

Please sign in to comment.