Skip to content

Commit

Permalink
Change queue wakeup rate-limiting to be once per poll-period instead …
Browse files Browse the repository at this point in the history
…of based on rate (#2912)

The IO-based wakeup (rkq_qio) now has a `sent` boolean that tracks whether
a wakeup has been sent, this boolean is reset by the queue reader each time
it polls the queue - effectively allowing only one wakeup-event per non-polling
period and voiding the need for rate-limiting the wakeups.
  • Loading branch information
edenhill committed Apr 14, 2021
1 parent b31363f commit 01310aa
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 25 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ librdkafka v1.7.0 is feature release:

### Consumer fixes

* If a rebalance happened during a `consume_batch..()` call the already
accumulated messages for revoked partitions were not purged, which would
pass messages to the application for partitions that were no longer owned
by the consumer. Fixed by @jliunyu. #3340.
* The consumer group deemed cached metadata up to date by checking
`topic.metadata.refresh.interval.ms`: if this property was set too low
it would cause cached metadata to be unusable and new metadata to be fetched,
Expand All @@ -69,6 +73,10 @@ librdkafka v1.7.0 is feature release:
created partition objects, or partitions that were changing leaders, to
not have their message queues purged. This could cause
`abort_transaction()` to time out. This issue is now fixed.
* In certain high-thruput produce rate patterns the producing could stall for
1 second, regardless of `linger.ms`, due to rate-limiting of internal
queue wakeups. This is now fixed by not rate-limiting queue wakeups but
instead limiting them to one wakeup per queue reader poll. #2912.

### Transactional Producer fixes

Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka_partition.c
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ void rd_kafka_toppar_enq_msg (rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm) {
rd_kafka_toppar_unlock(rktp);

if (wakeup_q) {
rd_kafka_q_yield(wakeup_q, rd_true/*rate-limit*/);
rd_kafka_q_yield(wakeup_q);
rd_kafka_q_destroy(wakeup_q);
}
}
Expand Down
22 changes: 18 additions & 4 deletions src/rdkafka_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ int rd_kafka_q_purge0 (rd_kafka_q_t *rkq, int do_lock) {
* by locks taken from rd_kafka_op_destroy(). */
TAILQ_MOVE(&tmpq, &rkq->rkq_q, rko_link);

rd_kafka_q_mark_served(rkq);

/* Zero out queue */
rd_kafka_q_reset(rkq);

Expand Down Expand Up @@ -226,6 +228,7 @@ void rd_kafka_q_purge_toppar_version (rd_kafka_q_t *rkq,
size += rko->rko_len;
}

rd_kafka_q_mark_served(rkq);

rkq->rkq_qlen -= cnt;
rkq->rkq_qsize -= size;
Expand Down Expand Up @@ -256,7 +259,7 @@ int rd_kafka_q_move_cnt (rd_kafka_q_t *dstq, rd_kafka_q_t *srcq,

if (!dstq->rkq_fwdq && !srcq->rkq_fwdq) {
if (cnt > 0 && dstq->rkq_qlen == 0)
rd_kafka_q_io_event(dstq, rd_false/*no rate-limiting*/);
rd_kafka_q_io_event(dstq);

/* Optimization, if 'cnt' is equal/larger than all
* items of 'srcq' we can move the entire queue. */
Expand Down Expand Up @@ -284,6 +287,9 @@ int rd_kafka_q_move_cnt (rd_kafka_q_t *dstq, rd_kafka_q_t *srcq,
mcnt++;
}
}

rd_kafka_q_mark_served(srcq);

} else
mcnt = rd_kafka_q_move_cnt(dstq->rkq_fwdq ? dstq->rkq_fwdq:dstq,
srcq->rkq_fwdq ? srcq->rkq_fwdq:srcq,
Expand Down Expand Up @@ -363,6 +369,8 @@ rd_kafka_op_t *rd_kafka_q_pop_serve (rd_kafka_q_t *rkq, rd_ts_t timeout_us,
!(rko = rd_kafka_op_filter(rkq, rko, version)))
;

rd_kafka_q_mark_served(rkq);

if (rko) {
/* Proper versioned op */
rd_kafka_q_deq0(rkq, rko);
Expand Down Expand Up @@ -475,6 +483,8 @@ int rd_kafka_q_serve (rd_kafka_q_t *rkq, int timeout_ms,
&timeout_tspec) == thrd_success)
;

rd_kafka_q_mark_served(rkq);

if (!rko) {
mtx_unlock(&rkq->rkq_lock);
return 0;
Expand Down Expand Up @@ -591,6 +601,8 @@ int rd_kafka_q_serve_rkmessages (rd_kafka_q_t *rkq, int timeout_ms,
&timeout_tspec) == thrd_success)
;

rd_kafka_q_mark_served(rkq);

if (!rko) {
mtx_unlock(&rkq->rkq_lock);
break; /* Timed out */
Expand Down Expand Up @@ -771,8 +783,7 @@ void rd_kafka_q_io_event_enable (rd_kafka_q_t *rkq, rd_socket_t fd,
qio->fd = fd;
qio->size = size;
qio->payload = (void *)(qio+1);
qio->ts_rate = rkq->rkq_rk->rk_conf.buffering_max_us;
qio->ts_last = 0;
qio->sent = rd_false;
qio->event_cb = NULL;
qio->event_cb_opaque = NULL;
memcpy(qio->payload, payload, size);
Expand All @@ -799,7 +810,7 @@ void rd_kafka_queue_io_event_enable (rd_kafka_queue_t *rkqu, int fd,


void rd_kafka_queue_yield (rd_kafka_queue_t *rkqu) {
rd_kafka_q_yield(rkqu->rkqu_q, rd_true);
rd_kafka_q_yield(rkqu->rkqu_q);
}


Expand Down Expand Up @@ -894,6 +905,9 @@ int rd_kafka_q_apply (rd_kafka_q_t *rkq,
next = TAILQ_NEXT(next, rko_link);
cnt += callback(rkq, rko, opaque);
}

rd_kafka_q_mark_served(rkq);

mtx_unlock(&rkq->rkq_lock);

return cnt;
Expand Down
67 changes: 48 additions & 19 deletions src/rdkafka_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@

TAILQ_HEAD(rd_kafka_op_tailq, rd_kafka_op_s);

/**
* @struct Queue for rd_kafka_op_t*.
*
* @remark All readers of the queue must call rd_kafka_q_mark_served()
* after reading the queue (while still holding the queue lock) to
* clear the wakeup-sent flag.
*/
struct rd_kafka_q_s {
mtx_t rkq_lock;
cnd_t rkq_cond;
Expand Down Expand Up @@ -90,8 +97,10 @@ struct rd_kafka_q_io {
rd_socket_t fd;
void *payload;
size_t size;
rd_ts_t ts_rate; /**< How often the IO wakeup may be performed (us) */
rd_ts_t ts_last; /**< Last IO wakeup */
rd_bool_t sent; /**< Wake-up has been sent.
* This field is reset to false by the queue
* reader, allowing a new wake-up to be sent by a
* subsequent writer. */
/* For callback-based signalling */
void (*event_cb) (rd_kafka_t *rk, void *opaque);
void *event_cb_opaque;
Expand Down Expand Up @@ -286,31 +295,32 @@ static RD_INLINE RD_UNUSED int rd_kafka_q_is_fwded (rd_kafka_q_t *rkq) {
/**
* @brief Trigger an IO event for this queue.
*
* @param rate_limit if true, rate limit IO-based wakeups.
*
* @remark Queue MUST be locked
*/
static RD_INLINE RD_UNUSED
void rd_kafka_q_io_event (rd_kafka_q_t *rkq, rd_bool_t rate_limit) {
void rd_kafka_q_io_event (rd_kafka_q_t *rkq) {

if (likely(!rkq->rkq_qio))
return;

if (rkq->rkq_qio->event_cb) {
rkq->rkq_qio->event_cb(rkq->rkq_rk, rkq->rkq_qio->event_cb_opaque);
rkq->rkq_qio->event_cb(rkq->rkq_rk,
rkq->rkq_qio->event_cb_opaque);
return;
}


if (rate_limit) {
rd_ts_t now = rd_clock();
if (likely(rkq->rkq_qio->ts_last + rkq->rkq_qio->ts_rate > now))
return;
/* Only one wake-up event should be sent per non-polling period.
* As the queue reader calls poll/reads the channel it calls to
* rd_kafka_q_mark_served() to reset the wakeup sent flag, allowing
* further wakeups in the next non-polling period. */
if (rkq->rkq_qio->sent)
return; /* Wake-up event already written */

rkq->rkq_qio->ts_last = now;
}
rkq->rkq_qio->sent = rd_true;

/* Ignore errors, not much to do anyway. */
/* Write wake-up event to socket.
* Ignore errors, not much to do anyway. */
if (rd_write(rkq->rkq_qio->fd, rkq->rkq_qio->payload,
(int)rkq->rkq_qio->size) == -1)
;
Expand All @@ -333,7 +343,7 @@ int rd_kafka_op_cmp_prio (const void *_a, const void *_b) {
* @brief Wake up waiters without enqueuing an op.
*/
static RD_INLINE RD_UNUSED void
rd_kafka_q_yield (rd_kafka_q_t *rkq, rd_bool_t rate_limit) {
rd_kafka_q_yield (rd_kafka_q_t *rkq) {
rd_kafka_q_t *fwdq;

mtx_lock(&rkq->rkq_lock);
Expand All @@ -350,12 +360,12 @@ rd_kafka_q_yield (rd_kafka_q_t *rkq, rd_bool_t rate_limit) {
rkq->rkq_flags |= RD_KAFKA_Q_F_YIELD;
cnd_broadcast(&rkq->rkq_cond);
if (rkq->rkq_qlen == 0)
rd_kafka_q_io_event(rkq, rate_limit);
rd_kafka_q_io_event(rkq);

mtx_unlock(&rkq->rkq_lock);
} else {
mtx_unlock(&rkq->rkq_lock);
rd_kafka_q_yield(fwdq, rate_limit);
rd_kafka_q_yield(fwdq);
rd_kafka_q_destroy(fwdq);
}

Expand Down Expand Up @@ -426,7 +436,7 @@ int rd_kafka_q_enq1 (rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
rd_kafka_q_enq0(rkq, rko, at_head);
cnd_signal(&rkq->rkq_cond);
if (rkq->rkq_qlen == 1)
rd_kafka_q_io_event(rkq, rd_false/*no rate-limiting*/);
rd_kafka_q_io_event(rkq);

if (do_lock)
mtx_unlock(&rkq->rkq_lock);
Expand Down Expand Up @@ -490,6 +500,23 @@ void rd_kafka_q_deq0 (rd_kafka_q_t *rkq, rd_kafka_op_t *rko) {
rkq->rkq_qsize -= rko->rko_len;
}


/**
* @brief Mark queue as served / read.
*
* This is currently used by the queue reader side to reset the io-event
* wakeup flag.
*
* Should be called by all queue readers.
*
* @locks_required rkq must be locked.
*/
static RD_INLINE RD_UNUSED void rd_kafka_q_mark_served (rd_kafka_q_t *rkq) {
if (rkq->rkq_qio)
rkq->rkq_qio->sent = rd_false;
}


/**
* Concat all elements of 'srcq' onto tail of 'rkq'.
* 'rkq' will be be locked (if 'do_lock'==1), but 'srcq' will not.
Expand Down Expand Up @@ -531,11 +558,12 @@ int rd_kafka_q_concat0 (rd_kafka_q_t *rkq, rd_kafka_q_t *srcq, int do_lock) {

TAILQ_CONCAT(&rkq->rkq_q, &srcq->rkq_q, rko_link);
if (rkq->rkq_qlen == 0)
rd_kafka_q_io_event(rkq, rd_false/*no rate-limiting*/);
rd_kafka_q_io_event(rkq);
rkq->rkq_qlen += srcq->rkq_qlen;
rkq->rkq_qsize += srcq->rkq_qsize;
cnd_signal(&rkq->rkq_cond);

rd_kafka_q_mark_served(srcq);
rd_kafka_q_reset(srcq);
} else
r = rd_kafka_q_concat0(rkq->rkq_fwdq ? rkq->rkq_fwdq : rkq,
Expand Down Expand Up @@ -572,10 +600,11 @@ void rd_kafka_q_prepend0 (rd_kafka_q_t *rkq, rd_kafka_q_t *srcq,
/* Move srcq to rkq */
TAILQ_MOVE(&rkq->rkq_q, &srcq->rkq_q, rko_link);
if (rkq->rkq_qlen == 0 && srcq->rkq_qlen > 0)
rd_kafka_q_io_event(rkq, rd_false/*no rate-limiting*/);
rd_kafka_q_io_event(rkq);
rkq->rkq_qlen += srcq->rkq_qlen;
rkq->rkq_qsize += srcq->rkq_qsize;

rd_kafka_q_mark_served(srcq);
rd_kafka_q_reset(srcq);
} else
rd_kafka_q_prepend0(rkq->rkq_fwdq ? rkq->rkq_fwdq : rkq,
Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka_timer.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ static void rd_kafka_timer_schedule (rd_kafka_timers_t *rkts,
TAILQ_INSERT_HEAD(&rkts->rkts_timers, rtmr, rtmr_link);
cnd_signal(&rkts->rkts_cond);
if (rkts->rkts_wakeq)
rd_kafka_q_yield(rkts->rkts_wakeq, rd_true);
rd_kafka_q_yield(rkts->rkts_wakeq);
} else
TAILQ_INSERT_SORTED(&rkts->rkts_timers, rtmr,
rd_kafka_timer_t *, rtmr_link,
Expand Down

0 comments on commit 01310aa

Please sign in to comment.