Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer stopped consuming, task Fetcher._fetch_task has finished #983

Closed
pavelschon opened this issue Feb 23, 2024 · 8 comments · Fixed by #1007
Closed

Consumer stopped consuming, task Fetcher._fetch_task has finished #983

pavelschon opened this issue Feb 23, 2024 · 8 comments · Fixed by #1007

Comments

@pavelschon
Copy link

pavelschon commented Feb 23, 2024

Describe the bug
Under unclear conditions, our consumer has stopped receiving messages.
We investigated, when it happens, _fetch_task is finished, despite the Fetcher object is still alive (Fetcher.close() was not called).

 '_fetch_task': <Task finished name='Task-29' coro=<Fetcher._fetch_requests_routine() done, defined at /opt/poetry/venv/lib/python3.12/site-packages/aiokafka/consumer/fetcher.py:457> result=None>,

We suspect the issue might be in the _fetch_requests_routine(), that it incorrectly handles CancelledError.
According to the python documentation, CancelledError shall be reraised, while the above routine just suppress it.

Environment (please complete the following information):

  • aiokafka version: 0.10.0
  • Kafka Broker version: Confluent Kafka
  • Other information (Confluent Cloud version, etc.):
@AxTheB
Copy link

AxTheB commented Feb 23, 2024

When we replaced the fetch task with fresh one, it did not receive new messages (there were messages in assigned partition) and got finished too, without logging anything. I see only one way for this task to get to finished state, and that is by swallowing the CancelledError.
If the error got propagated, it would allow the app to restart and quickly resume consuming messages.

@ods
Copy link
Collaborator

ods commented Feb 24, 2024

The only place where _fetch_task is cancelled is the Fetcher.close() method. Or do you see other options? Also, for this exception to be propagated, you have to await the task, which is also done in close() method only.

@ods
Copy link
Collaborator

ods commented Feb 24, 2024

Hmm, there is other option to get CancelledError here:

                    for task in self._pending_tasks:
                        # Those tasks should have proper handling for
                        # cancellation
                        if not task.done():
                            task.cancel()
                        await task

@pavelschon
Copy link
Author

Can you confirm this is a bug in aiokafka?

On our side, we had to implement a workaround

if consumer._fetcher._fetch_task.done():
   # restart app

@apmorton
Copy link
Contributor

apmorton commented May 9, 2024

(originally posted this on #847, but on further investigation I think this is the correct issue)

@ods I have bisected this issue to #802

Consider the following script:

import asyncio
import logging
import time

import aiokafka


async def consume_task(consumer):
    async for msg in consumer:
        print(msg)
        await consumer.commit()


async def lag_task(producer):
    while True:
        print("sleeping")
        await asyncio.sleep(10)
        print("inducing lag")
        time.sleep(40)
        print("sending message")
        await producer.send_and_wait(
            topic='some_topic',
            value=b'a message',
        )


async def main():
    async with (
        aiokafka.AIOKafkaProducer() as producer,
        aiokafka.AIOKafkaConsumer(
            'some_topic',
            group_id='some_group',
            enable_auto_commit=False,
        ) as consumer,
    ):
        await consumer.seek_to_end()

        task1 = asyncio.create_task(consume_task(consumer))
        task2 = asyncio.create_task(lag_task(producer))

        consumer._fetcher._fetch_task.add_done_callback(lambda t: print('fetch task done'))

        await asyncio.wait([task1, task2], return_when=asyncio.FIRST_COMPLETED)
        print("something finished")


if __name__ == '__main__':
    # logging.basicConfig(level=logging.DEBUG)
    asyncio.run(main())

Prior to #802 this will print:

sleeping
inducing lag
sending message
Heartbeat failed: local member_id was not recognized; resetting and re-joining group
Heartbeat session expired - marking coordinator dead
Marking the coordinator dead (node 0)for group some_group.
sleeping
ConsumerRecord(topic='some_topic', partition=0, offset=10, timestamp=1715232041461, timestamp_type=0, key=None, value=b'a message', checksum=None, serialized_key_size=-1, serialized_value_size=9, headers=())

After #802 this will print:

sleeping
inducing lag
sending message
Failed fetch messages from 0: [Error 7] RequestTimedOutError
Heartbeat failed: local member_id was not recognized; resetting and re-joining group
Heartbeat session expired - marking coordinator dead
Marking the coordinator dead (node 0)for group some_group.
sleeping
fetch task done

Notice the ConsumerRecord is not printed and fetch task done is.

If you turn on debug logging you will see that after the induced lag we stop getting:

DEBUG:aiokafka.consumer.fetcher:Adding fetch request for partition

No new messages will ever be received by consume_task after this point.

Reverting this single line from #802 restores the original behavior:

diff --git a/aiokafka/conn.py b/aiokafka/conn.py
index da27fd2..2ceb9ba 100644
--- a/aiokafka/conn.py
+++ b/aiokafka/conn.py
@@ -450,7 +450,7 @@ class AIOKafkaConnection:
             return self._writer.drain()
         fut = self._loop.create_future()
         self._requests.append((correlation_id, request.RESPONSE_TYPE, fut))
-        return wait_for(fut, self._request_timeout)
+        return asyncio.wait_for(fut, self._request_timeout)

     def _send_sasl_token(self, payload, expect_response=True):
         if self._writer is None:

@ods
Copy link
Collaborator

ods commented May 12, 2024

Hi @apmorton,
Thank you for reproducing the problem. Right, in Python from 3.8.6 there was a bug in asyncio.wait_for() which was fixed in 3.12 by using the same approach, as used here. No surprise that a bug that doesn't handle some exception and another that swallows it may compensate each other.

@ods
Copy link
Collaborator

ods commented May 12, 2024

Here is a snippet to demonstrate the problem with just suppressing CancelledError:

import asyncio
from time import time

async def task_with_cleanup():
    try:
        await asyncio.sleep(1000)
    finally:
        print("Cleanup", time() - started)
        await asyncio.sleep(2)

async def worker():
    task = asyncio.create_task(task_with_cleanup())
    await asyncio.sleep(0)
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("CancelledError is suppressed", time() - started)
    await asyncio.sleep(10)

async def main():
    await asyncio.wait_for(worker(), timeout=1)
    elapsed = time() - started
    assert elapsed < 2, elapsed

started = time()
asyncio.run(main())

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants