Skip to content

Commit

Permalink
protocol: put a lock around protocol.drain()
Browse files Browse the repository at this point in the history
As of CPython 3.6.1, drain() is not coroutine-safe so put a lock around it
to make sure it only ever gets called from a single coroutine at a time.

http://bugs.python.org/issue29930
python-websockets/websockets#16
  • Loading branch information
RemiCardona committed Apr 13, 2017
1 parent 99a0bf7 commit 6ea7683
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 4 deletions.
6 changes: 3 additions & 3 deletions aioamqp/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def _write_frame(self, frame, request, check_open=True, drain=True):
raise exceptions.ChannelClosed()
frame.write_frame(request)
if drain:
yield from self.protocol._stream_writer.drain()
yield from self.protocol._drain()

@asyncio.coroutine
def _write_frame_awaiting_response(self, waiter_id, frame, request, no_wait, check_open=True, drain=True):
Expand Down Expand Up @@ -508,7 +508,7 @@ def basic_publish(self, payload, exchange_name, routing_key, properties=None, ma
encoder.payload.write(chunk)
yield from self._write_frame(content_frame, encoder, drain=False)

yield from self.protocol._stream_writer.drain()
yield from self.protocol._drain()

@asyncio.coroutine
def basic_qos(self, prefetch_size=0, prefetch_count=0, connection_global=None):
Expand Down Expand Up @@ -826,7 +826,7 @@ def publish(self, payload, exchange_name, routing_key, properties=None, mandator
encoder.payload.write(chunk)
yield from self._write_frame(content_frame, encoder, drain=False)

yield from self.protocol._stream_writer.drain()
yield from self.protocol._drain()

if self.publisher_confirms:
yield from fut
Expand Down
11 changes: 10 additions & 1 deletion aioamqp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ def __init__(self, *args, **kwargs):
self.server_channel_max = None
self.channels_ids_ceil = 0
self.channels_ids_free = set()
self._drain_lock = asyncio.Lock(loop=self._loop)

def connection_made(self, transport):
super().connection_made(transport)
Expand Down Expand Up @@ -133,11 +134,19 @@ def ensure_open(self):
assert self.state == CONNECTING
raise exceptions.AioamqpException("connection isn't established yet.")

@asyncio.coroutine
def _drain(self):
with (yield from self._drain_lock):
# drain() cannot be called concurrently by multiple coroutines:
# http://bugs.python.org/issue29930. Remove this lock when no
# version of Python where this bugs exists is supported anymore.
yield from self._stream_writer.drain()

@asyncio.coroutine
def _write_frame(self, frame, request, drain=True):
frame.write_frame(request)
if drain:
yield from self._stream_writer.drain()
yield from self._drain()

@asyncio.coroutine
def close(self, no_wait=False, timeout=None):
Expand Down

0 comments on commit 6ea7683

Please sign in to comment.