-
Notifications
You must be signed in to change notification settings - Fork 233
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Long running consumer stop receiving messages and it's state is not closed #1037
Comments
Yes, I did test on version 0.11.0 as well, I updated the tests result in the issue description above. It seems that when the RequestTimedOutError happen, the consumer can't receive messages, so no "New Messages Received..." printed out in the console. But after change the wait_for method to use asyncio.wait_for, it can receive messages and no RequestTimedOutError printed in the console. |
I'm also experiencing the same issue
@Tinanuaa any luck? Also tried upgrading to 0.11 with no luck |
For anyone else that ends up here, it was so dumb. Just turned out to be timeout vars, try that first (added all the timeout ones under
|
Has anyone found workarounds besides increasing timeouts or reverting to 0.7.2 (and by extension python 3.7)? |
Describe the bug
I have a long running fastapi app which will start a consumer on app start and consume messages from kafka. The consumer works fine for maybe a week and then it suddenly stop receiving any messages while the consumer is not closed. It happened twice and I checked the log, the common error in the log before the consumer stopped consuming messages was
Failed fetch messages from 1001: [Error 7] RequestTimedOutError
. After I restarted the service, the consumer can pick up from the last stopped offset and reconsume all the missed messages.Expected behaviour
A long running consumer can keep running for long time and consume messages.
Environment (please complete the following information):
Reproducible example
I use the following scripts to reproduce the problem. So the sleep(40) will make the request timeout, conn.send() will raise Timeout exception in version 0.10.0 and 0.11.0, which make client.send() raise RequestTimedOutError. But this not happen in aiokafka 0.7.2. I've went through these issues (#983 and #802 ), and found that in conn.send, if we use asyncio.wait_for instead of utils.wait_for, then the consumer can work after timeout. My guess is these two methods have some differences which cause the problem, I tried to find the root cause but failed, so I just post my findings here.
With the above script, I used aiokafka 0.7.2, aiokafka 0.10.0 and an updated conn.py (change line 453 from
return wait_for(fut, self._request_timeout)
toreturn asyncio.wait_for(fut, self._request_timeout)
) in aiokafka 0.10.0. The output is like belowaiokafka 0.7.2
aiokafka 0.10.0
aiokafka 0.11.0
aiokafka 0.10.0 after change wait_for to asyncio.wait_for() in conn.py::send()
aiokafka 0.11.0 after change wait_for to asyncio.wait_for() in conn.py::send()
The text was updated successfully, but these errors were encountered: