-
Notifications
You must be signed in to change notification settings - Fork 142
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AttributeError: 'MemoryObjectItemReceiver' object has no attribute 'item' #754
Comments
Would you be able to create a minimal workable example that demonstrates this issue? I'm not convinced that AnyIO is the real culprit here. |
This might be helpful @agronholm fastapi/fastapi#11652 ... I"m getting the same behaviour
|
There is one place in |
This was fixed by #767. Let me know if it wasn't. |
@agronholm tested using commit |
@agronholm Can we have a new release (4.4.1) with this bug fix? |
The next release will be v4.5.0 which is only pending my finalization of a fix for #695, and the review and merging of a number of outstanding PRs. |
I've got same issue (but from different place):
seems this line also should be fixed in the same way: try:
return receiver.item
except AttributeError:
raise EndOfStream |
same error, and likely triggered due to unrelated errors on our end that trip this (fastapi route that did not wait on canceled tasks, yields, etc) |
But it was fixed already? Are you experiencing this with the latest release too? |
Yes, 4.8.0 |
Can you provide a MWE? |
Or at least a traceback? Even that might help. |
The scenario is a bit interesting, you can see some variants we're playing with here to make work:
Internally we see at least one of of the requests succeed (we force both to fire for testing purposes by inserting a delay to make first slow), but the ultimate fastapi response is not the 'first winner', but an error, and we are seeing
triggered by @app.get("/test_slow_request_fallback")
async def test_slow_request_fallback(request: Request) -> Dict:
async def call_task(force_slow: bool = False) -> Dict[str, Any]:
if force_slow:
await asyncio.sleep(10)
return await async_langchain_task()
async def call_with_retry() -> Dict[str, Any]:
first_attempt = asyncio.create_task(call_task(force_slow=True))
try:
return await asyncio.wait_for(first_attempt, timeout=g_env.DT_LLM_REDISPATCH_AFTER_WAIT_S)
except asyncio.TimeoutError:
second_attempt = asyncio.create_task(call_task())
done, pending = await asyncio.wait(
[first_attempt, second_attempt],
return_when=asyncio.FIRST_COMPLETED
)
async def cancel_pending():
for task in pending:
try:
task.cancel()
#await task
#await asyncio.wrap_future(task, return_exceptions=True)
except asyncio.CancelledError:
pass
except Exception as ex:
telemetry.exception_with('Canceling pending tasks failed, continue anyways', ex)
for task in done:
try:
if task.cancelled():
telemetry.info('Done subtask cancelled, returning anyways, type={}', type(task))
#continue
out = task.result()
telemetry.info('Subtask succeeded, type={}:\n{}\n', type(out), out)
#await cancel_pending()
return out
except Exception as ex:
telemetry.exception_with(f'Done subtask failed: {str(task)}', ex)
continue
#await cancel_pending()
raise RuntimeError("All tasks failed.")
try:
response = await call_with_retry()
return {"status": "OK"}
except Exception as e:
await log_exn(request, 'test failed', e)
return JSONResponse(
{"detail": f"Test failed: {str(e)}"}, status_code=500
) |
Related: Also currently examining |
So unlike with the original issue, this doesn't occur in the Why are you using old-style asyncio tasks instead of task groups by the way? I also see you catching and not reraising Can we have a MWE? I feel there's too much logic here to unpack. |
I'm not sure there's an AnyIO bug here. As you can see, the AttributeError is handled two lines after the exception is raised. If there's a problem on our side, it's about the |
A bit more minimal The expectation is:
@app.get("/test_async")
async def test_async(request: Request) -> Dict:
t0_s = datetime.now().timestamp()
async def call_qa(force_slow: bool = False) -> Dict[str, Any]:
if force_slow:
await asyncio.sleep(10)
async def work() -> Dict[str, Any]:
await asyncio.sleep(1)
return {'ok': True}
return await work()
async def call_with_retry() -> Dict[str, Any]:
first_attempt = asyncio.create_task(call_qa(force_slow=True))
try:
return await asyncio.wait_for(first_attempt, timeout=0.5)
except asyncio.TimeoutError:
telemetry.info('Starting retry')
second_attempt = asyncio.create_task(call_qa())
done, pending = await asyncio.wait(
[first_attempt, second_attempt],
return_when=asyncio.FIRST_COMPLETED
)
for task in done:
try:
if task.cancelled():
telemetry.info('Done subtask cancelled, returning anyways, type={}', type(task))
out = task.result()
telemetry.info('Subtask succeeded, type={}:\n{}\n', type(out), out)
return out
except Exception as ex:
telemetry.exception_with(f'Done subtask failed: {str(task)}', ex)
continue
raise RuntimeError("All tasks failed.")
try:
response = await call_with_retry()
telemetry.info('helper call response:\n{}\n', response)
assert response['ok']
now_s = datetime.now().timestamp()
assert now_s - t0_s <= 3
telemetry.info('done')
return {"status": "OK"}
except Exception as e:
return JSONResponse(
{"detail": f"test failed: {str(e)}"}, status_code=500
)
|
Ok, so do you have any evidence that there is an AnyIO bug involved? |
TBD, adding timing info shows the resumption at 0.5s is when the exn throws, so when the timedout triggers, but no longer anyio in the stack:
|
I'm guessing the issue above is partly or entirely user error: first_attempt = asyncio.create_task(call_qa(force_slow=True))
try:
return await asyncio.wait_for(first_attempt, timeout=0.5)
except asyncio.TimeoutError:
telemetry.info('Starting retry')
second_attempt = asyncio.create_task(call_qa())
done, pending = await asyncio.wait(
[first_attempt, second_attempt],
return_when=asyncio.FIRST_COMPLETED
)
for task in done:
try:
if task.cancelled():
telemetry.info('Done subtask cancelled, returning anyways, type={}', type(task))
out = task.result()
telemetry.info('Subtask succeeded, type={}:\n{}\n', type(out), out)
return out
except Exception as ex:
telemetry.exception_with(f'Done subtask failed: {str(task)}', ex)
continue
raise RuntimeError("All tasks failed.") The This was the wrong pattern for a |
Ok, may I then suggest that you investigate, and if this turns out to be an AnyIO problem (which I don't believe it is), report it as a new issue here? |
Yes, I intentionally posted as a continuation of this issue to help w SEO and not a new issue because of this murkiness |
Things to check first
I have searched the existing issues and didn't find my bug already reported there
I have checked that my bug is still present in the latest release
AnyIO version
4.4.0
Python version
3.11.9
What happened?
After bumping anyio from 4.3.0 to 4.4.0, one of our tests that depends on
starlette.testclient.TestClient
began to fail.Here is the ASGI lifespan code under test:
And here is the test itself:
Finally, here is the newly-encountered exception:
I added some
print
statements aroundanyio/from_thread.py
to see what is failing internally:How can we reproduce the bug?
Create a starlette/fastapi application with a test or otherwise that uses
starlette.testclient.TestClient
; in the ASGI lifespan async context manager, create/yield/cancel a task like so:Pin anyio to 4.3.0. The test should pass.
Bump anyio to 4.4.0. The test should now error.
The text was updated successfully, but these errors were encountered: