-
Notifications
You must be signed in to change notification settings - Fork 3.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add field to rd_kafka_queue which denotes if it contains fetched msgs #4256
Add field to rd_kafka_queue which denotes if it contains fetched msgs #4256
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we are locking on polling, please check performance once.
68aaa51
to
e921874
Compare
b5aca40
to
9419b31
Compare
An issue in v2.1.0 was fixed in which max.poll.interval.ms was not honored, because it was reset on any queue poll, not just consumer poll. It was changed it so that only certain rdkafka.h functions which were polling would reset the timer. However, librdkafka exposes a method rd_kafka_queue_get_consumer, which returns the consumer queue, and the application can poll this queue for events rather than calling consume poll. There is no way to distinguish polls to this queue and an arbitrary queue, and it won't reset the timer. So, a new field is maintained inside the queue denoting if it might contain fetched messages, or not. It deals with forwarding of queues, so if a queue which receives fetched messages is forwarded multiple times, calling poll on the forwardee will also reset the timer.
Co-authored-by: Emanuele Sabellico <[email protected]>
Co-authored-by: Emanuele Sabellico <[email protected]>
a1bbc5d
to
abd072f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great Milind, it was difficult to find the correct combination of changes that solves both problems by resetting the timer only when needed and fits with the existing code.
Thanks Emanuele. Will merge after #4262 for CI |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
An issue in v2.1.0 was fixed in which max.poll.interval.ms was not honored,
because it was reset on any queue poll, not just consumer poll.
It was changed it so that only certain rdkafka.h functions which were polling
would reset the timer.
However, librdkafka exposes a method rd_kafka_queue_get_consumer, which returns
the consumer queue, and the application can poll this queue for events rather
than calling consume poll. There is no way to distinguish polls to this queue
and an arbitrary queue, and it won't reset the timer.
So, a new field is maintained inside the queue denoting if it might
contain fetched messages, or not. It deals with forwarding of queues, so
if a queue which receives fetched messages is forwarded multiple times,
calling poll on the forwardee will also reset the timer.
Note that there are a few points where someone might forward/unforward into a queue while polling it. (See cases with
does_q_contain_consumer_msgs = rd_kafka_q_consumer_cnt(rkq, 1);
) which might make the timer reset wrong on these occasions.I don't think it matters much since that's a somewhat convoluted use case and to fix it will require taking a lock over the duration of the entire poll. But let me know what you think.
Fixes confluentinc/confluent-kafka-go#980