Skip to content

Commit

Permalink
Merge pull request #496 from homm/optimize-streams
Browse files Browse the repository at this point in the history
Streams optimization
  • Loading branch information
asvetlov committed Sep 19, 2015
2 parents 7c43151 + 4cdbbbf commit 69dd2d4
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 101 deletions.
104 changes: 49 additions & 55 deletions aiohttp/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ def __init__(self, limit=DEFAULT_LIMIT, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
self._loop = loop
self._buffer = bytearray()
self._buffer = collections.deque()
self._buffer_size = 0
self._buffer_offset = 0
self._eof = False
self._waiter = None
self._eof_waiter = None
Expand Down Expand Up @@ -86,7 +88,8 @@ def feed_data(self, data):
if not data:
return

self._buffer.extend(data)
self._buffer.append(data)
self._buffer_size += len(data)
self.total_bytes += len(data)

waiter = self._waiter
Expand All @@ -110,22 +113,22 @@ def readline(self):
if self._exception is not None:
raise self._exception

line = bytearray()
line = []
line_size = 0
not_enough = True

while not_enough:
while self._buffer and not_enough:
ichar = self._buffer.find(b'\n')
if ichar < 0:
line.extend(self._buffer)
self._buffer.clear()
else:
ichar += 1
line.extend(self._buffer[:ichar])
del self._buffer[:ichar]
offset = self._buffer_offset
ichar = self._buffer[0].find(b'\n', offset) + 1
# Read from current offset to found b'\n' or to the end.
data = self._read_nowait(ichar - offset if ichar else 0)
line.append(data)
line_size += len(data)
if ichar:
not_enough = False

if len(line) > self._limit:
if line_size > self._limit:
raise ValueError('Line is too long')

if self._eof:
Expand All @@ -138,10 +141,7 @@ def readline(self):
finally:
self._waiter = None

if line:
return bytes(line)
else:
return EOF_MARKER
return b''.join(line)

@asyncio.coroutine
def read(self, n=-1):
Expand All @@ -168,38 +168,23 @@ def read(self, n=-1):
# This used to just loop creating a new waiter hoping to
# collect everything in self._buffer, but that would
# deadlock if the subprocess sends more than self.limit
# bytes. So just call self.read(self._limit) until EOF.
# bytes. So just call self.readany() until EOF.
blocks = []
while True:
block = yield from self.read(self._limit)
block = yield from self.readany()
if not block:
break
blocks.append(block)
data = b''.join(blocks)
if data:
return data
else:
return EOF_MARKER
else:
if not self._buffer and not self._eof:
self._waiter = self._create_waiter('read')
try:
yield from self._waiter
finally:
self._waiter = None
return b''.join(blocks)

if n < 0 or len(self._buffer) <= n:
data = bytes(self._buffer)
self._buffer.clear()
else:
# n > 0 and len(self._buffer) > n
data = bytes(self._buffer[:n])
del self._buffer[:n]
if not self._buffer and not self._eof:
self._waiter = self._create_waiter('read')
try:
yield from self._waiter
finally:
self._waiter = None

if data:
return data
else:
return EOF_MARKER
return self._read_nowait(n)

@asyncio.coroutine
def readany(self):
Expand All @@ -213,13 +198,7 @@ def readany(self):
finally:
self._waiter = None

data = bytes(self._buffer)
del self._buffer[:]

if data:
return data
else:
return EOF_MARKER
return self._read_nowait()

@asyncio.coroutine
def readexactly(self, n):
Expand Down Expand Up @@ -253,12 +232,28 @@ def read_nowait(self):
raise RuntimeError(
'Called while some coroutine is waiting for incoming data.')

return self._read_nowait()

def _read_nowait(self, n=None):
if not self._buffer:
return EOF_MARKER

first_buffer = self._buffer[0]
offset = self._buffer_offset
if n and len(first_buffer) - offset > n:
data = first_buffer[offset:offset + n]
self._buffer_offset += n

elif offset:
self._buffer.popleft()
data = first_buffer[offset:]
self._buffer_offset = 0

else:
data = bytes(self._buffer)
del self._buffer[:]
return data
data = self._buffer.popleft()

self._buffer_size -= len(data)
return data


class EmptyStreamReader:
Expand Down Expand Up @@ -397,17 +392,16 @@ def maybe_resume(func):
def wrapper(self, *args, **kw):
result = yield from func(self, *args, **kw)

size = len(self._buffer)
if self._stream.paused:
if size < self._b_limit:
if self._buffer_size < self._b_limit:
try:
self._stream.transport.resume_reading()
except (AttributeError, NotImplementedError):
pass
else:
self._stream.paused = False
else:
if size > self._b_limit:
if self._buffer_size > self._b_limit:
try:
self._stream.transport.pause_reading()
except (AttributeError, NotImplementedError):
Expand Down Expand Up @@ -441,7 +435,7 @@ def feed_data(self, data, size=0):
super().feed_data(data)

if (not self._stream.paused and
not has_waiter and len(self._buffer) > self._b_limit):
not has_waiter and self._buffer_size > self._b_limit):
try:
self._stream.transport.pause_reading()
except (AttributeError, NotImplementedError):
Expand Down
Loading

0 comments on commit 69dd2d4

Please sign in to comment.