-
Notifications
You must be signed in to change notification settings - Fork 233
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Consumer stopped consuming, task Fetcher._fetch_task has finished #983
Comments
When we replaced the fetch task with fresh one, it did not receive new messages (there were messages in assigned partition) and got finished too, without logging anything. I see only one way for this task to get to finished state, and that is by swallowing the |
The only place where |
Hmm, there is other option to get for task in self._pending_tasks:
# Those tasks should have proper handling for
# cancellation
if not task.done():
task.cancel()
await task |
Can you confirm this is a bug in aiokafka? On our side, we had to implement a workaround if consumer._fetcher._fetch_task.done():
# restart app |
(originally posted this on #847, but on further investigation I think this is the correct issue) @ods I have bisected this issue to #802 Consider the following script: import asyncio
import logging
import time
import aiokafka
async def consume_task(consumer):
async for msg in consumer:
print(msg)
await consumer.commit()
async def lag_task(producer):
while True:
print("sleeping")
await asyncio.sleep(10)
print("inducing lag")
time.sleep(40)
print("sending message")
await producer.send_and_wait(
topic='some_topic',
value=b'a message',
)
async def main():
async with (
aiokafka.AIOKafkaProducer() as producer,
aiokafka.AIOKafkaConsumer(
'some_topic',
group_id='some_group',
enable_auto_commit=False,
) as consumer,
):
await consumer.seek_to_end()
task1 = asyncio.create_task(consume_task(consumer))
task2 = asyncio.create_task(lag_task(producer))
consumer._fetcher._fetch_task.add_done_callback(lambda t: print('fetch task done'))
await asyncio.wait([task1, task2], return_when=asyncio.FIRST_COMPLETED)
print("something finished")
if __name__ == '__main__':
# logging.basicConfig(level=logging.DEBUG)
asyncio.run(main()) Prior to #802 this will print:
After #802 this will print:
Notice the If you turn on debug logging you will see that after the induced lag we stop getting:
No new messages will ever be received by Reverting this single line from #802 restores the original behavior: diff --git a/aiokafka/conn.py b/aiokafka/conn.py
index da27fd2..2ceb9ba 100644
--- a/aiokafka/conn.py
+++ b/aiokafka/conn.py
@@ -450,7 +450,7 @@ class AIOKafkaConnection:
return self._writer.drain()
fut = self._loop.create_future()
self._requests.append((correlation_id, request.RESPONSE_TYPE, fut))
- return wait_for(fut, self._request_timeout)
+ return asyncio.wait_for(fut, self._request_timeout)
def _send_sasl_token(self, payload, expect_response=True):
if self._writer is None: |
Here is a snippet to demonstrate the problem with just suppressing import asyncio
from time import time
async def task_with_cleanup():
try:
await asyncio.sleep(1000)
finally:
print("Cleanup", time() - started)
await asyncio.sleep(2)
async def worker():
task = asyncio.create_task(task_with_cleanup())
await asyncio.sleep(0)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("CancelledError is suppressed", time() - started)
await asyncio.sleep(10)
async def main():
await asyncio.wait_for(worker(), timeout=1)
elapsed = time() - started
assert elapsed < 2, elapsed
started = time()
asyncio.run(main()) |
Describe the bug
Under unclear conditions, our consumer has stopped receiving messages.
We investigated, when it happens,
_fetch_task
is finished, despite theFetcher
object is still alive (Fetcher.close()
was not called).We suspect the issue might be in the
_fetch_requests_routine()
, that it incorrectly handlesCancelledError
.According to the python documentation,
CancelledError
shall be reraised, while the above routine just suppress it.Environment (please complete the following information):
The text was updated successfully, but these errors were encountered: