Skip to content

Commit

Permalink
[KIP-580] Exponential Backoff with Mock Broker Changes to Automate Te…
Browse files Browse the repository at this point in the history
…sting. (#4422)

* Rebase Commit

* Changes for Partial Comments

* Changes

* Run style fix

* Windows build fix for 0143

* Changes

* Changes

* Changes

* Style fixes

---------

Co-authored-by: Milind L <[email protected]>
Co-authored-by: Milind L <[email protected]>
  • Loading branch information
3 people authored Sep 29, 2023
1 parent ba57a12 commit 6dc7c71
Show file tree
Hide file tree
Showing 24 changed files with 963 additions and 61 deletions.
45 changes: 30 additions & 15 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,36 @@ librdkafka v2.3.0 is a feature release:
rack information on 32bit architectures.
Solved by aligning all allocations to the maximum allowed word size (#4449).


### Consumer fixes

* Stored offsets were excluded from the commit if the leader epoch was
less than committed epoch, as it's possible if leader epoch is the default -1.
This didn't happen in Python, Go and .NET bindings when stored position was
taken from the message.
Solved by checking only that the stored offset is greater
than committed one, if either stored or committed leader epoch is -1 (#4442).
* If an OffsetForLeaderEpoch request was being retried, and the leader changed
while the retry was in-flight, an infinite loop of requests was triggered,
because we weren't updating the leader epoch correctly.
Fixed by updating the leader epoch before sending the request (#4433).
* During offset validation a permanent error like host resolution failure
would cause an offset reset.
This isn't what's expected or what the Java implementation does.
Solved by retrying even in case of permanent errors (#4447).
* Stored offsets were excluded from the commit if the leader epoch was
less than committed epoch, as it's possible if leader epoch is the default -1.
This didn't happen in Python, Go and .NET bindings when stored position was
taken from the message.
Solved by checking only that the stored offset is greater
than committed one, if either stored or committed leader epoch is -1 (#4442).
* If an OffsetForLeaderEpoch request was being retried, and the leader changed
while the retry was in-flight, an infinite loop of requests was triggered,
because we weren't updating the leader epoch correctly.
Fixed by updating the leader epoch before sending the request (#4433).
* During offset validation a permanent error like host resolution failure
would cause an offset reset.
This isn't what's expected or what the Java implementation does.
Solved by retrying even in case of permanent errors (#4447).


## Upgrade considerations

* `retry.backoff.ms`:
If it is set greater than `retry.backoff.max.ms` which has the default value of 1000 ms then it is assumes the value of `retry.backoff.max.ms`.
To change this behaviour make sure that `retry.backoff.ms` is always less than `retry.backoff.max.ms`.
If equal then the backoff will be linear instead of exponential.

* `topic.metadata.refresh.fast.interval.ms`:
If it is set greater than `retry.backoff.max.ms` which has the default value of 1000 ms then it is assumes the value of `retry.backoff.max.ms`.
To change this behaviour make sure that `topic.metadata.refresh.fast.interval.ms` is always less than `retry.backoff.max.ms`.
If equal then the backoff will be linear instead of exponential.


# librdkafka v2.2.0
Expand Down Expand Up @@ -85,7 +98,9 @@ librdkafka v2.2.0 is a feature release:
* [KIP-339](https://cwiki.apache.org/confluence/display/KAFKA/KIP-339%3A+Create+a+new+IncrementalAlterConfigs+API):
IncrementalAlterConfigs API (started by @PrasanthV454, #4110).
* [KIP-554](https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API): Add Broker-side SCRAM Config API (#4241).

* [KIP-580](https://cwiki.apache.org/confluence/display/KAFKA/KIP-580%3A+Exponential+Backoff+for+Kafka+Clients): Added Exponential Backoff mechanism for
retriable requests with `retry.backoff.ms` as minimum backoff and `retry.backoff.max.ms` as the
maximum backoff, with 20% jitter(#4422).

## Enhancements

Expand Down
5 changes: 3 additions & 2 deletions CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ max.in.flight.requests.per.connection | * | 1 .. 1000000 | 1000000
max.in.flight | * | 1 .. 1000000 | 1000000 | low | Alias for `max.in.flight.requests.per.connection`: Maximum number of in-flight requests per broker connection. This is a generic property applied to all broker communication, however it is primarily relevant to produce requests. In particular, note that other mechanisms limit the number of outstanding consumer fetch request per broker to one. <br>*Type: integer*
topic.metadata.refresh.interval.ms | * | -1 .. 3600000 | 300000 | low | Period of time in milliseconds at which topic and broker metadata is refreshed in order to proactively discover any new brokers, topics, partitions or partition leader changes. Use -1 to disable the intervalled refresh (not recommended). If there are no locally referenced topics (no topic objects created, no messages produced, no subscription or no assignment) then only the broker list will be refreshed every interval but no more often than every 10s. <br>*Type: integer*
metadata.max.age.ms | * | 1 .. 86400000 | 900000 | low | Metadata cache max age. Defaults to topic.metadata.refresh.interval.ms * 3 <br>*Type: integer*
topic.metadata.refresh.fast.interval.ms | * | 1 .. 60000 | 250 | low | When a topic loses its leader a new metadata request will be enqueued with this initial interval, exponentially increasing until the topic metadata has been refreshed. This is used to recover quickly from transitioning leader brokers. <br>*Type: integer*
topic.metadata.refresh.fast.interval.ms | * | 1 .. 60000 | 100 | low | When a topic loses its leader a new metadata request will be enqueued immediately and then with this initial interval, exponentially increasing upto `retry.backoff.max.ms`, until the topic metadata has been refreshed. If not set explicitly, it will be defaulted to `retry.backoff.ms`. This is used to recover quickly from transitioning leader brokers. <br>*Type: integer*
topic.metadata.refresh.fast.cnt | * | 0 .. 1000 | 10 | low | **DEPRECATED** No longer used. <br>*Type: integer*
topic.metadata.refresh.sparse | * | true, false | true | low | Sparse metadata requests (consumes less network bandwidth) <br>*Type: boolean*
topic.metadata.propagation.max.ms | * | 0 .. 3600000 | 30000 | low | Apache Kafka topic creation is asynchronous and it takes some time for a new topic to propagate throughout the cluster to all brokers. If a client requests topic metadata after manual topic creation but before the topic has been fully propagated to the broker the client is requesting metadata from, the topic will seem to be non-existent and the client will mark the topic as such, failing queued produced messages with `ERR__UNKNOWN_TOPIC`. This setting delays marking a topic as non-existent until the configured propagation max time has passed. The maximum propagation time is calculated from the time the topic is first referenced in the client, e.g., on produce(). <br>*Type: integer*
Expand Down Expand Up @@ -142,7 +142,8 @@ queue.buffering.max.ms | P | 0 .. 900000 | 5
linger.ms | P | 0 .. 900000 | 5 | high | Alias for `queue.buffering.max.ms`: Delay in milliseconds to wait for messages in the producer queue to accumulate before constructing message batches (MessageSets) to transmit to brokers. A higher value allows larger and more effective (less overhead, improved compression) batches of messages to accumulate at the expense of increased message delivery latency. <br>*Type: float*
message.send.max.retries | P | 0 .. 2147483647 | 2147483647 | high | How many times to retry sending a failing Message. **Note:** retrying may cause reordering unless `enable.idempotence` is set to true. <br>*Type: integer*
retries | P | 0 .. 2147483647 | 2147483647 | high | Alias for `message.send.max.retries`: How many times to retry sending a failing Message. **Note:** retrying may cause reordering unless `enable.idempotence` is set to true. <br>*Type: integer*
retry.backoff.ms | P | 1 .. 300000 | 100 | medium | The backoff time in milliseconds before retrying a protocol request. <br>*Type: integer*
retry.backoff.ms | P | 1 .. 300000 | 100 | medium | The backoff time in milliseconds before retrying a protocol request, this is the first backoff time, and will be backed off exponentially until number of retries is exhausted, and it's capped by retry.backoff.max.ms. <br>*Type: integer*
retry.backoff.max.ms | P | 1 .. 300000 | 1000 | medium | The max backoff time in milliseconds before retrying a protocol request, this is the atmost backoff allowed for exponentially backed off requests. <br>*Type: integer*
queue.buffering.backpressure.threshold | P | 1 .. 1000000 | 1 | low | The threshold of outstanding not yet transmitted broker requests needed to backpressure the producer's message accumulator. If the number of not yet transmitted requests equals or exceeds this number, produce request creation that would have otherwise been triggered (for example, in accordance with linger.ms) will be delayed. A lower number yields larger and more effective batches. A higher value can improve latency when using compression on slow machines. <br>*Type: integer*
compression.codec | P | none, gzip, snappy, lz4, zstd | none | medium | compression codec to use for compressing message sets. This is the default value for all topics, may be overridden by the topic configuration property `compression.codec`. <br>*Type: enum value*
compression.type | P | none, gzip, snappy, lz4, zstd | none | medium | Alias for `compression.codec`: compression codec to use for compressing message sets. This is the default value for all topics, may be overridden by the topic configuration property `compression.codec`. <br>*Type: enum value*
Expand Down
5 changes: 3 additions & 2 deletions INTRODUCTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,8 @@ error code set.

The application should typically not attempt to retry producing the message
on failure, but instead configure librdkafka to perform these retries
using the `retries` and `retry.backoff.ms` configuration properties.
using the `retries`, `retry.backoff.ms` and `retry.backoff.max.ms`
configuration properties.


#### Error: Timed out in transmission queue
Expand Down Expand Up @@ -1950,7 +1951,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf
| KIP-559 - Make the Kafka Protocol Friendlier with L7 Proxies | 2.5.0 | Not supported |
| KIP-568 - Explicit rebalance triggering on the Consumer | 2.6.0 | Not supported |
| KIP-659 - Add metadata to DescribeConfigsResponse | 2.6.0 | Not supported |
| KIP-580 - Exponential backoff for Kafka clients | WIP | Partially supported |
| KIP-580 - Exponential backoff for Kafka clients | 3.7.0 (WIP) | supported |
| KIP-584 - Versioning scheme for features | WIP | Not supported |
| KIP-588 - Allow producers to recover gracefully from txn timeouts | 2.8.0 (WIP) | Not supported |
| KIP-601 - Configurable socket connection timeout | 2.7.0 | Supported |
Expand Down
18 changes: 18 additions & 0 deletions src/rdinterval.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2018-2022, Magnus Edenhill
* 2023 Confluent Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -30,6 +31,7 @@
#define _RDINTERVAL_H_

#include "rd.h"
#include "rdrand.h"

typedef struct rd_interval_s {
rd_ts_t ri_ts_last; /* last interval timestamp */
Expand Down Expand Up @@ -109,6 +111,22 @@ static RD_INLINE RD_UNUSED void rd_interval_reset_to_now(rd_interval_t *ri,
ri->ri_backoff = 0;
}

/**
* Reset the interval to 'now' with the given backoff ms and max_jitter as
* percentage. The backoff is given just for absolute jitter calculation. If now
* is 0, the time will be gathered automatically.
*/
static RD_INLINE RD_UNUSED void
rd_interval_reset_to_now_with_jitter(rd_interval_t *ri,
rd_ts_t now,
int64_t backoff_ms,
int max_jitter) {
rd_interval_reset_to_now(ri, now);
/* We are multiplying by 10 as (backoff_ms * percent * 1000)/100 ->
* backoff_ms * jitter * 10 */
ri->ri_backoff = backoff_ms * rd_jitter(-max_jitter, max_jitter) * 10;
}

/**
* Back off the next interval by `backoff_us` microseconds.
*/
Expand Down
20 changes: 18 additions & 2 deletions src/rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -2823,6 +2823,7 @@ int rd_kafka_send(rd_kafka_broker_t *rkb) {
*/
void rd_kafka_broker_buf_retry(rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf) {

int64_t backoff = 0;
/* Restore original replyq since replyq.q will have been NULLed
* by buf_callback()/replyq_enq(). */
if (!rkbuf->rkbuf_replyq.q && rkbuf->rkbuf_orig_replyq.q) {
Expand Down Expand Up @@ -2850,9 +2851,24 @@ void rd_kafka_broker_buf_retry(rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf) {
rkb->rkb_rk->rk_conf.retry_backoff_ms);

rd_atomic64_add(&rkb->rkb_c.tx_retries, 1);
/* In some cases, failed Produce requests do not increment the retry
* count, see rd_kafka_handle_Produce_error. */
if (rkbuf->rkbuf_retries > 0)
backoff = (1 << (rkbuf->rkbuf_retries - 1)) *
(rkb->rkb_rk->rk_conf.retry_backoff_ms);
else
backoff = rkb->rkb_rk->rk_conf.retry_backoff_ms;

/* We are multiplying by 10 as (backoff_ms * percent * 1000)/100 ->
* backoff_ms * jitter * 10 */
backoff = rd_jitter(100 - RD_KAFKA_RETRY_JITTER_PERCENT,
100 + RD_KAFKA_RETRY_JITTER_PERCENT) *
backoff * 10;

if (backoff > rkb->rkb_rk->rk_conf.retry_backoff_max_ms * 1000)
backoff = rkb->rkb_rk->rk_conf.retry_backoff_max_ms * 1000;

rkbuf->rkbuf_ts_retry =
rd_clock() + (rkb->rkb_rk->rk_conf.retry_backoff_ms * 1000);
rkbuf->rkbuf_ts_retry = rd_clock() + backoff;
/* Precaution: time out the request if it hasn't moved from the
* retry queue within the retry interval (such as when the broker is
* down). */
Expand Down
7 changes: 5 additions & 2 deletions src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -755,8 +755,11 @@ void rd_kafka_cgrp_coord_query(rd_kafka_cgrp_t *rkcg, const char *reason) {

rd_kafka_broker_destroy(rkb);

/* Back off the next intervalled query since we just sent one. */
rd_interval_reset_to_now(&rkcg->rkcg_coord_query_intvl, 0);
/* Back off the next intervalled query with a jitter since we just sent
* one. */
rd_interval_reset_to_now_with_jitter(&rkcg->rkcg_coord_query_intvl, 0,
500,
RD_KAFKA_RETRY_JITTER_PERCENT);
}

/**
Expand Down
51 changes: 47 additions & 4 deletions src/rdkafka_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2012-2022, Magnus Edenhill
* 2023 Confluent Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -457,10 +458,12 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
{_RK_GLOBAL, "topic.metadata.refresh.fast.interval.ms", _RK_C_INT,
_RK(metadata_refresh_fast_interval_ms),
"When a topic loses its leader a new metadata request will be "
"enqueued with this initial interval, exponentially increasing "
"enqueued immediately and then with this initial interval, exponentially "
"increasing upto `retry.backoff.max.ms`, "
"until the topic metadata has been refreshed. "
"If not set explicitly, it will be defaulted to `retry.backoff.ms`. "
"This is used to recover quickly from transitioning leader brokers.",
1, 60 * 1000, 250},
1, 60 * 1000, 100},
{_RK_GLOBAL | _RK_DEPRECATED, "topic.metadata.refresh.fast.cnt", _RK_C_INT,
_RK(metadata_refresh_fast_cnt), "No longer used.", 0, 1000, 10},
{_RK_GLOBAL, "topic.metadata.refresh.sparse", _RK_C_BOOL,
Expand Down Expand Up @@ -1372,10 +1375,21 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
0, INT32_MAX, INT32_MAX},
{_RK_GLOBAL | _RK_PRODUCER, "retries", _RK_C_ALIAS,
.sdef = "message.send.max.retries"},

{_RK_GLOBAL | _RK_PRODUCER | _RK_MED, "retry.backoff.ms", _RK_C_INT,
_RK(retry_backoff_ms),
"The backoff time in milliseconds before retrying a protocol request.", 1,
300 * 1000, 100},
"The backoff time in milliseconds before retrying a protocol request, "
"this is the first backoff time, "
"and will be backed off exponentially until number of retries is "
"exhausted, and it's capped by retry.backoff.max.ms.",
1, 300 * 1000, 100},

{_RK_GLOBAL | _RK_PRODUCER | _RK_MED, "retry.backoff.max.ms", _RK_C_INT,
_RK(retry_backoff_max_ms),
"The max backoff time in milliseconds before retrying a protocol request, "
"this is the atmost backoff allowed for exponentially backed off "
"requests.",
1, 300 * 1000, 1000},

{_RK_GLOBAL | _RK_PRODUCER, "queue.buffering.backpressure.threshold",
_RK_C_INT, _RK(queue_backpressure_thres),
Expand Down Expand Up @@ -3928,6 +3942,10 @@ const char *rd_kafka_conf_finalize(rd_kafka_type_t cltype,
conf->sparse_connect_intvl =
RD_MAX(11, RD_MIN(conf->reconnect_backoff_ms / 2, 1000));
}
if (!rd_kafka_conf_is_modified(
conf, "topic.metadata.refresh.fast.interval.ms"))
conf->metadata_refresh_fast_interval_ms =
conf->retry_backoff_ms;

if (!rd_kafka_conf_is_modified(conf, "connections.max.idle.ms") &&
conf->brokerlist && rd_strcasestr(conf->brokerlist, "azure")) {
Expand Down Expand Up @@ -4116,6 +4134,31 @@ int rd_kafka_conf_warn(rd_kafka_t *rk) {
"recommend not using set_default_topic_conf");

/* Additional warnings */
if (rk->rk_conf.retry_backoff_ms > rk->rk_conf.retry_backoff_max_ms) {
rd_kafka_log(
rk, LOG_WARNING, "CONFWARN",
"Configuration `retry.backoff.ms` with value %d is greater "
"than configuration `retry.backoff.max.ms` with value %d. "
"A static backoff with value `retry.backoff.max.ms` will "
"be applied.",
rk->rk_conf.retry_backoff_ms,
rk->rk_conf.retry_backoff_max_ms);
}

if (rd_kafka_conf_is_modified(
&rk->rk_conf, "topic.metadata.refresh.fast.interval.ms") &&
rk->rk_conf.metadata_refresh_fast_interval_ms >
rk->rk_conf.retry_backoff_max_ms) {
rd_kafka_log(
rk, LOG_WARNING, "CONFWARN",
"Configuration `topic.metadata.refresh.fast.interval.ms` "
"with value %d is greater than configuration "
"`retry.backoff.max.ms` with value %d. "
"A static backoff with value `retry.backoff.max.ms` will "
"be applied.",
rk->rk_conf.metadata_refresh_fast_interval_ms,
rk->rk_conf.retry_backoff_max_ms);
}
if (rk->rk_type == RD_KAFKA_CONSUMER) {
if (rk->rk_conf.fetch_wait_max_ms + 1000 >
rk->rk_conf.socket_timeout_ms)
Expand Down
Loading

0 comments on commit 6dc7c71

Please sign in to comment.