diff --git a/CHANGELOG.md b/CHANGELOG.md index 92f4c2d20b..ab6a72aa68 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ librdkafka v2.1.1 is a bugfix release: * Fixes certain cases where polling would not rejoin the consumer group when the cause of leaving the group was exceeding `max.poll.interval.ms`. + + ## Fixes ### Consumer fixes diff --git a/src/rdkafka.c b/src/rdkafka.c index 0f2d3bf9e0..fbc216874f 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -3050,16 +3050,16 @@ static rd_kafka_op_res_t rd_kafka_consume_callback0( void *opaque) { struct consume_ctx ctx = {.consume_cb = consume_cb, .opaque = opaque}; rd_kafka_op_res_t res; - const int does_q_contain_consumer_msgs = - rd_kafka_q_consumer_cnt(rkq, 1); + const rd_bool_t can_q_contain_fetched_msgs = + rd_kafka_q_can_contain_fetched_msgs(rkq, RD_DO_LOCK); - if (timeout_ms && does_q_contain_consumer_msgs) + if (timeout_ms && can_q_contain_fetched_msgs) rd_kafka_app_poll_blocking(rkq->rkq_rk); res = rd_kafka_q_serve(rkq, timeout_ms, max_cnt, RD_KAFKA_Q_CB_RETURN, rd_kafka_consume_cb, &ctx); - if (does_q_contain_consumer_msgs) + if (can_q_contain_fetched_msgs) rd_kafka_app_polled(rkq->rkq_rk); return res; @@ -3124,10 +3124,10 @@ rd_kafka_consume0(rd_kafka_t *rk, rd_kafka_q_t *rkq, int timeout_ms) { rd_kafka_op_t *rko; rd_kafka_message_t *rkmessage = NULL; rd_ts_t abs_timeout = rd_timeout_init(timeout_ms); - const int does_q_contain_consumer_msgs = - rd_kafka_q_consumer_cnt(rkq, 1); + const rd_bool_t can_q_contain_fetched_msgs = + rd_kafka_q_can_contain_fetched_msgs(rkq, RD_DO_LOCK); - if (timeout_ms && does_q_contain_consumer_msgs) + if (timeout_ms && can_q_contain_fetched_msgs) rd_kafka_app_poll_blocking(rk); rd_kafka_yield_thread = 0; @@ -3146,7 +3146,7 @@ rd_kafka_consume0(rd_kafka_t *rk, rd_kafka_q_t *rkq, int timeout_ms) { /* Callback called rd_kafka_yield(), we must * stop dispatching the queue and return. */ rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INTR, EINTR); - if (does_q_contain_consumer_msgs) + if (can_q_contain_fetched_msgs) rd_kafka_app_polled(rk); return NULL; } @@ -3159,7 +3159,7 @@ rd_kafka_consume0(rd_kafka_t *rk, rd_kafka_q_t *rkq, int timeout_ms) { /* Timeout reached with no op returned. */ rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__TIMED_OUT, ETIMEDOUT); - if (does_q_contain_consumer_msgs) + if (can_q_contain_fetched_msgs) rd_kafka_app_polled(rk); return NULL; } @@ -3175,7 +3175,7 @@ rd_kafka_consume0(rd_kafka_t *rk, rd_kafka_q_t *rkq, int timeout_ms) { rd_kafka_set_last_error(0, 0); - if (does_q_contain_consumer_msgs) + if (can_q_contain_fetched_msgs) rd_kafka_app_polled(rk); return rkmessage; @@ -4011,16 +4011,16 @@ rd_kafka_op_res_t rd_kafka_poll_cb(rd_kafka_t *rk, int rd_kafka_poll(rd_kafka_t *rk, int timeout_ms) { int r; - const int does_q_contain_consumer_msgs = - rd_kafka_q_consumer_cnt(rk->rk_rep, 1); + const rd_bool_t can_q_contain_fetched_msgs = + rd_kafka_q_can_contain_fetched_msgs(rk->rk_rep, RD_DO_LOCK); - if (timeout_ms && does_q_contain_consumer_msgs) + if (timeout_ms && can_q_contain_fetched_msgs) rd_kafka_app_poll_blocking(rk); r = rd_kafka_q_serve(rk->rk_rep, timeout_ms, 0, RD_KAFKA_Q_CB_CALLBACK, rd_kafka_poll_cb, NULL); - if (does_q_contain_consumer_msgs) + if (can_q_contain_fetched_msgs) rd_kafka_app_polled(rk); return r; @@ -4029,16 +4029,17 @@ int rd_kafka_poll(rd_kafka_t *rk, int timeout_ms) { rd_kafka_event_t *rd_kafka_queue_poll(rd_kafka_queue_t *rkqu, int timeout_ms) { rd_kafka_op_t *rko; - const int does_q_contain_consumer_msgs = - rd_kafka_q_consumer_cnt(rkqu->rkqu_q, 1); + const rd_bool_t can_q_contain_fetched_msgs = + rd_kafka_q_can_contain_fetched_msgs(rkqu->rkqu_q, RD_DO_LOCK); - if (timeout_ms && does_q_contain_consumer_msgs) + + if (timeout_ms && can_q_contain_fetched_msgs) rd_kafka_app_poll_blocking(rkqu->rkqu_rk); rko = rd_kafka_q_pop_serve(rkqu->rkqu_q, rd_timeout_us(timeout_ms), 0, RD_KAFKA_Q_CB_EVENT, rd_kafka_poll_cb, NULL); - if (does_q_contain_consumer_msgs) + if (can_q_contain_fetched_msgs) rd_kafka_app_polled(rkqu->rkqu_rk); if (!rko) @@ -4049,16 +4050,16 @@ rd_kafka_event_t *rd_kafka_queue_poll(rd_kafka_queue_t *rkqu, int timeout_ms) { int rd_kafka_queue_poll_callback(rd_kafka_queue_t *rkqu, int timeout_ms) { int r; - const int does_q_contain_consumer_msgs = - rd_kafka_q_consumer_cnt(rkqu->rkqu_q, 1); + const rd_bool_t can_q_contain_fetched_msgs = + rd_kafka_q_can_contain_fetched_msgs(rkqu->rkqu_q, RD_DO_LOCK); - if (timeout_ms && does_q_contain_consumer_msgs) + if (timeout_ms && can_q_contain_fetched_msgs) rd_kafka_app_poll_blocking(rkqu->rkqu_rk); r = rd_kafka_q_serve(rkqu->rkqu_q, timeout_ms, 0, RD_KAFKA_Q_CB_CALLBACK, rd_kafka_poll_cb, NULL); - if (does_q_contain_consumer_msgs) + if (can_q_contain_fetched_msgs) rd_kafka_app_polled(rkqu->rkqu_rk); return r; diff --git a/src/rdkafka.h b/src/rdkafka.h index e3474e50ff..0d0fad5e6e 100644 --- a/src/rdkafka.h +++ b/src/rdkafka.h @@ -3427,10 +3427,16 @@ rd_kafka_error_t *rd_kafka_sasl_set_credentials(rd_kafka_t *rk, * @returns a reference to the librdkafka consumer queue. * This is the queue served by rd_kafka_consumer_poll(). * - * Use rd_kafka_queue_destroy() to loose the reference. + * Use rd_kafka_queue_destroy() to lose the reference. * * @remark rd_kafka_queue_destroy() MUST be called on this queue * prior to calling rd_kafka_consumer_close(). + * @remark Polling the returned queue counts as a consumer poll, and will reset + * the timer for max.poll.interval.ms. If this queue is forwarded to a + * "destq", polling destq also counts as a consumer poll (this works + * for any number of forwards). However, even if this queue is + * unforwarded or forwarded elsewhere, polling destq will continue + * to count as a consumer poll. */ RD_EXPORT rd_kafka_queue_t *rd_kafka_queue_get_consumer(rd_kafka_t *rk); diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index b48d35fe3c..c2824fd71c 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -415,7 +415,7 @@ rd_kafka_cgrp_t *rd_kafka_cgrp_new(rd_kafka_t *rk, rkcg->rkcg_wait_coord_q = rd_kafka_q_new(rk); rkcg->rkcg_wait_coord_q->rkq_serve = rkcg->rkcg_ops->rkq_serve; rkcg->rkcg_wait_coord_q->rkq_opaque = rkcg->rkcg_ops->rkq_opaque; - rkcg->rkcg_q = rd_kafka_q_new_consume(rk); + rkcg->rkcg_q = rd_kafka_consume_q_new(rk); rkcg->rkcg_group_instance_id = rd_kafkap_str_new(rk->rk_conf.group_instance_id, -1); diff --git a/src/rdkafka_partition.c b/src/rdkafka_partition.c index 56f945dfd9..bd2904c1e8 100644 --- a/src/rdkafka_partition.c +++ b/src/rdkafka_partition.c @@ -252,7 +252,7 @@ rd_kafka_toppar_t *rd_kafka_toppar_new0(rd_kafka_topic_t *rkt, mtx_init(&rktp->rktp_lock, mtx_plain); rd_refcnt_init(&rktp->rktp_refcnt, 0); - rktp->rktp_fetchq = rd_kafka_q_new_consume(rkt->rkt_rk); + rktp->rktp_fetchq = rd_kafka_consume_q_new(rkt->rkt_rk); rktp->rktp_ops = rd_kafka_q_new(rkt->rkt_rk); rktp->rktp_ops->rkq_serve = rd_kafka_toppar_op_serve; rktp->rktp_ops->rkq_opaque = rktp; diff --git a/src/rdkafka_queue.c b/src/rdkafka_queue.c index 47ce46ed8d..3e114c0ce0 100644 --- a/src/rdkafka_queue.c +++ b/src/rdkafka_queue.c @@ -83,18 +83,20 @@ void rd_kafka_q_destroy_final(rd_kafka_q_t *rkq) { */ void rd_kafka_q_init0(rd_kafka_q_t *rkq, rd_kafka_t *rk, - int is_consume, + rd_bool_t for_consume, const char *func, int line) { rd_kafka_q_reset(rkq); - rkq->rkq_fwdq = NULL; - rkq->rkq_refcnt = 1; - rkq->rkq_flags = RD_KAFKA_Q_F_READY; - rkq->rkq_rk = rk; - rkq->rkq_consumer_cnt = is_consume > 0; - rkq->rkq_qio = NULL; - rkq->rkq_serve = NULL; - rkq->rkq_opaque = NULL; + rkq->rkq_fwdq = NULL; + rkq->rkq_refcnt = 1; + rkq->rkq_flags = RD_KAFKA_Q_F_READY; + if (for_consume) { + rkq->rkq_flags |= RD_KAFKA_Q_F_CONSUMER; + } + rkq->rkq_rk = rk; + rkq->rkq_qio = NULL; + rkq->rkq_serve = NULL; + rkq->rkq_opaque = NULL; mtx_init(&rkq->rkq_lock, mtx_plain); cnd_init(&rkq->rkq_cond); #if ENABLE_DEVEL @@ -108,13 +110,15 @@ void rd_kafka_q_init0(rd_kafka_q_t *rkq, /** * Allocate a new queue and initialize it. */ -rd_kafka_q_t * -rd_kafka_q_new0(rd_kafka_t *rk, int is_consume, const char *func, int line) { +rd_kafka_q_t *rd_kafka_q_new0(rd_kafka_t *rk, + rd_bool_t for_consume, + const char *func, + int line) { rd_kafka_q_t *rkq = rd_malloc(sizeof(*rkq)); - if (!is_consume) + if (!for_consume) rd_kafka_q_init(rkq, rk); else - rd_kafka_q_init_consume(rkq, rk); + rd_kafka_consume_q_init(rkq, rk); rkq->rkq_flags |= RD_KAFKA_Q_F_ALLOCATED; #if ENABLE_DEVEL rd_snprintf(rkq->rkq_name, sizeof(rkq->rkq_name), "%s:%d", func, line); @@ -125,29 +129,29 @@ rd_kafka_q_new0(rd_kafka_t *rk, int is_consume, const char *func, int line) { } /* - * Handles the logic for increment/decrement of rkq_consumer_cnt. - * Only used internally when forwarding queues. + * Sets the flag RD_KAFKA_Q_F_CONSUMER for rkq, any queues it's being forwarded + * to, recursively. + * Setting this flag indicates that polling this queue is equivalent to calling + * consumer poll, and will reset the max.poll.interval.ms timer. Only used + * internally when forwarding queues. * @locks rd_kafka_q_lock(rkq) */ -static void rd_kafka_q_consumer_cnt_propagate(rd_kafka_q_t *rkq, - int cnt_delta) { - rd_assert(rkq); - if (cnt_delta == 0) - return; - +static void rd_kafka_q_consumer_propagate(rd_kafka_q_t *rkq) { mtx_lock(&rkq->rkq_lock); - rkq->rkq_consumer_cnt += cnt_delta; - rd_assert(rkq->rkq_consumer_cnt >= 0); + rkq->rkq_flags |= RD_KAFKA_Q_F_CONSUMER; if (!rkq->rkq_fwdq) { mtx_unlock(&rkq->rkq_lock); return; } - /* Recursively propagate the change in the count. There is a chance of a - * deadlock here but only if the user is forwarding queues in a circular - * manner, which already is an incorrect mode of operation.*/ - rd_kafka_q_consumer_cnt_propagate(rkq->rkq_fwdq, cnt_delta); + /* Recursively propagate the flag to any queues rkq is already + * forwarding to. There will be a deadlock here if the queues are being + * forwarded circularly, but that is a user error. We can't resolve this + * deadlock by unlocking before the recursive call, because that leads + * to incorrectness if the rkq_fwdq is forwarded elsewhere and the old + * one destroyed between recursive calls. */ + rd_kafka_q_consumer_propagate(rkq->rkq_fwdq); mtx_unlock(&rkq->rkq_lock); } @@ -171,8 +175,6 @@ void rd_kafka_q_fwd_set0(rd_kafka_q_t *srcq, if (fwd_app) srcq->rkq_flags |= RD_KAFKA_Q_F_FWD_APP; if (srcq->rkq_fwdq) { - rd_kafka_q_consumer_cnt_propagate(srcq->rkq_fwdq, - -1 * srcq->rkq_consumer_cnt); rd_kafka_q_destroy(srcq->rkq_fwdq); srcq->rkq_fwdq = NULL; } @@ -187,8 +189,9 @@ void rd_kafka_q_fwd_set0(rd_kafka_q_t *srcq, } srcq->rkq_fwdq = destq; - rd_kafka_q_consumer_cnt_propagate(destq, - srcq->rkq_consumer_cnt); + + if (srcq->rkq_flags & RD_KAFKA_Q_F_CONSUMER) + rd_kafka_q_consumer_propagate(destq); } if (do_lock) mtx_unlock(&srcq->rkq_lock); @@ -636,7 +639,7 @@ int rd_kafka_q_serve_rkmessages(rd_kafka_q_t *rkq, rd_kafka_q_t *fwdq; struct timespec timeout_tspec; int i; - int does_q_contain_consumer_msgs; + rd_bool_t can_q_contain_fetched_msgs; mtx_lock(&rkq->rkq_lock); if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) { @@ -649,10 +652,11 @@ int rd_kafka_q_serve_rkmessages(rd_kafka_q_t *rkq, return cnt; } - does_q_contain_consumer_msgs = rd_kafka_q_consumer_cnt(rkq, 0); + can_q_contain_fetched_msgs = + rd_kafka_q_can_contain_fetched_msgs(rkq, RD_DONT_LOCK); mtx_unlock(&rkq->rkq_lock); - if (timeout_ms && does_q_contain_consumer_msgs) + if (timeout_ms && can_q_contain_fetched_msgs) rd_kafka_app_poll_blocking(rk); rd_timeout_init_timespec(&timeout_tspec, timeout_ms); @@ -759,7 +763,7 @@ int rd_kafka_q_serve_rkmessages(rd_kafka_q_t *rkq, rd_kafka_op_destroy(rko); } - if (does_q_contain_consumer_msgs) + if (can_q_contain_fetched_msgs) rd_kafka_app_polled(rk); return cnt; diff --git a/src/rdkafka_queue.h b/src/rdkafka_queue.h index b692b702d3..82abe4deef 100644 --- a/src/rdkafka_queue.h +++ b/src/rdkafka_queue.h @@ -75,12 +75,11 @@ struct rd_kafka_q_s { * by triggering the cond-var \ * but without having to enqueue \ * an op. */ - - /* Set to 1 for queues which contain consumer messages, and 0 otherwise. - * If a queue A forwards its messages to a queue B, B.rkq_consumer_cnt - * += A.rkq_consumer_cnt. When queue A is set to forward to NULL, this - * is reverted. */ - int rkq_consumer_cnt; +#define RD_KAFKA_Q_F_CONSUMER \ + 0x10 /* If this flag is set, this queue might contain fetched messages \ + from partitions. Polling this queue will reset the \ + max.poll.interval.ms timer. Once set, this flag is never \ + reset. */ rd_kafka_t *rkq_rk; struct rd_kafka_q_io *rkq_qio; /* FD-based application signalling */ @@ -129,18 +128,20 @@ static RD_INLINE RD_UNUSED int rd_kafka_q_ready(rd_kafka_q_t *rkq) { void rd_kafka_q_init0(rd_kafka_q_t *rkq, rd_kafka_t *rk, - int is_consume, + rd_bool_t for_consume, const char *func, int line); #define rd_kafka_q_init(rkq, rk) \ - rd_kafka_q_init0(rkq, rk, 0, __FUNCTION__, __LINE__) -#define rd_kafka_q_init_consume(rkq, rk) \ - rd_kafka_q_init0(rkq, rk, 1, __FUNCTION__, __LINE__) -rd_kafka_q_t * -rd_kafka_q_new0(rd_kafka_t *rk, int is_consume, const char *func, int line); -#define rd_kafka_q_new(rk) rd_kafka_q_new0(rk, 0, __FUNCTION__, __LINE__) -#define rd_kafka_q_new_consume(rk) \ - rd_kafka_q_new0(rk, 1, __FUNCTION__, __LINE__) + rd_kafka_q_init0(rkq, rk, rd_false, __FUNCTION__, __LINE__) +#define rd_kafka_consume_q_init(rkq, rk) \ + rd_kafka_q_init0(rkq, rk, rd_true, __FUNCTION__, __LINE__) +rd_kafka_q_t *rd_kafka_q_new0(rd_kafka_t *rk, + rd_bool_t for_consume, + const char *func, + int line); +#define rd_kafka_q_new(rk) rd_kafka_q_new0(rk, rd_false, __FUNCTION__, __LINE__) +#define rd_kafka_consume_q_new(rk) \ + rd_kafka_q_new0(rk, rd_true, __FUNCTION__, __LINE__) void rd_kafka_q_destroy_final(rd_kafka_q_t *rkq); #define rd_kafka_q_lock(rkqu) mtx_lock(&(rkqu)->rkq_lock) @@ -1177,19 +1178,19 @@ rd_kafka_enq_once_disable(rd_kafka_enq_once_t *eonce) { } /** - * @brief Returns the rkq's rkq_consumer_cnt. + * @brief Returns true if the queue can contain fetched messages. * * @locks rd_kafka_q_lock(rkq) if do_lock is set. */ -static RD_INLINE RD_UNUSED int rd_kafka_q_consumer_cnt(rd_kafka_q_t *rkq, - int do_lock) { - int cnt; +static RD_INLINE RD_UNUSED rd_bool_t +rd_kafka_q_can_contain_fetched_msgs(rd_kafka_q_t *rkq, rd_bool_t do_lock) { + rd_bool_t val; if (do_lock) mtx_lock(&rkq->rkq_lock); - cnt = rkq->rkq_consumer_cnt; + val = rkq->rkq_flags & RD_KAFKA_Q_F_CONSUMER; if (do_lock) mtx_unlock(&rkq->rkq_lock); - return cnt; + return val; }