From 9832f1d3f34c564e1ecbfaaf63c64135bd445fc5 Mon Sep 17 00:00:00 2001 From: Magnus Edenhill Date: Wed, 14 Apr 2021 09:40:00 +0200 Subject: [PATCH] Change queue wakeup rate-limiting to be once per poll-period instead of based on rate (#2912) The IO-based wakeup (rkq_qio) now has a `sent` boolean that tracks whether a wakeup has been sent, this boolean is reset by the queue reader each time it polls the queue - effectively allowing only one wakeup-event per non-polling period and voiding the need for rate-limiting the wakeups. --- CHANGELOG.md | 8 +++++ src/rdkafka_partition.c | 2 +- src/rdkafka_queue.c | 22 +++++++++++--- src/rdkafka_queue.h | 67 +++++++++++++++++++++++++++++------------ src/rdkafka_timer.c | 2 +- 5 files changed, 76 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f0fb197a46..e4a59f211b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,6 +49,10 @@ librdkafka v1.7.0 is feature release: ### Consumer fixes + * If a rebalance happened during a `consume_batch..()` call the already + accumulated messages for revoked partitions were not purged, which would + pass messages to the application for partitions that were no longer owned + by the consumer. Fixed by @jliunyu. #3340. * The consumer group deemed cached metadata up to date by checking `topic.metadata.refresh.interval.ms`: if this property was set too low it would cause cached metadata to be unusable and new metadata to be fetched, @@ -69,6 +73,10 @@ librdkafka v1.7.0 is feature release: created partition objects, or partitions that were changing leaders, to not have their message queues purged. This could cause `abort_transaction()` to time out. This issue is now fixed. + * In certain high-thruput produce rate patterns the producing could stall for + 1 second, regardless of `linger.ms`, due to rate-limiting of internal + queue wakeups. This is now fixed by not rate-limiting queue wakeups but + instead limiting them to one wakeup per queue reader poll. #2912. ### Transactional Producer fixes diff --git a/src/rdkafka_partition.c b/src/rdkafka_partition.c index 984ddcbf5a..3c455a6a08 100644 --- a/src/rdkafka_partition.c +++ b/src/rdkafka_partition.c @@ -709,7 +709,7 @@ void rd_kafka_toppar_enq_msg (rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm) { rd_kafka_toppar_unlock(rktp); if (wakeup_q) { - rd_kafka_q_yield(wakeup_q, rd_true/*rate-limit*/); + rd_kafka_q_yield(wakeup_q); rd_kafka_q_destroy(wakeup_q); } } diff --git a/src/rdkafka_queue.c b/src/rdkafka_queue.c index c975ea2057..b43225a009 100644 --- a/src/rdkafka_queue.c +++ b/src/rdkafka_queue.c @@ -175,6 +175,8 @@ int rd_kafka_q_purge0 (rd_kafka_q_t *rkq, int do_lock) { * by locks taken from rd_kafka_op_destroy(). */ TAILQ_MOVE(&tmpq, &rkq->rkq_q, rko_link); + rd_kafka_q_mark_served(rkq); + /* Zero out queue */ rd_kafka_q_reset(rkq); @@ -226,6 +228,7 @@ void rd_kafka_q_purge_toppar_version (rd_kafka_q_t *rkq, size += rko->rko_len; } + rd_kafka_q_mark_served(rkq); rkq->rkq_qlen -= cnt; rkq->rkq_qsize -= size; @@ -256,7 +259,7 @@ int rd_kafka_q_move_cnt (rd_kafka_q_t *dstq, rd_kafka_q_t *srcq, if (!dstq->rkq_fwdq && !srcq->rkq_fwdq) { if (cnt > 0 && dstq->rkq_qlen == 0) - rd_kafka_q_io_event(dstq, rd_false/*no rate-limiting*/); + rd_kafka_q_io_event(dstq); /* Optimization, if 'cnt' is equal/larger than all * items of 'srcq' we can move the entire queue. */ @@ -284,6 +287,9 @@ int rd_kafka_q_move_cnt (rd_kafka_q_t *dstq, rd_kafka_q_t *srcq, mcnt++; } } + + rd_kafka_q_mark_served(srcq); + } else mcnt = rd_kafka_q_move_cnt(dstq->rkq_fwdq ? dstq->rkq_fwdq:dstq, srcq->rkq_fwdq ? srcq->rkq_fwdq:srcq, @@ -363,6 +369,8 @@ rd_kafka_op_t *rd_kafka_q_pop_serve (rd_kafka_q_t *rkq, rd_ts_t timeout_us, !(rko = rd_kafka_op_filter(rkq, rko, version))) ; + rd_kafka_q_mark_served(rkq); + if (rko) { /* Proper versioned op */ rd_kafka_q_deq0(rkq, rko); @@ -475,6 +483,8 @@ int rd_kafka_q_serve (rd_kafka_q_t *rkq, int timeout_ms, &timeout_tspec) == thrd_success) ; + rd_kafka_q_mark_served(rkq); + if (!rko) { mtx_unlock(&rkq->rkq_lock); return 0; @@ -591,6 +601,8 @@ int rd_kafka_q_serve_rkmessages (rd_kafka_q_t *rkq, int timeout_ms, &timeout_tspec) == thrd_success) ; + rd_kafka_q_mark_served(rkq); + if (!rko) { mtx_unlock(&rkq->rkq_lock); break; /* Timed out */ @@ -771,8 +783,7 @@ void rd_kafka_q_io_event_enable (rd_kafka_q_t *rkq, rd_socket_t fd, qio->fd = fd; qio->size = size; qio->payload = (void *)(qio+1); - qio->ts_rate = rkq->rkq_rk->rk_conf.buffering_max_us; - qio->ts_last = 0; + qio->sent = rd_false; qio->event_cb = NULL; qio->event_cb_opaque = NULL; memcpy(qio->payload, payload, size); @@ -799,7 +810,7 @@ void rd_kafka_queue_io_event_enable (rd_kafka_queue_t *rkqu, int fd, void rd_kafka_queue_yield (rd_kafka_queue_t *rkqu) { - rd_kafka_q_yield(rkqu->rkqu_q, rd_true); + rd_kafka_q_yield(rkqu->rkqu_q); } @@ -894,6 +905,9 @@ int rd_kafka_q_apply (rd_kafka_q_t *rkq, next = TAILQ_NEXT(next, rko_link); cnt += callback(rkq, rko, opaque); } + + rd_kafka_q_mark_served(rkq); + mtx_unlock(&rkq->rkq_lock); return cnt; diff --git a/src/rdkafka_queue.h b/src/rdkafka_queue.h index e68a52584c..90216768be 100644 --- a/src/rdkafka_queue.h +++ b/src/rdkafka_queue.h @@ -42,6 +42,13 @@ TAILQ_HEAD(rd_kafka_op_tailq, rd_kafka_op_s); +/** + * @struct Queue for rd_kafka_op_t*. + * + * @remark All readers of the queue must call rd_kafka_q_mark_served() + * after reading the queue (while still holding the queue lock) to + * clear the wakeup-sent flag. + */ struct rd_kafka_q_s { mtx_t rkq_lock; cnd_t rkq_cond; @@ -90,8 +97,10 @@ struct rd_kafka_q_io { rd_socket_t fd; void *payload; size_t size; - rd_ts_t ts_rate; /**< How often the IO wakeup may be performed (us) */ - rd_ts_t ts_last; /**< Last IO wakeup */ + rd_bool_t sent; /**< Wake-up has been sent. + * This field is reset to false by the queue + * reader, allowing a new wake-up to be sent by a + * subsequent writer. */ /* For callback-based signalling */ void (*event_cb) (rd_kafka_t *rk, void *opaque); void *event_cb_opaque; @@ -286,31 +295,32 @@ static RD_INLINE RD_UNUSED int rd_kafka_q_is_fwded (rd_kafka_q_t *rkq) { /** * @brief Trigger an IO event for this queue. * - * @param rate_limit if true, rate limit IO-based wakeups. - * * @remark Queue MUST be locked */ static RD_INLINE RD_UNUSED -void rd_kafka_q_io_event (rd_kafka_q_t *rkq, rd_bool_t rate_limit) { +void rd_kafka_q_io_event (rd_kafka_q_t *rkq) { if (likely(!rkq->rkq_qio)) return; if (rkq->rkq_qio->event_cb) { - rkq->rkq_qio->event_cb(rkq->rkq_rk, rkq->rkq_qio->event_cb_opaque); + rkq->rkq_qio->event_cb(rkq->rkq_rk, + rkq->rkq_qio->event_cb_opaque); return; } - if (rate_limit) { - rd_ts_t now = rd_clock(); - if (likely(rkq->rkq_qio->ts_last + rkq->rkq_qio->ts_rate > now)) - return; + /* Only one wake-up event should be sent per non-polling period. + * As the queue reader calls poll/reads the channel it calls to + * rd_kafka_q_mark_served() to reset the wakeup sent flag, allowing + * further wakeups in the next non-polling period. */ + if (rkq->rkq_qio->sent) + return; /* Wake-up event already written */ - rkq->rkq_qio->ts_last = now; - } + rkq->rkq_qio->sent = rd_true; - /* Ignore errors, not much to do anyway. */ + /* Write wake-up event to socket. + * Ignore errors, not much to do anyway. */ if (rd_write(rkq->rkq_qio->fd, rkq->rkq_qio->payload, (int)rkq->rkq_qio->size) == -1) ; @@ -333,7 +343,7 @@ int rd_kafka_op_cmp_prio (const void *_a, const void *_b) { * @brief Wake up waiters without enqueuing an op. */ static RD_INLINE RD_UNUSED void -rd_kafka_q_yield (rd_kafka_q_t *rkq, rd_bool_t rate_limit) { +rd_kafka_q_yield (rd_kafka_q_t *rkq) { rd_kafka_q_t *fwdq; mtx_lock(&rkq->rkq_lock); @@ -350,12 +360,12 @@ rd_kafka_q_yield (rd_kafka_q_t *rkq, rd_bool_t rate_limit) { rkq->rkq_flags |= RD_KAFKA_Q_F_YIELD; cnd_broadcast(&rkq->rkq_cond); if (rkq->rkq_qlen == 0) - rd_kafka_q_io_event(rkq, rate_limit); + rd_kafka_q_io_event(rkq); mtx_unlock(&rkq->rkq_lock); } else { mtx_unlock(&rkq->rkq_lock); - rd_kafka_q_yield(fwdq, rate_limit); + rd_kafka_q_yield(fwdq); rd_kafka_q_destroy(fwdq); } @@ -426,7 +436,7 @@ int rd_kafka_q_enq1 (rd_kafka_q_t *rkq, rd_kafka_op_t *rko, rd_kafka_q_enq0(rkq, rko, at_head); cnd_signal(&rkq->rkq_cond); if (rkq->rkq_qlen == 1) - rd_kafka_q_io_event(rkq, rd_false/*no rate-limiting*/); + rd_kafka_q_io_event(rkq); if (do_lock) mtx_unlock(&rkq->rkq_lock); @@ -490,6 +500,23 @@ void rd_kafka_q_deq0 (rd_kafka_q_t *rkq, rd_kafka_op_t *rko) { rkq->rkq_qsize -= rko->rko_len; } + +/** + * @brief Mark queue as served / read. + * + * This is currently used by the queue reader side to reset the io-event + * wakeup flag. + * + * Should be called by all queue readers. + * + * @locks_required rkq must be locked. +*/ +static RD_INLINE RD_UNUSED void rd_kafka_q_mark_served (rd_kafka_q_t *rkq) { + if (rkq->rkq_qio) + rkq->rkq_qio->sent = rd_false; +} + + /** * Concat all elements of 'srcq' onto tail of 'rkq'. * 'rkq' will be be locked (if 'do_lock'==1), but 'srcq' will not. @@ -531,11 +558,12 @@ int rd_kafka_q_concat0 (rd_kafka_q_t *rkq, rd_kafka_q_t *srcq, int do_lock) { TAILQ_CONCAT(&rkq->rkq_q, &srcq->rkq_q, rko_link); if (rkq->rkq_qlen == 0) - rd_kafka_q_io_event(rkq, rd_false/*no rate-limiting*/); + rd_kafka_q_io_event(rkq); rkq->rkq_qlen += srcq->rkq_qlen; rkq->rkq_qsize += srcq->rkq_qsize; cnd_signal(&rkq->rkq_cond); + rd_kafka_q_mark_served(srcq); rd_kafka_q_reset(srcq); } else r = rd_kafka_q_concat0(rkq->rkq_fwdq ? rkq->rkq_fwdq : rkq, @@ -572,10 +600,11 @@ void rd_kafka_q_prepend0 (rd_kafka_q_t *rkq, rd_kafka_q_t *srcq, /* Move srcq to rkq */ TAILQ_MOVE(&rkq->rkq_q, &srcq->rkq_q, rko_link); if (rkq->rkq_qlen == 0 && srcq->rkq_qlen > 0) - rd_kafka_q_io_event(rkq, rd_false/*no rate-limiting*/); + rd_kafka_q_io_event(rkq); rkq->rkq_qlen += srcq->rkq_qlen; rkq->rkq_qsize += srcq->rkq_qsize; + rd_kafka_q_mark_served(srcq); rd_kafka_q_reset(srcq); } else rd_kafka_q_prepend0(rkq->rkq_fwdq ? rkq->rkq_fwdq : rkq, diff --git a/src/rdkafka_timer.c b/src/rdkafka_timer.c index 9ee41da810..2657808a2f 100644 --- a/src/rdkafka_timer.c +++ b/src/rdkafka_timer.c @@ -82,7 +82,7 @@ static void rd_kafka_timer_schedule (rd_kafka_timers_t *rkts, TAILQ_INSERT_HEAD(&rkts->rkts_timers, rtmr, rtmr_link); cnd_signal(&rkts->rkts_cond); if (rkts->rkts_wakeq) - rd_kafka_q_yield(rkts->rkts_wakeq, rd_true); + rd_kafka_q_yield(rkts->rkts_wakeq); } else TAILQ_INSERT_SORTED(&rkts->rkts_timers, rtmr, rd_kafka_timer_t *, rtmr_link,