Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KIP-580] Exponential Backoff with Mock Broker Changes to Automate Testing. #4422

Merged
merged 15 commits into from
Sep 29, 2023
Merged
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ librdkafka v2.2.0 is a feature release:
* [KIP-339](https://cwiki.apache.org/confluence/display/KAFKA/KIP-339%3A+Create+a+new+IncrementalAlterConfigs+API):
IncrementalAlterConfigs API (started by @PrasanthV454, #4110).
* [KIP-554](https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API): Add Broker-side SCRAM Config API (#4241).

* [KIP-580](https://cwiki.apache.org/confluence/display/KAFKA/KIP-580%3A+Exponential+Backoff+for+Kafka+Clients): Added Exponential Backoff mechanism for
retriable requests with `retry.backoff.ms` as minimum backoff and `retry.backoff.max.ms` as the
maximum backoff, with jitter `RD_KAFKA_RETRY_JITTER_PERCENT`(#4422).
mahajanadhitya marked this conversation as resolved.
Show resolved Hide resolved

## Enhancements

Expand Down
5 changes: 3 additions & 2 deletions CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ message.copy.max.bytes | * | 0 .. 1000000000 | 65535
receive.message.max.bytes | * | 1000 .. 2147483647 | 100000000 | medium | Maximum Kafka protocol response message size. This serves as a safety precaution to avoid memory exhaustion in case of protocol hickups. This value must be at least `fetch.max.bytes` + 512 to allow for protocol overhead; the value is adjusted automatically unless the configuration property is explicitly set. <br>*Type: integer*
max.in.flight.requests.per.connection | * | 1 .. 1000000 | 1000000 | low | Maximum number of in-flight requests per broker connection. This is a generic property applied to all broker communication, however it is primarily relevant to produce requests. In particular, note that other mechanisms limit the number of outstanding consumer fetch request per broker to one. <br>*Type: integer*
max.in.flight | * | 1 .. 1000000 | 1000000 | low | Alias for `max.in.flight.requests.per.connection`: Maximum number of in-flight requests per broker connection. This is a generic property applied to all broker communication, however it is primarily relevant to produce requests. In particular, note that other mechanisms limit the number of outstanding consumer fetch request per broker to one. <br>*Type: integer*
topic.metadata.refresh.interval.ms | * | -1 .. 3600000 | 300000 | low | Period of time in milliseconds at which topic and broker metadata is refreshed in order to proactively discover any new brokers, topics, partitions or partition leader changes. Use -1 to disable the intervalled refresh (not recommended). If there are no locally referenced topics (no topic objects created, no messages produced, no subscription or no assignment) then only the broker list will be refreshed every interval but no more often than every 10s. <br>*Type: integer*
topic.metadata.refresh.interval.ms | * | -1 .. 3600000 | 300000 | low | Period of time in milliseconds at which topic and broker metadata is refreshed in order to proactively discover any new brokers, topics, partitions or partition leader changes. Use -1 to disable the intervalled refresh (not recommended). If not set explicitly, it will be defaulted to `retry.backoff.ms`. If there are no locally referenced topics (no topic objects created, no messages produced, no subscription or no assignment) then only the broker list will be refreshed every interval but no more often than every 10s. <br>*Type: integer*
mahajanadhitya marked this conversation as resolved.
Show resolved Hide resolved
metadata.max.age.ms | * | 1 .. 86400000 | 900000 | low | Metadata cache max age. Defaults to topic.metadata.refresh.interval.ms * 3 <br>*Type: integer*
topic.metadata.refresh.fast.interval.ms | * | 1 .. 60000 | 250 | low | When a topic loses its leader a new metadata request will be enqueued with this initial interval, exponentially increasing until the topic metadata has been refreshed. This is used to recover quickly from transitioning leader brokers. <br>*Type: integer*
topic.metadata.refresh.fast.cnt | * | 0 .. 1000 | 10 | low | **DEPRECATED** No longer used. <br>*Type: integer*
Expand Down Expand Up @@ -142,7 +142,8 @@ queue.buffering.max.ms | P | 0 .. 900000 | 5
linger.ms | P | 0 .. 900000 | 5 | high | Alias for `queue.buffering.max.ms`: Delay in milliseconds to wait for messages in the producer queue to accumulate before constructing message batches (MessageSets) to transmit to brokers. A higher value allows larger and more effective (less overhead, improved compression) batches of messages to accumulate at the expense of increased message delivery latency. <br>*Type: float*
message.send.max.retries | P | 0 .. 2147483647 | 2147483647 | high | How many times to retry sending a failing Message. **Note:** retrying may cause reordering unless `enable.idempotence` is set to true. <br>*Type: integer*
retries | P | 0 .. 2147483647 | 2147483647 | high | Alias for `message.send.max.retries`: How many times to retry sending a failing Message. **Note:** retrying may cause reordering unless `enable.idempotence` is set to true. <br>*Type: integer*
retry.backoff.ms | P | 1 .. 300000 | 100 | medium | The backoff time in milliseconds before retrying a protocol request. <br>*Type: integer*
retry.backoff.ms | P | 1 .. 300000 | 100 | medium | The backoff time in milliseconds before retrying a protocol request, this is the first backoff time, and will be backed off exponentially until number of retries is exhausted, and it's capped by retry.backoff.max.ms. <br>*Type: integer*
retry.backoff.max.ms | P | 1 .. 300000 | 1000 | medium | The max backoff time in milliseconds before retrying a protocol request, this is the atmost backoff allowed for exponentially backed off requests. <br>*Type: integer*
queue.buffering.backpressure.threshold | P | 1 .. 1000000 | 1 | low | The threshold of outstanding not yet transmitted broker requests needed to backpressure the producer's message accumulator. If the number of not yet transmitted requests equals or exceeds this number, produce request creation that would have otherwise been triggered (for example, in accordance with linger.ms) will be delayed. A lower number yields larger and more effective batches. A higher value can improve latency when using compression on slow machines. <br>*Type: integer*
compression.codec | P | none, gzip, snappy, lz4, zstd | none | medium | compression codec to use for compressing message sets. This is the default value for all topics, may be overridden by the topic configuration property `compression.codec`. <br>*Type: enum value*
compression.type | P | none, gzip, snappy, lz4, zstd | none | medium | Alias for `compression.codec`: compression codec to use for compressing message sets. This is the default value for all topics, may be overridden by the topic configuration property `compression.codec`. <br>*Type: enum value*
Expand Down
5 changes: 3 additions & 2 deletions INTRODUCTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,8 @@ error code set.

The application should typically not attempt to retry producing the message
on failure, but instead configure librdkafka to perform these retries
using the `retries` and `retry.backoff.ms` configuration properties.
using the `retries`, `retry.backoff.ms` and `retry.backoff.max.ms`
configuration properties.


#### Error: Timed out in transmission queue
Expand Down Expand Up @@ -1950,7 +1951,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf
| KIP-559 - Make the Kafka Protocol Friendlier with L7 Proxies | 2.5.0 | Not supported |
| KIP-568 - Explicit rebalance triggering on the Consumer | 2.6.0 | Not supported |
| KIP-659 - Add metadata to DescribeConfigsResponse | 2.6.0 | Not supported |
| KIP-580 - Exponential backoff for Kafka clients | WIP | Partially supported |
| KIP-580 - Exponential backoff for Kafka clients | 3.7.0 (WIP) | Supported supported |
mahajanadhitya marked this conversation as resolved.
Show resolved Hide resolved
| KIP-584 - Versioning scheme for features | WIP | Not supported |
| KIP-588 - Allow producers to recover gracefully from txn timeouts | 2.8.0 (WIP) | Not supported |
| KIP-601 - Configurable socket connection timeout | 2.7.0 | Supported |
Expand Down
18 changes: 18 additions & 0 deletions src/rdinterval.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#define _RDINTERVAL_H_

#include "rd.h"
#include "rdrand.h"
mahajanadhitya marked this conversation as resolved.
Show resolved Hide resolved

typedef struct rd_interval_s {
rd_ts_t ri_ts_last; /* last interval timestamp */
Expand Down Expand Up @@ -109,6 +110,23 @@ static RD_INLINE RD_UNUSED void rd_interval_reset_to_now(rd_interval_t *ri,
ri->ri_backoff = 0;
}

/**
* Reset the interval to 'now' with the given backoff ms and max_jitter as
* percentage. The backoff is given just for absolute jitter calculation. If now
* is 0, the time will be gathered automatically.
*/
static RD_INLINE RD_UNUSED void
rd_interval_reset_to_now_with_jitter(rd_interval_t *ri,
rd_ts_t now,
int64_t backoff_ms,
int max_jitter) {
rd_interval_reset_to_now(ri, now);
/* We are multiplying by 10 as (backoff_ms * percent * 1000)/100 ->
* backoff_ms * jitter * 10 */
ri->ri_ts_last = ri->ri_ts_last +
mahajanadhitya marked this conversation as resolved.
Show resolved Hide resolved
backoff_ms * rd_jitter(-max_jitter, max_jitter) * 10;
}

/**
* Back off the next interval by `backoff_us` microseconds.
*/
Expand Down
19 changes: 17 additions & 2 deletions src/rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -2823,6 +2823,7 @@ int rd_kafka_send(rd_kafka_broker_t *rkb) {
*/
void rd_kafka_broker_buf_retry(rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf) {

int64_t backoff = 0;
/* Restore original replyq since replyq.q will have been NULLed
* by buf_callback()/replyq_enq(). */
if (!rkbuf->rkbuf_replyq.q && rkbuf->rkbuf_orig_replyq.q) {
Expand Down Expand Up @@ -2850,9 +2851,23 @@ void rd_kafka_broker_buf_retry(rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf) {
rkb->rkb_rk->rk_conf.retry_backoff_ms);

rd_atomic64_add(&rkb->rkb_c.tx_retries, 1);
/* In some cases, failed Produce requests do not increment the retry count, see rd_kafka_handle_Produce_error. */
if (rkbuf->rkbuf_retries > 0)
mahajanadhitya marked this conversation as resolved.
Show resolved Hide resolved
backoff = (1 << (rkbuf->rkbuf_retries - 1)) *
(rkb->rkb_rk->rk_conf.retry_backoff_ms);
else
backoff = rkb->rkb_rk->rk_conf.retry_backoff_ms;

/* We are multiplying by 10 as (backoff_ms * percent * 1000)/100 ->
* backoff_ms * jitter * 10 */
backoff = rd_jitter(100 - RD_KAFKA_RETRY_JITTER_PERCENT,
100 + RD_KAFKA_RETRY_JITTER_PERCENT) *
backoff * 10;

if (backoff > rkb->rkb_rk->rk_conf.retry_backoff_max_ms * 1000)
backoff = rkb->rkb_rk->rk_conf.retry_backoff_max_ms * 1000;

rkbuf->rkbuf_ts_retry =
rd_clock() + (rkb->rkb_rk->rk_conf.retry_backoff_ms * 1000);
rkbuf->rkbuf_ts_retry = rd_clock() + backoff;
/* Precaution: time out the request if it hasn't moved from the
* retry queue within the retry interval (such as when the broker is
* down). */
Expand Down
7 changes: 5 additions & 2 deletions src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -755,8 +755,11 @@ void rd_kafka_cgrp_coord_query(rd_kafka_cgrp_t *rkcg, const char *reason) {

rd_kafka_broker_destroy(rkb);

/* Back off the next intervalled query since we just sent one. */
rd_interval_reset_to_now(&rkcg->rkcg_coord_query_intvl, 0);
/* Back off the next intervalled query with a jitter since we just sent
* one. */
rd_interval_reset_to_now_with_jitter(&rkcg->rkcg_coord_query_intvl, 0,
500,
RD_KAFKA_RETRY_JITTER_PERCENT);
}

/**
Expand Down
45 changes: 43 additions & 2 deletions src/rdkafka_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
"metadata is refreshed in order to proactively discover any new "
"brokers, topics, partitions or partition leader changes. "
"Use -1 to disable the intervalled refresh (not recommended). "
"If not set explicitly, it will be defaulted to `retry.backoff.ms`. "
"If there are no locally referenced topics "
"(no topic objects created, no messages produced, "
"no subscription or no assignment) then only the broker list will "
Expand Down Expand Up @@ -1372,10 +1373,21 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
0, INT32_MAX, INT32_MAX},
{_RK_GLOBAL | _RK_PRODUCER, "retries", _RK_C_ALIAS,
.sdef = "message.send.max.retries"},

{_RK_GLOBAL | _RK_PRODUCER | _RK_MED, "retry.backoff.ms", _RK_C_INT,
_RK(retry_backoff_ms),
"The backoff time in milliseconds before retrying a protocol request.", 1,
300 * 1000, 100},
"The backoff time in milliseconds before retrying a protocol request, "
"this is the first backoff time, "
"and will be backed off exponentially until number of retries is "
"exhausted, and it's capped by retry.backoff.max.ms.",
1, 300 * 1000, 100},

{_RK_GLOBAL | _RK_PRODUCER | _RK_MED, "retry.backoff.max.ms", _RK_C_INT,
_RK(retry_backoff_max_ms),
"The max backoff time in milliseconds before retrying a protocol request, "
"this is the atmost backoff allowed for exponentially backed off "
"requests.",
1, 300 * 1000, 1000},

{_RK_GLOBAL | _RK_PRODUCER, "queue.buffering.backpressure.threshold",
_RK_C_INT, _RK(queue_backpressure_thres),
Expand Down Expand Up @@ -3928,6 +3940,10 @@ const char *rd_kafka_conf_finalize(rd_kafka_type_t cltype,
conf->sparse_connect_intvl =
RD_MAX(11, RD_MIN(conf->reconnect_backoff_ms / 2, 1000));
}
if (!rd_kafka_conf_is_modified(
conf, "topic.metadata.refresh.fast.interval.ms"))
conf->metadata_refresh_fast_interval_ms =
conf->retry_backoff_ms;

if (!rd_kafka_conf_is_modified(conf, "connections.max.idle.ms") &&
conf->brokerlist && rd_strcasestr(conf->brokerlist, "azure")) {
Expand Down Expand Up @@ -4116,6 +4132,31 @@ int rd_kafka_conf_warn(rd_kafka_t *rk) {
"recommend not using set_default_topic_conf");

/* Additional warnings */
if (rk->rk_conf.retry_backoff_ms > rk->rk_conf.retry_backoff_max_ms) {
rd_kafka_log(
rk, LOG_WARNING, "CONFWARN",
"Configuration `retry.backoff.ms` with value %d is greater "
"than configuration `retry.backoff.max.ms` with value %d. "
"A static backoff with value `retry.backoff.max.ms` will "
"be applied.",
rk->rk_conf.retry_backoff_ms,
rk->rk_conf.retry_backoff_max_ms);
}

if (rd_kafka_conf_is_modified(
&rk->rk_conf, "topic.metadata.refresh.fast.interval.ms") &&
rk->rk_conf.metadata_refresh_fast_interval_ms >
rk->rk_conf.retry_backoff_max_ms) {
rd_kafka_log(
rk, LOG_WARNING, "CONFWARN",
"Configuration `topic.metadata.refresh.fast.interval.ms` "
"with value %d is greater than configuration "
"`retry.backoff.max.ms` with value %d. "
"A static backoff with value `retry.backoff.max.ms` will "
"be applied.",
rk->rk_conf.metadata_refresh_fast_interval_ms,
rk->rk_conf.retry_backoff_max_ms);
}
mahajanadhitya marked this conversation as resolved.
Show resolved Hide resolved
if (rk->rk_type == RD_KAFKA_CONSUMER) {
if (rk->rk_conf.fetch_wait_max_ms + 1000 >
rk->rk_conf.socket_timeout_ms)
Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ struct rd_kafka_conf_s {
int queue_backpressure_thres;
int max_retries;
int retry_backoff_ms;
int retry_backoff_max_ms;
mahajanadhitya marked this conversation as resolved.
Show resolved Hide resolved
int batch_num_messages;
int batch_size;
rd_kafka_compression_t compression_codec;
Expand Down
2 changes: 2 additions & 0 deletions src/rdkafka_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,8 @@ const char *rd_kafka_purge_flags2str(int flags);
#define RD_KAFKA_DBG_ALL 0xfffff
#define RD_KAFKA_DBG_NONE 0x0

/* Jitter Percent for exponential retry backoff */
#define RD_KAFKA_RETRY_JITTER_PERCENT 20
mahajanadhitya marked this conversation as resolved.
Show resolved Hide resolved

void rd_kafka_log0(const rd_kafka_conf_t *conf,
const rd_kafka_t *rk,
Expand Down
17 changes: 7 additions & 10 deletions src/rdkafka_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -1622,15 +1622,12 @@ static void rd_kafka_metadata_leader_query_tmr_cb(rd_kafka_timers_t *rkts,
rk, NULL, &topics, rd_true /*force*/,
rk->rk_conf.allow_auto_create_topics,
rd_false /*!cgrp_update*/, "partition leader query");
/* Back off next query exponentially until we reach
* the standard query interval - then stop the timer
* since the intervalled querier will do the job for us. */
if (rk->rk_conf.metadata_refresh_interval_ms > 0 &&
rtmr->rtmr_interval * 2 / 1000 >=
rk->rk_conf.metadata_refresh_interval_ms)
rd_kafka_timer_stop(rkts, rtmr, 1 /*lock*/);
else
rd_kafka_timer_exp_backoff(rkts, rtmr);

/* Back off next query exponentially till we reach
* the retry backoff max ms */
rd_kafka_timer_exp_backoff(
mahajanadhitya marked this conversation as resolved.
Show resolved Hide resolved
rkts, rtmr, rk->rk_conf.retry_backoff_ms * 1000,
rk->rk_conf.retry_backoff_max_ms * 1000, 20);
}

rd_list_destroy(&topics);
Expand Down Expand Up @@ -1660,7 +1657,7 @@ void rd_kafka_metadata_fast_leader_query(rd_kafka_t *rk) {
"Starting fast leader query");
rd_kafka_timer_start(
&rk->rk_timers, &rk->rk_metadata_cache.rkmc_query_tmr,
rk->rk_conf.metadata_refresh_fast_interval_ms * 1000,
0 /* First request should be tried immediately */,
rd_kafka_metadata_leader_query_tmr_cb, NULL);
}
}
Expand Down
Loading