Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AttributeError: 'MemoryObjectItemReceiver' object has no attribute 'item' #754

Closed
2 tasks done
mecampbellsoup opened this issue Jul 1, 2024 · 23 comments
Closed
2 tasks done
Labels
bug Something isn't working

Comments

@mecampbellsoup
Copy link

Things to check first

  • I have searched the existing issues and didn't find my bug already reported there

  • I have checked that my bug is still present in the latest release

AnyIO version

4.4.0

Python version

3.11.9

What happened?

After bumping anyio from 4.3.0 to 4.4.0, one of our tests that depends on starlette.testclient.TestClient began to fail.

Here is the ASGI lifespan code under test:

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    async with database, task_queue.open_async():
        # TODO only run this worker if there is no separate, dedicated worker process
        worker = asyncio.create_task(
            task_queue.run_worker_async(install_signal_handlers=False)
        )

        yield

        worker.cancel()
        try:
            await asyncio.wait_for(worker, timeout=10)
        except TimeoutError:
            logger.error("Ungraceful shutdown")
        except asyncio.CancelledError:
            logger.error("Graceful shutdown")

And here is the test itself:

@patch("cloud_console.server.app.verify_clients")
@patch("cloud_console.server.app.database")
async def test_initialize_app_success(
    mock_database: AsyncMock, mock_verify_clients: AsyncMock
):
    mock_database.__aenter__ = AsyncMock()
    mock_database.__aexit__ = AsyncMock()
    app: FastAPI = initialize_app()

    with TestClient(app):
        pass

    mock_verify_clients.assert_awaited_once()
    mock_database.__aenter__.assert_awaited_once()
    mock_database.__aexit__.assert_awaited_once()

Finally, here is the newly-encountered exception:

===================================================================================================================== FAILURES ======================================================================================================================
____________________________________________________________________________________________________________ test_initialize_app_success ____________________________________________________________________________________________________________

mock_database = <MagicMock name='database' id='130423892726736'>, mock_verify_clients = <AsyncMock name='verify_clients' id='130423892725968'>

    @patch("cloud_console.server.app.verify_clients")
    @patch("cloud_console.server.app.database")
    async def test_initialize_app_success(
        mock_database: AsyncMock, mock_verify_clients: AsyncMock
    ):
        mock_database.__aenter__ = AsyncMock()
        mock_database.__aexit__ = AsyncMock()
        app: FastAPI = initialize_app()

>       with TestClient(app):

tests/server/test_app.py:20:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.venv/lib/python3.11/site-packages/starlette/testclient.py:798: in __exit__
    self.exit_stack.close()
../../../.pyenv/versions/3.11.9/lib/python3.11/contextlib.py:609: in close
    self.__exit__(None, None, None)
../../../.pyenv/versions/3.11.9/lib/python3.11/contextlib.py:601: in __exit__
    raise exc_details[1]
../../../.pyenv/versions/3.11.9/lib/python3.11/contextlib.py:586: in __exit__
    if cb(*exc_details):
../../../.pyenv/versions/3.11.9/lib/python3.11/contextlib.py:469: in _exit_wrapper
    callback(*args, **kwds)
.venv/lib/python3.11/site-packages/starlette/testclient.py:791: in wait_shutdown
    portal.call(self.wait_shutdown)
.venv/lib/python3.11/site-packages/anyio/from_thread.py:287: in call
    return cast(T_Retval, self.start_task_soon(func, *args).result())
../../../.pyenv/versions/3.11.9/lib/python3.11/concurrent/futures/_base.py:456: in result
    return self.__get_result()
../../../.pyenv/versions/3.11.9/lib/python3.11/concurrent/futures/_base.py:401: in __get_result
    raise self._exception
.venv/lib/python3.11/site-packages/anyio/from_thread.py:218: in _call_func
    retval = await retval_or_awaitable
.venv/lib/python3.11/site-packages/starlette/testclient.py:833: in wait_shutdown
    message = await receive()
.venv/lib/python3.11/site-packages/starlette/testclient.py:828: in receive
    self.task.result()
../../../.pyenv/versions/3.11.9/lib/python3.11/concurrent/futures/_base.py:449: in result
    return self.__get_result()
../../../.pyenv/versions/3.11.9/lib/python3.11/concurrent/futures/_base.py:401: in __get_result
    raise self._exception
.venv/lib/python3.11/site-packages/anyio/from_thread.py:218: in _call_func
    retval = await retval_or_awaitable
.venv/lib/python3.11/site-packages/starlette/testclient.py:803: in lifespan
    await self.app(scope, self.stream_receive.receive, self.stream_send.send)
.venv/lib/python3.11/site-packages/fastapi/applications.py:1054: in __call__
    await super().__call__(scope, receive, send)
.venv/lib/python3.11/site-packages/sentry_sdk/integrations/starlette.py:374: in _sentry_patched_asgi_app
    return await middleware(scope, receive, send)
.venv/lib/python3.11/site-packages/sentry_sdk/integrations/asgi.py:152: in _run_asgi3
    return await self._run_app(scope, receive, send, asgi_version=3)
.venv/lib/python3.11/site-packages/sentry_sdk/integrations/asgi.py:167: in _run_app
    raise exc from None
.venv/lib/python3.11/site-packages/sentry_sdk/integrations/asgi.py:163: in _run_app
    return await self.app(scope, receive, send)
.venv/lib/python3.11/site-packages/starlette/applications.py:123: in __call__
    await self.middleware_stack(scope, receive, send)
.venv/lib/python3.11/site-packages/sentry_sdk/integrations/starlette.py:169: in _create_span_call
    return await old_call(app, scope, new_receive, new_send, **kwargs)
.venv/lib/python3.11/site-packages/starlette/middleware/errors.py:151: in __call__
    await self.app(scope, receive, send)
.venv/lib/python3.11/site-packages/sentry_sdk/integrations/starlette.py:268: in _sentry_exceptionmiddleware_call
    await old_call(self, scope, receive, send)
.venv/lib/python3.11/site-packages/sentry_sdk/integrations/starlette.py:169: in _create_span_call
    return await old_call(app, scope, new_receive, new_send, **kwargs)
.venv/lib/python3.11/site-packages/starlette/middleware/exceptions.py:51: in __call__
    await self.app(scope, receive, send)
.venv/lib/python3.11/site-packages/starlette/routing.py:756: in __call__
    await self.middleware_stack(scope, receive, send)
.venv/lib/python3.11/site-packages/starlette/routing.py:765: in app
    await self.lifespan(scope, receive, send)
.venv/lib/python3.11/site-packages/starlette/routing.py:750: in lifespan
    await send({"type": "lifespan.shutdown.complete"})
.venv/lib/python3.11/site-packages/sentry_sdk/integrations/starlette.py:159: in _sentry_send
    description=getattr(send, "__qualname__", str(send)),
../../../.pyenv/versions/3.11.9/lib/python3.11/dataclasses.py:240: in wrapper
    result = user_function(self)
<string>:3: in __repr__
    ???
../../../.pyenv/versions/3.11.9/lib/python3.11/dataclasses.py:240: in wrapper
    result = user_function(self)
<string>:3: in __repr__
    ???
../../../.pyenv/versions/3.11.9/lib/python3.11/dataclasses.py:240: in wrapper
    result = user_function(self)
<string>:3: in __repr__
    ???
../../../.pyenv/versions/3.11.9/lib/python3.11/dataclasses.py:240: in wrapper
    result = user_function(self)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = MemoryObjectItemReceiver(task_info=AsyncIOTaskInfo(id=130423912790720, name='anyio.from_thread.BlockingPortal._call_func'), item=None)

>   ???
E   AttributeError: 'MemoryObjectItemReceiver' object has no attribute 'item'

<string>:3: AttributeError
--------------------------------------------------------------------------------------------------------------- Captured stderr call ----------------------------------------------------------------------------------------------------------------
ERROR:cloud_console.server.app:Graceful shutdown
----------------------------------------------------------------------------------------------------------------- Captured log call -----------------------------------------------------------------------------------------------------------------
ERROR    cloud_console.server.app:app.py:42 Graceful shutdown

I added some print statements around anyio/from_thread.py to see what is failing internally:

----------------------------------------------------------------------------------------------------------------------------- Captured stdout call -----------------------------------------------------------------------------------------------------------------------------
retval_or_awaitable <class 'coroutine'> <coroutine object TestClient.lifespan at 0x70a707fcb840>
scope {'type': 'lifespan', 'state': {}, 'app': <fastapi.applications.FastAPI object at 0x70a7080e7d10>, 'router': <fastapi.routing.APIRouter object at 0x70a707f68590>} <class 'dict'>
receive <function _enable_span_for_middleware.<locals>._create_span_call.<locals>._sentry_receive at 0x70a707e4d260> <class 'function'>
send <function _enable_span_for_middleware.<locals>._create_span_call.<locals>._sentry_send at 0x70a707e4d760> <class 'function'>
retval_or_awaitable <class 'coroutine'> <coroutine object TestClient.wait_startup at 0x70a707f8b140>
ret <class 'procrastinate.utils.AwaitableContext'> <procrastinate.utils.AwaitableContext object at 0x70a707e5a990>
yielding...
retval <class 'NoneType'> None
retval_or_awaitable <class 'coroutine'> <coroutine object TestClient.wait_shutdown at 0x70a707fa7c40>
done yielding.
retval_or_awaitable <class 'coroutine'> <coroutine object BlockingPortal.stop at 0x70a707f039f0>
retval <class 'NoneType'> None

How can we reproduce the bug?

Create a starlette/fastapi application with a test or otherwise that uses starlette.testclient.TestClient; in the ASGI lifespan async context manager, create/yield/cancel a task like so:

        worker = asyncio.create_task(
            task_queue.run_worker_async(install_signal_handlers=False)
        )

        yield

        worker.cancel()

Pin anyio to 4.3.0. The test should pass.
Bump anyio to 4.4.0. The test should now error.

@mecampbellsoup mecampbellsoup added the bug Something isn't working label Jul 1, 2024
@agronholm
Copy link
Owner

Would you be able to create a minimal workable example that demonstrates this issue? I'm not convinced that AnyIO is the real culprit here.

@lordoffreaks
Copy link

This might be helpful @agronholm fastapi/fastapi#11652 ... I"m getting the same behaviour

Pin anyio to 4.3.0. The test should pass.
Bump anyio to 4.4.0. The test should now error.

@agronholm
Copy link
Owner

There is one place in MemoryObjectReceiveStream where such an AttributeError occurs, but it is caught and an EndOfStream is raised in its place. So it doesn't make sense that the AttributeError would be uncaught, as is implied in this bug report.

@agronholm
Copy link
Owner

This was fixed by #767. Let me know if it wasn't.

@lordoffreaks
Copy link

@agronholm tested using commit f6750c802e862941fd356bae28753c441aea2ab5 and works perfect, thanks a lot for the work!!!

@williamjamir
Copy link

@agronholm Can we have a new release (4.4.1) with this bug fix?

@agronholm
Copy link
Owner

The next release will be v4.5.0 which is only pending my finalization of a fix for #695, and the review and merging of a number of outstanding PRs.

@gorkunov
Copy link

gorkunov commented Dec 17, 2024

I've got same issue (but from different place):

AttributeError: 'MemoryObjectItemReceiver' object has no attribute 'item'

return receiver.item

seems this line also should be fixed in the same way:

try:
   return receiver.item
except AttributeError:
   raise EndOfStream

@lmeyerov
Copy link

same error, and likely triggered due to unrelated errors on our end that trip this (fastapi route that did not wait on canceled tasks, yields, etc)

@agronholm
Copy link
Owner

But it was fixed already? Are you experiencing this with the latest release too?

@lmeyerov
Copy link

Yes, 4.8.0

@agronholm
Copy link
Owner

Can you provide a MWE?

@agronholm
Copy link
Owner

Or at least a traceback? Even that might help.

@lmeyerov
Copy link

lmeyerov commented Jan 12, 2025

The scenario is a bit interesting, you can see some variants we're playing with here to make work:

  • Goal: We fire some fastapi async request route, and if taking too long, it retries the slow step, and take the first winner
  • Annoyance: Langchain async await chain.arun is involved, and langchain asyncio is shaky

Internally we see at least one of of the requests succeed (we force both to fire for testing purposes by inserting a delay to make first slow), but the ultimate fastapi response is not the 'first winner', but an error, and we are seeing anyio on the exception traces involved. Unclear who is the culprit tho, including user error: you can see variants we're playing with in comments.

api-1  | Traceback (most recent call last):
api-1  |   File "/opt/conda/envs/louie/lib/python3.10/site-packages/anyio/streams/memory.py", line 111, in receive
api-1  |     return self.receive_nowait()
api-1  |   File "/opt/conda/envs/louie/lib/python3.10/site-packages/anyio/streams/memory.py", line 106, in receive_nowait
api-1  |     raise WouldBlock
api-1  | anyio.WouldBlock
api-1  | 
api-1  | During handling of the above exception, another exception occurred:
api-1  | 
api-1  | Traceback (most recent call last):
api-1  |   File "/opt/conda/envs/louie/lib/python3.10/site-packages/anyio/streams/memory.py", line 124, in receive
api-1  |     return receiver.item
api-1  | AttributeError: 'MemoryObjectItemReceiver' object has no attribute 'item'
api-1  | 
api-1  | During handling of the above exception, another exception occurred:
api-1  | 
api-1  | Traceback (most recent call last):
api-1  |   File "/opt/conda/envs/louie/lib/python3.10/site-packages/starlette/middleware/base.py", line 157, in call_next
api-1  |     message = await recv_stream.receive()
api-1  |   File "/opt/conda/envs/louie/lib/python3.10/site-packages/anyio/streams/memory.py", line 126, in receive
api-1  |     raise EndOfStream
api-1  | anyio.EndOfStream
api-1  | 
api-1  | During handling of the above exception, another exception occurred:
api-1  | 
api-1  | Traceback (most recent call last):
api-1  |   File "/app/graphistrygpt/api/routes/security.py", line 61, in rate_limit_middleware
api-1  |     response = await call_next(request)
api-1  |   File "/opt/conda/envs/louie/lib/python3.10/site-packages/starlette/middleware/base.py", line 164, in call_next
api-1  |     raise RuntimeError("No response returned.")
api-1  | RuntimeError: No response returned.

triggered by

    @app.get("/test_slow_request_fallback")
    async def test_slow_request_fallback(request: Request) -> Dict:

        async def call_task(force_slow: bool = False) -> Dict[str, Any]:
            if force_slow:
                await asyncio.sleep(10)
            return await async_langchain_task()
        
        async def call_with_retry() -> Dict[str, Any]:

            first_attempt = asyncio.create_task(call_task(force_slow=True))

            try:
                return await asyncio.wait_for(first_attempt, timeout=g_env.DT_LLM_REDISPATCH_AFTER_WAIT_S)
            except asyncio.TimeoutError:
                second_attempt = asyncio.create_task(call_task())
                done, pending = await asyncio.wait(
                    [first_attempt, second_attempt],
                    return_when=asyncio.FIRST_COMPLETED
                )

                async def cancel_pending():
                    for task in pending:
                        try:
                            task.cancel()
                            #await task
                            #await asyncio.wrap_future(task, return_exceptions=True)
                        except asyncio.CancelledError:
                            pass
                        except Exception as ex:
                            telemetry.exception_with('Canceling pending tasks failed, continue anyways', ex)
                for task in done:
                    try:
                        if task.cancelled():
                            telemetry.info('Done subtask cancelled, returning anyways, type={}', type(task))
                            #continue
                        out = task.result()
                        telemetry.info('Subtask succeeded, type={}:\n{}\n', type(out), out)
                        #await cancel_pending()
                        return out
                    except Exception as ex:
                        telemetry.exception_with(f'Done subtask failed: {str(task)}', ex)
                        continue

                #await cancel_pending()
                raise RuntimeError("All tasks failed.")

        try:
            response = await call_with_retry()
            return {"status": "OK"}
        except Exception as e:
            await log_exn(request, 'test failed', e)
            return JSONResponse(
                {"detail": f"Test failed: {str(e)}"}, status_code=500
            )

@lmeyerov
Copy link

Related: Also currently examining uvicorn versions, as we updated fastapi to latest, and uvicorn next, to help eliminate mismatches

@agronholm
Copy link
Owner

So unlike with the original issue, this doesn't occur in the __repr__().

Why are you using old-style asyncio tasks instead of task groups by the way? I also see you catching and not reraising asyncio.CancelledError, and that is bad! It messes up asyncio's cancel counters.

Can we have a MWE? I feel there's too much logic here to unpack.

@agronholm
Copy link
Owner

I'm not sure there's an AnyIO bug here. As you can see, the AttributeError is handled two lines after the exception is raised. If there's a problem on our side, it's about the EndOfStream containing the AttributeError in its context. We should probably do away with that, as it might be confusing (as proven here).

@lmeyerov
Copy link

lmeyerov commented Jan 12, 2025

A bit more minimal

The expectation is:

  • call_qa(force_slow=True) starts
  • after 0.5s, call_qa(force_slow=False) starts
  • the second call finishes, route returns in 1s
  • the first call finishes/cancels
    @app.get("/test_async")
    async def test_async(request: Request) -> Dict:

        t0_s = datetime.now().timestamp()


        async def call_qa(force_slow: bool = False) -> Dict[str, Any]:
            if force_slow:
                await asyncio.sleep(10)

            async def work() -> Dict[str, Any]:
                await asyncio.sleep(1)
                return {'ok': True}

            return await work()

        async def call_with_retry() -> Dict[str, Any]:

            first_attempt = asyncio.create_task(call_qa(force_slow=True))

            try:
                return await asyncio.wait_for(first_attempt, timeout=0.5)
            except asyncio.TimeoutError:
                telemetry.info('Starting retry')
                second_attempt = asyncio.create_task(call_qa())
                done, pending = await asyncio.wait(
                    [first_attempt, second_attempt],
                    return_when=asyncio.FIRST_COMPLETED
                )

                for task in done:
                    try:
                        if task.cancelled():
                            telemetry.info('Done subtask cancelled, returning anyways, type={}', type(task))
                        out = task.result()
                        telemetry.info('Subtask succeeded, type={}:\n{}\n', type(out), out)
                        return out
                    except Exception as ex:
                        telemetry.exception_with(f'Done subtask failed: {str(task)}', ex)
                        continue

                raise RuntimeError("All tasks failed.")

        try:
            response = await call_with_retry()
            telemetry.info('helper call response:\n{}\n', response)

            assert response['ok']

            now_s = datetime.now().timestamp()
            assert now_s - t0_s <= 3

            telemetry.info('done')
            return {"status": "OK"}
        except Exception as e:
            return JSONResponse(
                {"detail": f"test failed: {str(e)}"}, status_code=500
            )
api-1  | DEBUG /app/graphistrygpt/api/api.py:73 Incoming request @a9d20bb78f8643b9: GET http://localhost:8000/api/test_async
api-1  | DEBUG /app/graphistrygpt/api/api.py:78 span event: api_request_GET_/api/test_async 
api-1  | INFO /app/graphistrygpt/plugins/dt/router.py:1158 Starting retry
api-1  | INFO /app/graphistrygpt/plugins/dt/router.py:1178 Done subtask cancelled, returning anyways, type=<class '_asyncio.Task'>
api-1  | [2025-01-12 21:48:07 +0000] [657] [ERROR] Exception in ASGI application
api-1  | Traceback (most recent call last):
api-1  |   File "/app/graphistrygpt/plugins/dt/router.py", line 1143, in call_qa
api-1  |     await asyncio.sleep(10)
api-1  |   File "/opt/conda/envs/louie/lib/python3.10/asyncio/tasks.py", line 605, in sleep
api-1  |     return await future
api-1  | asyncio.exceptions.CancelledError
api-1  | 
api-1  | During handling of the above exception, another exception occurred:
api-1  | 
api-1  | Traceback (most recent call last):
api-1  |   File "/opt/conda/envs/louie/lib/python3.10/site-packages/uvicorn/protocols/http/httptools_impl.py", line 426, in run_asgi
api-1  |     result = await app(  # type: ignore[func-returns-value]
api-1  |   File "/opt/conda/envs/louie/lib/python3.10/site-packages/uvicorn/middleware/proxy_headers.py", line 84, in __call__
api-1  |     return await self.app(scope, receive, send)
api-1  |   File "/opt/conda/envs/louie/lib/python3.10/site-packages/fastapi/applications.py", line 1054, in __call__
api-1  |     await super().__call__(scope, receive, send)
api-1  |   File "/opt/conda/envs/louie/lib/python3.10/site-packages/starlette/applications.py", line 113, in __call__
api-1  |     await self.middleware_stack(scope, receive, send)
api-1  |   File "/opt/conda/envs/louie/lib/python3.10/site-packages/starlette/middleware/errors.py", line 165, in __call__
api-1  |     await self.app(scope, receive, _send)
api-1  |   File "/app/graphistrygpt/api/api.py", line 86, in __call__
api-1  |     await self.app(scope, receive, send)
api-1  |   File "/opt/conda/envs/louie/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 62, in __call__
api-1  |     await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
api-1  |   File "/opt/conda/envs/louie/lib/python3.10/site-packages/starlette/_exception_handler.py", line 42, in wrapped_app
api-1  |     await app(scope, receive, sender)
api-1  |   File "/opt/conda/envs/louie/lib/python3.10/site-packages/starlette/routing.py", line 715, in __call__
api-1  |     await self.middleware_stack(scope, receive, send)
api-1  |   File "/opt/conda/envs/louie/lib/python3.10/site-packages/starlette/routing.py", line 735, in app
api-1  |     await route.handle(scope, receive, send)
api-1  |   File "/opt/conda/envs/louie/lib/python3.10/site-packages/starlette/routing.py", line 288, in handle
api-1  |     await self.app(scope, receive, send)
api-1  |   File "/opt/conda/envs/louie/lib/python3.10/site-packages/starlette/routing.py", line 76, in app
api-1  |     await wrap_app_handling_exceptions(app, request)(scope, receive, send)
api-1  |   File "/opt/conda/envs/louie/lib/python3.10/site-packages/starlette/_exception_handler.py", line 42, in wrapped_app
api-1  |     await app(scope, receive, sender)
api-1  |   File "/opt/conda/envs/louie/lib/python3.10/site-packages/starlette/routing.py", line 73, in app
api-1  |     response = await f(request)
api-1  |   File "/opt/conda/envs/louie/lib/python3.10/site-packages/fastapi/routing.py", line 301, in app
api-1  |     raw_response = await run_endpoint_function(
api-1  |   File "/opt/conda/envs/louie/lib/python3.10/site-packages/fastapi/routing.py", line 212, in run_endpoint_function
api-1  |     return await dependant.call(**values)
api-1  |   File "/app/graphistrygpt/plugins/dt/router.py", line 1192, in test_async
api-1  |     response = await call_with_retry()
api-1  |   File "/app/graphistrygpt/plugins/dt/router.py", line 1180, in call_with_retry
api-1  |     out = task.result()

@agronholm
Copy link
Owner

Ok, so do you have any evidence that there is an AnyIO bug involved?

@lmeyerov
Copy link

TBD, adding timing info shows the resumption at 0.5s is when the exn throws, so when the timedout triggers, but no longer anyio in the stack:

api-1  | 2025-01-12 21:58:23,732 DEBUG /app/graphistrygpt/api/api.py:73 Incoming request @4fb889f3f72b9d28: GET http://localhost:8000/api/test_async
api-1  | 2025-01-12 21:58:23,732 DEBUG /app/graphistrygpt/api/api.py:78 span event: api_request_GET_/api/test_async 
api-1  | 2025-01-12 21:58:24,233 INFO /app/graphistrygpt/plugins/dt/router.py:1158 Starting retry
api-1  | 2025-01-12 21:58:24,234 INFO /app/graphistrygpt/plugins/dt/router.py:1168 Done subtask cancelled, returning anyways, type=<class '_asyncio.Task'>
api-1  | [2025-01-12 21:58:24 +0000] [17] [ERROR] Exception in ASGI application
api-1  | Traceback (most recent call last):
api-1  |   File "/app/graphistrygpt/plugins/dt/router.py", line 1143, in call_qa
api-1  |     await asyncio.sleep(10)
api-1  |   File "/opt/conda/envs/louie/lib/python3.10/asyncio/tasks.py", line 605, in sleep
api-1  |     return await future
api-1  | asyncio.exceptions.CancelledError
api-1  | 
api-1  | During handling of the above exception, another exception occurred:
api-1  | 
api-1  | Traceback (most recent call last):
api-1  |   File "/opt/conda/envs/louie/lib/python3.10/site-packages/uvicorn/protocols/http/httptools_impl.py", line 426, in run_asgi
api-1  |     result = await app(  # type: ignore[func-returns-value]
api-1  |   File "/opt/conda/envs/louie/lib/python3.10/site-packages/uvicorn/middleware/proxy_headers.py", line 84, in __call__
api-1  |     return await self.app(scope, receive, send)
api-1  |   File "/opt/conda/envs/louie/lib/python3.10/site-packages/fastapi/applications.py", line 1054, in __call__
api-1  |     await super().__call__(scope, receive, send)
api-1  |   File "/opt/conda/envs/louie/lib/python3.10/site-packages/starlette/applications.py", line 113, in __call__
api-1  |     await self.middleware_stack(scope, receive, send)
api-1  |   File "/opt/conda/envs/louie/lib/python3.10/site-packages/starlette/middleware/errors.py", line 165, in __call__
api-1  |     await self.app(scope, receive, _send)
api-1  |   File "/app/graphistrygpt/api/api.py", line 86, in __call__
api-1  |     await self.app(scope, receive, send)
api-1  |   File "/opt/conda/envs/louie/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 62, in __call__
api-1  |     await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
api-1  |   File "/opt/conda/envs/louie/lib/python3.10/site-packages/starlette/_exception_handler.py", line 42, in wrapped_app
api-1  |     await app(scope, receive, sender)
api-1  |   File "/opt/conda/envs/louie/lib/python3.10/site-packages/starlette/routing.py", line 715, in __call__
api-1  |     await self.middleware_stack(scope, receive, send)
api-1  |   File "/opt/conda/envs/louie/lib/python3.10/site-packages/starlette/routing.py", line 735, in app
api-1  |     await route.handle(scope, receive, send)
api-1  |   File "/opt/conda/envs/louie/lib/python3.10/site-packages/starlette/routing.py", line 288, in handle
api-1  |     await self.app(scope, receive, send)
api-1  |   File "/opt/conda/envs/louie/lib/python3.10/site-packages/starlette/routing.py", line 76, in app
api-1  |     await wrap_app_handling_exceptions(app, request)(scope, receive, send)
api-1  |   File "/opt/conda/envs/louie/lib/python3.10/site-packages/starlette/_exception_handler.py", line 42, in wrapped_app
api-1  |     await app(scope, receive, sender)
api-1  |   File "/opt/conda/envs/louie/lib/python3.10/site-packages/starlette/routing.py", line 73, in app
api-1  |     response = await f(request)
api-1  |   File "/opt/conda/envs/louie/lib/python3.10/site-packages/fastapi/routing.py", line 301, in app
api-1  |     raw_response = await run_endpoint_function(
api-1  |   File "/opt/conda/envs/louie/lib/python3.10/site-packages/fastapi/routing.py", line 212, in run_endpoint_function
api-1  |     return await dependant.call(**values)
api-1  |   File "/app/graphistrygpt/plugins/dt/router.py", line 1179, in test_async
api-1  |     response = await call_with_retry()
api-1  |   File "/app/graphistrygpt/plugins/dt/router.py", line 1169, in call_with_retry
api-1  |     out = task.result()

@lmeyerov
Copy link

lmeyerov commented Jan 12, 2025

I'm guessing the issue above is partly or entirely user error:

            first_attempt = asyncio.create_task(call_qa(force_slow=True))

            try:
                return await asyncio.wait_for(first_attempt, timeout=0.5)
            except asyncio.TimeoutError:
                telemetry.info('Starting retry')
                second_attempt = asyncio.create_task(call_qa())
                done, pending = await asyncio.wait(
                    [first_attempt, second_attempt],
                    return_when=asyncio.FIRST_COMPLETED
                )

                for task in done:
                    try:
                        if task.cancelled():
                            telemetry.info('Done subtask cancelled, returning anyways, type={}', type(task))
                        out = task.result()
                        telemetry.info('Subtask succeeded, type={}:\n{}\n', type(out), out)
                        return out
                    except Exception as ex:
                        telemetry.exception_with(f'Done subtask failed: {str(task)}', ex)
                        continue

                raise RuntimeError("All tasks failed.")

The wait_for(first_attempt, timeout=0.5) will cancel first_attempt vs letting it keep running. So task.cancelled() will trigger, etc.

This was the wrong pattern for a first-one-wins retry, so our fixed local pattern now works when changed. However, for the original code, even now knowing the task was canceled due to the timeout, I'm still unclear why fastapi returned what it did.

@agronholm
Copy link
Owner

Ok, may I then suggest that you investigate, and if this turns out to be an AnyIO problem (which I don't believe it is), report it as a new issue here?

@lmeyerov
Copy link

Yes, I intentionally posted as a continuation of this issue to help w SEO and not a new issue because of this murkiness

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

6 participants