Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

get_object is broken in case of concurrent execution. #17

Closed
oselivanov opened this issue Dec 21, 2015 · 11 comments
Closed

get_object is broken in case of concurrent execution. #17

oselivanov opened this issue Dec 21, 2015 · 11 comments

Comments

@oselivanov
Copy link

I've tried to build simplest possible web server talking to s3 and found an issue:

Run following example:

#!/usr/bin/env python3
from os import environ
from time import time

import aiobotocore
from aiohttp import web
from asyncio import get_event_loop

bucket = 'my-bucket'
key = 'my-1mb-key'

session = aiobotocore.get_session()
client = session.create_client(
    's3', region_name='us-east-1',
    aws_secret_access_key=environ['AWS_SECRET_ACCESS_KEY'],
    aws_access_key_id=environ['AWS_ACCESS_KEY_ID'])

async def get_content(key):
    obj = await client.get_object(Bucket=bucket, Key=key)
    body = await obj['Body'].read()
    return body

async def index(request):
    print('got request')
    t = time()
    results = await get_content(key)
    content = str(len(results)).encode() + b' ' + str(time() - t).encode()
    print('request done')
    return web.Response(body=content)

app = web.Application()
app.router.add_route('GET', '/', index)

loop = get_event_loop()
handler = app.make_handler()
f = loop.create_server(handler, '0.0.0.0', 8080)
srv = loop.run_until_complete(f)
print('serving on', srv.sockets[0].getsockname())
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass
finally:
    loop.run_until_complete(handler.finish_connections(1.0))
    srv.close()
    loop.run_until_complete(srv.wait_closed())
    loop.run_until_complete(app.finish())
loop.close()

Then use

    siege -c 4 -t 600s 'http://localhost:8080/'

You'll get following output:

** SIEGE 3.1.0
** Preparing 4 concurrent users for battle.
The server is now under siege...
HTTP/1.1 200   3.76 secs:      17 bytes ==> GET  /
HTTP/1.1 200   3.76 secs:      17 bytes ==> GET  /
HTTP/1.1 200   4.19 secs:      17 bytes ==> GET  /
HTTP/1.1 500   2.35 secs:     170 bytes ==> GET  /
HTTP/1.1 200   2.39 secs:      18 bytes ==> GET  /
HTTP/1.1 500   2.42 secs:     170 bytes ==> GET  /
HTTP/1.1 500   2.69 secs:     170 bytes ==> GET  /
HTTP/1.1 500   9.37 secs:     170 bytes ==> GET  /
HTTP/1.1 500   1.79 secs:     170 bytes ==> GET  /
HTTP/1.1 200   4.79 secs:      17 bytes ==> GET  /
HTTP/1.1 200   2.83 secs:      17 bytes ==> GET  /
HTTP/1.1 500   2.58 secs:     170 bytes ==> GET  /
HTTP/1.1 500   2.12 secs:     170 bytes ==> GET  /
HTTP/1.1 500   2.54 secs:     170 bytes ==> GET  /
HTTP/1.1 200   5.29 secs:      17 bytes ==> GET  /
HTTP/1.1 200   7.09 secs:      17 bytes ==> GET  /
HTTP/1.1 500   5.86 secs:     170 bytes ==> GET  /
HTTP/1.1 500   5.43 secs:     170 bytes ==> GET  /
HTTP/1.1 500   2.38 secs:     170 bytes ==> GET  /
HTTP/1.1 200   5.51 secs:      16 bytes ==> GET  /

In logs:

Error handling request
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/aiohttp/server.py", line 262, in start
    yield from self.handle_request(message, payload)
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/aiohttp/web.py", line 87, in handle_request
    resp = yield from handler(request)
  File "./py35aio.py", line 25, in index
    results = await get_content(key)
  File "./py35aio.py", line 19, in get_content
    body = await obj['Body'].read()
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/aiohttp/streams.py", line 472, in wrapper
    result = yield from func(self, *args, **kw)
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/aiohttp/streams.py", line 527, in read
    return (yield from super().read(n))
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/aiohttp/streams.py", line 247, in read
    block = yield from self.readany()
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/aiohttp/streams.py", line 472, in wrapper
    result = yield from func(self, *args, **kw)
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/aiohttp/streams.py", line 535, in readany
    return (yield from super().readany())
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/aiohttp/streams.py", line 270, in readany
    yield from self._waiter
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py", line 385, in __iter__
    yield self  # This tells Task to wait for completion.
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py", line 288, in _wakeup
    value = future.result()
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
aiohttp.errors.ServerDisconnectedError
Error handling request
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/aiohttp/server.py", line 262, in start
    yield from self.handle_request(message, payload)
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/aiohttp/web.py", line 87, in handle_request
    resp = yield from handler(request)
  File "./py35aio.py", line 25, in index
    results = await get_content(key)
  File "./py35aio.py", line 19, in get_content
    body = await obj['Body'].read()
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/aiohttp/streams.py", line 472, in wrapper
    result = yield from func(self, *args, **kw)
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/aiohttp/streams.py", line 527, in read
    return (yield from super().read(n))
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/aiohttp/streams.py", line 247, in read
    block = yield from self.readany()
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/aiohttp/streams.py", line 472, in wrapper
    result = yield from func(self, *args, **kw)
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/aiohttp/streams.py", line 535, in readany
    return (yield from super().readany())
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/aiohttp/streams.py", line 270, in readany
    yield from self._waiter
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py", line 385, in __iter__
    yield self  # This tells Task to wait for completion.
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py", line 288, in _wakeup
    value = future.result()
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
aiohttp.errors.ServerDisconnectedError

It seems that in case of multiple requests obj['Body'] might be broken.

    obj = await client.get_object(Bucket=bucket, Key=key)
    body = await obj['Body'].read()

Even if you try to move this block

session = aiobotocore.get_session()
client = session.create_client(
    's3', region_name='us-east-1',
    aws_secret_access_key=environ['AWS_SECRET_ACCESS_KEY'],
    aws_access_key_id=environ['AWS_ACCESS_KEY_ID'])

inside of the "get_content" function it will act the same.

@jettify
Copy link
Member

jettify commented Dec 21, 2015

Interesting, I will take a look.

On Mon, 21 Dec 2015 at 09:12 Oleg Selivanov [email protected]
wrote:

I've tried to build simplest possible web server talking to s3 and found
an issue:

Run following example:

#!/usr/bin/env python3
from os import environ
from time import time

import aiobotocore
from aiohttp import web
from asyncio import get_event_loop

bucket = 'my-bucket'
key = 'my-1mb-key'

session = aiobotocoreget_session()
client = sessioncreate_client(
's3', region_name='us-east-1',
aws_secret_access_key=environ['AWS_SECRET_ACCESS_KEY'],
aws_access_key_id=environ['AWS_ACCESS_KEY_ID'])

async def get_content(key):
obj = await clientget_object(Bucket=bucket, Key=key)
body = await obj['Body']read()
return body

async def index(request):
print('got request')
t = time()
results = await get_content(key)
content = str(len(results))encode() + b' ' + str(time() - t)encode()
print('request done')
return webResponse(body=content)

app = webApplication()
approuteradd_route('GET', '/', index)

loop = get_event_loop()
handler = appmake_handler()
f = loopcreate_server(handler, '0000', 8080)
srv = looprun_until_complete(f)
print('serving on', srvsockets[0]getsockname())
try:
looprun_forever()
except KeyboardInterrupt:
pass
finally:
looprun_until_complete(handlerfinish_connections(10))
srvclose()
looprun_until_complete(srvwait_closed())
looprun_until_complete(appfinish())
loopclose()

Then use

siege -c 4 -t 600s 'http://localhost:8080/'

You'll get following output:

** SIEGE 310
** Preparing 4 concurrent users for battle
The server is now under siege
HTTP/11 200 376 secs: 17 bytes ==> GET /
HTTP/11 200 376 secs: 17 bytes ==> GET /
HTTP/11 200 419 secs: 17 bytes ==> GET /
HTTP/11 500 235 secs: 170 bytes ==> GET /
HTTP/11 200 239 secs: 18 bytes ==> GET /
HTTP/11 500 242 secs: 170 bytes ==> GET /
HTTP/11 500 269 secs: 170 bytes ==> GET /
HTTP/11 500 937 secs: 170 bytes ==> GET /
HTTP/11 500 179 secs: 170 bytes ==> GET /
HTTP/11 200 479 secs: 17 bytes ==> GET /
HTTP/11 200 283 secs: 17 bytes ==> GET /
HTTP/11 500 258 secs: 170 bytes ==> GET /
HTTP/11 500 212 secs: 170 bytes ==> GET /
HTTP/11 500 254 secs: 170 bytes ==> GET /
HTTP/11 200 529 secs: 17 bytes ==> GET /
HTTP/11 200 709 secs: 17 bytes ==> GET /
HTTP/11 500 586 secs: 170 bytes ==> GET /
HTTP/11 500 543 secs: 170 bytes ==> GET /
HTTP/11 500 238 secs: 170 bytes ==> GET /
HTTP/11 200 551 secs: 16 bytes ==> GET /

In logs:

Error handling request
Traceback (most recent call last):
File "/Library/Frameworks/Pythonframework/Versions/35/lib/python35/site-packages/aiohttp/serverpy", line 262, in start
yield from selfhandle_request(message, payload)
File "/Library/Frameworks/Pythonframework/Versions/35/lib/python35/site-packages/aiohttp/webpy", line 87, in handle_request
resp = yield from handler(request)
File "/py35aiopy", line 25, in index
results = await get_content(key)
File "/py35aiopy", line 19, in get_content
body = await obj['Body']read()
File "/Library/Frameworks/Pythonframework/Versions/35/lib/python35/site-packages/aiohttp/streamspy", line 472, in wrapper
result = yield from func(self, _args, *_kw)
File "/Library/Frameworks/Pythonframework/Versions/35/lib/python35/site-packages/aiohttp/streamspy", line 527, in read
return (yield from super()read(n))
File "/Library/Frameworks/Pythonframework/Versions/35/lib/python35/site-packages/aiohttp/streamspy", line 247, in read
block = yield from selfreadany()
File "/Library/Frameworks/Pythonframework/Versions/35/lib/python35/site-packages/aiohttp/streamspy", line 472, in wrapper
result = yield from func(self, _args, *_kw)
File "/Library/Frameworks/Pythonframework/Versions/35/lib/python35/site-packages/aiohttp/streamspy", line 535, in readany
return (yield from super()readany())
File "/Library/Frameworks/Pythonframework/Versions/35/lib/python35/site-packages/aiohttp/streamspy", line 270, in readany
yield from self_waiter
File "/Library/Frameworks/Pythonframework/Versions/35/lib/python35/asyncio/futurespy", line 385, in iter
yield self # This tells Task to wait for completion
File "/Library/Frameworks/Pythonframework/Versions/35/lib/python35/asyncio/taskspy", line 288, in _wakeup
value = futureresult()
File "/Library/Frameworks/Pythonframework/Versions/35/lib/python35/asyncio/futurespy", line 274, in result
raise self_exception
aiohttperrorsServerDisconnectedError
Error handling request
Traceback (most recent call last):
File "/Library/Frameworks/Pythonframework/Versions/35/lib/python35/site-packages/aiohttp/serverpy", line 262, in start
yield from selfhandle_request(message, payload)
File "/Library/Frameworks/Pythonframework/Versions/35/lib/python35/site-packages/aiohttp/webpy", line 87, in handle_request
resp = yield from handler(request)
File "/py35aiopy", line 25, in index
results = await get_content(key)
File "/py35aiopy", line 19, in get_content
body = await obj['Body']read()
File "/Library/Frameworks/Pythonframework/Versions/35/lib/python35/site-packages/aiohttp/streamspy", line 472, in wrapper
result = yield from func(self, _args, *_kw)
File "/Library/Frameworks/Pythonframework/Versions/35/lib/python35/site-packages/aiohttp/streamspy", line 527, in read
return (yield from super()read(n))
File "/Library/Frameworks/Pythonframework/Versions/35/lib/python35/site-packages/aiohttp/streamspy", line 247, in read
block = yield from selfreadany()
File "/Library/Frameworks/Pythonframework/Versions/35/lib/python35/site-packages/aiohttp/streamspy", line 472, in wrapper
result = yield from func(self, _args, *_kw)
File "/Library/Frameworks/Pythonframework/Versions/35/lib/python35/site-packages/aiohttp/streamspy", line 535, in readany
return (yield from super()readany())
File "/Library/Frameworks/Pythonframework/Versions/35/lib/python35/site-packages/aiohttp/streamspy", line 270, in readany
yield from self_waiter
File "/Library/Frameworks/Pythonframework/Versions/35/lib/python35/asyncio/futurespy", line 385, in iter
yield self # This tells Task to wait for completion
File "/Library/Frameworks/Pythonframework/Versions/35/lib/python35/asyncio/taskspy", line 288, in _wakeup
value = futureresult()
File "/Library/Frameworks/Pythonframework/Versions/35/lib/python35/asyncio/futurespy", line 274, in result
raise self_exception
aiohttperrorsServerDisconnectedError

It seems that in case of multiple requests obj['Body'] might be broken

obj = await clientget_object(Bucket=bucket, Key=key)
body = await obj['Body']read()

Even if you try to move this block

session = aiobotocoreget_session()
client = sessioncreate_client(
's3', region_name='us-east-1',
aws_secret_access_key=environ['AWS_SECRET_ACCESS_KEY'],
aws_access_key_id=environ['AWS_ACCESS_KEY_ID'])

inside of the "get_content" function it will act the same


Reply to this email directly or view it on GitHub
#17.

@jettify
Copy link
Member

jettify commented Dec 21, 2015

My be problem is related to this issue: aio-libs/aiohttp#631 ?

@jettify
Copy link
Member

jettify commented Dec 21, 2015

could you try same thing with presigned URL to confirm?

@oselivanov
Copy link
Author

I have no time to test it these days, maybe later. But assumption that aws breaks in case of 2 concurrent requests per second sounds awkward.
I just know when we use vanilla boto in production in thread pool with a lot of concurrent requests it works just fine.

@oselivanov
Copy link
Author

Ok, I hacked some aiobotocore and botocore code and confirmed a reason of the issue. aiobotocore is currently broken by design for streaming requests (probably all requests greater than N Kb).

It works like that:

  1. User sends request and got response and StreamReader
  2. User sends second request, StreamReader of first request is no longer valid since this second request uses same connection.
  3. User tries to read data from StreamReader, stuff breaks.

Possible solutions are:

  1. Remove lazy load.
  2. Implement connection pool.

I'd do both in that order. First to temporary fix an issue, second to get proper long-term solution.

@oselivanov
Copy link
Author

Solution # 1
#18

@jettify
Copy link
Member

jettify commented Dec 30, 2015

Interesting, aiobotocore does use pool but unbounded one, aiohttp.ClientSession implements connection pool using aiohttp.TCPConnector....

@jettify
Copy link
Member

jettify commented Dec 30, 2015

so for request connection acquired from pool, and released once resp.read() called, from this point of view your PR just does releasing connection sooner.

@oselivanov
Copy link
Author

It didn't dig that deep and unfortunately don't know internals of botocore, so cannot judge.

@jettify
Copy link
Member

jettify commented Feb 1, 2016

Should be fixed now, by #19, I tested manually with you script looks like working. Issue was super tricky, due to garbage collector collected response as result FlowControlStreamReader stoped working correctly.

Thanks you for finding and helping to fix this issue!

@jettify jettify closed this as completed Feb 1, 2016
@oselivanov
Copy link
Author

Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants