Skip to content

Commit

Permalink
Switch from int to flag
Browse files Browse the repository at this point in the history
  • Loading branch information
milindl committed Apr 19, 2023
1 parent dfe6e6f commit 68aaa51
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 81 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ librdkafka v2.1.1 is a bugfix release:

* Fixes certain cases where polling would not rejoin the consumer group
when the cause of leaving the group was exceeding `max.poll.interval.ms`.


## Fixes

### Consumer fixes
Expand Down
45 changes: 23 additions & 22 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -3050,16 +3050,16 @@ static rd_kafka_op_res_t rd_kafka_consume_callback0(
void *opaque) {
struct consume_ctx ctx = {.consume_cb = consume_cb, .opaque = opaque};
rd_kafka_op_res_t res;
const int does_q_contain_consumer_msgs =
rd_kafka_q_consumer_cnt(rkq, 1);
const rd_bool_t can_q_contain_fetched_msgs =
rd_kafka_q_can_contain_fetched_msgs(rkq, RD_DO_LOCK);

if (timeout_ms && does_q_contain_consumer_msgs)
if (timeout_ms && can_q_contain_fetched_msgs)
rd_kafka_app_poll_blocking(rkq->rkq_rk);

res = rd_kafka_q_serve(rkq, timeout_ms, max_cnt, RD_KAFKA_Q_CB_RETURN,
rd_kafka_consume_cb, &ctx);

if (does_q_contain_consumer_msgs)
if (can_q_contain_fetched_msgs)
rd_kafka_app_polled(rkq->rkq_rk);

return res;
Expand Down Expand Up @@ -3124,10 +3124,10 @@ rd_kafka_consume0(rd_kafka_t *rk, rd_kafka_q_t *rkq, int timeout_ms) {
rd_kafka_op_t *rko;
rd_kafka_message_t *rkmessage = NULL;
rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);
const int does_q_contain_consumer_msgs =
rd_kafka_q_consumer_cnt(rkq, 1);
const rd_bool_t can_q_contain_fetched_msgs =
rd_kafka_q_can_contain_fetched_msgs(rkq, RD_DO_LOCK);

if (timeout_ms && does_q_contain_consumer_msgs)
if (timeout_ms && can_q_contain_fetched_msgs)
rd_kafka_app_poll_blocking(rk);

rd_kafka_yield_thread = 0;
Expand All @@ -3146,7 +3146,7 @@ rd_kafka_consume0(rd_kafka_t *rk, rd_kafka_q_t *rkq, int timeout_ms) {
/* Callback called rd_kafka_yield(), we must
* stop dispatching the queue and return. */
rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INTR, EINTR);
if (does_q_contain_consumer_msgs)
if (can_q_contain_fetched_msgs)
rd_kafka_app_polled(rk);
return NULL;
}
Expand All @@ -3159,7 +3159,7 @@ rd_kafka_consume0(rd_kafka_t *rk, rd_kafka_q_t *rkq, int timeout_ms) {
/* Timeout reached with no op returned. */
rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__TIMED_OUT,
ETIMEDOUT);
if (does_q_contain_consumer_msgs)
if (can_q_contain_fetched_msgs)
rd_kafka_app_polled(rk);
return NULL;
}
Expand All @@ -3175,7 +3175,7 @@ rd_kafka_consume0(rd_kafka_t *rk, rd_kafka_q_t *rkq, int timeout_ms) {

rd_kafka_set_last_error(0, 0);

if (does_q_contain_consumer_msgs)
if (can_q_contain_fetched_msgs)
rd_kafka_app_polled(rk);

return rkmessage;
Expand Down Expand Up @@ -4011,16 +4011,16 @@ rd_kafka_op_res_t rd_kafka_poll_cb(rd_kafka_t *rk,

int rd_kafka_poll(rd_kafka_t *rk, int timeout_ms) {
int r;
const int does_q_contain_consumer_msgs =
rd_kafka_q_consumer_cnt(rk->rk_rep, 1);
const rd_bool_t can_q_contain_fetched_msgs =
rd_kafka_q_can_contain_fetched_msgs(rk->rk_rep, RD_DO_LOCK);

if (timeout_ms && does_q_contain_consumer_msgs)
if (timeout_ms && can_q_contain_fetched_msgs)
rd_kafka_app_poll_blocking(rk);

r = rd_kafka_q_serve(rk->rk_rep, timeout_ms, 0, RD_KAFKA_Q_CB_CALLBACK,
rd_kafka_poll_cb, NULL);

if (does_q_contain_consumer_msgs)
if (can_q_contain_fetched_msgs)
rd_kafka_app_polled(rk);

return r;
Expand All @@ -4029,16 +4029,17 @@ int rd_kafka_poll(rd_kafka_t *rk, int timeout_ms) {

rd_kafka_event_t *rd_kafka_queue_poll(rd_kafka_queue_t *rkqu, int timeout_ms) {
rd_kafka_op_t *rko;
const int does_q_contain_consumer_msgs =
rd_kafka_q_consumer_cnt(rkqu->rkqu_q, 1);
const rd_bool_t can_q_contain_fetched_msgs =
rd_kafka_q_can_contain_fetched_msgs(rkqu->rkqu_q, RD_DO_LOCK);

if (timeout_ms && does_q_contain_consumer_msgs)

if (timeout_ms && can_q_contain_fetched_msgs)
rd_kafka_app_poll_blocking(rkqu->rkqu_rk);

rko = rd_kafka_q_pop_serve(rkqu->rkqu_q, rd_timeout_us(timeout_ms), 0,
RD_KAFKA_Q_CB_EVENT, rd_kafka_poll_cb, NULL);

if (does_q_contain_consumer_msgs)
if (can_q_contain_fetched_msgs)
rd_kafka_app_polled(rkqu->rkqu_rk);

if (!rko)
Expand All @@ -4049,16 +4050,16 @@ rd_kafka_event_t *rd_kafka_queue_poll(rd_kafka_queue_t *rkqu, int timeout_ms) {

int rd_kafka_queue_poll_callback(rd_kafka_queue_t *rkqu, int timeout_ms) {
int r;
const int does_q_contain_consumer_msgs =
rd_kafka_q_consumer_cnt(rkqu->rkqu_q, 1);
const rd_bool_t can_q_contain_fetched_msgs =
rd_kafka_q_can_contain_fetched_msgs(rkqu->rkqu_q, RD_DO_LOCK);

if (timeout_ms && does_q_contain_consumer_msgs)
if (timeout_ms && can_q_contain_fetched_msgs)
rd_kafka_app_poll_blocking(rkqu->rkqu_rk);

r = rd_kafka_q_serve(rkqu->rkqu_q, timeout_ms, 0,
RD_KAFKA_Q_CB_CALLBACK, rd_kafka_poll_cb, NULL);

if (does_q_contain_consumer_msgs)
if (can_q_contain_fetched_msgs)
rd_kafka_app_polled(rkqu->rkqu_rk);

return r;
Expand Down
8 changes: 7 additions & 1 deletion src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -3427,10 +3427,16 @@ rd_kafka_error_t *rd_kafka_sasl_set_credentials(rd_kafka_t *rk,
* @returns a reference to the librdkafka consumer queue.
* This is the queue served by rd_kafka_consumer_poll().
*
* Use rd_kafka_queue_destroy() to loose the reference.
* Use rd_kafka_queue_destroy() to lose the reference.
*
* @remark rd_kafka_queue_destroy() MUST be called on this queue
* prior to calling rd_kafka_consumer_close().
* @remark Polling the returned queue counts as a consumer poll, and will reset
* the timer for max.poll.interval.ms. If this queue is forwarded to a
* "destq", polling destq also counts as a consumer poll (this works
* for any number of forwards). However, even if this queue is
* unforwarded or forwarded elsewhere, polling destq will continue
* to count as a consumer poll.
*/
RD_EXPORT
rd_kafka_queue_t *rd_kafka_queue_get_consumer(rd_kafka_t *rk);
Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ rd_kafka_cgrp_t *rd_kafka_cgrp_new(rd_kafka_t *rk,
rkcg->rkcg_wait_coord_q = rd_kafka_q_new(rk);
rkcg->rkcg_wait_coord_q->rkq_serve = rkcg->rkcg_ops->rkq_serve;
rkcg->rkcg_wait_coord_q->rkq_opaque = rkcg->rkcg_ops->rkq_opaque;
rkcg->rkcg_q = rd_kafka_q_new_consume(rk);
rkcg->rkcg_q = rd_kafka_consume_q_new(rk);
rkcg->rkcg_group_instance_id =
rd_kafkap_str_new(rk->rk_conf.group_instance_id, -1);

Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka_partition.c
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ rd_kafka_toppar_t *rd_kafka_toppar_new0(rd_kafka_topic_t *rkt,
mtx_init(&rktp->rktp_lock, mtx_plain);

rd_refcnt_init(&rktp->rktp_refcnt, 0);
rktp->rktp_fetchq = rd_kafka_q_new_consume(rkt->rkt_rk);
rktp->rktp_fetchq = rd_kafka_consume_q_new(rkt->rkt_rk);
rktp->rktp_ops = rd_kafka_q_new(rkt->rkt_rk);
rktp->rktp_ops->rkq_serve = rd_kafka_toppar_op_serve;
rktp->rktp_ops->rkq_opaque = rktp;
Expand Down
74 changes: 39 additions & 35 deletions src/rdkafka_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,18 +83,20 @@ void rd_kafka_q_destroy_final(rd_kafka_q_t *rkq) {
*/
void rd_kafka_q_init0(rd_kafka_q_t *rkq,
rd_kafka_t *rk,
int is_consume,
rd_bool_t for_consume,
const char *func,
int line) {
rd_kafka_q_reset(rkq);
rkq->rkq_fwdq = NULL;
rkq->rkq_refcnt = 1;
rkq->rkq_flags = RD_KAFKA_Q_F_READY;
rkq->rkq_rk = rk;
rkq->rkq_consumer_cnt = is_consume > 0;
rkq->rkq_qio = NULL;
rkq->rkq_serve = NULL;
rkq->rkq_opaque = NULL;
rkq->rkq_fwdq = NULL;
rkq->rkq_refcnt = 1;
rkq->rkq_flags = RD_KAFKA_Q_F_READY;
if (for_consume) {
rkq->rkq_flags |= RD_KAFKA_Q_F_CONSUMER;
}
rkq->rkq_rk = rk;
rkq->rkq_qio = NULL;
rkq->rkq_serve = NULL;
rkq->rkq_opaque = NULL;
mtx_init(&rkq->rkq_lock, mtx_plain);
cnd_init(&rkq->rkq_cond);
#if ENABLE_DEVEL
Expand All @@ -108,13 +110,15 @@ void rd_kafka_q_init0(rd_kafka_q_t *rkq,
/**
* Allocate a new queue and initialize it.
*/
rd_kafka_q_t *
rd_kafka_q_new0(rd_kafka_t *rk, int is_consume, const char *func, int line) {
rd_kafka_q_t *rd_kafka_q_new0(rd_kafka_t *rk,
rd_bool_t for_consume,
const char *func,
int line) {
rd_kafka_q_t *rkq = rd_malloc(sizeof(*rkq));
if (!is_consume)
if (!for_consume)
rd_kafka_q_init(rkq, rk);
else
rd_kafka_q_init_consume(rkq, rk);
rd_kafka_consume_q_init(rkq, rk);
rkq->rkq_flags |= RD_KAFKA_Q_F_ALLOCATED;
#if ENABLE_DEVEL
rd_snprintf(rkq->rkq_name, sizeof(rkq->rkq_name), "%s:%d", func, line);
Expand All @@ -125,29 +129,29 @@ rd_kafka_q_new0(rd_kafka_t *rk, int is_consume, const char *func, int line) {
}

/*
* Handles the logic for increment/decrement of rkq_consumer_cnt.
* Only used internally when forwarding queues.
* Sets the flag RD_KAFKA_Q_F_CONSUMER for rkq, any queues it's being forwarded
* to, recursively.
* Setting this flag indicates that polling this queue is equivalent to calling
* consumer poll, and will reset the max.poll.interval.ms timer. Only used
* internally when forwarding queues.
* @locks rd_kafka_q_lock(rkq)
*/
static void rd_kafka_q_consumer_cnt_propagate(rd_kafka_q_t *rkq,
int cnt_delta) {
rd_assert(rkq);
if (cnt_delta == 0)
return;

static void rd_kafka_q_consumer_propagate(rd_kafka_q_t *rkq) {
mtx_lock(&rkq->rkq_lock);
rkq->rkq_consumer_cnt += cnt_delta;
rd_assert(rkq->rkq_consumer_cnt >= 0);
rkq->rkq_flags |= RD_KAFKA_Q_F_CONSUMER;

if (!rkq->rkq_fwdq) {
mtx_unlock(&rkq->rkq_lock);
return;
}

/* Recursively propagate the change in the count. There is a chance of a
* deadlock here but only if the user is forwarding queues in a circular
* manner, which already is an incorrect mode of operation.*/
rd_kafka_q_consumer_cnt_propagate(rkq->rkq_fwdq, cnt_delta);
/* Recursively propagate the flag to any queues rkq is already
* forwarding to. There will be a deadlock here if the queues are being
* forwarded circularly, but that is a user error. We can't resolve this
* deadlock by unlocking before the recursive call, because that leads
* to incorrectness if the rkq_fwdq is forwarded elsewhere and the old
* one destroyed between recursive calls. */
rd_kafka_q_consumer_propagate(rkq->rkq_fwdq);
mtx_unlock(&rkq->rkq_lock);
}

Expand All @@ -171,8 +175,6 @@ void rd_kafka_q_fwd_set0(rd_kafka_q_t *srcq,
if (fwd_app)
srcq->rkq_flags |= RD_KAFKA_Q_F_FWD_APP;
if (srcq->rkq_fwdq) {
rd_kafka_q_consumer_cnt_propagate(srcq->rkq_fwdq,
-1 * srcq->rkq_consumer_cnt);
rd_kafka_q_destroy(srcq->rkq_fwdq);
srcq->rkq_fwdq = NULL;
}
Expand All @@ -187,8 +189,9 @@ void rd_kafka_q_fwd_set0(rd_kafka_q_t *srcq,
}

srcq->rkq_fwdq = destq;
rd_kafka_q_consumer_cnt_propagate(destq,
srcq->rkq_consumer_cnt);

if (srcq->rkq_flags & RD_KAFKA_Q_F_CONSUMER)
rd_kafka_q_consumer_propagate(destq);
}
if (do_lock)
mtx_unlock(&srcq->rkq_lock);
Expand Down Expand Up @@ -636,7 +639,7 @@ int rd_kafka_q_serve_rkmessages(rd_kafka_q_t *rkq,
rd_kafka_q_t *fwdq;
struct timespec timeout_tspec;
int i;
int does_q_contain_consumer_msgs;
rd_bool_t can_q_contain_fetched_msgs;

mtx_lock(&rkq->rkq_lock);
if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) {
Expand All @@ -649,10 +652,11 @@ int rd_kafka_q_serve_rkmessages(rd_kafka_q_t *rkq,
return cnt;
}

does_q_contain_consumer_msgs = rd_kafka_q_consumer_cnt(rkq, 0);
can_q_contain_fetched_msgs =
rd_kafka_q_can_contain_fetched_msgs(rkq, RD_DONT_LOCK);
mtx_unlock(&rkq->rkq_lock);

if (timeout_ms && does_q_contain_consumer_msgs)
if (timeout_ms && can_q_contain_fetched_msgs)
rd_kafka_app_poll_blocking(rk);

rd_timeout_init_timespec(&timeout_tspec, timeout_ms);
Expand Down Expand Up @@ -759,7 +763,7 @@ int rd_kafka_q_serve_rkmessages(rd_kafka_q_t *rkq,
rd_kafka_op_destroy(rko);
}

if (does_q_contain_consumer_msgs)
if (can_q_contain_fetched_msgs)
rd_kafka_app_polled(rk);

return cnt;
Expand Down
43 changes: 22 additions & 21 deletions src/rdkafka_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,11 @@ struct rd_kafka_q_s {
* by triggering the cond-var \
* but without having to enqueue \
* an op. */

/* Set to 1 for queues which contain consumer messages, and 0 otherwise.
* If a queue A forwards its messages to a queue B, B.rkq_consumer_cnt
* += A.rkq_consumer_cnt. When queue A is set to forward to NULL, this
* is reverted. */
int rkq_consumer_cnt;
#define RD_KAFKA_Q_F_CONSUMER \
0x10 /* If this flag is set, this queue might contain fetched messages \
from partitions. Polling this queue will reset the \
max.poll.interval.ms timer. Once set, this flag is never \
reset. */

rd_kafka_t *rkq_rk;
struct rd_kafka_q_io *rkq_qio; /* FD-based application signalling */
Expand Down Expand Up @@ -129,18 +128,20 @@ static RD_INLINE RD_UNUSED int rd_kafka_q_ready(rd_kafka_q_t *rkq) {

void rd_kafka_q_init0(rd_kafka_q_t *rkq,
rd_kafka_t *rk,
int is_consume,
rd_bool_t for_consume,
const char *func,
int line);
#define rd_kafka_q_init(rkq, rk) \
rd_kafka_q_init0(rkq, rk, 0, __FUNCTION__, __LINE__)
#define rd_kafka_q_init_consume(rkq, rk) \
rd_kafka_q_init0(rkq, rk, 1, __FUNCTION__, __LINE__)
rd_kafka_q_t *
rd_kafka_q_new0(rd_kafka_t *rk, int is_consume, const char *func, int line);
#define rd_kafka_q_new(rk) rd_kafka_q_new0(rk, 0, __FUNCTION__, __LINE__)
#define rd_kafka_q_new_consume(rk) \
rd_kafka_q_new0(rk, 1, __FUNCTION__, __LINE__)
rd_kafka_q_init0(rkq, rk, rd_false, __FUNCTION__, __LINE__)
#define rd_kafka_consume_q_init(rkq, rk) \
rd_kafka_q_init0(rkq, rk, rd_true, __FUNCTION__, __LINE__)
rd_kafka_q_t *rd_kafka_q_new0(rd_kafka_t *rk,
rd_bool_t for_consume,
const char *func,
int line);
#define rd_kafka_q_new(rk) rd_kafka_q_new0(rk, rd_false, __FUNCTION__, __LINE__)
#define rd_kafka_consume_q_new(rk) \
rd_kafka_q_new0(rk, rd_true, __FUNCTION__, __LINE__)
void rd_kafka_q_destroy_final(rd_kafka_q_t *rkq);

#define rd_kafka_q_lock(rkqu) mtx_lock(&(rkqu)->rkq_lock)
Expand Down Expand Up @@ -1177,19 +1178,19 @@ rd_kafka_enq_once_disable(rd_kafka_enq_once_t *eonce) {
}

/**
* @brief Returns the rkq's rkq_consumer_cnt.
* @brief Returns true if the queue can contain fetched messages.
*
* @locks rd_kafka_q_lock(rkq) if do_lock is set.
*/
static RD_INLINE RD_UNUSED int rd_kafka_q_consumer_cnt(rd_kafka_q_t *rkq,
int do_lock) {
int cnt;
static RD_INLINE RD_UNUSED rd_bool_t
rd_kafka_q_can_contain_fetched_msgs(rd_kafka_q_t *rkq, rd_bool_t do_lock) {
rd_bool_t val;
if (do_lock)
mtx_lock(&rkq->rkq_lock);
cnt = rkq->rkq_consumer_cnt;
val = rkq->rkq_flags & RD_KAFKA_Q_F_CONSUMER;
if (do_lock)
mtx_unlock(&rkq->rkq_lock);
return cnt;
return val;
}


Expand Down

0 comments on commit 68aaa51

Please sign in to comment.