diff --git a/aiohttp/streams.py b/aiohttp/streams.py index 3879ee6c6e9..55dbb2cd883 100644 --- a/aiohttp/streams.py +++ b/aiohttp/streams.py @@ -27,7 +27,9 @@ def __init__(self, limit=DEFAULT_LIMIT, loop=None): if loop is None: loop = asyncio.get_event_loop() self._loop = loop - self._buffer = bytearray() + self._buffer = collections.deque() + self._buffer_size = 0 + self._buffer_offset = 0 self._eof = False self._waiter = None self._eof_waiter = None @@ -86,7 +88,8 @@ def feed_data(self, data): if not data: return - self._buffer.extend(data) + self._buffer.append(data) + self._buffer_size += len(data) self.total_bytes += len(data) waiter = self._waiter @@ -110,22 +113,22 @@ def readline(self): if self._exception is not None: raise self._exception - line = bytearray() + line = [] + line_size = 0 not_enough = True while not_enough: while self._buffer and not_enough: - ichar = self._buffer.find(b'\n') - if ichar < 0: - line.extend(self._buffer) - self._buffer.clear() - else: - ichar += 1 - line.extend(self._buffer[:ichar]) - del self._buffer[:ichar] + offset = self._buffer_offset + ichar = self._buffer[0].find(b'\n', offset) + 1 + # Read from current offset to found b'\n' or to the end. + data = self._read_nowait(ichar - offset if ichar else 0) + line.append(data) + line_size += len(data) + if ichar: not_enough = False - if len(line) > self._limit: + if line_size > self._limit: raise ValueError('Line is too long') if self._eof: @@ -138,10 +141,7 @@ def readline(self): finally: self._waiter = None - if line: - return bytes(line) - else: - return EOF_MARKER + return b''.join(line) @asyncio.coroutine def read(self, n=-1): @@ -168,38 +168,23 @@ def read(self, n=-1): # This used to just loop creating a new waiter hoping to # collect everything in self._buffer, but that would # deadlock if the subprocess sends more than self.limit - # bytes. So just call self.read(self._limit) until EOF. + # bytes. So just call self.readany() until EOF. blocks = [] while True: - block = yield from self.read(self._limit) + block = yield from self.readany() if not block: break blocks.append(block) - data = b''.join(blocks) - if data: - return data - else: - return EOF_MARKER - else: - if not self._buffer and not self._eof: - self._waiter = self._create_waiter('read') - try: - yield from self._waiter - finally: - self._waiter = None + return b''.join(blocks) - if n < 0 or len(self._buffer) <= n: - data = bytes(self._buffer) - self._buffer.clear() - else: - # n > 0 and len(self._buffer) > n - data = bytes(self._buffer[:n]) - del self._buffer[:n] + if not self._buffer and not self._eof: + self._waiter = self._create_waiter('read') + try: + yield from self._waiter + finally: + self._waiter = None - if data: - return data - else: - return EOF_MARKER + return self._read_nowait(n) @asyncio.coroutine def readany(self): @@ -213,13 +198,7 @@ def readany(self): finally: self._waiter = None - data = bytes(self._buffer) - del self._buffer[:] - - if data: - return data - else: - return EOF_MARKER + return self._read_nowait() @asyncio.coroutine def readexactly(self, n): @@ -253,12 +232,28 @@ def read_nowait(self): raise RuntimeError( 'Called while some coroutine is waiting for incoming data.') + return self._read_nowait() + + def _read_nowait(self, n=None): if not self._buffer: return EOF_MARKER + + first_buffer = self._buffer[0] + offset = self._buffer_offset + if n and len(first_buffer) - offset > n: + data = first_buffer[offset:offset + n] + self._buffer_offset += n + + elif offset: + self._buffer.popleft() + data = first_buffer[offset:] + self._buffer_offset = 0 + else: - data = bytes(self._buffer) - del self._buffer[:] - return data + data = self._buffer.popleft() + + self._buffer_size -= len(data) + return data class EmptyStreamReader: @@ -397,9 +392,8 @@ def maybe_resume(func): def wrapper(self, *args, **kw): result = yield from func(self, *args, **kw) - size = len(self._buffer) if self._stream.paused: - if size < self._b_limit: + if self._buffer_size < self._b_limit: try: self._stream.transport.resume_reading() except (AttributeError, NotImplementedError): @@ -407,7 +401,7 @@ def wrapper(self, *args, **kw): else: self._stream.paused = False else: - if size > self._b_limit: + if self._buffer_size > self._b_limit: try: self._stream.transport.pause_reading() except (AttributeError, NotImplementedError): @@ -441,7 +435,7 @@ def feed_data(self, data, size=0): super().feed_data(data) if (not self._stream.paused and - not has_waiter and len(self._buffer) > self._b_limit): + not has_waiter and self._buffer_size > self._b_limit): try: self._stream.transport.pause_reading() except (AttributeError, NotImplementedError): diff --git a/tests/test_streams.py b/tests/test_streams.py index ca68b4243af..c83540cf936 100644 --- a/tests/test_streams.py +++ b/tests/test_streams.py @@ -63,21 +63,26 @@ def cb(): def test_wait_eof_eof(self): stream = self._make_one() stream.feed_eof() + wait_task = asyncio.Task(stream.wait_eof(), loop=self.loop) self.loop.run_until_complete(wait_task) self.assertTrue(stream.is_eof()) def test_feed_empty_data(self): stream = self._make_one() - stream.feed_data(b'') - self.assertEqual(b'', stream._buffer) + stream.feed_eof() + + data = self.loop.run_until_complete(stream.read()) + self.assertEqual(b'', data) def test_feed_nonempty_data(self): stream = self._make_one() - stream.feed_data(self.DATA) - self.assertEqual(self.DATA, stream._buffer) + stream.feed_eof() + + data = self.loop.run_until_complete(stream.read()) + self.assertEqual(self.DATA, data) def test_read_zero(self): # Read zero bytes. @@ -86,7 +91,10 @@ def test_read_zero(self): data = self.loop.run_until_complete(stream.read(0)) self.assertEqual(b'', data) - self.assertEqual(self.DATA, stream._buffer) + + stream.feed_eof() + data = self.loop.run_until_complete(stream.read()) + self.assertEqual(self.DATA, data) def test_read(self): # Read bytes. @@ -99,7 +107,10 @@ def cb(): data = self.loop.run_until_complete(read_task) self.assertEqual(self.DATA, data) - self.assertEqual(b'', stream._buffer) + + stream.feed_eof() + data = self.loop.run_until_complete(stream.read()) + self.assertEqual(b'', data) def test_read_line_breaks(self): # Read bytes without line breaks. @@ -108,9 +119,10 @@ def test_read_line_breaks(self): stream.feed_data(b'line2') data = self.loop.run_until_complete(stream.read(5)) - self.assertEqual(b'line1', data) - self.assertEqual(b'line2', stream._buffer) + + data = self.loop.run_until_complete(stream.read(5)) + self.assertEqual(b'line2', data) def test_read_eof(self): # Read bytes, stop at eof. @@ -123,7 +135,8 @@ def cb(): data = self.loop.run_until_complete(read_task) self.assertEqual(b'', data) - self.assertEqual(b'', stream._buffer) + + data = self.loop.run_until_complete(stream.read()) self.assertIs(data, streams.EOF_MARKER) @mock.patch('aiohttp.streams.internal_logger') @@ -152,9 +165,10 @@ def cb(): self.loop.call_soon(cb) data = self.loop.run_until_complete(read_task) - self.assertEqual(b'chunk1\nchunk2', data) - self.assertEqual(b'', stream._buffer) + + data = self.loop.run_until_complete(stream.read()) + self.assertEqual(b'', data) def test_read_exception(self): stream = self._make_one() @@ -182,7 +196,10 @@ def cb(): line = self.loop.run_until_complete(read_task) self.assertEqual(b'chunk1 chunk2 chunk3 \n', line) - self.assertEqual(b' chunk4', stream._buffer) + + stream.feed_eof() + data = self.loop.run_until_complete(stream.read()) + self.assertEqual(b' chunk4', data) def test_readline_limit_with_existing_data(self): # Read one line. The data is in StreamReader's buffer @@ -195,21 +212,9 @@ def test_readline_limit_with_existing_data(self): self.assertRaises( ValueError, self.loop.run_until_complete, stream.readline()) # The buffer should contain the remaining data after exception - self.assertEqual(b'line2\n', stream._buffer) - - stream = streams.StreamReader(limit=3, loop=self.loop) - stream.feed_data(b'li') - stream.feed_data(b'ne1') - stream.feed_data(b'li') - - self.assertRaises( - ValueError, self.loop.run_until_complete, stream.readline()) - # No b'\n' at the end. The 'limit' is set to 3. So before - # waiting for the new data in buffer, 'readline' will consume - # the entire buffer, and since the length of the consumed data - # is more than 3, it will raise a ValueError. The buffer is - # expected to be empty now. - self.assertEqual(b'', stream._buffer) + stream.feed_eof() + data = self.loop.run_until_complete(stream.read()) + self.assertEqual(b'line2\n', data) def test_readline_limit(self): # Read one line. StreamReaders are fed with data after @@ -226,9 +231,6 @@ def cb(): self.assertRaises( ValueError, self.loop.run_until_complete, stream.readline()) - # The buffer had just one line of data, and after raising - # a ValueError it should be empty. - self.assertEqual(b'', stream._buffer) stream = self._make_one(limit=7) @@ -241,7 +243,8 @@ def cb(): self.assertRaises( ValueError, self.loop.run_until_complete, stream.readline()) - self.assertEqual(b'chunk3\n', stream._buffer) + data = self.loop.run_until_complete(stream.read()) + self.assertEqual(b'chunk3\n', data) def test_readline_nolimit_nowait(self): # All needed data for the first 'readline' call will be @@ -251,9 +254,11 @@ def test_readline_nolimit_nowait(self): stream.feed_data(self.DATA[6:]) line = self.loop.run_until_complete(stream.readline()) - self.assertEqual(b'line1\n', line) - self.assertEqual(b'line2\nline3\n', stream._buffer) + + stream.feed_eof() + data = self.loop.run_until_complete(stream.read()) + self.assertEqual(b'line2\nline3\n', data) def test_readline_eof(self): stream = self._make_one() @@ -278,9 +283,11 @@ def test_readline_read_byte_count(self): self.loop.run_until_complete(stream.readline()) data = self.loop.run_until_complete(stream.read(7)) - self.assertEqual(b'line2\nl', data) - self.assertEqual(b'ine3\n', stream._buffer) + + stream.feed_eof() + data = self.loop.run_until_complete(stream.read()) + self.assertEqual(b'ine3\n', data) def test_readline_exception(self): stream = self._make_one() @@ -292,7 +299,6 @@ def test_readline_exception(self): stream.set_exception(ValueError()) self.assertRaises( ValueError, self.loop.run_until_complete, stream.readline()) - self.assertEqual(b'', stream._buffer) def test_readexactly_zero_or_less(self): # Read exact number of bytes (zero or less). @@ -301,11 +307,18 @@ def test_readexactly_zero_or_less(self): data = self.loop.run_until_complete(stream.readexactly(0)) self.assertEqual(b'', data) - self.assertEqual(self.DATA, stream._buffer) + stream.feed_eof() + data = self.loop.run_until_complete(stream.read()) + self.assertEqual(self.DATA, data) + + stream = self._make_one() + stream.feed_data(self.DATA) data = self.loop.run_until_complete(stream.readexactly(-1)) self.assertEqual(b'', data) - self.assertEqual(self.DATA, stream._buffer) + stream.feed_eof() + data = self.loop.run_until_complete(stream.read()) + self.assertEqual(self.DATA, data) def test_readexactly(self): # Read exact number of bytes. @@ -322,7 +335,10 @@ def cb(): data = self.loop.run_until_complete(read_task) self.assertEqual(self.DATA + self.DATA, data) - self.assertEqual(self.DATA, stream._buffer) + + stream.feed_eof() + data = self.loop.run_until_complete(stream.read()) + self.assertEqual(self.DATA, data) def test_readexactly_eof(self): # Read exact number of bytes (eof). @@ -341,7 +357,8 @@ def cb(): self.assertEqual(cm.exception.expected, n) self.assertEqual(str(cm.exception), '18 bytes read on a total of 36 expected bytes') - self.assertEqual(b'', stream._buffer) + data = self.loop.run_until_complete(stream.read()) + self.assertEqual(b'', data) def test_readexactly_exception(self): stream = self._make_one() @@ -397,9 +414,10 @@ def test_readany_eof(self): self.loop.call_soon(stream.feed_data, b'chunk1\n') data = self.loop.run_until_complete(read_task) - self.assertEqual(b'chunk1\n', data) - self.assertEqual(b'', stream._buffer) + stream.feed_eof() + data = self.loop.run_until_complete(stream.read()) + self.assertEqual(b'', data) def test_readany_empty_eof(self): stream = self._make_one() @@ -424,15 +442,15 @@ def test_readany_exception(self): def test_read_nowait(self): stream = self._make_one() - stream.feed_data(b'line1\n') - stream.feed_data(b'line2\n') + stream.feed_data(b'line1\nline2\n') self.assertEqual( stream.read_nowait(), b'line1\nline2\n') self.assertIs( stream.read_nowait(), streams.EOF_MARKER) - self.assertEqual( - bytes(stream._buffer), b'') + stream.feed_eof() + data = self.loop.run_until_complete(stream.read()) + self.assertEqual(b'', data) def test_read_nowait_exception(self): stream = self._make_one()