-
Notifications
You must be signed in to change notification settings - Fork 141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixed feed_data after feed_eof assertion errors on asyncio #752
Conversation
Test №1 process.py import time
import random
while True:
print(random.random(), flush=True)
time.sleep(0.01) test_subprocess.py import sys
from pathlib import Path
import anyio
from anyio.streams.buffered import BufferedByteReceiveStream
async def main():
_script_path = str(Path(__file__).parent / "process.py")
async with await anyio.open_process([sys.executable, _script_path]) as process:
stream = BufferedByteReceiveStream(process.stdout)
while True:
try:
output = await stream.receive_until(b"\n", 6)
print(output, flush=True)
except (anyio.ClosedResourceError, anyio.BrokenResourceError, anyio.EndOfStream):
break
if __name__ == "__main__":
anyio.run(main) WITHOUT fix -- starts producing these after some time
WITH fix These results are consistent between different runs of tests |
Test №2 Let's simulate exception in user's code, by raising IndexError in test_subprocess.py import sys
from pathlib import Path
from typing import Optional
from anyio.abc import Process
import random
import anyio
from anyio.streams.buffered import BufferedByteReceiveStream
class Generator:
def __init__(self):
self._proc: Optional[Process] = None
async def __aenter__(self):
self._proc = await anyio.open_process([
sys.executable,
str(Path(__file__).parent / "process.py")
])
self._stream = BufferedByteReceiveStream(self._proc.stdout)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self._proc.aclose()
return False
def __aiter__(self):
return self
async def __anext__(self):
while True:
try:
output = await self._stream.receive_until(b"\n", 65535)
except (anyio.EndOfStream, anyio.IncompleteRead, anyio.ClosedResourceError):
raise StopAsyncIteration
# Simulate exception in user's code
if random.random() < 0.02:
raise IndexError
return output
async def main():
_script_path = str(Path(__file__).parent / "process.py")
async with Generator() as gen:
async for item in gen:
print(item, flush=True)
if __name__ == "__main__":
anyio.run(main) WITHOUT fix -- spam of WITH fix -- process just hangs and outputs nothing This is consistent in Windows and Linux |
I think, the second test is not linked to the It turns out, that (in the case, if error pops in user's code and not in subprocess) you need to explicitly terminate or kill process before calling If we substitute async def __aexit__(self, exc_type, exc_val, exc_tb):
self._proc.terminate()
await self._proc.aclose()
return False I've written tests for anyio (including with/without |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good!
Shutdown logic is always tricky, at least until someone very carefully implments it as a context manager 😅
@graingert correctly pointed out that if the stream is aclosed while another task is reading from it, it will hang, as there is no signal to stop waiting on the read. I'm struggling to write a proper test for this though. |
I've pushed an alternate solution, and an accompanying test to cover the case pointed out by @graingert. |
Grumbles about asyncio streams |
Changes
This removes the call to
self._stream.feed_eof()
, as it should not be done when the protocol is still sending data (and we can't prevent it from doing so). Instead, we set an internal flag and raiseClosedResourceError
if someone tries to read from the stream after it's closed.Fixes #490.
Checklist
If this is a user-facing code change, like a bugfix or a new feature, please ensure that
you've fulfilled the following conditions (where applicable):
tests/
) added which would fail without your patchdocs/
, in case of behavior changes or newfeatures)
docs/versionhistory.rst
).If this is a trivial change, like a typo fix or a code reformatting, then you can ignore
these instructions.
Updating the changelog
If there are no entries after the last release, use
**UNRELEASED**
as the version.If, say, your patch fixes issue #123, the entry should look like this:
* Fix big bad boo-boo in task groups (#123 <https://github.com/agronholm/anyio/issues/123>_; PR by @yourgithubaccount)
If there's no issue linked, just link to your pull request instead by updating the
changelog after you've created the PR.