Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve (and fix) producer queue wakeups #3798

Merged
merged 16 commits into from
Apr 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ librdkafka v1.9.0 is a feature release:

## Enhancements

* Improved producer queue scheduling. Fixes the performance regression
introduced in v1.7.0 for some produce patterns. (#3538, #2912)
* Windows: Added native Win32 IO/Queue scheduling. This removes the
internal TCP loopback connections that were previously used for timely
queue wakeups.
Expand All @@ -43,12 +45,17 @@ librdkafka v1.9.0 is a feature release:
if not already created.
* Bundled zlib upgraded to version 1.2.12.
* Bundled OpenSSL upgraded to 1.1.1n.
* Added `test.mock.broker.rtt` to simulate RTT/latency for mock brokers.


## Fixes

### General fixes

* Fix various 1 second delays due to internal broker threads blocking on IO
even though there are events to handle.
These delays could be seen randomly in any of the non produce/consume
request APIs, such as `commit_transaction()`, `list_groups()`, etc.
* Windows: some applications would crash with an error message like
`no OPENSSL_Applink()` written to the console if `ssl.keystore.location`
was configured.
Expand Down Expand Up @@ -120,7 +127,10 @@ librdkafka v1.9.0 is a feature release:
broker (added in Apache Kafka 2.8), which could cause the producer to
seemingly hang.
This error code is now correctly handled by raising a fatal error.
* The logic for enforcing that `message.timeout.ms` is greater than
* Improved producer queue wakeup scheduling. This should significantly
decrease the number of wakeups and thus syscalls for high message rate
producers. (#3538, #2912)
* The logic for enforcing that `message.timeout.ms` is greather than
an explicitly configured `linger.ms` was incorrect and instead of
erroring out early the lingering time was automatically adjusted to the
message timeout, ignoring the configured `linger.ms`.
Expand Down
2 changes: 1 addition & 1 deletion STATISTICS.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ rxidle | int | | Microseconds since last socket receive (or -1 if no receives ye
req | object | | Request type counters. Object key is the request name, value is the number of requests sent.
zbuf_grow | int | | Total number of decompression buffer size increases
buf_grow | int | | Total number of buffer size increases (deprecated, unused)
wakeups | int | | Broker thread poll wakeups
wakeups | int | | Broker thread poll loop wakeups
connects | int | | Number of connection attempts, including successful and failed, and name resolution failures.
disconnects | int | | Number of disconnects (triggered by broker, network, load-balancer, etc.).
int_latency | object | | Internal producer queue latency in microseconds. See *Window stats* below
Expand Down
12 changes: 9 additions & 3 deletions examples/rdkafka_performance.c
Original file line number Diff line number Diff line change
Expand Up @@ -216,17 +216,23 @@ static void msg_delivered(rd_kafka_t *rk,
!last || msgs_wait_cnt < 5 || !(msgs_wait_cnt % dr_disp_div) ||
(now - last) >= dispintvl * 1000 || verbosity >= 3) {
if (rkmessage->err && verbosity >= 2)
printf("%% Message delivery failed: %s [%" PRId32
printf("%% Message delivery failed (broker %" PRId32
"): "
"%s [%" PRId32
"]: "
"%s (%li remain)\n",
rd_kafka_message_broker_id(rkmessage),
rd_kafka_topic_name(rkmessage->rkt),
rkmessage->partition,
rd_kafka_err2str(rkmessage->err), msgs_wait_cnt);
else if (verbosity > 2)
printf("%% Message delivered (offset %" PRId64
", broker %" PRId32
"): "
"%li remain\n",
rkmessage->offset, msgs_wait_cnt);
rkmessage->offset,
rd_kafka_message_broker_id(rkmessage),
msgs_wait_cnt);
if (verbosity >= 3 && do_seq)
printf(" --> \"%.*s\"\n", (int)rkmessage->len,
(const char *)rkmessage->payload);
Expand Down Expand Up @@ -1485,7 +1491,7 @@ int main(int argc, char **argv) {
(int)RD_MAX(0, (next - rd_clock()) /
1000));
} while (next > rd_clock());
} else {
} else if (cnt.msgs % 1000 == 0) {
rd_kafka_poll(rk, 0);
}

Expand Down
20 changes: 15 additions & 5 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -2312,6 +2312,7 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
/* Create Mock cluster */
rd_atomic32_init(&rk->rk_mock.cluster_cnt, 0);
if (rk->rk_conf.mock.broker_cnt > 0) {
const char *mock_bootstraps;
rk->rk_mock.cluster =
rd_kafka_mock_cluster_new(rk, rk->rk_conf.mock.broker_cnt);

Expand All @@ -2323,23 +2324,32 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
goto fail;
}

mock_bootstraps =
rd_kafka_mock_cluster_bootstraps(rk->rk_mock.cluster),
rd_kafka_log(rk, LOG_NOTICE, "MOCK",
"Mock cluster enabled: "
"original bootstrap.servers and security.protocol "
"ignored and replaced");
"ignored and replaced with %s",
mock_bootstraps);

/* Overwrite bootstrap.servers and connection settings */
if (rd_kafka_conf_set(
&rk->rk_conf, "bootstrap.servers",
rd_kafka_mock_cluster_bootstraps(rk->rk_mock.cluster),
NULL, 0) != RD_KAFKA_CONF_OK)
if (rd_kafka_conf_set(&rk->rk_conf, "bootstrap.servers",
mock_bootstraps, NULL,
0) != RD_KAFKA_CONF_OK)
rd_assert(!"failed to replace mock bootstrap.servers");

if (rd_kafka_conf_set(&rk->rk_conf, "security.protocol",
"plaintext", NULL, 0) != RD_KAFKA_CONF_OK)
rd_assert(!"failed to reset mock security.protocol");

rk->rk_conf.security_protocol = RD_KAFKA_PROTO_PLAINTEXT;

/* Apply default RTT to brokers */
if (rk->rk_conf.mock.broker_rtt)
rd_kafka_mock_broker_set_rtt(
rk->rk_mock.cluster,
-1/*all brokers*/,
rk->rk_conf.mock.broker_rtt);
}

if (rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_SSL ||
Expand Down
119 changes: 79 additions & 40 deletions src/rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -3417,7 +3417,13 @@ rd_kafka_broker_ops_io_serve(rd_kafka_broker_t *rkb, rd_ts_t abs_timeout) {
*
* The return value indicates if ops_serve() below should
* use a timeout or not.
*
* If there are ops enqueued cut the timeout short so
* that they're processed as soon as possible.
*/
if (abs_timeout > 0 && rd_kafka_q_len(rkb->rkb_ops) > 0)
abs_timeout = RD_POLL_NOWAIT;

if (rd_kafka_transport_io_serve(
rkb->rkb_transport, rkb->rkb_ops,
rd_timeout_remains(abs_timeout)))
Expand All @@ -3429,6 +3435,8 @@ rd_kafka_broker_ops_io_serve(rd_kafka_broker_t *rkb, rd_ts_t abs_timeout) {
wakeup =
rd_kafka_broker_ops_serve(rkb, rd_timeout_remains_us(abs_timeout));

rd_atomic64_add(&rkb->rkb_c.wakeups, 1);

/* An op might have triggered the need for a connection, if so
* transition to TRY_CONNECT state. */
if (unlikely(rd_kafka_broker_needs_connection(rkb) &&
Expand Down Expand Up @@ -3627,11 +3635,29 @@ rd_kafka_broker_outbufs_space(rd_kafka_broker_t *rkb) {
}



/**
* @brief Update \p *next_wakeup_ptr to \p maybe_next_wakeup if it is sooner.
*
* Both parameters are absolute timestamps.
* \p maybe_next_wakeup must not be 0.
*/
#define rd_kafka_set_next_wakeup(next_wakeup_ptr, maybe_next_wakeup) \
do { \
rd_ts_t *__n = (next_wakeup_ptr); \
rd_ts_t __m = (maybe_next_wakeup); \
rd_dassert(__m != 0); \
if (__m < *__n) \
*__n = __m; \
} while (0)


/**
* @brief Serve a toppar for producing.
*
* @param next_wakeup will be updated to when the next wake-up/attempt is
* desired, only lower (sooner) values will be set.
* desired. Does not take the current value into
* consideration, even if it is lower.
* @param do_timeout_scan perform msg timeout scan
* @param may_send if set to false there is something on the global level
* that prohibits sending messages, such as a transactional
Expand Down Expand Up @@ -3659,6 +3685,7 @@ static int rd_kafka_toppar_producer_serve(rd_kafka_broker_t *rkb,
int reqcnt;
int inflight = 0;
uint64_t epoch_base_msgid = 0;
rd_bool_t batch_ready = rd_false;

/* By limiting the number of not-yet-sent buffers (rkb_outbufs) we
* provide a backpressure mechanism to the producer loop
Expand All @@ -3685,8 +3712,8 @@ static int rd_kafka_toppar_producer_serve(rd_kafka_broker_t *rkb,
timeoutcnt =
rd_kafka_broker_toppar_msgq_scan(rkb, rktp, now, &next);

if (next && next < *next_wakeup)
*next_wakeup = next;
if (next)
rd_kafka_set_next_wakeup(next_wakeup, next);

if (rd_kafka_is_idempotent(rkb->rkb_rk)) {
if (!rd_kafka_pid_valid(pid)) {
Expand Down Expand Up @@ -3732,10 +3759,32 @@ static int rd_kafka_toppar_producer_serve(rd_kafka_broker_t *rkb,
} else if (max_requests > 0) {
/* Move messages from locked partition produce queue
* to broker-local xmit queue. */
if ((move_cnt = rktp->rktp_msgq.rkmq_msg_cnt) > 0)
if ((move_cnt = rktp->rktp_msgq.rkmq_msg_cnt) > 0) {

rd_kafka_msgq_insert_msgq(
&rktp->rktp_xmit_msgq, &rktp->rktp_msgq,
rktp->rktp_rkt->rkt_conf.msg_order_cmp);
}

/* Calculate maximum wait-time to honour
* queue.buffering.max.ms contract.
* Unless flushing in which case immediate
* wakeups are allowed. */
batch_ready = rd_kafka_msgq_allow_wakeup_at(
&rktp->rktp_msgq, &rktp->rktp_xmit_msgq,
/* Only update the broker thread wakeup time
* if connection is up and messages can actually be
* sent, otherwise the wakeup can't do much. */
rkb->rkb_state == RD_KAFKA_BROKER_STATE_UP ? next_wakeup
: NULL,
now, flushing ? 1 : rkb->rkb_rk->rk_conf.buffering_max_us,
/* Batch message count threshold */
rkb->rkb_rk->rk_conf.batch_num_messages,
/* Batch size threshold.
* When compression is enabled the
* threshold is increased by x8. */
(rktp->rktp_rkt->rkt_conf.compression_codec ? 1 : 8) *
(int64_t)rkb->rkb_rk->rk_conf.batch_size);
}

rd_kafka_toppar_unlock(rktp);
Expand Down Expand Up @@ -3870,30 +3919,9 @@ static int rd_kafka_toppar_producer_serve(rd_kafka_broker_t *rkb,

/* Attempt to fill the batch size, but limit our waiting
* to queue.buffering.max.ms, batch.num.messages, and batch.size. */
if (!flushing && r < rkb->rkb_rk->rk_conf.batch_num_messages &&
rktp->rktp_xmit_msgq.rkmq_msg_bytes <
(int64_t)rkb->rkb_rk->rk_conf.batch_size) {
rd_ts_t wait_max;

/* Calculate maximum wait-time to honour
* queue.buffering.max.ms contract. */
wait_max = rd_kafka_msg_enq_time(rkm) +
rkb->rkb_rk->rk_conf.buffering_max_us;

if (wait_max > now) {
/* Wait for more messages or queue.buffering.max.ms
* to expire. */
if (wait_max < *next_wakeup)
*next_wakeup = wait_max;
return 0;
}
}

/* Honour retry.backoff.ms. */
if (unlikely(rkm->rkm_u.producer.ts_backoff > now)) {
if (rkm->rkm_u.producer.ts_backoff < *next_wakeup)
*next_wakeup = rkm->rkm_u.producer.ts_backoff;
/* Wait for backoff to expire */
if (!batch_ready) {
/* Wait for more messages or queue.buffering.max.ms
* to expire. */
return 0;
}

Expand All @@ -3907,10 +3935,22 @@ static int rd_kafka_toppar_producer_serve(rd_kafka_broker_t *rkb,
break;
}

/* If there are messages still in the queue, make the next
* wakeup immediate. */
if (rd_kafka_msgq_len(&rktp->rktp_xmit_msgq) > 0)
*next_wakeup = now;
/* Update the allowed wake-up time based on remaining messages
* in the queue. */
if (cnt > 0) {
rd_kafka_toppar_lock(rktp);
batch_ready = rd_kafka_msgq_allow_wakeup_at(
&rktp->rktp_msgq, &rktp->rktp_xmit_msgq, next_wakeup, now,
flushing ? 1 : rkb->rkb_rk->rk_conf.buffering_max_us,
/* Batch message count threshold */
rkb->rkb_rk->rk_conf.batch_num_messages,
/* Batch size threshold.
* When compression is enabled the
* threshold is increased by x8. */
(rktp->rktp_rkt->rkt_conf.compression_codec ? 1 : 8) *
(int64_t)rkb->rkb_rk->rk_conf.batch_size);
rd_kafka_toppar_unlock(rktp);
}

return cnt;
}
Expand All @@ -3921,7 +3961,7 @@ static int rd_kafka_toppar_producer_serve(rd_kafka_broker_t *rkb,
* @brief Produce from all toppars assigned to this broker.
*
* @param next_wakeup is updated if the next IO/ops timeout should be
* less than the input value.
* less than the input value (i.e., sooner).
*
* @returns the total number of messages produced.
*/
Expand Down Expand Up @@ -3970,8 +4010,7 @@ static int rd_kafka_broker_produce_toppars(rd_kafka_broker_t *rkb,
rkb, rktp, pid, now, &this_next_wakeup, do_timeout_scan,
may_send, flushing);

if (this_next_wakeup < ret_next_wakeup)
ret_next_wakeup = this_next_wakeup;
rd_kafka_set_next_wakeup(&ret_next_wakeup, this_next_wakeup);

} while ((rktp = CIRCLEQ_LOOP_NEXT(&rkb->rkb_active_toppars, rktp,
rktp_activelink)) !=
Expand Down Expand Up @@ -4008,16 +4047,16 @@ static void rd_kafka_broker_producer_serve(rd_kafka_broker_t *rkb,
(abs_timeout > (now = rd_clock()))) {
rd_bool_t do_timeout_scan;
rd_ts_t next_wakeup = abs_timeout;
int overshoot;
rd_bool_t overshot;

rd_kafka_broker_unlock(rkb);

/* Perform timeout scan on first iteration, thus
* on each state change, to make sure messages in
* partition rktp_xmit_msgq are timed out before
* being attempted to re-transmit. */
overshoot = rd_interval(&timeout_scan, 1000 * 1000, now);
do_timeout_scan = cnt++ == 0 || overshoot >= 0;
overshot = rd_interval(&timeout_scan, 1000 * 1000, now) >= 0;
do_timeout_scan = cnt++ == 0 || overshot;

rd_kafka_broker_produce_toppars(rkb, now, &next_wakeup,
do_timeout_scan);
Expand Down Expand Up @@ -5403,9 +5442,9 @@ void rd_kafka_broker_destroy_final(rd_kafka_broker_t *rkb) {
rd_kafka_sasl_broker_term(rkb);

if (rkb->rkb_wakeup_fd[0] != -1)
rd_close(rkb->rkb_wakeup_fd[0]);
rd_socket_close(rkb->rkb_wakeup_fd[0]);
if (rkb->rkb_wakeup_fd[1] != -1)
rd_close(rkb->rkb_wakeup_fd[1]);
rd_socket_close(rkb->rkb_wakeup_fd[1]);

if (rkb->rkb_recv_buf)
rd_kafka_buf_destroy(rkb->rkb_recv_buf);
Expand Down
Loading