diff --git a/CHANGELOG.md b/CHANGELOG.md index 31e9ded0ad..a8e3dc5721 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,23 +38,36 @@ librdkafka v2.3.0 is a feature release: rack information on 32bit architectures. Solved by aligning all allocations to the maximum allowed word size (#4449). + ### Consumer fixes - * Stored offsets were excluded from the commit if the leader epoch was - less than committed epoch, as it's possible if leader epoch is the default -1. - This didn't happen in Python, Go and .NET bindings when stored position was - taken from the message. - Solved by checking only that the stored offset is greater - than committed one, if either stored or committed leader epoch is -1 (#4442). - * If an OffsetForLeaderEpoch request was being retried, and the leader changed - while the retry was in-flight, an infinite loop of requests was triggered, - because we weren't updating the leader epoch correctly. - Fixed by updating the leader epoch before sending the request (#4433). - * During offset validation a permanent error like host resolution failure - would cause an offset reset. - This isn't what's expected or what the Java implementation does. - Solved by retrying even in case of permanent errors (#4447). + * Stored offsets were excluded from the commit if the leader epoch was + less than committed epoch, as it's possible if leader epoch is the default -1. + This didn't happen in Python, Go and .NET bindings when stored position was + taken from the message. + Solved by checking only that the stored offset is greater + than committed one, if either stored or committed leader epoch is -1 (#4442). + * If an OffsetForLeaderEpoch request was being retried, and the leader changed + while the retry was in-flight, an infinite loop of requests was triggered, + because we weren't updating the leader epoch correctly. + Fixed by updating the leader epoch before sending the request (#4433). + * During offset validation a permanent error like host resolution failure + would cause an offset reset. + This isn't what's expected or what the Java implementation does. + Solved by retrying even in case of permanent errors (#4447). + +## Upgrade considerations + + * `retry.backoff.ms`: + If it is set greater than `retry.backoff.max.ms` which has the default value of 1000 ms then it is assumes the value of `retry.backoff.max.ms`. + To change this behaviour make sure that `retry.backoff.ms` is always less than `retry.backoff.max.ms`. + If equal then the backoff will be linear instead of exponential. + + * `topic.metadata.refresh.fast.interval.ms`: + If it is set greater than `retry.backoff.max.ms` which has the default value of 1000 ms then it is assumes the value of `retry.backoff.max.ms`. + To change this behaviour make sure that `topic.metadata.refresh.fast.interval.ms` is always less than `retry.backoff.max.ms`. + If equal then the backoff will be linear instead of exponential. # librdkafka v2.2.0 @@ -85,7 +98,9 @@ librdkafka v2.2.0 is a feature release: * [KIP-339](https://cwiki.apache.org/confluence/display/KAFKA/KIP-339%3A+Create+a+new+IncrementalAlterConfigs+API): IncrementalAlterConfigs API (started by @PrasanthV454, #4110). * [KIP-554](https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API): Add Broker-side SCRAM Config API (#4241). - + * [KIP-580](https://cwiki.apache.org/confluence/display/KAFKA/KIP-580%3A+Exponential+Backoff+for+Kafka+Clients): Added Exponential Backoff mechanism for + retriable requests with `retry.backoff.ms` as minimum backoff and `retry.backoff.max.ms` as the + maximum backoff, with 20% jitter(#4422). ## Enhancements diff --git a/CONFIGURATION.md b/CONFIGURATION.md index 127fe4c88f..4a75378b53 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -14,7 +14,7 @@ max.in.flight.requests.per.connection | * | 1 .. 1000000 | 1000000 max.in.flight | * | 1 .. 1000000 | 1000000 | low | Alias for `max.in.flight.requests.per.connection`: Maximum number of in-flight requests per broker connection. This is a generic property applied to all broker communication, however it is primarily relevant to produce requests. In particular, note that other mechanisms limit the number of outstanding consumer fetch request per broker to one.
*Type: integer* topic.metadata.refresh.interval.ms | * | -1 .. 3600000 | 300000 | low | Period of time in milliseconds at which topic and broker metadata is refreshed in order to proactively discover any new brokers, topics, partitions or partition leader changes. Use -1 to disable the intervalled refresh (not recommended). If there are no locally referenced topics (no topic objects created, no messages produced, no subscription or no assignment) then only the broker list will be refreshed every interval but no more often than every 10s.
*Type: integer* metadata.max.age.ms | * | 1 .. 86400000 | 900000 | low | Metadata cache max age. Defaults to topic.metadata.refresh.interval.ms * 3
*Type: integer* -topic.metadata.refresh.fast.interval.ms | * | 1 .. 60000 | 250 | low | When a topic loses its leader a new metadata request will be enqueued with this initial interval, exponentially increasing until the topic metadata has been refreshed. This is used to recover quickly from transitioning leader brokers.
*Type: integer* +topic.metadata.refresh.fast.interval.ms | * | 1 .. 60000 | 100 | low | When a topic loses its leader a new metadata request will be enqueued immediately and then with this initial interval, exponentially increasing upto `retry.backoff.max.ms`, until the topic metadata has been refreshed. If not set explicitly, it will be defaulted to `retry.backoff.ms`. This is used to recover quickly from transitioning leader brokers.
*Type: integer* topic.metadata.refresh.fast.cnt | * | 0 .. 1000 | 10 | low | **DEPRECATED** No longer used.
*Type: integer* topic.metadata.refresh.sparse | * | true, false | true | low | Sparse metadata requests (consumes less network bandwidth)
*Type: boolean* topic.metadata.propagation.max.ms | * | 0 .. 3600000 | 30000 | low | Apache Kafka topic creation is asynchronous and it takes some time for a new topic to propagate throughout the cluster to all brokers. If a client requests topic metadata after manual topic creation but before the topic has been fully propagated to the broker the client is requesting metadata from, the topic will seem to be non-existent and the client will mark the topic as such, failing queued produced messages with `ERR__UNKNOWN_TOPIC`. This setting delays marking a topic as non-existent until the configured propagation max time has passed. The maximum propagation time is calculated from the time the topic is first referenced in the client, e.g., on produce().
*Type: integer* @@ -142,7 +142,8 @@ queue.buffering.max.ms | P | 0 .. 900000 | 5 linger.ms | P | 0 .. 900000 | 5 | high | Alias for `queue.buffering.max.ms`: Delay in milliseconds to wait for messages in the producer queue to accumulate before constructing message batches (MessageSets) to transmit to brokers. A higher value allows larger and more effective (less overhead, improved compression) batches of messages to accumulate at the expense of increased message delivery latency.
*Type: float* message.send.max.retries | P | 0 .. 2147483647 | 2147483647 | high | How many times to retry sending a failing Message. **Note:** retrying may cause reordering unless `enable.idempotence` is set to true.
*Type: integer* retries | P | 0 .. 2147483647 | 2147483647 | high | Alias for `message.send.max.retries`: How many times to retry sending a failing Message. **Note:** retrying may cause reordering unless `enable.idempotence` is set to true.
*Type: integer* -retry.backoff.ms | P | 1 .. 300000 | 100 | medium | The backoff time in milliseconds before retrying a protocol request.
*Type: integer* +retry.backoff.ms | P | 1 .. 300000 | 100 | medium | The backoff time in milliseconds before retrying a protocol request, this is the first backoff time, and will be backed off exponentially until number of retries is exhausted, and it's capped by retry.backoff.max.ms.
*Type: integer* +retry.backoff.max.ms | P | 1 .. 300000 | 1000 | medium | The max backoff time in milliseconds before retrying a protocol request, this is the atmost backoff allowed for exponentially backed off requests.
*Type: integer* queue.buffering.backpressure.threshold | P | 1 .. 1000000 | 1 | low | The threshold of outstanding not yet transmitted broker requests needed to backpressure the producer's message accumulator. If the number of not yet transmitted requests equals or exceeds this number, produce request creation that would have otherwise been triggered (for example, in accordance with linger.ms) will be delayed. A lower number yields larger and more effective batches. A higher value can improve latency when using compression on slow machines.
*Type: integer* compression.codec | P | none, gzip, snappy, lz4, zstd | none | medium | compression codec to use for compressing message sets. This is the default value for all topics, may be overridden by the topic configuration property `compression.codec`.
*Type: enum value* compression.type | P | none, gzip, snappy, lz4, zstd | none | medium | Alias for `compression.codec`: compression codec to use for compressing message sets. This is the default value for all topics, may be overridden by the topic configuration property `compression.codec`.
*Type: enum value* diff --git a/INTRODUCTION.md b/INTRODUCTION.md index 49bd2950d5..f90d8f7a0e 100644 --- a/INTRODUCTION.md +++ b/INTRODUCTION.md @@ -319,7 +319,8 @@ error code set. The application should typically not attempt to retry producing the message on failure, but instead configure librdkafka to perform these retries -using the `retries` and `retry.backoff.ms` configuration properties. +using the `retries`, `retry.backoff.ms` and `retry.backoff.max.ms` +configuration properties. #### Error: Timed out in transmission queue @@ -1950,7 +1951,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf | KIP-559 - Make the Kafka Protocol Friendlier with L7 Proxies | 2.5.0 | Not supported | | KIP-568 - Explicit rebalance triggering on the Consumer | 2.6.0 | Not supported | | KIP-659 - Add metadata to DescribeConfigsResponse | 2.6.0 | Not supported | -| KIP-580 - Exponential backoff for Kafka clients | WIP | Partially supported | +| KIP-580 - Exponential backoff for Kafka clients | 3.7.0 (WIP) | supported | | KIP-584 - Versioning scheme for features | WIP | Not supported | | KIP-588 - Allow producers to recover gracefully from txn timeouts | 2.8.0 (WIP) | Not supported | | KIP-601 - Configurable socket connection timeout | 2.7.0 | Supported | diff --git a/src/rdinterval.h b/src/rdinterval.h index d43ff95358..95cdf3c2d7 100644 --- a/src/rdinterval.h +++ b/src/rdinterval.h @@ -2,6 +2,7 @@ * librdkafka - Apache Kafka C library * * Copyright (c) 2018-2022, Magnus Edenhill + * 2023 Confluent Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -30,6 +31,7 @@ #define _RDINTERVAL_H_ #include "rd.h" +#include "rdrand.h" typedef struct rd_interval_s { rd_ts_t ri_ts_last; /* last interval timestamp */ @@ -109,6 +111,22 @@ static RD_INLINE RD_UNUSED void rd_interval_reset_to_now(rd_interval_t *ri, ri->ri_backoff = 0; } +/** + * Reset the interval to 'now' with the given backoff ms and max_jitter as + * percentage. The backoff is given just for absolute jitter calculation. If now + * is 0, the time will be gathered automatically. + */ +static RD_INLINE RD_UNUSED void +rd_interval_reset_to_now_with_jitter(rd_interval_t *ri, + rd_ts_t now, + int64_t backoff_ms, + int max_jitter) { + rd_interval_reset_to_now(ri, now); + /* We are multiplying by 10 as (backoff_ms * percent * 1000)/100 -> + * backoff_ms * jitter * 10 */ + ri->ri_backoff = backoff_ms * rd_jitter(-max_jitter, max_jitter) * 10; +} + /** * Back off the next interval by `backoff_us` microseconds. */ diff --git a/src/rdkafka_broker.c b/src/rdkafka_broker.c index 481c21d9c5..e92f008bfc 100644 --- a/src/rdkafka_broker.c +++ b/src/rdkafka_broker.c @@ -2823,6 +2823,7 @@ int rd_kafka_send(rd_kafka_broker_t *rkb) { */ void rd_kafka_broker_buf_retry(rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf) { + int64_t backoff = 0; /* Restore original replyq since replyq.q will have been NULLed * by buf_callback()/replyq_enq(). */ if (!rkbuf->rkbuf_replyq.q && rkbuf->rkbuf_orig_replyq.q) { @@ -2850,9 +2851,24 @@ void rd_kafka_broker_buf_retry(rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf) { rkb->rkb_rk->rk_conf.retry_backoff_ms); rd_atomic64_add(&rkb->rkb_c.tx_retries, 1); + /* In some cases, failed Produce requests do not increment the retry + * count, see rd_kafka_handle_Produce_error. */ + if (rkbuf->rkbuf_retries > 0) + backoff = (1 << (rkbuf->rkbuf_retries - 1)) * + (rkb->rkb_rk->rk_conf.retry_backoff_ms); + else + backoff = rkb->rkb_rk->rk_conf.retry_backoff_ms; + + /* We are multiplying by 10 as (backoff_ms * percent * 1000)/100 -> + * backoff_ms * jitter * 10 */ + backoff = rd_jitter(100 - RD_KAFKA_RETRY_JITTER_PERCENT, + 100 + RD_KAFKA_RETRY_JITTER_PERCENT) * + backoff * 10; + + if (backoff > rkb->rkb_rk->rk_conf.retry_backoff_max_ms * 1000) + backoff = rkb->rkb_rk->rk_conf.retry_backoff_max_ms * 1000; - rkbuf->rkbuf_ts_retry = - rd_clock() + (rkb->rkb_rk->rk_conf.retry_backoff_ms * 1000); + rkbuf->rkbuf_ts_retry = rd_clock() + backoff; /* Precaution: time out the request if it hasn't moved from the * retry queue within the retry interval (such as when the broker is * down). */ diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index 50d3ec24e1..eb953bb56b 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -755,8 +755,11 @@ void rd_kafka_cgrp_coord_query(rd_kafka_cgrp_t *rkcg, const char *reason) { rd_kafka_broker_destroy(rkb); - /* Back off the next intervalled query since we just sent one. */ - rd_interval_reset_to_now(&rkcg->rkcg_coord_query_intvl, 0); + /* Back off the next intervalled query with a jitter since we just sent + * one. */ + rd_interval_reset_to_now_with_jitter(&rkcg->rkcg_coord_query_intvl, 0, + 500, + RD_KAFKA_RETRY_JITTER_PERCENT); } /** diff --git a/src/rdkafka_conf.c b/src/rdkafka_conf.c index 9200af4c6a..154582d6fc 100644 --- a/src/rdkafka_conf.c +++ b/src/rdkafka_conf.c @@ -2,6 +2,7 @@ * librdkafka - Apache Kafka C library * * Copyright (c) 2012-2022, Magnus Edenhill + * 2023 Confluent Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -457,10 +458,12 @@ static const struct rd_kafka_property rd_kafka_properties[] = { {_RK_GLOBAL, "topic.metadata.refresh.fast.interval.ms", _RK_C_INT, _RK(metadata_refresh_fast_interval_ms), "When a topic loses its leader a new metadata request will be " - "enqueued with this initial interval, exponentially increasing " + "enqueued immediately and then with this initial interval, exponentially " + "increasing upto `retry.backoff.max.ms`, " "until the topic metadata has been refreshed. " + "If not set explicitly, it will be defaulted to `retry.backoff.ms`. " "This is used to recover quickly from transitioning leader brokers.", - 1, 60 * 1000, 250}, + 1, 60 * 1000, 100}, {_RK_GLOBAL | _RK_DEPRECATED, "topic.metadata.refresh.fast.cnt", _RK_C_INT, _RK(metadata_refresh_fast_cnt), "No longer used.", 0, 1000, 10}, {_RK_GLOBAL, "topic.metadata.refresh.sparse", _RK_C_BOOL, @@ -1372,10 +1375,21 @@ static const struct rd_kafka_property rd_kafka_properties[] = { 0, INT32_MAX, INT32_MAX}, {_RK_GLOBAL | _RK_PRODUCER, "retries", _RK_C_ALIAS, .sdef = "message.send.max.retries"}, + {_RK_GLOBAL | _RK_PRODUCER | _RK_MED, "retry.backoff.ms", _RK_C_INT, _RK(retry_backoff_ms), - "The backoff time in milliseconds before retrying a protocol request.", 1, - 300 * 1000, 100}, + "The backoff time in milliseconds before retrying a protocol request, " + "this is the first backoff time, " + "and will be backed off exponentially until number of retries is " + "exhausted, and it's capped by retry.backoff.max.ms.", + 1, 300 * 1000, 100}, + + {_RK_GLOBAL | _RK_PRODUCER | _RK_MED, "retry.backoff.max.ms", _RK_C_INT, + _RK(retry_backoff_max_ms), + "The max backoff time in milliseconds before retrying a protocol request, " + "this is the atmost backoff allowed for exponentially backed off " + "requests.", + 1, 300 * 1000, 1000}, {_RK_GLOBAL | _RK_PRODUCER, "queue.buffering.backpressure.threshold", _RK_C_INT, _RK(queue_backpressure_thres), @@ -3928,6 +3942,10 @@ const char *rd_kafka_conf_finalize(rd_kafka_type_t cltype, conf->sparse_connect_intvl = RD_MAX(11, RD_MIN(conf->reconnect_backoff_ms / 2, 1000)); } + if (!rd_kafka_conf_is_modified( + conf, "topic.metadata.refresh.fast.interval.ms")) + conf->metadata_refresh_fast_interval_ms = + conf->retry_backoff_ms; if (!rd_kafka_conf_is_modified(conf, "connections.max.idle.ms") && conf->brokerlist && rd_strcasestr(conf->brokerlist, "azure")) { @@ -4116,6 +4134,31 @@ int rd_kafka_conf_warn(rd_kafka_t *rk) { "recommend not using set_default_topic_conf"); /* Additional warnings */ + if (rk->rk_conf.retry_backoff_ms > rk->rk_conf.retry_backoff_max_ms) { + rd_kafka_log( + rk, LOG_WARNING, "CONFWARN", + "Configuration `retry.backoff.ms` with value %d is greater " + "than configuration `retry.backoff.max.ms` with value %d. " + "A static backoff with value `retry.backoff.max.ms` will " + "be applied.", + rk->rk_conf.retry_backoff_ms, + rk->rk_conf.retry_backoff_max_ms); + } + + if (rd_kafka_conf_is_modified( + &rk->rk_conf, "topic.metadata.refresh.fast.interval.ms") && + rk->rk_conf.metadata_refresh_fast_interval_ms > + rk->rk_conf.retry_backoff_max_ms) { + rd_kafka_log( + rk, LOG_WARNING, "CONFWARN", + "Configuration `topic.metadata.refresh.fast.interval.ms` " + "with value %d is greater than configuration " + "`retry.backoff.max.ms` with value %d. " + "A static backoff with value `retry.backoff.max.ms` will " + "be applied.", + rk->rk_conf.metadata_refresh_fast_interval_ms, + rk->rk_conf.retry_backoff_max_ms); + } if (rk->rk_type == RD_KAFKA_CONSUMER) { if (rk->rk_conf.fetch_wait_max_ms + 1000 > rk->rk_conf.socket_timeout_ms) diff --git a/src/rdkafka_conf.h b/src/rdkafka_conf.h index 01b6258d2e..bd17a261bf 100644 --- a/src/rdkafka_conf.h +++ b/src/rdkafka_conf.h @@ -423,6 +423,7 @@ struct rd_kafka_conf_s { int queue_backpressure_thres; int max_retries; int retry_backoff_ms; + int retry_backoff_max_ms; int batch_num_messages; int batch_size; rd_kafka_compression_t compression_codec; diff --git a/src/rdkafka_int.h b/src/rdkafka_int.h index 8a29c1f623..e586dd6e69 100644 --- a/src/rdkafka_int.h +++ b/src/rdkafka_int.h @@ -863,6 +863,8 @@ const char *rd_kafka_purge_flags2str(int flags); #define RD_KAFKA_DBG_ALL 0xfffff #define RD_KAFKA_DBG_NONE 0x0 +/* Jitter Percent for exponential retry backoff */ +#define RD_KAFKA_RETRY_JITTER_PERCENT 20 void rd_kafka_log0(const rd_kafka_conf_t *conf, const rd_kafka_t *rk, diff --git a/src/rdkafka_metadata.c b/src/rdkafka_metadata.c index 9119d41f27..de90b166e6 100644 --- a/src/rdkafka_metadata.c +++ b/src/rdkafka_metadata.c @@ -1625,15 +1625,13 @@ static void rd_kafka_metadata_leader_query_tmr_cb(rd_kafka_timers_t *rkts, rk, NULL, &topics, rd_true /*force*/, rk->rk_conf.allow_auto_create_topics, rd_false /*!cgrp_update*/, "partition leader query"); - /* Back off next query exponentially until we reach - * the standard query interval - then stop the timer - * since the intervalled querier will do the job for us. */ - if (rk->rk_conf.metadata_refresh_interval_ms > 0 && - rtmr->rtmr_interval * 2 / 1000 >= - rk->rk_conf.metadata_refresh_interval_ms) - rd_kafka_timer_stop(rkts, rtmr, 1 /*lock*/); - else - rd_kafka_timer_exp_backoff(rkts, rtmr); + + /* Back off next query exponentially till we reach + * the retry backoff max ms */ + rd_kafka_timer_exp_backoff( + rkts, rtmr, rk->rk_conf.retry_backoff_ms * 1000, + rk->rk_conf.retry_backoff_max_ms * 1000, + RD_KAFKA_RETRY_JITTER_PERCENT); } rd_list_destroy(&topics); @@ -1663,7 +1661,7 @@ void rd_kafka_metadata_fast_leader_query(rd_kafka_t *rk) { "Starting fast leader query"); rd_kafka_timer_start( &rk->rk_timers, &rk->rk_metadata_cache.rkmc_query_tmr, - rk->rk_conf.metadata_refresh_fast_interval_ms * 1000, + 0 /* First request should be tried immediately */, rd_kafka_metadata_leader_query_tmr_cb, NULL); } } diff --git a/src/rdkafka_mock.c b/src/rdkafka_mock.c index 6ec89b8468..fdc11ec5da 100644 --- a/src/rdkafka_mock.c +++ b/src/rdkafka_mock.c @@ -38,7 +38,7 @@ #include "rdkafka_interceptor.h" #include "rdkafka_mock_int.h" #include "rdkafka_transport_int.h" - +#include "rdkafka_mock.h" #include static void rd_kafka_mock_cluster_destroy0(rd_kafka_mock_cluster_t *mcluster); @@ -1127,6 +1127,15 @@ rd_kafka_mock_connection_parse_request(rd_kafka_mock_connection_t *mconn, return -1; } + mtx_lock(&mcluster->lock); + if (mcluster->track_requests) { + rd_list_add(&mcluster->request_list, + rd_kafka_mock_request_new( + mconn->broker->id, rkbuf->rkbuf_reqhdr.ApiKey, + rd_clock())); + } + mtx_unlock(&mcluster->lock); + rd_kafka_dbg(rk, MOCK, "MOCK", "Broker %" PRId32 ": Received %sRequestV%hd from %s", mconn->broker->id, @@ -2525,6 +2534,7 @@ rd_kafka_mock_cluster_t *rd_kafka_mock_cluster_new(rd_kafka_t *rk, TAILQ_INIT(&mcluster->topics); mcluster->defaults.partition_cnt = 4; mcluster->defaults.replication_factor = RD_MIN(3, broker_cnt); + mcluster->track_requests = rd_false; TAILQ_INIT(&mcluster->cgrps); @@ -2602,3 +2612,92 @@ const char * rd_kafka_mock_cluster_bootstraps(const rd_kafka_mock_cluster_t *mcluster) { return mcluster->bootstraps; } + +/** + * @struct Represents a request to the mock cluster along with a timestamp. + */ +struct rd_kafka_mock_request_s { + int32_t id; /**< Broker id */ + int16_t api_key; /**< API Key of request */ + rd_ts_t timestamp /**< Timestamp at which request was received */; +}; + +rd_kafka_mock_request_t * +rd_kafka_mock_request_new(int32_t id, int16_t api_key, rd_ts_t timestamp) { + rd_kafka_mock_request_t *request; + request = rd_malloc(sizeof(*request)); + request->id = id; + request->api_key = api_key; + request->timestamp = timestamp; + return request; +} + +rd_kafka_mock_request_t * +rd_kafka_mock_request_copy(rd_kafka_mock_request_t *mrequest) { + rd_kafka_mock_request_t *request; + request = rd_malloc(sizeof(*request)); + request->id = mrequest->id; + request->api_key = mrequest->api_key; + request->timestamp = mrequest->timestamp; + return request; +} + +void rd_kafka_mock_request_destroy(rd_kafka_mock_request_t *element) { + rd_free(element); +} + +void rd_kafka_mock_request_free(void *element) { + rd_kafka_mock_request_destroy(element); +} + +void rd_kafka_mock_start_request_tracking(rd_kafka_mock_cluster_t *mcluster) { + mtx_lock(&mcluster->lock); + mcluster->track_requests = rd_true; + rd_list_init(&mcluster->request_list, 32, rd_kafka_mock_request_free); + mtx_unlock(&mcluster->lock); +} + +void rd_kafka_mock_stop_request_tracking(rd_kafka_mock_cluster_t *mcluster) { + mtx_lock(&mcluster->lock); + mcluster->track_requests = rd_false; + rd_list_clear(&mcluster->request_list); + mtx_unlock(&mcluster->lock); +} + +rd_kafka_mock_request_t ** +rd_kafka_mock_get_requests(rd_kafka_mock_cluster_t *mcluster, size_t *cntp) { + size_t i; + rd_kafka_mock_request_t **ret = NULL; + + mtx_lock(&mcluster->lock); + *cntp = rd_list_cnt(&mcluster->request_list); + if (*cntp > 0) { + ret = rd_calloc(*cntp, sizeof(rd_kafka_mock_request_t *)); + for (i = 0; i < *cntp; i++) { + rd_kafka_mock_request_t *mreq = + rd_list_elem(&mcluster->request_list, i); + ret[i] = rd_kafka_mock_request_copy(mreq); + } + } + + mtx_unlock(&mcluster->lock); + return ret; +} + +void rd_kafka_mock_clear_requests(rd_kafka_mock_cluster_t *mcluster) { + mtx_lock(&mcluster->lock); + rd_list_clear(&mcluster->request_list); + mtx_unlock(&mcluster->lock); +} + +int32_t rd_kafka_mock_request_id(rd_kafka_mock_request_t *mreq) { + return mreq->id; +} + +int16_t rd_kafka_mock_request_api_key(rd_kafka_mock_request_t *mreq) { + return mreq->api_key; +} + +rd_ts_t rd_kafka_mock_request_timestamp(rd_kafka_mock_request_t *mreq) { + return mreq->timestamp; +} diff --git a/src/rdkafka_mock.h b/src/rdkafka_mock.h index a9fd86f12f..6c256e1252 100644 --- a/src/rdkafka_mock.h +++ b/src/rdkafka_mock.h @@ -364,6 +364,57 @@ rd_kafka_mock_set_apiversion(rd_kafka_mock_cluster_t *mcluster, int16_t MinVersion, int16_t MaxVersion); +/** + * @name Represents a request to the mock cluster along with a timestamp. + */ +typedef struct rd_kafka_mock_request_s rd_kafka_mock_request_t; + +RD_EXPORT +rd_kafka_mock_request_t * +rd_kafka_mock_request_new(int32_t id, int16_t api_key, rd_ts_t timestamp); + +RD_EXPORT +void rd_kafka_mock_start_request_tracking(rd_kafka_mock_cluster_t *mcluster); + +RD_EXPORT +void rd_kafka_mock_stop_request_tracking(rd_kafka_mock_cluster_t *mcluster); + +/** + * @brief Destroy a rd_kafka_mock_request_t * and deallocate memory. + */ +RD_EXPORT void rd_kafka_mock_request_destroy(rd_kafka_mock_request_t *mreq); + +/** + * @brief Get the broker id to which \p mreq was sent. + */ +RD_EXPORT int32_t rd_kafka_mock_request_id(rd_kafka_mock_request_t *mreq); + +/** + * @brief Get the ApiKey with which \p mreq was sent. + */ +RD_EXPORT int16_t rd_kafka_mock_request_api_key(rd_kafka_mock_request_t *mreq); + +/** + * @brief Get the timestamp at which \p mreq was sent. + */ +RD_EXPORT rd_ts_t +rd_kafka_mock_request_timestamp(rd_kafka_mock_request_t *mreq); + +/** + * @brief Get the list of requests sent to this mock cluster. + * + * @param cntp is set to the count of requests. + * @return List of rd_kafka_mock_request_t *. + * @remark each element of the returned array must be freed with + * rd_kafka_mock_request_destroy, and the list itself must be freed too. + */ +RD_EXPORT rd_kafka_mock_request_t ** +rd_kafka_mock_get_requests(rd_kafka_mock_cluster_t *mcluster, size_t *cntp); + +/** + * @brief Clear the list of requests sent to this mock broker. + */ +RD_EXPORT void rd_kafka_mock_clear_requests(rd_kafka_mock_cluster_t *mcluster); /**@}*/ diff --git a/src/rdkafka_mock_int.h b/src/rdkafka_mock_int.h index 390e8631ff..87da2d4e31 100644 --- a/src/rdkafka_mock_int.h +++ b/src/rdkafka_mock_int.h @@ -393,9 +393,19 @@ struct rd_kafka_mock_cluster_s { /**< Request handlers */ struct rd_kafka_mock_api_handler api_handlers[RD_KAFKAP__NUM]; + /**< Appends the requests received to mock cluster if set to true, + * defaulted to false for less memory usage. */ + rd_bool_t track_requests; + /**< List of API requests for this broker. Type: + * rd_kafka_mock_request_t* + */ + rd_list_t request_list; + /**< Mutex for: * .errstacks * .apiversions + * .track_requests + * .request_list */ mtx_t lock; diff --git a/src/rdkafka_msg.c b/src/rdkafka_msg.c index 6b129ace1d..5e71209dbf 100644 --- a/src/rdkafka_msg.c +++ b/src/rdkafka_msg.c @@ -2033,9 +2033,11 @@ static int unittest_msgq_order(const char *what, } /* Retry the messages, which moves them back to sendq - * maintaining the original order */ + * maintaining the original order with exponential backoff + * set to false */ rd_kafka_retry_msgq(&rkmq, &sendq, 1, 1, 0, - RD_KAFKA_MSG_STATUS_NOT_PERSISTED, cmp); + RD_KAFKA_MSG_STATUS_NOT_PERSISTED, cmp, rd_false, 0, + 0); RD_UT_ASSERT(rd_kafka_msgq_len(&sendq) == 0, "sendq FIFO should be empty, not contain %d messages", @@ -2073,9 +2075,11 @@ static int unittest_msgq_order(const char *what, } /* Retry the messages, which should now keep the 3 first messages - * on sendq (no more retries) and just number 4 moved back. */ + * on sendq (no more retries) and just number 4 moved back. + * No exponential backoff applied. */ rd_kafka_retry_msgq(&rkmq, &sendq, 1, 1, 0, - RD_KAFKA_MSG_STATUS_NOT_PERSISTED, cmp); + RD_KAFKA_MSG_STATUS_NOT_PERSISTED, cmp, rd_false, 0, + 0); if (fifo) { if (ut_verify_msgq_order("readded #2", &rkmq, 4, 6, rd_true)) @@ -2094,9 +2098,10 @@ static int unittest_msgq_order(const char *what, return 1; } - /* Move all messages back on rkmq */ + /* Move all messages back on rkmq without any exponential backoff. */ rd_kafka_retry_msgq(&rkmq, &sendq, 0, 1000, 0, - RD_KAFKA_MSG_STATUS_NOT_PERSISTED, cmp); + RD_KAFKA_MSG_STATUS_NOT_PERSISTED, cmp, rd_false, 0, + 0); /* Move first half of messages to sendq (1,2,3). @@ -2116,11 +2121,14 @@ static int unittest_msgq_order(const char *what, rkm = ut_rd_kafka_msg_new(msgsize); rkm->rkm_u.producer.msgid = i; rd_kafka_msgq_enq_sorted0(&rkmq, rkm, cmp); - + /* No exponential backoff applied. */ rd_kafka_retry_msgq(&rkmq, &sendq, 0, 1000, 0, - RD_KAFKA_MSG_STATUS_NOT_PERSISTED, cmp); + RD_KAFKA_MSG_STATUS_NOT_PERSISTED, cmp, rd_false, 0, + 0); + /* No exponential backoff applied. */ rd_kafka_retry_msgq(&rkmq, &sendq2, 0, 1000, 0, - RD_KAFKA_MSG_STATUS_NOT_PERSISTED, cmp); + RD_KAFKA_MSG_STATUS_NOT_PERSISTED, cmp, rd_false, 0, + 0); RD_UT_ASSERT(rd_kafka_msgq_len(&sendq) == 0, "sendq FIFO should be empty, not contain %d messages", diff --git a/src/rdkafka_partition.c b/src/rdkafka_partition.c index 1a9066d3d9..76baa3cfa3 100644 --- a/src/rdkafka_partition.c +++ b/src/rdkafka_partition.c @@ -873,6 +873,11 @@ void rd_kafka_msgq_insert_msgq(rd_kafka_msgq_t *destq, * @param incr_retry Increment retry count for messages. * @param max_retries Maximum retries allowed per message. * @param backoff Absolute retry backoff for retried messages. + * @param exponential_backoff If true the backoff should be exponential with + * 2**(retry_count - 1)*retry_ms with jitter. The + * \p backoff is ignored. + * @param retry_ms The retry ms used for exponential backoff calculation + * @param retry_max_ms The max backoff limit for exponential backoff calculation * * @returns 0 if all messages were retried, or 1 if some messages * could not be retried. @@ -883,10 +888,14 @@ int rd_kafka_retry_msgq(rd_kafka_msgq_t *destq, int max_retries, rd_ts_t backoff, rd_kafka_msg_status_t status, - int (*cmp)(const void *a, const void *b)) { + int (*cmp)(const void *a, const void *b), + rd_bool_t exponential_backoff, + int retry_ms, + int retry_max_ms) { rd_kafka_msgq_t retryable = RD_KAFKA_MSGQ_INITIALIZER(retryable); rd_kafka_msg_t *rkm, *tmp; - + int64_t jitter = rd_jitter(100 - RD_KAFKA_RETRY_JITTER_PERCENT, + 100 + RD_KAFKA_RETRY_JITTER_PERCENT); /* Scan through messages to see which ones are eligible for retry, * move the retryable ones to temporary queue and * set backoff time for first message and optionally @@ -900,8 +909,25 @@ int rd_kafka_retry_msgq(rd_kafka_msgq_t *destq, rd_kafka_msgq_deq(srcq, rkm, 1); rd_kafka_msgq_enq(&retryable, rkm); - rkm->rkm_u.producer.ts_backoff = backoff; rkm->rkm_u.producer.retries += incr_retry; + if (exponential_backoff) { + /* In some cases, like failed Produce requests do not + * increment the retry count, see + * rd_kafka_handle_Produce_error. */ + if (rkm->rkm_u.producer.retries > 0) + backoff = + (1 << (rkm->rkm_u.producer.retries - 1)) * + retry_ms; + else + backoff = retry_ms; + /* Multiplied by 10 as backoff should be in nano + * seconds. */ + backoff = jitter * backoff * 10; + if (backoff > retry_max_ms * 1000) + backoff = retry_max_ms * 1000; + backoff = rd_clock() + backoff; + } + rkm->rkm_u.producer.ts_backoff = backoff; /* Don't downgrade a message from any form of PERSISTED * to NOT_PERSISTED, since the original cause of indicating @@ -940,17 +966,21 @@ int rd_kafka_toppar_retry_msgq(rd_kafka_toppar_t *rktp, rd_kafka_msgq_t *rkmq, int incr_retry, rd_kafka_msg_status_t status) { - rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk; - rd_ts_t backoff = rd_clock() + (rk->rk_conf.retry_backoff_ms * 1000); + rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk; + int retry_ms = rk->rk_conf.retry_backoff_ms; + int retry_max_ms = rk->rk_conf.retry_backoff_max_ms; int r; if (rd_kafka_terminating(rk)) return 1; rd_kafka_toppar_lock(rktp); + /* Exponential backoff applied. */ r = rd_kafka_retry_msgq(&rktp->rktp_msgq, rkmq, incr_retry, - rk->rk_conf.max_retries, backoff, status, - rktp->rktp_rkt->rkt_conf.msg_order_cmp); + rk->rk_conf.max_retries, + 0 /* backoff will be calculated */, status, + rktp->rktp_rkt->rkt_conf.msg_order_cmp, rd_true, + retry_ms, retry_max_ms); rd_kafka_toppar_unlock(rktp); return r; diff --git a/src/rdkafka_partition.h b/src/rdkafka_partition.h index 6fbb86db04..638c86eb35 100644 --- a/src/rdkafka_partition.h +++ b/src/rdkafka_partition.h @@ -566,7 +566,10 @@ int rd_kafka_retry_msgq(rd_kafka_msgq_t *destq, int max_retries, rd_ts_t backoff, rd_kafka_msg_status_t status, - int (*cmp)(const void *a, const void *b)); + int (*cmp)(const void *a, const void *b), + rd_bool_t exponential_backoff, + int retry_ms, + int retry_max_ms); void rd_kafka_msgq_insert_msgq(rd_kafka_msgq_t *destq, rd_kafka_msgq_t *srcq, int (*cmp)(const void *a, const void *b)); diff --git a/src/rdkafka_timer.c b/src/rdkafka_timer.c index 776b5d995f..b62343269d 100644 --- a/src/rdkafka_timer.c +++ b/src/rdkafka_timer.c @@ -29,6 +29,7 @@ #include "rdkafka_int.h" #include "rd.h" #include "rdtime.h" +#include "rdrand.h" #include "rdsysqueue.h" #include "rdkafka_queue.h" @@ -198,15 +199,32 @@ void rd_kafka_timer_start0(rd_kafka_timers_t *rkts, /** * Delay the next timer invocation by '2 * rtmr->rtmr_interval' + * @param minimum_backoff the minimum backoff to be applied + * @param maximum_backoff the maximum backoff to be applied + * @param max_jitter the jitter percentage to be applied to the backoff */ void rd_kafka_timer_exp_backoff(rd_kafka_timers_t *rkts, - rd_kafka_timer_t *rtmr) { + rd_kafka_timer_t *rtmr, + rd_ts_t minimum_backoff, + rd_ts_t maximum_backoff, + int max_jitter) { + int64_t jitter; rd_kafka_timers_lock(rkts); if (rd_kafka_timer_scheduled(rtmr)) { - rtmr->rtmr_interval *= 2; rd_kafka_timer_unschedule(rkts, rtmr); } - rd_kafka_timer_schedule(rkts, rtmr, 0); + rtmr->rtmr_interval *= 2; + jitter = + (rd_jitter(-max_jitter, max_jitter) * rtmr->rtmr_interval) / 100; + if (rtmr->rtmr_interval + jitter < minimum_backoff) { + rtmr->rtmr_interval = minimum_backoff; + jitter = 0; + } else if ((maximum_backoff != -1) && + (rtmr->rtmr_interval + jitter) > maximum_backoff) { + rtmr->rtmr_interval = maximum_backoff; + jitter = 0; + } + rd_kafka_timer_schedule(rkts, rtmr, jitter); rd_kafka_timers_unlock(rkts); } diff --git a/src/rdkafka_timer.h b/src/rdkafka_timer.h index d3e8fba61e..9a273adcfa 100644 --- a/src/rdkafka_timer.h +++ b/src/rdkafka_timer.h @@ -85,7 +85,10 @@ void rd_kafka_timer_start0(rd_kafka_timers_t *rkts, callback, arg) void rd_kafka_timer_exp_backoff(rd_kafka_timers_t *rkts, - rd_kafka_timer_t *rtmr); + rd_kafka_timer_t *rtmr, + rd_ts_t minimum, + rd_ts_t maximum, + int maxjitter); rd_ts_t rd_kafka_timer_next(rd_kafka_timers_t *rkts, rd_kafka_timer_t *rtmr, int do_lock); diff --git a/src/rdkafka_topic.c b/src/rdkafka_topic.c index 2b68ee0204..5a161db9ac 100644 --- a/src/rdkafka_topic.c +++ b/src/rdkafka_topic.c @@ -1261,6 +1261,8 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt, rd_kafka_broker_t **partbrokers; int leader_cnt = 0; int old_state; + rd_bool_t partition_exists_with_no_leader_epoch = rd_false; + rd_bool_t partition_exists_with_updated_leader_epoch = rd_false; if (mdt->err != RD_KAFKA_RESP_ERR_NO_ERROR) rd_kafka_dbg(rk, TOPIC | RD_KAFKA_DBG_METADATA, "METADATA", @@ -1326,6 +1328,8 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt, int r; rd_kafka_broker_t *leader; int32_t leader_epoch = mdit->partitions[j].leader_epoch; + rd_kafka_toppar_t *rktp = + rd_kafka_toppar_get(rkt, mdt->partitions[j].id, 0); rd_kafka_dbg(rk, TOPIC | RD_KAFKA_DBG_METADATA, "METADATA", " Topic %s partition %i Leader %" PRId32 @@ -1336,6 +1340,14 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt, leader = partbrokers[j]; partbrokers[j] = NULL; + /* If broker does not support leaderEpoch(KIP 320) then it is + * set to -1, we assume that metadata is not stale. */ + if (leader_epoch == -1) + partition_exists_with_no_leader_epoch = rd_true; + else if (rktp->rktp_leader_epoch < leader_epoch) + partition_exists_with_updated_leader_epoch = rd_true; + + /* Update leader for partition */ r = rd_kafka_toppar_leader_update(rkt, mdt->partitions[j].id, mdt->partitions[j].leader, @@ -1349,10 +1361,14 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt, /* Drop reference to broker (from find()) */ rd_kafka_broker_destroy(leader); } + RD_IF_FREE(rktp, rd_kafka_toppar_destroy); } - /* If all partitions have leaders we can turn off fast leader query. */ - if (mdt->partition_cnt > 0 && leader_cnt == mdt->partition_cnt) + /* If all partitions have leaders, and this metadata update was not + * stale, we can turn off fast leader query. */ + if (mdt->partition_cnt > 0 && leader_cnt == mdt->partition_cnt && + (partition_exists_with_no_leader_epoch || + partition_exists_with_updated_leader_epoch)) rkt->rkt_flags &= ~RD_KAFKA_TOPIC_F_LEADER_UNAVAIL; if (mdt->err != RD_KAFKA_RESP_ERR_NO_ERROR && rkt->rkt_partition_cnt) { diff --git a/tests/0075-retry.c b/tests/0075-retry.c index 86eeb56d15..c3ce353abf 100644 --- a/tests/0075-retry.c +++ b/tests/0075-retry.c @@ -177,6 +177,7 @@ static void do_test_low_socket_timeout(const char *topic) { test_conf_set(conf, "socket.timeout.ms", "1000"); test_conf_set(conf, "socket.max.fails", "12345"); test_conf_set(conf, "retry.backoff.ms", "5000"); + test_conf_set(conf, "retry.backoff.max.ms", "5000"); /* Avoid api version requests (with their own timeout) to get in * the way of our test */ test_conf_set(conf, "api.version.request", "false"); diff --git a/tests/0143-exponential_backoff_mock.c b/tests/0143-exponential_backoff_mock.c new file mode 100644 index 0000000000..80ae817d5c --- /dev/null +++ b/tests/0143-exponential_backoff_mock.c @@ -0,0 +1,561 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2023, Confluent Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "test.h" +#include "../src/rdkafka_proto.h" +#include "../src/rdkafka_mock.h" + +const int32_t retry_ms = 100; +const int32_t retry_max_ms = 1000; + +static void free_mock_requests(rd_kafka_mock_request_t **requests, + size_t request_cnt) { + size_t i; + for (i = 0; i < request_cnt; i++) + rd_kafka_mock_request_destroy(requests[i]); + rd_free(requests); +} +/** + * @brief find_coordinator test + * We fail the request with RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE, + * so that the request is tried via the intervalled mechanism. The intervalling + * is done at 500 ms, with a 20% jitter. However, the actual code to retry the + * request runs inside rd_kafka_cgrp_serve that is called every one second, + * hence, the retry actually happens always in 1 second, no matter what the + * jitter is. This will be fixed once rd_kafka_cgrp_serve is timer triggered. + * The exponential backoff does not apply in this case we just apply the jitter + * to the backoff of intervalled query The retry count is non - deterministic as + * fresh request spawned on its own. + */ +static void test_find_coordinator(rd_kafka_mock_cluster_t *mcluster, + const char *topic, + rd_kafka_conf_t *conf) { + rd_kafka_mock_request_t **requests = NULL; + size_t request_cnt = 0; + int64_t previous_request_ts = -1; + int32_t retry_count = 0; + int32_t num_retries = 4; + const int32_t low = 1000; + int32_t buffer = 200; // 200 ms buffer added + rd_kafka_t *consumer; + rd_kafka_message_t *rkm; + size_t i; + + SUB_TEST(); + test_conf_set(conf, "auto.offset.reset", "earliest"); + test_conf_set(conf, "enable.auto.commit", "false"); + + consumer = test_create_consumer(topic, NULL, conf, NULL); + + rd_kafka_mock_push_request_errors( + mcluster, RD_KAFKAP_FindCoordinator, num_retries, + RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE, + RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE, + RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE, + RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE); + /* This will trigger a find_coordinator request */ + rkm = rd_kafka_consumer_poll(consumer, 10 * 1000); + if (rkm) + rd_kafka_message_destroy(rkm); + rd_sleep(4); + requests = rd_kafka_mock_get_requests(mcluster, &request_cnt); + for (i = 0; (i < request_cnt) && (retry_count < num_retries); i++) { + TEST_SAY("Broker Id : %d API Key : %d Timestamp : %" PRId64 + "\n", + rd_kafka_mock_request_id(requests[i]), + rd_kafka_mock_request_api_key(requests[i]), + rd_kafka_mock_request_timestamp(requests[i])); + + if (rd_kafka_mock_request_api_key(requests[i]) != + RD_KAFKAP_FindCoordinator) + continue; + + if (previous_request_ts != -1) { + int64_t time_difference = + (rd_kafka_mock_request_timestamp(requests[i]) - + previous_request_ts) / + 1000; + TEST_ASSERT(((time_difference > low - buffer) && + (time_difference < low + buffer)), + "Time difference should be close " + "to 1 second, it is %" PRId64 + " ms instead.\n", + time_difference); + retry_count++; + } + previous_request_ts = + rd_kafka_mock_request_timestamp(requests[i]); + } + rd_kafka_destroy(consumer); + free_mock_requests(requests, request_cnt); + rd_kafka_mock_clear_requests(mcluster); + SUB_TEST_PASS(); +} + +/** + * Exponential Backoff needs to be checked for the request_type. Also the + * request_type should only be retried if one previous has failed for correct + * execution. + */ +static void helper_exponential_backoff(rd_kafka_mock_cluster_t *mcluster, + int32_t request_type) { + rd_kafka_mock_request_t **requests = NULL; + size_t request_cnt = 0; + int64_t previous_request_ts = -1; + int32_t retry_count = 0; + size_t i; + requests = rd_kafka_mock_get_requests(mcluster, &request_cnt); + for (i = 0; i < request_cnt; i++) { + TEST_SAY("Broker Id : %d API Key : %d Timestamp : %" PRId64 + "\n", + rd_kafka_mock_request_id(requests[i]), + rd_kafka_mock_request_api_key(requests[i]), + rd_kafka_mock_request_timestamp(requests[i])); + + if (rd_kafka_mock_request_api_key(requests[i]) != request_type) + continue; + + if (previous_request_ts != -1) { + int64_t time_difference = + (rd_kafka_mock_request_timestamp(requests[i]) - + previous_request_ts) / + 1000; + /* Max Jitter is 20 percent each side so buffer chosen + * is 25 percent to account for latency delays */ + int64_t low = + ((1 << retry_count) * (retry_ms)*75) / 100; + int64_t high = + ((1 << retry_count) * (retry_ms)*125) / 100; + if (high > ((retry_max_ms * 125) / 100)) + high = (retry_max_ms * 125) / 100; + if (low > ((retry_max_ms * 75) / 100)) + low = (retry_max_ms * 75) / 100; + TEST_ASSERT((time_difference < high) && + (time_difference > low), + "Time difference is not respected, should " + "be between %" PRId64 " and %" PRId64 + " where time difference is %" PRId64 "\n", + low, high, time_difference); + retry_count++; + } + previous_request_ts = + rd_kafka_mock_request_timestamp(requests[i]); + } + free_mock_requests(requests, request_cnt); +} +/** + * @brief offset_commit test + * We fail the request with RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS so + * that the request is retried with the exponential backoff. The max retries + * allowed is 2 for offset_commit. The RPC calls rd_kafka_buf_retry for its + * retry attempt so this tests all such RPCs which depend on it for retrying. + * The retry number of request is deterministic i.e no fresh requests are + * spawned on its own. Also the max retries is 2 for Offset Commit. + */ +static void test_offset_commit(rd_kafka_mock_cluster_t *mcluster, + const char *topic, + rd_kafka_conf_t *conf) { + rd_kafka_t *consumer; + rd_kafka_message_t *rkm; + rd_kafka_topic_partition_list_t *offsets; + rd_kafka_topic_partition_t *rktpar; + SUB_TEST(); + test_conf_set(conf, "auto.offset.reset", "earliest"); + test_conf_set(conf, "enable.auto.commit", "false"); + + consumer = test_create_consumer(topic, NULL, conf, NULL); + test_consumer_subscribe(consumer, topic); + rkm = rd_kafka_consumer_poll(consumer, 10 * 1000); + if (rkm) + rd_kafka_message_destroy(rkm); + rd_sleep(4); + rd_kafka_mock_push_request_errors( + mcluster, RD_KAFKAP_OffsetCommit, 2, + RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS, + RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS); + + offsets = rd_kafka_topic_partition_list_new(1); + rktpar = rd_kafka_topic_partition_list_add(offsets, topic, 0); + /* Setting Offset to an arbitrary number */ + rktpar->offset = 4; + /* rd_kafka_commit will trigger OffsetCommit RPC call */ + rd_kafka_commit(consumer, offsets, 0); + rd_kafka_topic_partition_list_destroy(offsets); + rd_sleep(3); + + helper_exponential_backoff(mcluster, RD_KAFKAP_OffsetCommit); + + + rd_kafka_destroy(consumer); + rd_kafka_mock_clear_requests(mcluster); + SUB_TEST_PASS(); +} + +/** + * @brief produce test + * We fail the request with RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS so + * that the request is retried with the exponential backoff. The exponential + * backoff is capped at retry_max_ms with jitter. The retry number of request is + * deterministic i.e no fresh requests are spawned on its own. + */ +static void test_produce(rd_kafka_mock_cluster_t *mcluster, + const char *topic, + rd_kafka_conf_t *conf) { + rd_kafka_t *producer; + rd_kafka_topic_t *rkt; + SUB_TEST(); + rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb); + + producer = test_create_handle(RD_KAFKA_PRODUCER, conf); + rkt = test_create_producer_topic(producer, topic, NULL); + + rd_kafka_mock_push_request_errors( + mcluster, RD_KAFKAP_Produce, 7, + RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS, + RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS, + RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS, + RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS, + RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS, + RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS, + RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS); + + test_produce_msgs(producer, rkt, 0, RD_KAFKA_PARTITION_UA, 0, 1, + "hello", 5); + rd_sleep(3); + + helper_exponential_backoff(mcluster, RD_KAFKAP_Produce); + + + rd_kafka_topic_destroy(rkt); + rd_kafka_destroy(producer); + rd_kafka_mock_clear_requests(mcluster); + SUB_TEST_PASS(); +} + +/** + * Helper function for find coordinator trigger with the given request_type, the + * find coordinator request should be triggered after a failing request of + * request_type. + */ +static void helper_find_coordinator_trigger(rd_kafka_mock_cluster_t *mcluster, + int32_t request_type) { + rd_kafka_mock_request_t **requests = NULL; + size_t request_cnt = 0; + int32_t num_request = 0; + size_t i; + requests = rd_kafka_mock_get_requests(mcluster, &request_cnt); + for (i = 0; i < request_cnt; i++) { + TEST_SAY("Broker Id : %d API Key : %d Timestamp : %" PRId64 + "\n", + rd_kafka_mock_request_id(requests[i]), + rd_kafka_mock_request_api_key(requests[i]), + rd_kafka_mock_request_timestamp(requests[i])); + if (num_request == 0) { + if (rd_kafka_mock_request_api_key(requests[i]) == + request_type) { + num_request++; + } + } else if (num_request == 1) { + if (rd_kafka_mock_request_api_key(requests[i]) == + RD_KAFKAP_FindCoordinator) { + TEST_SAY( + "FindCoordinator request made after " + "failing request with NOT_COORDINATOR " + "error.\n"); + break; + } else if (rd_kafka_mock_request_api_key(requests[i]) == + request_type) { + num_request++; + TEST_FAIL( + "Second request made without any " + "FindCoordinator request."); + } + } + } + free_mock_requests(requests, request_cnt); + if (num_request != 1) + TEST_FAIL("No request was made."); +} +/** + * @brief heartbeat-find_coordinator test + * We fail the request with RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP so that + * the FindCoordinator request is triggered. + */ +static void test_heartbeat_find_coordinator(rd_kafka_mock_cluster_t *mcluster, + const char *topic, + rd_kafka_conf_t *conf) { + rd_kafka_t *consumer; + rd_kafka_message_t *rkm; + SUB_TEST(); + test_conf_set(conf, "auto.offset.reset", "earliest"); + test_conf_set(conf, "enable.auto.commit", "false"); + + consumer = test_create_consumer(topic, NULL, conf, NULL); + + rd_kafka_mock_push_request_errors( + mcluster, RD_KAFKAP_Heartbeat, 1, + RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP); + + rd_kafka_mock_clear_requests(mcluster); + test_consumer_subscribe(consumer, topic); + /* This will trigger a find_coordinator request */ + rkm = rd_kafka_consumer_poll(consumer, 10 * 1000); + if (rkm) + rd_kafka_message_destroy(rkm); + rd_sleep(6); + + + helper_find_coordinator_trigger(mcluster, RD_KAFKAP_Heartbeat); + + + rd_kafka_destroy(consumer); + rd_kafka_mock_clear_requests(mcluster); + SUB_TEST_PASS(); +} + +/** + * @brief joingroup-find_coordinator test + * We fail the request with RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP so that + * the FindCoordinator request is triggered. + */ +static void test_joingroup_find_coordinator(rd_kafka_mock_cluster_t *mcluster, + const char *topic, + rd_kafka_conf_t *conf) { + rd_kafka_t *consumer; + rd_kafka_message_t *rkm; + SUB_TEST(); + test_conf_set(conf, "auto.offset.reset", "earliest"); + test_conf_set(conf, "enable.auto.commit", "false"); + + consumer = test_create_consumer(topic, NULL, conf, NULL); + rd_kafka_mock_push_request_errors( + mcluster, RD_KAFKAP_JoinGroup, 1, + RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP); + rd_kafka_mock_clear_requests(mcluster); + test_consumer_subscribe(consumer, topic); + /* This will trigger a find_coordinator request */ + rkm = rd_kafka_consumer_poll(consumer, 10 * 1000); + if (rkm) + rd_kafka_message_destroy(rkm); + rd_sleep(4); + + helper_find_coordinator_trigger(mcluster, RD_KAFKAP_JoinGroup); + + rd_kafka_destroy(consumer); + rd_kafka_mock_clear_requests(mcluster); + SUB_TEST_PASS(); +} + +/** + * @brief produce-fast_leader_query test + * We fail a Produce request with RD_KAFKA_RESP_ERR_NOT_LEADER_OR_FOLLOWER, so + * that it triggers a fast leader query (a Metadata request). We don't update + * the leader in this test, so the Metadata is always stale from the client's + * perspective, and the fast leader query carries on, being backed off + * exponentially until the max retry time is reached. The retry number of + * request is non deterministic as it will keep retrying till the leader change. + */ +static void test_produce_fast_leader_query(rd_kafka_mock_cluster_t *mcluster, + const char *topic, + rd_kafka_conf_t *conf) { + rd_kafka_mock_request_t **requests = NULL; + size_t request_cnt = 0; + int64_t previous_request_ts = -1; + int32_t retry_count = 0; + rd_bool_t produced = rd_false; + rd_kafka_t *producer; + rd_kafka_topic_t *rkt; + size_t i; + SUB_TEST(); + rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb); + + producer = test_create_handle(RD_KAFKA_PRODUCER, conf); + rkt = test_create_producer_topic(producer, topic, NULL); + + rd_kafka_mock_push_request_errors( + mcluster, RD_KAFKAP_Produce, 1, + RD_KAFKA_RESP_ERR_NOT_LEADER_OR_FOLLOWER); + rd_kafka_mock_clear_requests(mcluster); + test_produce_msgs(producer, rkt, 0, RD_KAFKA_PARTITION_UA, 0, 1, + "hello", 1); + rd_sleep(10); + requests = rd_kafka_mock_get_requests(mcluster, &request_cnt); + + for (i = 0; i < request_cnt; i++) { + TEST_SAY("Broker Id : %d API Key : %d Timestamp : %" PRId64 + "\n", + rd_kafka_mock_request_id(requests[i]), + rd_kafka_mock_request_api_key(requests[i]), + rd_kafka_mock_request_timestamp(requests[i])); + + if (!produced && rd_kafka_mock_request_api_key(requests[i]) == + RD_KAFKAP_Produce) + produced = rd_true; + else if (rd_kafka_mock_request_api_key(requests[i]) == + RD_KAFKAP_Metadata && + produced) { + if (previous_request_ts != -1) { + int64_t time_difference = + (rd_kafka_mock_request_timestamp( + requests[i]) - + previous_request_ts) / + 1000; + /* Max Jitter is 20 percent each side so buffer + * chosen is 25 percent to account for latency + * delays */ + int64_t low = + ((1 << retry_count) * (retry_ms)*75) / 100; + int64_t high = + ((1 << retry_count) * (retry_ms)*125) / 100; + if (high > ((retry_max_ms * 125) / 100)) + high = (retry_max_ms * 125) / 100; + if (low > ((retry_max_ms * 75) / 100)) + low = (retry_max_ms * 75) / 100; + TEST_ASSERT( + (time_difference < high) && + (time_difference > low), + "Time difference is not respected, should " + "be between %" PRId64 " and %" PRId64 + " where time difference is %" PRId64 "\n", + low, high, time_difference); + retry_count++; + } + previous_request_ts = + rd_kafka_mock_request_timestamp(requests[i]); + } + } + rd_kafka_topic_destroy(rkt); + rd_kafka_destroy(producer); + free_mock_requests(requests, request_cnt); + rd_kafka_mock_clear_requests(mcluster); + SUB_TEST_PASS(); +} + +/** + * @brief fetch-fast_leader_query test + * We fail a Fetch request by causing a leader change (the leader is the same, + * but with a different leader epoch). It triggers fast leader query (Metadata + * request). The request is able to obtain an updated leader, and hence, the + * fast leader query terminates after one Metadata request. + */ +static void test_fetch_fast_leader_query(rd_kafka_mock_cluster_t *mcluster, + const char *topic, + rd_kafka_conf_t *conf) { + rd_kafka_mock_request_t **requests = NULL; + size_t request_cnt = 0; + rd_bool_t previous_request_was_Fetch = rd_false; + rd_bool_t Metadata_after_Fetch = rd_false; + rd_kafka_t *consumer; + rd_kafka_message_t *rkm; + size_t i; + SUB_TEST(); + test_conf_set(conf, "auto.offset.reset", "earliest"); + test_conf_set(conf, "enable.auto.commit", "false"); + + consumer = test_create_consumer(topic, NULL, conf, NULL); + + test_consumer_subscribe(consumer, topic); + rkm = rd_kafka_consumer_poll(consumer, 10 * 1000); + + if (rkm) + rd_kafka_message_destroy(rkm); + rd_kafka_mock_clear_requests(mcluster); + + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1); + rkm = rd_kafka_consumer_poll(consumer, 10 * 1000); + if (rkm) + rd_kafka_message_destroy(rkm); + rd_sleep(3); + requests = rd_kafka_mock_get_requests(mcluster, &request_cnt); + for (i = 0; i < request_cnt; i++) { + TEST_SAY("Broker Id : %d API Key : %d Timestamp : %" PRId64 + "\n", + rd_kafka_mock_request_id(requests[i]), + rd_kafka_mock_request_api_key(requests[i]), + rd_kafka_mock_request_timestamp(requests[i])); + + if (rd_kafka_mock_request_api_key(requests[i]) == + RD_KAFKAP_Fetch) + previous_request_was_Fetch = rd_true; + else if (rd_kafka_mock_request_api_key(requests[i]) == + RD_KAFKAP_Metadata && + previous_request_was_Fetch) { + Metadata_after_Fetch = rd_true; + break; + } else + previous_request_was_Fetch = rd_false; + } + rd_kafka_destroy(consumer); + free_mock_requests(requests, request_cnt); + rd_kafka_mock_clear_requests(mcluster); + TEST_ASSERT( + Metadata_after_Fetch, + "Metadata Request should have been made after fetch atleast once."); + SUB_TEST_PASS(); +} + +/** + * @brief Exponential Backoff (KIP 580) + * We test all the pipelines which affect the retry mechanism for both + * intervalled queries where jitter is added and backed off queries where both + * jitter and exponential backoff is applied with the max being retry_max_ms. + */ +int main_0143_exponential_backoff_mock(int argc, char **argv) { + const char *topic = test_mk_topic_name("topic", 1); + rd_kafka_mock_cluster_t *mcluster; + rd_kafka_conf_t *conf; + const char *bootstraps; + if (test_needs_auth()) { + TEST_SKIP("Mock cluster does not support SSL/SASL.\n"); + return 0; + } + mcluster = test_mock_cluster_new(1, &bootstraps); + rd_kafka_mock_start_request_tracking(mcluster); + rd_kafka_mock_topic_create(mcluster, topic, 1, 1); + + test_conf_init(&conf, NULL, 30); + /* This test may be slower when running with CI or Helgrind, + * restart the timeout. */ + test_timeout_set(100); + test_conf_set(conf, "bootstrap.servers", bootstraps); + test_conf_set(conf, "topic.metadata.refresh.interval.ms", "-1"); + + test_produce(mcluster, topic, rd_kafka_conf_dup(conf)); + test_find_coordinator(mcluster, topic, rd_kafka_conf_dup(conf)); + test_offset_commit(mcluster, topic, rd_kafka_conf_dup(conf)); + test_heartbeat_find_coordinator(mcluster, topic, + rd_kafka_conf_dup(conf)); + test_joingroup_find_coordinator(mcluster, topic, + rd_kafka_conf_dup(conf)); + test_fetch_fast_leader_query(mcluster, topic, rd_kafka_conf_dup(conf)); + test_produce_fast_leader_query(mcluster, topic, + rd_kafka_conf_dup(conf)); + test_mock_cluster_destroy(mcluster); + rd_kafka_conf_destroy(conf); + return 0; +} diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 66be0fbb2d..248772da83 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -133,6 +133,7 @@ set( 0139-offset_validation_mock.c 0140-commit_metadata.cpp 0142-reauthentication.c + 0143-exponential_backoff_mock.c 8000-idle.cpp 8001-fetch_from_follower_mock_manual.c test.c diff --git a/tests/test.c b/tests/test.c index 5a9c4fce89..7c5b15d638 100644 --- a/tests/test.c +++ b/tests/test.c @@ -254,6 +254,7 @@ _TEST_DECL(0138_admin_mock); _TEST_DECL(0139_offset_validation_mock); _TEST_DECL(0140_commit_metadata); _TEST_DECL(0142_reauthentication); +_TEST_DECL(0143_exponential_backoff_mock); /* Manual tests */ _TEST_DECL(8000_idle); @@ -505,6 +506,7 @@ struct test tests[] = { _TEST(0139_offset_validation_mock, 0), _TEST(0140_commit_metadata, 0), _TEST(0142_reauthentication, 0, TEST_BRKVER(2, 2, 0, 0)), + _TEST(0143_exponential_backoff_mock, TEST_F_LOCAL), /* Manual tests */ diff --git a/win32/tests/tests.vcxproj b/win32/tests/tests.vcxproj index 8463ffdf44..7badb2788e 100644 --- a/win32/tests/tests.vcxproj +++ b/win32/tests/tests.vcxproj @@ -223,6 +223,7 @@ +