Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix coroutine wrapper to await after exception (#7785) #7786

Conversation

Dreamsorcerer
Copy link
Member

@Dreamsorcerer Dreamsorcerer commented Nov 3, 2023

This PR fixes #7117 and similar issues. Short explanation - our coroutine wrapper does not properly handle the exception, which breaks coroutine handling. As you can see, any task expects results from throw -
https://github.com/python/cpython/blob/main/Lib/asyncio/tasks.py#L303 but it seems like in aiohttp it was acidently removed by this commit stalkerg@f04ecc2#diff-f334e752b4894ef951105572ab8b195aeb8db90eb6e48b1dfbd9a01da4c854f5L842

This is repro a case without aiohttp:

import ssl
import collections

class TestCoroWrapper(collections.abc.Coroutine):
    __slots__ = ("_coro", "_resp")
    def __init__(self, coro):
        self._coro = coro

    def __getattr__(self, attr):
        return getattr(self._coro, attr)

    def send(self, arg):
        return self._coro.send(arg)

    def throw(self, arg):
        self._coro.throw(arg)

    def close(self):
        return self._coro.close()

    def __await__(self):
        ret = self._coro.__await__()
        return ret

async def ssl_call(context):
    loop = asyncio.get_event_loop()
    return await loop.create_connection(
        lambda: asyncio.Protocol(),
        '2404:6800:4004:824::2004',
        443,
        ssl=context,
        family=socket.AddressFamily.AF_INET6,
        proto=6,
        flags=socket.AddressInfo.AI_NUMERICHOST | socket.AddressInfo.AI_NUMERICSERV,
        server_hostname='www.google.com',
        local_addr=None
    )

async def prepare_call():
    context = ssl.create_default_context()
    try:
        connection = await ssl_call(context)
    except ssl.SSLError as e:
        print(f"Got exception1: {e}")
        raise e

    return connection

async def my_task():
    try:
        await prepare_call()
    except Exception as e:
        print(f"Got exception2: {e}")

    await asyncio.sleep(1)
    raise Exception("test")

async def main():
    my_coro = TestCoroWrapper(my_task())
    print(f"is coro? {asyncio.iscoroutine(my_coro)}")
    task = asyncio.create_task(my_coro)
    await task

asyncio.run(main())

The TestCoroWrapper here is equivalent of
_BaseRequestContextManager. If you run such code like: SSL_CERT_FILE=/dev/null SSL_CERT_DIR=/dev/null python test.py you will get an error: await wasn't used with future.
The main idea here is that you are trying to await the sleep function after getting and catching an exception from the native (SSL) module.

Now I should explain why repro with aiohttp for some users return the same error:

import asyncio
import aiohttp

async def main():
    async with aiohttp.ClientSession() as session:
        try:
            response = await asyncio.ensure_future(session.get('https://www.google.com/'))
            print(await response.text())
        finally:
            response.release()

asyncio.run(main())

here it's happened because in TCPConnector._create_direct_connection we are getting all IPs for the given host and trying to connect one by one. If the first connection gets an error we will catch this error and try again for the next IP. If you have IPv6 you will have at least 2 IPs here (ipv6 and ipv4), and after the first error, you will try to connect to a second IP and get the same error.

Why it's problem only for asyncio.ensure_future? Because asyncio.ensure_future creates a task such a task starts processing coroutines and directly communicates with our coroutine wrapper witch not return a result for throw.


Co-authored-by: Sam Bull [email protected]
Co-authored-by: Sam Bull [email protected]
(cherry picked from commit a57dc31)

This PR fixes #7117 and similar issues. Short explanation - our
coroutine wrapper does not properly handle the exception, which breaks
coroutine handling. As you can see, any task expects results from
`throw` -
https://github.com/python/cpython/blob/main/Lib/asyncio/tasks.py#L303
but it seems like in aiohttp it was acidently removed by this commit
stalkerg@f04ecc2#diff-f334e752b4894ef951105572ab8b195aeb8db90eb6e48b1dfbd9a01da4c854f5L842

This is repro a case without aiohttp:
```python
import ssl
import collections

class TestCoroWrapper(collections.abc.Coroutine):
    __slots__ = ("_coro", "_resp")
    def __init__(self, coro):
        self._coro = coro

    def __getattr__(self, attr):
        return getattr(self._coro, attr)

    def send(self, arg):
        return self._coro.send(arg)

    def throw(self, arg):
        self._coro.throw(arg)

    def close(self):
        return self._coro.close()

    def __await__(self):
        ret = self._coro.__await__()
        return ret

async def ssl_call(context):
    loop = asyncio.get_event_loop()
    return await loop.create_connection(
        lambda: asyncio.Protocol(),
        '2404:6800:4004:824::2004',
        443,
        ssl=context,
        family=socket.AddressFamily.AF_INET6,
        proto=6,
        flags=socket.AddressInfo.AI_NUMERICHOST | socket.AddressInfo.AI_NUMERICSERV,
        server_hostname='www.google.com',
        local_addr=None
    )

async def prepare_call():
    context = ssl.create_default_context()
    try:
        connection = await ssl_call(context)
    except ssl.SSLError as e:
        print(f"Got exception1: {e}")
        raise e

    return connection

async def my_task():
    try:
        await prepare_call()
    except Exception as e:
        print(f"Got exception2: {e}")

    await asyncio.sleep(1)
    raise Exception("test")

async def main():
    my_coro = TestCoroWrapper(my_task())
    print(f"is coro? {asyncio.iscoroutine(my_coro)}")
    task = asyncio.create_task(my_coro)
    await task

asyncio.run(main())
```

The `TestCoroWrapper` here is equivalent of
`_BaseRequestContextManager`. If you run such code like:
`SSL_CERT_FILE=/dev/null SSL_CERT_DIR=/dev/null python test.py` you will
get an error: `await wasn't used with future`.
The main idea here is that you are trying to await the sleep function
after getting and catching an exception from the native (SSL) module.

Now I should explain why repro with aiohttp for some users return the
same error:
```python
import asyncio
import aiohttp

async def main():
    async with aiohttp.ClientSession() as session:
        try:
            response = await asyncio.ensure_future(session.get('https://www.google.com/'))
            print(await response.text())
        finally:
            response.release()

asyncio.run(main())
```

here it's happened because in `TCPConnector._create_direct_connection`
we are getting all IPs for the given host and trying to connect one by
one. If the first connection gets an error we will catch this error and
try again for the next IP. If you have IPv6 you will have at least 2 IPs
here (ipv6 and ipv4), and after the first error, you will try to connect
to a second IP and get the same error.

Why it's problem only for `asyncio.ensure_future`? Because
`asyncio.ensure_future` creates a `task` such a task starts processing
coroutines and directly communicates with our coroutine wrapper witch
not return a result for `throw`.

I will write changelog and etc after people validate this PR. But
honestly, I don't think I can write a unit test for a such case.

Regards,

- [ ] I think the code is well written
- [ ] Unit tests for the changes exist
- [ ] Documentation reflects the changes
- [ ] If you provide code modification, please add yourself to
`CONTRIBUTORS.txt`
  * The format is <Name> <Surname>.
  * Please keep alphabetical order, the file is sorted by names.
- [ ] Add a new news fragment into the `CHANGES` folder
  * name it `<issue_id>.<type>` for example (588.bugfix)
* if you don't have an `issue_id` change it to the pr id after creating
the pr
  * ensure type is one of the following:
    * `.feature`: Signifying a new feature.
    * `.bugfix`: Signifying a bug fix.
    * `.doc`: Signifying a documentation improvement.
    * `.removal`: Signifying a deprecation or removal of public API.
* `.misc`: A ticket has been closed, but it is not of interest to users.
* Make sure to use full sentences with correct case and punctuation, for
example: "Fix issue with non-ascii contents in doctest text files."

---------

Co-authored-by: Sam Bull <[email protected]>
Co-authored-by: Sam Bull <[email protected]>
(cherry picked from commit a57dc31)
@psf-chronographer psf-chronographer bot added the bot:chronographer:provided There is a change note present in this PR label Nov 3, 2023
@Dreamsorcerer Dreamsorcerer merged commit 798c25d into 3.9 Nov 3, 2023
@Dreamsorcerer Dreamsorcerer deleted the patchback/backports/3.9/a57dc3146839e2e1978b96db48e1de3af1a2bb50/pr-7785 branch November 3, 2023 14:38
xiangxli pushed a commit to xiangxli/aiohttp that referenced this pull request Dec 4, 2023
…ibs#7786)

This PR fixes aio-libs#7117 and similar issues. Short explanation - our
coroutine wrapper does not properly handle the exception, which breaks
coroutine handling. As you can see, any task expects results from
`throw` -
https://github.com/python/cpython/blob/main/Lib/asyncio/tasks.py#L303
but it seems like in aiohttp it was acidently removed by this commit
stalkerg@f04ecc2#diff-f334e752b4894ef951105572ab8b195aeb8db90eb6e48b1dfbd9a01da4c854f5L842

This is repro a case without aiohttp:
```python
import ssl
import collections

class TestCoroWrapper(collections.abc.Coroutine):
    __slots__ = ("_coro", "_resp")
    def __init__(self, coro):
        self._coro = coro

    def __getattr__(self, attr):
        return getattr(self._coro, attr)

    def send(self, arg):
        return self._coro.send(arg)

    def throw(self, arg):
        self._coro.throw(arg)

    def close(self):
        return self._coro.close()

    def __await__(self):
        ret = self._coro.__await__()
        return ret

async def ssl_call(context):
    loop = asyncio.get_event_loop()
    return await loop.create_connection(
        lambda: asyncio.Protocol(),
        '2404:6800:4004:824::2004',
        443,
        ssl=context,
        family=socket.AddressFamily.AF_INET6,
        proto=6,
        flags=socket.AddressInfo.AI_NUMERICHOST | socket.AddressInfo.AI_NUMERICSERV,
        server_hostname='www.google.com',
        local_addr=None
    )

async def prepare_call():
    context = ssl.create_default_context()
    try:
        connection = await ssl_call(context)
    except ssl.SSLError as e:
        print(f"Got exception1: {e}")
        raise e

    return connection

async def my_task():
    try:
        await prepare_call()
    except Exception as e:
        print(f"Got exception2: {e}")

    await asyncio.sleep(1)
    raise Exception("test")

async def main():
    my_coro = TestCoroWrapper(my_task())
    print(f"is coro? {asyncio.iscoroutine(my_coro)}")
    task = asyncio.create_task(my_coro)
    await task

asyncio.run(main())
```

The `TestCoroWrapper` here is equivalent of
`_BaseRequestContextManager`. If you run such code like:
`SSL_CERT_FILE=/dev/null SSL_CERT_DIR=/dev/null python test.py` you will
get an error: `await wasn't used with future`.
The main idea here is that you are trying to await the sleep function
after getting and catching an exception from the native (SSL) module.

Now I should explain why repro with aiohttp for some users return the
same error:
```python
import asyncio
import aiohttp

async def main():
    async with aiohttp.ClientSession() as session:
        try:
            response = await asyncio.ensure_future(session.get('https://www.google.com/'))
            print(await response.text())
        finally:
            response.release()

asyncio.run(main())
```

here it's happened because in `TCPConnector._create_direct_connection`
we are getting all IPs for the given host and trying to connect one by
one. If the first connection gets an error we will catch this error and
try again for the next IP. If you have IPv6 you will have at least 2 IPs
here (ipv6 and ipv4), and after the first error, you will try to connect
to a second IP and get the same error.

Why it's problem only for `asyncio.ensure_future`? Because
`asyncio.ensure_future` creates a `task` such a task starts processing
coroutines and directly communicates with our coroutine wrapper witch
not return a result for `throw`.

---------

Co-authored-by: Sam Bull <[email protected]>
Co-authored-by: Sam Bull <[email protected]>
(cherry picked from commit a57dc31)

Co-authored-by: Yury Zhuravlev <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bot:chronographer:provided There is a change note present in this PR
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants