Skip to content

Commit

Permalink
Add flag to rd_kafka_queue which denotes if it contains fetched msgs (#…
Browse files Browse the repository at this point in the history
…4256)

Add failing tests

An issue in v2.1.0 was fixed in which max.poll.interval.ms was not honored,
because it was reset on any queue poll, not just consumer poll.
It was changed it so that only certain rdkafka.h functions which were polling
would reset the timer.

However, librdkafka exposes a method rd_kafka_queue_get_consumer, which returns
the consumer queue, and the application can poll this queue for events rather
than calling consume poll. There is no way to distinguish polls to this queue
and an arbitrary queue, and it won't reset the timer.

So, a new flag is maintained inside the queue denoting if it might
contain fetched messages, or not. It deals with forwarding of queues, so
if a queue which receives fetched messages is forwarded multiple times,
calling poll on the forwardee will also reset the timer.

---------

Co-authored-by: Emanuele Sabellico <[email protected]>
  • Loading branch information
milindl and emasab authored Apr 21, 2023
1 parent d16fe07 commit b0b5bfe
Show file tree
Hide file tree
Showing 8 changed files with 201 additions and 8 deletions.
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ librdkafka v2.1.1 is a maintenance release:
timeout. That timeout can't be infinite.
* Fix CMake pkg-config cURL require and use
pkg-config `Requires.private` field (@FantasqueX, @stertingen, #4180).
* Fixes certain cases where polling would not keep the consumer
in the group or make it rejoin it (#4256).

## Fixes

Expand All @@ -26,7 +28,11 @@ librdkafka v2.1.1 is a maintenance release:
allowing threads different from the main one to call
the `rd_kafka_toppar_set_fetch_state` function, given they hold
the lock on the `rktp`.

* In v2.1.0, a bug was fixed which caused polling any queue to reset the
`max.poll.interval.ms`. Only certain functions were made to reset the timer,
but it is possible for the user to obtain the queue with messages from
the broker, skipping these functions. This was fixed by encoding information
in a queue itself, that, whether polling, resets the timer.


# librdkafka v2.1.0
Expand Down
25 changes: 25 additions & 0 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -4003,20 +4003,37 @@ rd_kafka_op_res_t rd_kafka_poll_cb(rd_kafka_t *rk,

int rd_kafka_poll(rd_kafka_t *rk, int timeout_ms) {
int r;
const rd_bool_t can_q_contain_fetched_msgs =
rd_kafka_q_can_contain_fetched_msgs(rk->rk_rep, RD_DO_LOCK);

if (timeout_ms && can_q_contain_fetched_msgs)
rd_kafka_app_poll_blocking(rk);

r = rd_kafka_q_serve(rk->rk_rep, timeout_ms, 0, RD_KAFKA_Q_CB_CALLBACK,
rd_kafka_poll_cb, NULL);

if (can_q_contain_fetched_msgs)
rd_kafka_app_polled(rk);

return r;
}


rd_kafka_event_t *rd_kafka_queue_poll(rd_kafka_queue_t *rkqu, int timeout_ms) {
rd_kafka_op_t *rko;
const rd_bool_t can_q_contain_fetched_msgs =
rd_kafka_q_can_contain_fetched_msgs(rkqu->rkqu_q, RD_DO_LOCK);


if (timeout_ms && can_q_contain_fetched_msgs)
rd_kafka_app_poll_blocking(rkqu->rkqu_rk);

rko = rd_kafka_q_pop_serve(rkqu->rkqu_q, rd_timeout_us(timeout_ms), 0,
RD_KAFKA_Q_CB_EVENT, rd_kafka_poll_cb, NULL);

if (can_q_contain_fetched_msgs)
rd_kafka_app_polled(rkqu->rkqu_rk);

if (!rko)
return NULL;

Expand All @@ -4025,10 +4042,18 @@ rd_kafka_event_t *rd_kafka_queue_poll(rd_kafka_queue_t *rkqu, int timeout_ms) {

int rd_kafka_queue_poll_callback(rd_kafka_queue_t *rkqu, int timeout_ms) {
int r;
const rd_bool_t can_q_contain_fetched_msgs =
rd_kafka_q_can_contain_fetched_msgs(rkqu->rkqu_q, RD_DO_LOCK);

if (timeout_ms && can_q_contain_fetched_msgs)
rd_kafka_app_poll_blocking(rkqu->rkqu_rk);

r = rd_kafka_q_serve(rkqu->rkqu_q, timeout_ms, 0,
RD_KAFKA_Q_CB_CALLBACK, rd_kafka_poll_cb, NULL);

if (can_q_contain_fetched_msgs)
rd_kafka_app_polled(rkqu->rkqu_rk);

return r;
}

Expand Down
6 changes: 6 additions & 0 deletions src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -3431,6 +3431,12 @@ rd_kafka_error_t *rd_kafka_sasl_set_credentials(rd_kafka_t *rk,
*
* @remark rd_kafka_queue_destroy() MUST be called on this queue
* prior to calling rd_kafka_consumer_close().
* @remark Polling the returned queue counts as a consumer poll, and will reset
* the timer for max.poll.interval.ms. If this queue is forwarded to a
* "destq", polling destq also counts as a consumer poll (this works
* for any number of forwards). However, even if this queue is
* unforwarded or forwarded elsewhere, polling destq will continue
* to count as a consumer poll.
*/
RD_EXPORT
rd_kafka_queue_t *rd_kafka_queue_get_consumer(rd_kafka_t *rk);
Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ rd_kafka_cgrp_t *rd_kafka_cgrp_new(rd_kafka_t *rk,
rkcg->rkcg_wait_coord_q = rd_kafka_q_new(rk);
rkcg->rkcg_wait_coord_q->rkq_serve = rkcg->rkcg_ops->rkq_serve;
rkcg->rkcg_wait_coord_q->rkq_opaque = rkcg->rkcg_ops->rkq_opaque;
rkcg->rkcg_q = rd_kafka_q_new(rk);
rkcg->rkcg_q = rd_kafka_consume_q_new(rk);
rkcg->rkcg_group_instance_id =
rd_kafkap_str_new(rk->rk_conf.group_instance_id, -1);

Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka_partition.c
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ rd_kafka_toppar_t *rd_kafka_toppar_new0(rd_kafka_topic_t *rkt,
mtx_init(&rktp->rktp_lock, mtx_plain);

rd_refcnt_init(&rktp->rktp_refcnt, 0);
rktp->rktp_fetchq = rd_kafka_q_new(rkt->rkt_rk);
rktp->rktp_fetchq = rd_kafka_consume_q_new(rkt->rkt_rk);
rktp->rktp_ops = rd_kafka_q_new(rkt->rkt_rk);
rktp->rktp_ops->rkq_serve = rd_kafka_toppar_op_serve;
rktp->rktp_ops->rkq_opaque = rktp;
Expand Down
44 changes: 42 additions & 2 deletions src/rdkafka_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,15 @@ void rd_kafka_q_destroy_final(rd_kafka_q_t *rkq) {
*/
void rd_kafka_q_init0(rd_kafka_q_t *rkq,
rd_kafka_t *rk,
rd_bool_t for_consume,
const char *func,
int line) {
rd_kafka_q_reset(rkq);
rkq->rkq_fwdq = NULL;
rkq->rkq_refcnt = 1;
rkq->rkq_flags = RD_KAFKA_Q_F_READY;
if (for_consume)
rkq->rkq_flags |= RD_KAFKA_Q_F_CONSUMER;
rkq->rkq_rk = rk;
rkq->rkq_qio = NULL;
rkq->rkq_serve = NULL;
Expand All @@ -106,9 +109,15 @@ void rd_kafka_q_init0(rd_kafka_q_t *rkq,
/**
* Allocate a new queue and initialize it.
*/
rd_kafka_q_t *rd_kafka_q_new0(rd_kafka_t *rk, const char *func, int line) {
rd_kafka_q_t *rd_kafka_q_new0(rd_kafka_t *rk,
rd_bool_t for_consume,
const char *func,
int line) {
rd_kafka_q_t *rkq = rd_malloc(sizeof(*rkq));
rd_kafka_q_init(rkq, rk);
if (!for_consume)
rd_kafka_q_init(rkq, rk);
else
rd_kafka_consume_q_init(rkq, rk);
rkq->rkq_flags |= RD_KAFKA_Q_F_ALLOCATED;
#if ENABLE_DEVEL
rd_snprintf(rkq->rkq_name, sizeof(rkq->rkq_name), "%s:%d", func, line);
Expand All @@ -118,6 +127,33 @@ rd_kafka_q_t *rd_kafka_q_new0(rd_kafka_t *rk, const char *func, int line) {
return rkq;
}

/*
* Sets the flag RD_KAFKA_Q_F_CONSUMER for rkq, any queues it's being forwarded
* to, recursively.
* Setting this flag indicates that polling this queue is equivalent to calling
* consumer poll, and will reset the max.poll.interval.ms timer. Only used
* internally when forwarding queues.
* @locks rd_kafka_q_lock(rkq)
*/
static void rd_kafka_q_consumer_propagate(rd_kafka_q_t *rkq) {
mtx_lock(&rkq->rkq_lock);
rkq->rkq_flags |= RD_KAFKA_Q_F_CONSUMER;

if (!rkq->rkq_fwdq) {
mtx_unlock(&rkq->rkq_lock);
return;
}

/* Recursively propagate the flag to any queues rkq is already
* forwarding to. There will be a deadlock here if the queues are being
* forwarded circularly, but that is a user error. We can't resolve this
* deadlock by unlocking before the recursive call, because that leads
* to incorrectness if the rkq_fwdq is forwarded elsewhere and the old
* one destroyed between recursive calls. */
rd_kafka_q_consumer_propagate(rkq->rkq_fwdq);
mtx_unlock(&rkq->rkq_lock);
}

/**
* Set/clear forward queue.
* Queue forwarding enables message routing inside rdkafka.
Expand Down Expand Up @@ -152,6 +188,9 @@ void rd_kafka_q_fwd_set0(rd_kafka_q_t *srcq,
}

srcq->rkq_fwdq = destq;

if (srcq->rkq_flags & RD_KAFKA_Q_F_CONSUMER)
rd_kafka_q_consumer_propagate(destq);
}
if (do_lock)
mtx_unlock(&srcq->rkq_lock);
Expand Down Expand Up @@ -610,6 +649,7 @@ int rd_kafka_q_serve_rkmessages(rd_kafka_q_t *rkq,
rd_kafka_q_destroy(fwdq);
return cnt;
}

mtx_unlock(&rkq->rkq_lock);

if (timeout_ms)
Expand Down
35 changes: 32 additions & 3 deletions src/rdkafka_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ struct rd_kafka_q_s {
* by triggering the cond-var \
* but without having to enqueue \
* an op. */
#define RD_KAFKA_Q_F_CONSUMER \
0x10 /* If this flag is set, this queue might contain fetched messages \
from partitions. Polling this queue will reset the \
max.poll.interval.ms timer. Once set, this flag is never \
reset. */

rd_kafka_t *rkq_rk;
struct rd_kafka_q_io *rkq_qio; /* FD-based application signalling */
Expand Down Expand Up @@ -123,12 +128,20 @@ static RD_INLINE RD_UNUSED int rd_kafka_q_ready(rd_kafka_q_t *rkq) {

void rd_kafka_q_init0(rd_kafka_q_t *rkq,
rd_kafka_t *rk,
rd_bool_t for_consume,
const char *func,
int line);
#define rd_kafka_q_init(rkq, rk) \
rd_kafka_q_init0(rkq, rk, __FUNCTION__, __LINE__)
rd_kafka_q_t *rd_kafka_q_new0(rd_kafka_t *rk, const char *func, int line);
#define rd_kafka_q_new(rk) rd_kafka_q_new0(rk, __FUNCTION__, __LINE__)
rd_kafka_q_init0(rkq, rk, rd_false, __FUNCTION__, __LINE__)
#define rd_kafka_consume_q_init(rkq, rk) \
rd_kafka_q_init0(rkq, rk, rd_true, __FUNCTION__, __LINE__)
rd_kafka_q_t *rd_kafka_q_new0(rd_kafka_t *rk,
rd_bool_t for_consume,
const char *func,
int line);
#define rd_kafka_q_new(rk) rd_kafka_q_new0(rk, rd_false, __FUNCTION__, __LINE__)
#define rd_kafka_consume_q_new(rk) \
rd_kafka_q_new0(rk, rd_true, __FUNCTION__, __LINE__)
void rd_kafka_q_destroy_final(rd_kafka_q_t *rkq);

#define rd_kafka_q_lock(rkqu) mtx_lock(&(rkqu)->rkq_lock)
Expand Down Expand Up @@ -1164,6 +1177,22 @@ rd_kafka_enq_once_disable(rd_kafka_enq_once_t *eonce) {
return rko;
}

/**
* @brief Returns true if the queue can contain fetched messages.
*
* @locks rd_kafka_q_lock(rkq) if do_lock is set.
*/
static RD_INLINE RD_UNUSED rd_bool_t
rd_kafka_q_can_contain_fetched_msgs(rd_kafka_q_t *rkq, rd_bool_t do_lock) {
rd_bool_t val;
if (do_lock)
mtx_lock(&rkq->rkq_lock);
val = rkq->rkq_flags & RD_KAFKA_Q_F_CONSUMER;
if (do_lock)
mtx_unlock(&rkq->rkq_lock);
return val;
}


/**@}*/

Expand Down
87 changes: 87 additions & 0 deletions tests/0089-max_poll_interval.c
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,95 @@ static void do_test_with_log_queue(void) {
SUB_TEST_PASS();
}


/**
* @brief Consumer should be able to rejoin the group just by polling after
* leaving due to a max.poll.interval.ms timeout. The poll does not need to
* go through any special function, any queue containing consumer messages
* should suffice.
* We test with the result of rd_kafka_queue_get_consumer, and an arbitrary
* queue that is forwarded to by the result of rd_kafka_queue_get_consumer.
*/
static void
do_test_rejoin_after_interval_expire(rd_bool_t forward_to_another_q) {
const char *topic = test_mk_topic_name("0089_max_poll_interval", 1);
rd_kafka_conf_t *conf;
char groupid[64];
rd_kafka_t *rk = NULL;
rd_kafka_queue_t *consumer_queue = NULL;
rd_kafka_event_t *event = NULL;
rd_kafka_queue_t *polling_queue = NULL;

SUB_TEST("Testing with forward_to_another_q = %d",
forward_to_another_q);

test_create_topic(NULL, topic, 1, 1);

test_str_id_generate(groupid, sizeof(groupid));
test_conf_init(&conf, NULL, 60);
test_conf_set(conf, "session.timeout.ms", "6000");
test_conf_set(conf, "max.poll.interval.ms", "10000" /*10s*/);
test_conf_set(conf, "partition.assignment.strategy", "range");

/* We need to specify a non-NULL rebalance CB to get events of type
* RD_KAFKA_EVENT_REBALANCE. */
rk = test_create_consumer(groupid, test_rebalance_cb, conf, NULL);

consumer_queue = rd_kafka_queue_get_consumer(rk);

test_consumer_subscribe(rk, topic);

if (forward_to_another_q) {
polling_queue = rd_kafka_queue_new(rk);
rd_kafka_queue_forward(consumer_queue, polling_queue);
} else
polling_queue = consumer_queue;

event = test_wait_event(polling_queue, RD_KAFKA_EVENT_REBALANCE,
(int)(test_timeout_multiplier * 10000));
TEST_ASSERT(event,
"Did not get a rebalance event for initial group join");
TEST_ASSERT(rd_kafka_event_error(event) ==
RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS,
"Group join should assign partitions");
rd_kafka_assign(rk, rd_kafka_event_topic_partition_list(event));
rd_kafka_event_destroy(event);

rd_sleep(10 + 1); /* Exceed max.poll.interval.ms. */

/* Note that by polling for the group leave, we're also polling the
* consumer queue, and hence it should trigger a rejoin. */
event = test_wait_event(polling_queue, RD_KAFKA_EVENT_REBALANCE,
(int)(test_timeout_multiplier * 10000));
TEST_ASSERT(event, "Did not get a rebalance event for the group leave");
TEST_ASSERT(rd_kafka_event_error(event) ==
RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS,
"Group leave should revoke partitions");
rd_kafka_assign(rk, NULL);
rd_kafka_event_destroy(event);

event = test_wait_event(polling_queue, RD_KAFKA_EVENT_REBALANCE,
(int)(test_timeout_multiplier * 10000));
TEST_ASSERT(event, "Should get a rebalance event for the group rejoin");
TEST_ASSERT(rd_kafka_event_error(event) ==
RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS,
"Group rejoin should assign partitions");
rd_kafka_assign(rk, rd_kafka_event_topic_partition_list(event));
rd_kafka_event_destroy(event);

if (forward_to_another_q)
rd_kafka_queue_destroy(polling_queue);
rd_kafka_queue_destroy(consumer_queue);
test_consumer_close(rk);
rd_kafka_destroy(rk);

SUB_TEST_PASS();
}

int main_0089_max_poll_interval(int argc, char **argv) {
do_test();
do_test_with_log_queue();
do_test_rejoin_after_interval_expire(rd_false);
do_test_rejoin_after_interval_expire(rd_true);
return 0;
}

0 comments on commit b0b5bfe

Please sign in to comment.