diff --git a/CHANGELOG.md b/CHANGELOG.md index fbc67c8242..55ac21bcdd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ librdkafka v2.2.1 is a maintenance release: * Added Topic id to the metadata response which is part of the [KIP-516](https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers) * Fixed ListConsumerGroupOffsets not fetching offsets for all the topics in a group with Apache Kafka version below 2.4.0. + * Add missing destroy that leads to leaking partition structure memory when there + are partition leader changes and a stale leader epoch is received (#4429). + * Fix a segmentation fault when closing a consumer using the + cooperative-sticky assignor before the first assignment (#4381). @@ -35,7 +39,9 @@ librdkafka v2.2.0 is a feature release: * [KIP-339](https://cwiki.apache.org/confluence/display/KAFKA/KIP-339%3A+Create+a+new+IncrementalAlterConfigs+API): IncrementalAlterConfigs API (started by @PrasanthV454, #4110). * [KIP-554](https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API): Add Broker-side SCRAM Config API (#4241). - + * [KIP-580](https://cwiki.apache.org/confluence/display/KAFKA/KIP-580%3A+Exponential+Backoff+for+Kafka+Clients): Added Exponential Backoff mechanism for + retriable requests with `retry.backoff.ms` as minimum backoff and `retry.backoff.max.ms` as the + maximum backoff, with jitter `RD_KAFKA_RETRY_JITTER_PERCENT`(#4422). ## Enhancements diff --git a/CONFIGURATION.md b/CONFIGURATION.md index 127fe4c88f..897f6211d7 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -12,7 +12,7 @@ message.copy.max.bytes | * | 0 .. 1000000000 | 65535 receive.message.max.bytes | * | 1000 .. 2147483647 | 100000000 | medium | Maximum Kafka protocol response message size. This serves as a safety precaution to avoid memory exhaustion in case of protocol hickups. This value must be at least `fetch.max.bytes` + 512 to allow for protocol overhead; the value is adjusted automatically unless the configuration property is explicitly set.
*Type: integer* max.in.flight.requests.per.connection | * | 1 .. 1000000 | 1000000 | low | Maximum number of in-flight requests per broker connection. This is a generic property applied to all broker communication, however it is primarily relevant to produce requests. In particular, note that other mechanisms limit the number of outstanding consumer fetch request per broker to one.
*Type: integer* max.in.flight | * | 1 .. 1000000 | 1000000 | low | Alias for `max.in.flight.requests.per.connection`: Maximum number of in-flight requests per broker connection. This is a generic property applied to all broker communication, however it is primarily relevant to produce requests. In particular, note that other mechanisms limit the number of outstanding consumer fetch request per broker to one.
*Type: integer* -topic.metadata.refresh.interval.ms | * | -1 .. 3600000 | 300000 | low | Period of time in milliseconds at which topic and broker metadata is refreshed in order to proactively discover any new brokers, topics, partitions or partition leader changes. Use -1 to disable the intervalled refresh (not recommended). If there are no locally referenced topics (no topic objects created, no messages produced, no subscription or no assignment) then only the broker list will be refreshed every interval but no more often than every 10s.
*Type: integer* +topic.metadata.refresh.interval.ms | * | -1 .. 3600000 | 300000 | low | Period of time in milliseconds at which topic and broker metadata is refreshed in order to proactively discover any new brokers, topics, partitions or partition leader changes. Use -1 to disable the intervalled refresh (not recommended). If not set explicitly, it will be defaulted to `retry.backoff.ms`. If there are no locally referenced topics (no topic objects created, no messages produced, no subscription or no assignment) then only the broker list will be refreshed every interval but no more often than every 10s.
*Type: integer* metadata.max.age.ms | * | 1 .. 86400000 | 900000 | low | Metadata cache max age. Defaults to topic.metadata.refresh.interval.ms * 3
*Type: integer* topic.metadata.refresh.fast.interval.ms | * | 1 .. 60000 | 250 | low | When a topic loses its leader a new metadata request will be enqueued with this initial interval, exponentially increasing until the topic metadata has been refreshed. This is used to recover quickly from transitioning leader brokers.
*Type: integer* topic.metadata.refresh.fast.cnt | * | 0 .. 1000 | 10 | low | **DEPRECATED** No longer used.
*Type: integer* @@ -142,7 +142,8 @@ queue.buffering.max.ms | P | 0 .. 900000 | 5 linger.ms | P | 0 .. 900000 | 5 | high | Alias for `queue.buffering.max.ms`: Delay in milliseconds to wait for messages in the producer queue to accumulate before constructing message batches (MessageSets) to transmit to brokers. A higher value allows larger and more effective (less overhead, improved compression) batches of messages to accumulate at the expense of increased message delivery latency.
*Type: float* message.send.max.retries | P | 0 .. 2147483647 | 2147483647 | high | How many times to retry sending a failing Message. **Note:** retrying may cause reordering unless `enable.idempotence` is set to true.
*Type: integer* retries | P | 0 .. 2147483647 | 2147483647 | high | Alias for `message.send.max.retries`: How many times to retry sending a failing Message. **Note:** retrying may cause reordering unless `enable.idempotence` is set to true.
*Type: integer* -retry.backoff.ms | P | 1 .. 300000 | 100 | medium | The backoff time in milliseconds before retrying a protocol request.
*Type: integer* +retry.backoff.ms | P | 1 .. 300000 | 100 | medium | The backoff time in milliseconds before retrying a protocol request, this is the first backoff time, and will be backed off exponentially until number of retries is exhausted, and it's capped by retry.backoff.max.ms.
*Type: integer* +retry.backoff.max.ms | P | 1 .. 300000 | 1000 | medium | The max backoff time in milliseconds before retrying a protocol request, this is the atmost backoff allowed for exponentially backed off requests.
*Type: integer* queue.buffering.backpressure.threshold | P | 1 .. 1000000 | 1 | low | The threshold of outstanding not yet transmitted broker requests needed to backpressure the producer's message accumulator. If the number of not yet transmitted requests equals or exceeds this number, produce request creation that would have otherwise been triggered (for example, in accordance with linger.ms) will be delayed. A lower number yields larger and more effective batches. A higher value can improve latency when using compression on slow machines.
*Type: integer* compression.codec | P | none, gzip, snappy, lz4, zstd | none | medium | compression codec to use for compressing message sets. This is the default value for all topics, may be overridden by the topic configuration property `compression.codec`.
*Type: enum value* compression.type | P | none, gzip, snappy, lz4, zstd | none | medium | Alias for `compression.codec`: compression codec to use for compressing message sets. This is the default value for all topics, may be overridden by the topic configuration property `compression.codec`.
*Type: enum value* diff --git a/INTRODUCTION.md b/INTRODUCTION.md index c360719d26..1f60f148ea 100644 --- a/INTRODUCTION.md +++ b/INTRODUCTION.md @@ -319,7 +319,8 @@ error code set. The application should typically not attempt to retry producing the message on failure, but instead configure librdkafka to perform these retries -using the `retries` and `retry.backoff.ms` configuration properties. +using the `retries`, `retry.backoff.ms` and `retry.backoff.max.ms` +configuration properties. #### Error: Timed out in transmission queue @@ -1950,7 +1951,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf | KIP-559 - Make the Kafka Protocol Friendlier with L7 Proxies | 2.5.0 | Not supported | | KIP-568 - Explicit rebalance triggering on the Consumer | 2.6.0 | Not supported | | KIP-659 - Add metadata to DescribeConfigsResponse | 2.6.0 | Not supported | -| KIP-580 - Exponential backoff for Kafka clients | WIP | Partially supported | +| KIP-580 - Exponential backoff for Kafka clients | 3.7.0 (WIP) | Supported supported | | KIP-584 - Versioning scheme for features | WIP | Not supported | | KIP-588 - Allow producers to recover gracefully from txn timeouts | 2.8.0 (WIP) | Not supported | | KIP-601 - Configurable socket connection timeout | 2.7.0 | Supported | diff --git a/src/rdinterval.h b/src/rdinterval.h index d43ff95358..afdaa2c89f 100644 --- a/src/rdinterval.h +++ b/src/rdinterval.h @@ -30,6 +30,7 @@ #define _RDINTERVAL_H_ #include "rd.h" +#include "rdrand.h" typedef struct rd_interval_s { rd_ts_t ri_ts_last; /* last interval timestamp */ @@ -109,6 +110,23 @@ static RD_INLINE RD_UNUSED void rd_interval_reset_to_now(rd_interval_t *ri, ri->ri_backoff = 0; } +/** + * Reset the interval to 'now' with the given backoff ms and max_jitter as + * percentage. The backoff is given just for absolute jitter calculation. If now + * is 0, the time will be gathered automatically. + */ +static RD_INLINE RD_UNUSED void +rd_interval_reset_to_now_with_jitter(rd_interval_t *ri, + rd_ts_t now, + int64_t backoff_ms, + int max_jitter) { + rd_interval_reset_to_now(ri, now); + /* We are multiplying by 10 as (backoff_ms * percent * 1000)/100 -> + * backoff_ms * jitter * 10 */ + ri->ri_ts_last = ri->ri_ts_last + + backoff_ms * rd_jitter(-max_jitter, max_jitter) * 10; +} + /** * Back off the next interval by `backoff_us` microseconds. */ diff --git a/src/rdkafka_broker.c b/src/rdkafka_broker.c index 481c21d9c5..281a01f156 100644 --- a/src/rdkafka_broker.c +++ b/src/rdkafka_broker.c @@ -2823,6 +2823,7 @@ int rd_kafka_send(rd_kafka_broker_t *rkb) { */ void rd_kafka_broker_buf_retry(rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf) { + int64_t backoff = 0; /* Restore original replyq since replyq.q will have been NULLed * by buf_callback()/replyq_enq(). */ if (!rkbuf->rkbuf_replyq.q && rkbuf->rkbuf_orig_replyq.q) { @@ -2850,9 +2851,23 @@ void rd_kafka_broker_buf_retry(rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf) { rkb->rkb_rk->rk_conf.retry_backoff_ms); rd_atomic64_add(&rkb->rkb_c.tx_retries, 1); + /* In some cases, failed Produce requests do not increment the retry count, see rd_kafka_handle_Produce_error. */ + if (rkbuf->rkbuf_retries > 0) + backoff = (1 << (rkbuf->rkbuf_retries - 1)) * + (rkb->rkb_rk->rk_conf.retry_backoff_ms); + else + backoff = rkb->rkb_rk->rk_conf.retry_backoff_ms; + + /* We are multiplying by 10 as (backoff_ms * percent * 1000)/100 -> + * backoff_ms * jitter * 10 */ + backoff = rd_jitter(100 - RD_KAFKA_RETRY_JITTER_PERCENT, + 100 + RD_KAFKA_RETRY_JITTER_PERCENT) * + backoff * 10; + + if (backoff > rkb->rkb_rk->rk_conf.retry_backoff_max_ms * 1000) + backoff = rkb->rkb_rk->rk_conf.retry_backoff_max_ms * 1000; - rkbuf->rkbuf_ts_retry = - rd_clock() + (rkb->rkb_rk->rk_conf.retry_backoff_ms * 1000); + rkbuf->rkbuf_ts_retry = rd_clock() + backoff; /* Precaution: time out the request if it hasn't moved from the * retry queue within the retry interval (such as when the broker is * down). */ diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index 9926f8632c..eb953bb56b 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -370,7 +370,8 @@ void rd_kafka_cgrp_destroy_final(rd_kafka_cgrp_t *rkcg) { rd_list_destroy(&rkcg->rkcg_toppars); rd_list_destroy(rkcg->rkcg_subscribed_topics); rd_kafka_topic_partition_list_destroy(rkcg->rkcg_errored_topics); - if (rkcg->rkcg_assignor && rkcg->rkcg_assignor->rkas_destroy_state_cb) + if (rkcg->rkcg_assignor && rkcg->rkcg_assignor->rkas_destroy_state_cb && + rkcg->rkcg_assignor_state) rkcg->rkcg_assignor->rkas_destroy_state_cb( rkcg->rkcg_assignor_state); rd_free(rkcg); @@ -754,8 +755,11 @@ void rd_kafka_cgrp_coord_query(rd_kafka_cgrp_t *rkcg, const char *reason) { rd_kafka_broker_destroy(rkb); - /* Back off the next intervalled query since we just sent one. */ - rd_interval_reset_to_now(&rkcg->rkcg_coord_query_intvl, 0); + /* Back off the next intervalled query with a jitter since we just sent + * one. */ + rd_interval_reset_to_now_with_jitter(&rkcg->rkcg_coord_query_intvl, 0, + 500, + RD_KAFKA_RETRY_JITTER_PERCENT); } /** @@ -1914,7 +1918,9 @@ static void rd_kafka_cgrp_handle_JoinGroup(rd_kafka_t *rk, "Unsupported assignment strategy \"%s\"", protocol_name); if (rkcg->rkcg_assignor) { - if (rkcg->rkcg_assignor->rkas_destroy_state_cb) + if (rkcg->rkcg_assignor + ->rkas_destroy_state_cb && + rkcg->rkcg_assignor_state) rkcg->rkcg_assignor ->rkas_destroy_state_cb( rkcg->rkcg_assignor_state); @@ -1952,7 +1958,8 @@ static void rd_kafka_cgrp_handle_JoinGroup(rd_kafka_t *rk, } if (rkcg->rkcg_assignor && rkcg->rkcg_assignor != rkas) { - if (rkcg->rkcg_assignor->rkas_destroy_state_cb) + if (rkcg->rkcg_assignor->rkas_destroy_state_cb && + rkcg->rkcg_assignor_state) rkcg->rkcg_assignor->rkas_destroy_state_cb( rkcg->rkcg_assignor_state); rkcg->rkcg_assignor_state = NULL; diff --git a/src/rdkafka_conf.c b/src/rdkafka_conf.c index 9200af4c6a..ea59554958 100644 --- a/src/rdkafka_conf.c +++ b/src/rdkafka_conf.c @@ -445,6 +445,7 @@ static const struct rd_kafka_property rd_kafka_properties[] = { "metadata is refreshed in order to proactively discover any new " "brokers, topics, partitions or partition leader changes. " "Use -1 to disable the intervalled refresh (not recommended). " + "If not set explicitly, it will be defaulted to `retry.backoff.ms`. " "If there are no locally referenced topics " "(no topic objects created, no messages produced, " "no subscription or no assignment) then only the broker list will " @@ -1372,10 +1373,21 @@ static const struct rd_kafka_property rd_kafka_properties[] = { 0, INT32_MAX, INT32_MAX}, {_RK_GLOBAL | _RK_PRODUCER, "retries", _RK_C_ALIAS, .sdef = "message.send.max.retries"}, + {_RK_GLOBAL | _RK_PRODUCER | _RK_MED, "retry.backoff.ms", _RK_C_INT, _RK(retry_backoff_ms), - "The backoff time in milliseconds before retrying a protocol request.", 1, - 300 * 1000, 100}, + "The backoff time in milliseconds before retrying a protocol request, " + "this is the first backoff time, " + "and will be backed off exponentially until number of retries is " + "exhausted, and it's capped by retry.backoff.max.ms.", + 1, 300 * 1000, 100}, + + {_RK_GLOBAL | _RK_PRODUCER | _RK_MED, "retry.backoff.max.ms", _RK_C_INT, + _RK(retry_backoff_max_ms), + "The max backoff time in milliseconds before retrying a protocol request, " + "this is the atmost backoff allowed for exponentially backed off " + "requests.", + 1, 300 * 1000, 1000}, {_RK_GLOBAL | _RK_PRODUCER, "queue.buffering.backpressure.threshold", _RK_C_INT, _RK(queue_backpressure_thres), @@ -3928,6 +3940,10 @@ const char *rd_kafka_conf_finalize(rd_kafka_type_t cltype, conf->sparse_connect_intvl = RD_MAX(11, RD_MIN(conf->reconnect_backoff_ms / 2, 1000)); } + if (!rd_kafka_conf_is_modified( + conf, "topic.metadata.refresh.fast.interval.ms")) + conf->metadata_refresh_fast_interval_ms = + conf->retry_backoff_ms; if (!rd_kafka_conf_is_modified(conf, "connections.max.idle.ms") && conf->brokerlist && rd_strcasestr(conf->brokerlist, "azure")) { @@ -4116,6 +4132,31 @@ int rd_kafka_conf_warn(rd_kafka_t *rk) { "recommend not using set_default_topic_conf"); /* Additional warnings */ + if (rk->rk_conf.retry_backoff_ms > rk->rk_conf.retry_backoff_max_ms) { + rd_kafka_log( + rk, LOG_WARNING, "CONFWARN", + "Configuration `retry.backoff.ms` with value %d is greater " + "than configuration `retry.backoff.max.ms` with value %d. " + "A static backoff with value `retry.backoff.max.ms` will " + "be applied.", + rk->rk_conf.retry_backoff_ms, + rk->rk_conf.retry_backoff_max_ms); + } + + if (rd_kafka_conf_is_modified( + &rk->rk_conf, "topic.metadata.refresh.fast.interval.ms") && + rk->rk_conf.metadata_refresh_fast_interval_ms > + rk->rk_conf.retry_backoff_max_ms) { + rd_kafka_log( + rk, LOG_WARNING, "CONFWARN", + "Configuration `topic.metadata.refresh.fast.interval.ms` " + "with value %d is greater than configuration " + "`retry.backoff.max.ms` with value %d. " + "A static backoff with value `retry.backoff.max.ms` will " + "be applied.", + rk->rk_conf.metadata_refresh_fast_interval_ms, + rk->rk_conf.retry_backoff_max_ms); + } if (rk->rk_type == RD_KAFKA_CONSUMER) { if (rk->rk_conf.fetch_wait_max_ms + 1000 > rk->rk_conf.socket_timeout_ms) diff --git a/src/rdkafka_conf.h b/src/rdkafka_conf.h index 01b6258d2e..bd17a261bf 100644 --- a/src/rdkafka_conf.h +++ b/src/rdkafka_conf.h @@ -423,6 +423,7 @@ struct rd_kafka_conf_s { int queue_backpressure_thres; int max_retries; int retry_backoff_ms; + int retry_backoff_max_ms; int batch_num_messages; int batch_size; rd_kafka_compression_t compression_codec; diff --git a/src/rdkafka_int.h b/src/rdkafka_int.h index 8a29c1f623..e586dd6e69 100644 --- a/src/rdkafka_int.h +++ b/src/rdkafka_int.h @@ -863,6 +863,8 @@ const char *rd_kafka_purge_flags2str(int flags); #define RD_KAFKA_DBG_ALL 0xfffff #define RD_KAFKA_DBG_NONE 0x0 +/* Jitter Percent for exponential retry backoff */ +#define RD_KAFKA_RETRY_JITTER_PERCENT 20 void rd_kafka_log0(const rd_kafka_conf_t *conf, const rd_kafka_t *rk, diff --git a/src/rdkafka_metadata.c b/src/rdkafka_metadata.c index f96edf6583..98b90d37d9 100644 --- a/src/rdkafka_metadata.c +++ b/src/rdkafka_metadata.c @@ -1533,15 +1533,12 @@ static void rd_kafka_metadata_leader_query_tmr_cb(rd_kafka_timers_t *rkts, rk, NULL, &topics, rd_true /*force*/, rk->rk_conf.allow_auto_create_topics, rd_false /*!cgrp_update*/, "partition leader query"); - /* Back off next query exponentially until we reach - * the standard query interval - then stop the timer - * since the intervalled querier will do the job for us. */ - if (rk->rk_conf.metadata_refresh_interval_ms > 0 && - rtmr->rtmr_interval * 2 / 1000 >= - rk->rk_conf.metadata_refresh_interval_ms) - rd_kafka_timer_stop(rkts, rtmr, 1 /*lock*/); - else - rd_kafka_timer_exp_backoff(rkts, rtmr); + + /* Back off next query exponentially till we reach + * the retry backoff max ms */ + rd_kafka_timer_exp_backoff( + rkts, rtmr, rk->rk_conf.retry_backoff_ms * 1000, + rk->rk_conf.retry_backoff_max_ms * 1000, 20); } rd_list_destroy(&topics); @@ -1571,7 +1568,7 @@ void rd_kafka_metadata_fast_leader_query(rd_kafka_t *rk) { "Starting fast leader query"); rd_kafka_timer_start( &rk->rk_timers, &rk->rk_metadata_cache.rkmc_query_tmr, - rk->rk_conf.metadata_refresh_fast_interval_ms * 1000, + 0 /* First request should be tried immediately */, rd_kafka_metadata_leader_query_tmr_cb, NULL); } } diff --git a/src/rdkafka_mock.c b/src/rdkafka_mock.c index 6ec89b8468..fdfe595708 100644 --- a/src/rdkafka_mock.c +++ b/src/rdkafka_mock.c @@ -38,7 +38,7 @@ #include "rdkafka_interceptor.h" #include "rdkafka_mock_int.h" #include "rdkafka_transport_int.h" - +#include "rdkafka_mock.h" #include static void rd_kafka_mock_cluster_destroy0(rd_kafka_mock_cluster_t *mcluster); @@ -1127,6 +1127,15 @@ rd_kafka_mock_connection_parse_request(rd_kafka_mock_connection_t *mconn, return -1; } + mtx_lock(&mcluster->lock); + if (mcluster->track_requests) { + rd_list_add(&mcluster->request_list, + rd_kafka_mock_request_new( + mconn->broker->id, rkbuf->rkbuf_reqhdr.ApiKey, + rd_clock())); + } + mtx_unlock(&mcluster->lock); + rd_kafka_dbg(rk, MOCK, "MOCK", "Broker %" PRId32 ": Received %sRequestV%hd from %s", mconn->broker->id, @@ -2525,6 +2534,7 @@ rd_kafka_mock_cluster_t *rd_kafka_mock_cluster_new(rd_kafka_t *rk, TAILQ_INIT(&mcluster->topics); mcluster->defaults.partition_cnt = 4; mcluster->defaults.replication_factor = RD_MIN(3, broker_cnt); + mcluster->track_requests = rd_false; TAILQ_INIT(&mcluster->cgrps); @@ -2602,3 +2612,91 @@ const char * rd_kafka_mock_cluster_bootstraps(const rd_kafka_mock_cluster_t *mcluster) { return mcluster->bootstraps; } +/** + * @struct Represents a request to the mock cluster along with a timestamp. + */ +struct rd_kafka_mock_request_s { + int32_t id; /**< Broker id */ + int16_t api_key; /**< API Key of request */ + rd_ts_t timestamp /**< Timestamp at which request was received */; +}; + +rd_kafka_mock_request_t * +rd_kafka_mock_request_new(int32_t id, int16_t api_key, rd_ts_t timestamp) { + rd_kafka_mock_request_t *request; + request = rd_malloc(sizeof(*request)); + request->id = id; + request->api_key = api_key; + request->timestamp = timestamp; + return request; +} + +rd_kafka_mock_request_t * +rd_kafka_mock_request_copy(rd_kafka_mock_request_t *mrequest) { + rd_kafka_mock_request_t *request; + request = rd_malloc(sizeof(*request)); + request->id = mrequest->id; + request->api_key = mrequest->api_key; + request->timestamp = mrequest->timestamp; + return request; +} + +void rd_kafka_mock_request_destroy(rd_kafka_mock_request_t *element) { + rd_free(element); +} + +void rd_kafka_mock_request_free(void *element) { + rd_kafka_mock_request_destroy(element); +} + +void rd_kafka_mock_start_request_tracking(rd_kafka_mock_cluster_t *mcluster) { + mtx_lock(&mcluster->lock); + mcluster->track_requests = rd_true; + rd_list_init(&mcluster->request_list, 32, rd_kafka_mock_request_free); + mtx_unlock(&mcluster->lock); +} + +void rd_kafka_mock_stop_request_tracking(rd_kafka_mock_cluster_t *mcluster) { + mtx_lock(&mcluster->lock); + mcluster->track_requests = rd_false; + rd_list_clear(&mcluster->request_list); + mtx_unlock(&mcluster->lock); +} + +rd_kafka_mock_request_t ** +rd_kafka_mock_get_requests(rd_kafka_mock_cluster_t *mcluster, size_t *cntp) { + size_t i; + rd_kafka_mock_request_t **ret = NULL; + + mtx_lock(&mcluster->lock); + *cntp = rd_list_cnt(&mcluster->request_list); + if (*cntp > 0) { + ret = rd_calloc(*cntp, sizeof(rd_kafka_mock_request_t *)); + for (i = 0; i < *cntp; i++) { + rd_kafka_mock_request_t *mreq = + rd_list_elem(&mcluster->request_list, i); + ret[i] = rd_kafka_mock_request_copy(mreq); + } + } + + mtx_unlock(&mcluster->lock); + return ret; +} + +void rd_kafka_mock_clear_requests(rd_kafka_mock_cluster_t *mcluster) { + mtx_lock(&mcluster->lock); + rd_list_clear(&mcluster->request_list); + mtx_unlock(&mcluster->lock); +} + +int32_t rd_kafka_mock_request_id(rd_kafka_mock_request_t *mreq) { + return mreq->id; +} + +int16_t rd_kafka_mock_request_api_key(rd_kafka_mock_request_t *mreq) { + return mreq->api_key; +} + +rd_ts_t rd_kafka_mock_request_timestamp(rd_kafka_mock_request_t *mreq) { + return mreq->timestamp; +} diff --git a/src/rdkafka_mock.h b/src/rdkafka_mock.h index a9fd86f12f..6c256e1252 100644 --- a/src/rdkafka_mock.h +++ b/src/rdkafka_mock.h @@ -364,6 +364,57 @@ rd_kafka_mock_set_apiversion(rd_kafka_mock_cluster_t *mcluster, int16_t MinVersion, int16_t MaxVersion); +/** + * @name Represents a request to the mock cluster along with a timestamp. + */ +typedef struct rd_kafka_mock_request_s rd_kafka_mock_request_t; + +RD_EXPORT +rd_kafka_mock_request_t * +rd_kafka_mock_request_new(int32_t id, int16_t api_key, rd_ts_t timestamp); + +RD_EXPORT +void rd_kafka_mock_start_request_tracking(rd_kafka_mock_cluster_t *mcluster); + +RD_EXPORT +void rd_kafka_mock_stop_request_tracking(rd_kafka_mock_cluster_t *mcluster); + +/** + * @brief Destroy a rd_kafka_mock_request_t * and deallocate memory. + */ +RD_EXPORT void rd_kafka_mock_request_destroy(rd_kafka_mock_request_t *mreq); + +/** + * @brief Get the broker id to which \p mreq was sent. + */ +RD_EXPORT int32_t rd_kafka_mock_request_id(rd_kafka_mock_request_t *mreq); + +/** + * @brief Get the ApiKey with which \p mreq was sent. + */ +RD_EXPORT int16_t rd_kafka_mock_request_api_key(rd_kafka_mock_request_t *mreq); + +/** + * @brief Get the timestamp at which \p mreq was sent. + */ +RD_EXPORT rd_ts_t +rd_kafka_mock_request_timestamp(rd_kafka_mock_request_t *mreq); + +/** + * @brief Get the list of requests sent to this mock cluster. + * + * @param cntp is set to the count of requests. + * @return List of rd_kafka_mock_request_t *. + * @remark each element of the returned array must be freed with + * rd_kafka_mock_request_destroy, and the list itself must be freed too. + */ +RD_EXPORT rd_kafka_mock_request_t ** +rd_kafka_mock_get_requests(rd_kafka_mock_cluster_t *mcluster, size_t *cntp); + +/** + * @brief Clear the list of requests sent to this mock broker. + */ +RD_EXPORT void rd_kafka_mock_clear_requests(rd_kafka_mock_cluster_t *mcluster); /**@}*/ diff --git a/src/rdkafka_mock_int.h b/src/rdkafka_mock_int.h index 390e8631ff..b59c9fdeb2 100644 --- a/src/rdkafka_mock_int.h +++ b/src/rdkafka_mock_int.h @@ -393,6 +393,12 @@ struct rd_kafka_mock_cluster_s { /**< Request handlers */ struct rd_kafka_mock_api_handler api_handlers[RD_KAFKAP__NUM]; + rd_bool_t track_requests; + /**< List of API requests for this broker. Type: + * rd_kafka_mock_request_t* + */ + rd_list_t request_list; + /**< Mutex for: * .errstacks * .apiversions diff --git a/src/rdkafka_msg.c b/src/rdkafka_msg.c index 6b129ace1d..5e71209dbf 100644 --- a/src/rdkafka_msg.c +++ b/src/rdkafka_msg.c @@ -2033,9 +2033,11 @@ static int unittest_msgq_order(const char *what, } /* Retry the messages, which moves them back to sendq - * maintaining the original order */ + * maintaining the original order with exponential backoff + * set to false */ rd_kafka_retry_msgq(&rkmq, &sendq, 1, 1, 0, - RD_KAFKA_MSG_STATUS_NOT_PERSISTED, cmp); + RD_KAFKA_MSG_STATUS_NOT_PERSISTED, cmp, rd_false, 0, + 0); RD_UT_ASSERT(rd_kafka_msgq_len(&sendq) == 0, "sendq FIFO should be empty, not contain %d messages", @@ -2073,9 +2075,11 @@ static int unittest_msgq_order(const char *what, } /* Retry the messages, which should now keep the 3 first messages - * on sendq (no more retries) and just number 4 moved back. */ + * on sendq (no more retries) and just number 4 moved back. + * No exponential backoff applied. */ rd_kafka_retry_msgq(&rkmq, &sendq, 1, 1, 0, - RD_KAFKA_MSG_STATUS_NOT_PERSISTED, cmp); + RD_KAFKA_MSG_STATUS_NOT_PERSISTED, cmp, rd_false, 0, + 0); if (fifo) { if (ut_verify_msgq_order("readded #2", &rkmq, 4, 6, rd_true)) @@ -2094,9 +2098,10 @@ static int unittest_msgq_order(const char *what, return 1; } - /* Move all messages back on rkmq */ + /* Move all messages back on rkmq without any exponential backoff. */ rd_kafka_retry_msgq(&rkmq, &sendq, 0, 1000, 0, - RD_KAFKA_MSG_STATUS_NOT_PERSISTED, cmp); + RD_KAFKA_MSG_STATUS_NOT_PERSISTED, cmp, rd_false, 0, + 0); /* Move first half of messages to sendq (1,2,3). @@ -2116,11 +2121,14 @@ static int unittest_msgq_order(const char *what, rkm = ut_rd_kafka_msg_new(msgsize); rkm->rkm_u.producer.msgid = i; rd_kafka_msgq_enq_sorted0(&rkmq, rkm, cmp); - + /* No exponential backoff applied. */ rd_kafka_retry_msgq(&rkmq, &sendq, 0, 1000, 0, - RD_KAFKA_MSG_STATUS_NOT_PERSISTED, cmp); + RD_KAFKA_MSG_STATUS_NOT_PERSISTED, cmp, rd_false, 0, + 0); + /* No exponential backoff applied. */ rd_kafka_retry_msgq(&rkmq, &sendq2, 0, 1000, 0, - RD_KAFKA_MSG_STATUS_NOT_PERSISTED, cmp); + RD_KAFKA_MSG_STATUS_NOT_PERSISTED, cmp, rd_false, 0, + 0); RD_UT_ASSERT(rd_kafka_msgq_len(&sendq) == 0, "sendq FIFO should be empty, not contain %d messages", diff --git a/src/rdkafka_partition.c b/src/rdkafka_partition.c index 1a9066d3d9..61ab178c37 100644 --- a/src/rdkafka_partition.c +++ b/src/rdkafka_partition.c @@ -873,6 +873,10 @@ void rd_kafka_msgq_insert_msgq(rd_kafka_msgq_t *destq, * @param incr_retry Increment retry count for messages. * @param max_retries Maximum retries allowed per message. * @param backoff Absolute retry backoff for retried messages. + * @param exponential_backoff If true the backoff should be exponential with + * 2**(retry_count - 1)*retry_ms with jitter. The \p backoff is ignored. + * @param retry_ms The retry ms used for exponential backoff calculation + * @param retry_max_ms The max backoff limit for exponential backoff calculation * * @returns 0 if all messages were retried, or 1 if some messages * could not be retried. @@ -883,10 +887,14 @@ int rd_kafka_retry_msgq(rd_kafka_msgq_t *destq, int max_retries, rd_ts_t backoff, rd_kafka_msg_status_t status, - int (*cmp)(const void *a, const void *b)) { + int (*cmp)(const void *a, const void *b), + rd_bool_t exponential_backoff, + int retry_ms, + int retry_max_ms) { rd_kafka_msgq_t retryable = RD_KAFKA_MSGQ_INITIALIZER(retryable); rd_kafka_msg_t *rkm, *tmp; - + int64_t jitter = rd_jitter(100 - RD_KAFKA_RETRY_JITTER_PERCENT, + 100 + RD_KAFKA_RETRY_JITTER_PERCENT); /* Scan through messages to see which ones are eligible for retry, * move the retryable ones to temporary queue and * set backoff time for first message and optionally @@ -900,8 +908,22 @@ int rd_kafka_retry_msgq(rd_kafka_msgq_t *destq, rd_kafka_msgq_deq(srcq, rkm, 1); rd_kafka_msgq_enq(&retryable, rkm); - rkm->rkm_u.producer.ts_backoff = backoff; rkm->rkm_u.producer.retries += incr_retry; + if (exponential_backoff) { + /* In some cases, like failed Produce requests do not increment the retry count, see rd_kafka_handle_Produce_error. */ + if (rkm->rkm_u.producer.retries > 0) + backoff = + (1 << (rkm->rkm_u.producer.retries - 1)) * + retry_ms; + else + backoff = retry_ms; + /* Multiplied by 10 as backoff should be in nano seconds. */ + backoff = jitter * backoff * 10; + if (backoff > retry_max_ms * 1000) + backoff = retry_max_ms * 1000; + backoff = rd_clock() + backoff; + } + rkm->rkm_u.producer.ts_backoff = backoff; /* Don't downgrade a message from any form of PERSISTED * to NOT_PERSISTED, since the original cause of indicating @@ -940,17 +962,22 @@ int rd_kafka_toppar_retry_msgq(rd_kafka_toppar_t *rktp, rd_kafka_msgq_t *rkmq, int incr_retry, rd_kafka_msg_status_t status) { - rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk; - rd_ts_t backoff = rd_clock() + (rk->rk_conf.retry_backoff_ms * 1000); + rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk; + + rd_ts_t backoff = rd_clock() + (rk->rk_conf.retry_backoff_ms * 1000); + int retry_ms = rk->rk_conf.retry_backoff_ms; + int retry_max_ms = rk->rk_conf.retry_backoff_max_ms; int r; if (rd_kafka_terminating(rk)) return 1; rd_kafka_toppar_lock(rktp); + /* Exponential backoff applied. */ r = rd_kafka_retry_msgq(&rktp->rktp_msgq, rkmq, incr_retry, - rk->rk_conf.max_retries, backoff, status, - rktp->rktp_rkt->rkt_conf.msg_order_cmp); + rk->rk_conf.max_retries, 0 /* backoff */, status, + rktp->rktp_rkt->rkt_conf.msg_order_cmp, rd_true, + retry_ms, retry_max_ms); rd_kafka_toppar_unlock(rktp); return r; diff --git a/src/rdkafka_partition.h b/src/rdkafka_partition.h index f9dd686423..3c1b2f10e9 100644 --- a/src/rdkafka_partition.h +++ b/src/rdkafka_partition.h @@ -560,7 +560,10 @@ int rd_kafka_retry_msgq(rd_kafka_msgq_t *destq, int max_retries, rd_ts_t backoff, rd_kafka_msg_status_t status, - int (*cmp)(const void *a, const void *b)); + int (*cmp)(const void *a, const void *b), + rd_bool_t exponential_backoff, + int retry_ms, + int retry_max_ms); void rd_kafka_msgq_insert_msgq(rd_kafka_msgq_t *destq, rd_kafka_msgq_t *srcq, int (*cmp)(const void *a, const void *b)); diff --git a/src/rdkafka_timer.c b/src/rdkafka_timer.c index 776b5d995f..65ff90a132 100644 --- a/src/rdkafka_timer.c +++ b/src/rdkafka_timer.c @@ -29,6 +29,7 @@ #include "rdkafka_int.h" #include "rd.h" #include "rdtime.h" +#include "rdrand.h" #include "rdsysqueue.h" #include "rdkafka_queue.h" @@ -195,18 +196,32 @@ void rd_kafka_timer_start0(rd_kafka_timers_t *rkts, rd_kafka_timers_unlock(rkts); } - /** * Delay the next timer invocation by '2 * rtmr->rtmr_interval' + * @param minimum_backoff the minimum backoff to be applied + * @param maximum_backoff the maximum backoff to be applied + * @param max_jitter the jitter percentage to be applied to the backoff */ void rd_kafka_timer_exp_backoff(rd_kafka_timers_t *rkts, - rd_kafka_timer_t *rtmr) { + rd_kafka_timer_t *rtmr, + rd_ts_t minimum_backoff, + rd_ts_t maximum_backoff, + int max_jitter) { + int64_t jitter; rd_kafka_timers_lock(rkts); if (rd_kafka_timer_scheduled(rtmr)) { - rtmr->rtmr_interval *= 2; rd_kafka_timer_unschedule(rkts, rtmr); } - rd_kafka_timer_schedule(rkts, rtmr, 0); + rtmr->rtmr_interval *= 2; + jitter = ( rd_jitter(-max_jitter, max_jitter) * rtmr->rtmr_interval ) / 100; + if (rtmr->rtmr_interval + jitter < minimum_backoff) { + rtmr->rtmr_interval = minimum_backoff; + jitter = 0; + } else if ((maximum_backoff != -1) && (rtmr->rtmr_interval + jitter) > maximum_backoff) { + rtmr->rtmr_interval = maximum_backoff; + jitter = 0; + } + rd_kafka_timer_schedule(rkts, rtmr, jitter); rd_kafka_timers_unlock(rkts); } diff --git a/src/rdkafka_timer.h b/src/rdkafka_timer.h index d3e8fba61e..9a273adcfa 100644 --- a/src/rdkafka_timer.h +++ b/src/rdkafka_timer.h @@ -85,7 +85,10 @@ void rd_kafka_timer_start0(rd_kafka_timers_t *rkts, callback, arg) void rd_kafka_timer_exp_backoff(rd_kafka_timers_t *rkts, - rd_kafka_timer_t *rtmr); + rd_kafka_timer_t *rtmr, + rd_ts_t minimum, + rd_ts_t maximum, + int maxjitter); rd_ts_t rd_kafka_timer_next(rd_kafka_timers_t *rkts, rd_kafka_timer_t *rtmr, int do_lock); diff --git a/src/rdkafka_topic.c b/src/rdkafka_topic.c index 4341637bc0..faa57a1079 100644 --- a/src/rdkafka_topic.c +++ b/src/rdkafka_topic.c @@ -677,6 +677,7 @@ static int rd_kafka_toppar_leader_update(rd_kafka_topic_t *rkt, rktp->rktp_leader_epoch); if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_ACTIVE) { rd_kafka_toppar_unlock(rktp); + rd_kafka_toppar_destroy(rktp); /* from get() */ return 0; } } @@ -1260,6 +1261,9 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt, rd_kafka_broker_t **partbrokers; int leader_cnt = 0; int old_state; + rd_bool_t topic_exists_with_no_leader_epoch = rd_false; + rd_bool_t topic_exists_with_updated_leader_epoch = rd_false; + rd_bool_t topic_exists_with_leader_change = rd_false; if (mdt->err != RD_KAFKA_RESP_ERR_NO_ERROR) rd_kafka_dbg(rk, TOPIC | RD_KAFKA_DBG_METADATA, "METADATA", @@ -1325,6 +1329,8 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt, int r; rd_kafka_broker_t *leader; int32_t leader_epoch = mdit->partitions[j].leader_epoch; + rd_kafka_toppar_t *rktp = + rd_kafka_toppar_get(rkt, mdt->partitions[j].id, 0); rd_kafka_dbg(rk, TOPIC | RD_KAFKA_DBG_METADATA, "METADATA", " Topic %s partition %i Leader %" PRId32 @@ -1335,6 +1341,14 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt, leader = partbrokers[j]; partbrokers[j] = NULL; + if (leader_epoch == -1) + topic_exists_with_no_leader_epoch = rd_true; + else if (rktp->rktp_leader_epoch < leader_epoch) + topic_exists_with_updated_leader_epoch = rd_true; + + if (rktp->rktp_leader_id != mdt->partitions[j].leader) + topic_exists_with_leader_change = rd_true; + /* Update leader for partition */ r = rd_kafka_toppar_leader_update(rkt, mdt->partitions[j].id, mdt->partitions[j].leader, @@ -1348,10 +1362,15 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt, /* Drop reference to broker (from find()) */ rd_kafka_broker_destroy(leader); } + RD_IF_FREE(rktp, rd_kafka_toppar_destroy); } - /* If all partitions have leaders we can turn off fast leader query. */ - if (mdt->partition_cnt > 0 && leader_cnt == mdt->partition_cnt) + /* If all partitions have leaders, and this metadata update was not + * stale, we can turn off fast leader query. */ + if (mdt->partition_cnt > 0 && leader_cnt == mdt->partition_cnt && + (topic_exists_with_no_leader_epoch || + topic_exists_with_updated_leader_epoch || + topic_exists_with_leader_change)) rkt->rkt_flags &= ~RD_KAFKA_TOPIC_F_LEADER_UNAVAIL; if (mdt->err != RD_KAFKA_RESP_ERR_NO_ERROR && rkt->rkt_partition_cnt) { diff --git a/tests/0075-retry.c b/tests/0075-retry.c index 86eeb56d15..c3ce353abf 100644 --- a/tests/0075-retry.c +++ b/tests/0075-retry.c @@ -177,6 +177,7 @@ static void do_test_low_socket_timeout(const char *topic) { test_conf_set(conf, "socket.timeout.ms", "1000"); test_conf_set(conf, "socket.max.fails", "12345"); test_conf_set(conf, "retry.backoff.ms", "5000"); + test_conf_set(conf, "retry.backoff.max.ms", "5000"); /* Avoid api version requests (with their own timeout) to get in * the way of our test */ test_conf_set(conf, "api.version.request", "false"); diff --git a/tests/0113-cooperative_rebalance.cpp b/tests/0113-cooperative_rebalance.cpp index 2ac03aafe8..c54619d714 100644 --- a/tests/0113-cooperative_rebalance.cpp +++ b/tests/0113-cooperative_rebalance.cpp @@ -2914,6 +2914,57 @@ static void r_lost_partitions_commit_illegal_generation_test_local() { test_mock_cluster_destroy(mcluster); } +/** + * @brief Test that the consumer is destroyed without segfault if + * it happens before first rebalance and there is no assignor + * state. See #4312 + */ +static void s_no_segfault_before_first_rebalance(void) { + rd_kafka_t *c; + rd_kafka_conf_t *conf; + rd_kafka_mock_cluster_t *mcluster; + const char *topic; + const char *bootstraps; + + SUB_TEST_QUICK(); + + TEST_SAY("Creating mock cluster\n"); + mcluster = test_mock_cluster_new(1, &bootstraps); + + topic = test_mk_topic_name("0113_s", 1); + + test_conf_init(&conf, NULL, 60); + test_conf_set(conf, "bootstrap.servers", bootstraps); + test_conf_set(conf, "partition.assignment.strategy", "cooperative-sticky"); + + TEST_SAY("Creating topic %s\n", topic); + TEST_CALL_ERR__(rd_kafka_mock_topic_create( + mcluster, topic, 2 /* partition_cnt */, 1 /* replication_factor */)); + + c = test_create_consumer(topic, NULL, conf, NULL); + + /* Add a 1s delay to the SyncGroup response so next condition can happen. */ + rd_kafka_mock_broker_push_request_error_rtts( + mcluster, 1 /*Broker 1*/, RD_KAFKAP_SyncGroup /*FetchRequest*/, 1, + RD_KAFKA_RESP_ERR_NOT_COORDINATOR, 1000); + + test_consumer_subscribe(c, topic); + + /* Wait for initial rebalance 3000 ms (default) + 500 ms for processing + * the JoinGroup response. Consumer close must come between the JoinGroup + * response and the SyncGroup response, so that rkcg_assignor is set, + * but rkcg_assignor_state isn't. */ + TEST_ASSERT(!test_consumer_poll_once(c, NULL, 3500), "poll should timeout"); + + rd_kafka_consumer_close(c); + + rd_kafka_destroy(c); + + TEST_SAY("Destroying mock cluster\n"); + test_mock_cluster_destroy(mcluster); + + SUB_TEST_PASS(); +} /** * @brief Rebalance callback for the v_.. test below. @@ -3117,6 +3168,7 @@ int main_0113_cooperative_rebalance_local(int argc, char **argv) { q_lost_partitions_illegal_generation_test(rd_false /*joingroup*/); q_lost_partitions_illegal_generation_test(rd_true /*syncgroup*/); r_lost_partitions_commit_illegal_generation_test_local(); + s_no_segfault_before_first_rebalance(); return 0; } diff --git a/tests/0143-exponential_backoff.c b/tests/0143-exponential_backoff.c new file mode 100644 index 0000000000..6caeb94ab8 --- /dev/null +++ b/tests/0143-exponential_backoff.c @@ -0,0 +1,549 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2023, Confluent Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "test.h" +#include "../src/rdkafka_proto.h" +#include "../src/rdkafka_mock.h" + +const int32_t retry_ms = 100; +const int32_t retry_max_ms = 1000; + +static void free_mock_requests(rd_kafka_mock_request_t **requests, size_t request_cnt){ + size_t i; + for(i=0;i low - buffer) && + (time_difference < low + buffer)), + "Time difference should be close " + "to 1 second, it is %" PRId64 + " ms instead.\n", + time_difference); + retry_count++; + } + previous_request_ts = + rd_kafka_mock_request_timestamp(requests[i]); + } + } + rd_kafka_destroy(consumer); + free_mock_requests(requests,request_cnt); + rd_kafka_mock_clear_requests(mcluster); + SUB_TEST_PASS(); +} + +/** + * Exponential Backoff needs to be checked for the request_type. Also the + * request_type should only be retried if one previous has failed for correct + * execution. + */ +static void helper_exponential_backoff(rd_kafka_mock_cluster_t *mcluster, + int32_t request_type) { + rd_kafka_mock_request_t **requests = NULL; + size_t request_cnt = 0; + int64_t previous_request_ts = -1; + int32_t retry_count = 0; + size_t i; + requests = rd_kafka_mock_get_requests(mcluster, &request_cnt); + for (i = 0; i < request_cnt; i++) { + if (rd_kafka_mock_request_api_key(requests[i]) == + request_type) { + TEST_SAY( + "Broker Id : %d API Key : %d Timestamp : %" PRId64 + "\n", + rd_kafka_mock_request_id(requests[i]), + rd_kafka_mock_request_api_key(requests[i]), + rd_kafka_mock_request_timestamp(requests[i])); + if (previous_request_ts != -1) { + int64_t time_difference = + (rd_kafka_mock_request_timestamp( + requests[i]) - + previous_request_ts) / + 1000; + /* Max Jitter is 20 percent each side so buffer chosen is 25 percent to account for latency delays */ + int64_t low = + ((1 << retry_count) * (retry_ms)*75) / 100; + int64_t high = + ((1 << retry_count) * (retry_ms)*125) / 100; + if (high > ((retry_max_ms * 125) / 100)) + high = (retry_max_ms * 125) / 100; + if (low > ((retry_max_ms * 75) / 100)) + low = (retry_max_ms * 75) / 100; + if ((time_difference > high) || + (time_difference < low)) { + TEST_FAIL("Time Difference is not respected."); + } + retry_count++; + } + previous_request_ts = + rd_kafka_mock_request_timestamp(requests[i]); + } + } + free_mock_requests(requests,request_cnt); +} +/** + * @brief offset_commit test + * We fail the request with RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS so + * that the request is retried with the exponential backoff. The max retries + * allowed is 2 for offset_commit. The RPC calls rd_kafka_buf_retry for its + * retry attempt so this tests all such RPCs which depend on it for retrying. + * The retry number of request is deterministic i.e no fresh requests are + * spawned on its own. Also the max retries is 2 for Offset Commit. + */ +static void test_offset_commit(rd_kafka_mock_cluster_t *mcluster, + const char *topic, + rd_kafka_conf_t *conf) { + rd_kafka_t *consumer; + rd_kafka_message_t *rkm; + rd_kafka_topic_partition_list_t *offsets; + rd_kafka_topic_partition_t *rktpar; + SUB_TEST(); + test_conf_set(conf, "auto.offset.reset", "earliest"); + test_conf_set(conf, "enable.auto.commit", "false"); + + consumer = test_create_consumer(topic, NULL, conf, NULL); + test_consumer_subscribe(consumer, topic); + rkm = rd_kafka_consumer_poll(consumer, 10 * 1000); + if (rkm) + rd_kafka_message_destroy(rkm); + rd_sleep(4); + rd_kafka_mock_push_request_errors( + mcluster, RD_KAFKAP_OffsetCommit, 2, + RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS, + RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS); + + offsets = rd_kafka_topic_partition_list_new(1); + rktpar = rd_kafka_topic_partition_list_add(offsets, topic, 0); + /* Setting Offset to an arbitrary number */ + rktpar->offset = 4; + /* rd_kafka_commit will trigger OffsetCommit RPC call */ + rd_kafka_commit(consumer, offsets, 0); + rd_kafka_topic_partition_list_destroy(offsets); + rd_sleep(3); + + helper_exponential_backoff(mcluster, RD_KAFKAP_OffsetCommit); + + + rd_kafka_destroy(consumer); + rd_kafka_mock_clear_requests(mcluster); + SUB_TEST_PASS(); +} + +/** + * @brief produce test + * We fail the request with RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS so + * that the request is retried with the exponential backoff. The exponential + * backoff is capped at retry_max_ms with jitter. The retry number of request is + * deterministic i.e no fresh requests are spawned on its own. + */ +static void test_produce(rd_kafka_mock_cluster_t *mcluster, + const char *topic, + rd_kafka_conf_t *conf) { + rd_kafka_t *producer; + rd_kafka_topic_t *rkt; + SUB_TEST(); + rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb); + + producer = test_create_handle(RD_KAFKA_PRODUCER, conf); + rkt = test_create_producer_topic(producer, topic, NULL); + + rd_kafka_mock_push_request_errors( + mcluster, RD_KAFKAP_Produce, 7, + RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS, + RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS, + RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS, + RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS, + RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS, + RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS, + RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS); + + test_produce_msgs(producer, rkt, 0, RD_KAFKA_PARTITION_UA, 0, 1, + "hello", 5); + rd_sleep(3); + + helper_exponential_backoff(mcluster, RD_KAFKAP_Produce); + + + rd_kafka_topic_destroy(rkt); + rd_kafka_destroy(producer); + rd_kafka_mock_clear_requests(mcluster); + SUB_TEST_PASS(); +} + +/** + * Helper function for find coordinator trigger with the given request_type, the + * find coordinator request should be triggered after a failing request of request_type. + */ +static void helper_find_coordinator_trigger(rd_kafka_mock_cluster_t *mcluster, + int32_t request_type) { + rd_kafka_mock_request_t **requests = NULL; + size_t request_cnt = 0; + int32_t num_request = 0; + size_t i; + requests = rd_kafka_mock_get_requests(mcluster, &request_cnt); + for (i = 0; i < request_cnt; i++) { + TEST_SAY("Broker Id : %d API Key : %d Timestamp : %" PRId64 + "\n", + rd_kafka_mock_request_id(requests[i]), + rd_kafka_mock_request_api_key(requests[i]), + rd_kafka_mock_request_timestamp(requests[i])); + if (num_request == 0) { + if (rd_kafka_mock_request_api_key(requests[i]) == + request_type) { + num_request++; + } + } else if (num_request == 1) { + if (rd_kafka_mock_request_api_key(requests[i]) == + RD_KAFKAP_FindCoordinator) { + TEST_SAY( + "FindCoordinator request made after " + "failing request with NOT_COORDINATOR " + "error.\n"); + break; + } else if (rd_kafka_mock_request_api_key(requests[i]) == + request_type) { + num_request++; + TEST_FAIL("Second request made without any FindCoordinator request."); + } + } + } + free_mock_requests(requests,request_cnt); + if (num_request != 1) + TEST_FAIL("No request was made."); +} +/** + * @brief heartbeat-find_coordinator test + * We fail the request with RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP so that + * the FindCoordinator request is trigerred. + */ +static void test_heartbeat_find_coordinator(rd_kafka_mock_cluster_t *mcluster, + const char *topic, + rd_kafka_conf_t *conf) { + int32_t err = 0; + rd_kafka_t *consumer; + rd_kafka_message_t *rkm; + SUB_TEST(); + test_conf_set(conf, "auto.offset.reset", "earliest"); + test_conf_set(conf, "enable.auto.commit", "false"); + + consumer = test_create_consumer(topic, NULL, conf, NULL); + + rd_kafka_mock_push_request_errors( + mcluster, RD_KAFKAP_Heartbeat, 1, + RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP); + + rd_kafka_mock_clear_requests(mcluster); + test_consumer_subscribe(consumer, topic); + /* This will trigger a find_coordinator request */ + rkm = rd_kafka_consumer_poll(consumer, 10 * 1000); + if (rkm) + rd_kafka_message_destroy(rkm); + rd_sleep(6); + + + helper_find_coordinator_trigger(mcluster, RD_KAFKAP_Heartbeat); + + + rd_kafka_destroy(consumer); + rd_kafka_mock_clear_requests(mcluster); + SUB_TEST_PASS(); +} + +/** + * @brief joingroup-find_coordinator test + * We fail the request with RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP so that + * the FindCoordinator request is trigerred. + */ +static void test_joingroup_find_coordinator(rd_kafka_mock_cluster_t *mcluster, + const char *topic, + rd_kafka_conf_t *conf) { + rd_kafka_t *consumer; + rd_kafka_message_t *rkm; + SUB_TEST(); + test_conf_set(conf, "auto.offset.reset", "earliest"); + test_conf_set(conf, "enable.auto.commit", "false"); + + consumer = test_create_consumer(topic, NULL, conf, NULL); + rd_kafka_mock_push_request_errors( + mcluster, RD_KAFKAP_JoinGroup, 1, + RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP); + rd_kafka_mock_clear_requests(mcluster); + test_consumer_subscribe(consumer, topic); + /* This will trigger a find_coordinator request */ + rkm = rd_kafka_consumer_poll(consumer, 10 * 1000); + if (rkm) + rd_kafka_message_destroy(rkm); + rd_sleep(4); + + helper_find_coordinator_trigger(mcluster, RD_KAFKAP_JoinGroup); + + rd_kafka_destroy(consumer); + rd_kafka_mock_clear_requests(mcluster); + SUB_TEST_PASS(); +} + +/** + * @brief produce-fast_leader_query test + * We fail a Produce request with RD_KAFKA_RESP_ERR_NOT_LEADER_OR_FOLLOWER, so + * that it triggers a fast leader query (a Metadata request). We don't update + * the leader in this test, so the Metadata is always stale from the client's + * perspective, and the fast leader query carries on, being backed off + * exponentially until the max retry time is reached. The retry number of + * request is non deterministic as it will keep retrying till the leader change. + */ +static void test_produce_fast_leader_query(rd_kafka_mock_cluster_t *mcluster, + const char *topic, + rd_kafka_conf_t *conf) { + rd_kafka_mock_request_t **requests = NULL; + size_t request_cnt = 0; + int64_t previous_request_ts = -1; + int32_t retry_count = 0; + rd_bool_t produced = rd_false; + rd_kafka_t *producer; + rd_kafka_topic_t *rkt; + size_t i; + SUB_TEST(); + rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb); + + producer = test_create_handle(RD_KAFKA_PRODUCER, conf); + rkt = test_create_producer_topic(producer, topic, NULL); + + rd_kafka_mock_push_request_errors( + mcluster, RD_KAFKAP_Produce, 1, + RD_KAFKA_RESP_ERR_NOT_LEADER_OR_FOLLOWER); + rd_kafka_mock_clear_requests(mcluster); + test_produce_msgs(producer, rkt, 0, RD_KAFKA_PARTITION_UA, 0, 1, + "hello", 1); + rd_sleep(10); + requests = rd_kafka_mock_get_requests(mcluster, &request_cnt); + + for (i = 0; i < request_cnt; i++) { + TEST_SAY("Broker Id : %d API Key : %d Timestamp : %" PRId64 + "\n", + rd_kafka_mock_request_id(requests[i]), + rd_kafka_mock_request_api_key(requests[i]), + rd_kafka_mock_request_timestamp(requests[i])); + if (!produced && rd_kafka_mock_request_api_key(requests[i]) == + RD_KAFKAP_Produce) + produced = rd_true; + if (rd_kafka_mock_request_api_key(requests[i]) == + RD_KAFKAP_Metadata && + produced) { + if (previous_request_ts != -1) { + int64_t time_difference = + (rd_kafka_mock_request_timestamp( + requests[i]) - + previous_request_ts) / + 1000; + /* Max Jitter is 20 percent each side so buffer chosen is 25 percent to account for latency delays */ + int64_t low = + ((1 << retry_count) * (retry_ms)*75) / 100; + int64_t high = + ((1 << retry_count) * (retry_ms)*125) / 100; + if (high > ((retry_max_ms * 125) / 100)) + high = (retry_max_ms * 125) / 100; + if (low > ((retry_max_ms * 75) / 100)) + low = (retry_max_ms * 75) / 100; + TEST_ASSERT( + (time_difference < high) && + (time_difference > low), + "Time difference is not respected!\n"); + retry_count++; + } + previous_request_ts = + rd_kafka_mock_request_timestamp(requests[i]); + } + } + rd_kafka_topic_destroy(rkt); + rd_kafka_destroy(producer); + free_mock_requests(requests,request_cnt); + rd_kafka_mock_clear_requests(mcluster); + SUB_TEST_PASS(); +} + +/** + * @brief fetch-fast_leader_query test + * We fail a Fetch request by causing a leader change (the leader is the same, + * but with a different leader epoch). It triggers fast leader query (Metadata + * request). The request is able to obtain an updated leader, and hence, the + * fast leader query terminates after one Metadata request. + */ +static void test_fetch_fast_leader_query(rd_kafka_mock_cluster_t *mcluster, + const char *topic, + rd_kafka_conf_t *conf) { + rd_kafka_mock_request_t **requests = NULL; + size_t request_cnt = 0; + rd_bool_t previous_request_was_Fetch = rd_false; + rd_bool_t Metadata_after_Fetch = rd_false; + rd_kafka_t *consumer; + rd_kafka_message_t *rkm; + size_t i; + SUB_TEST(); + test_conf_set(conf, "auto.offset.reset", "earliest"); + test_conf_set(conf, "enable.auto.commit", "false"); + + consumer = test_create_consumer(topic, NULL, conf, NULL); + + test_consumer_subscribe(consumer, topic); + rkm = rd_kafka_consumer_poll(consumer, 10 * 1000); + + if (rkm) + rd_kafka_message_destroy(rkm); + rd_kafka_mock_clear_requests(mcluster); + + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1); + rkm = rd_kafka_consumer_poll(consumer, 10 * 1000); + if (rkm) + rd_kafka_message_destroy(rkm); + rd_sleep(3); + requests = rd_kafka_mock_get_requests(mcluster, &request_cnt); + for (i = 0; i < request_cnt; i++) { + TEST_SAY("Broker Id : %d API Key : %d Timestamp : %" PRId64 + "\n", + rd_kafka_mock_request_id(requests[i]), + rd_kafka_mock_request_api_key(requests[i]), + rd_kafka_mock_request_timestamp(requests[i])); + if (rd_kafka_mock_request_api_key(requests[i]) == + RD_KAFKAP_Fetch) + previous_request_was_Fetch = rd_true; + else if (rd_kafka_mock_request_api_key(requests[i]) == + RD_KAFKAP_Metadata && + previous_request_was_Fetch) { + Metadata_after_Fetch = rd_true; + break; + } else + previous_request_was_Fetch = rd_false; + } + rd_kafka_destroy(consumer); + free_mock_requests(requests,request_cnt); + rd_kafka_mock_clear_requests(mcluster); + TEST_ASSERT( + Metadata_after_Fetch, + "Metadata Request should have been made after fetch atleast once."); + SUB_TEST_PASS(); +} + +/** + * @brief Exponential Backoff (KIP 580) + * We test all the pipelines which affect the retry mechanism for both + * intervalled queries where jitter is added and backed off queries where both + * jitter and exponential backoff is applied with the max being retry_max_ms. + */ +int main_0143_exponential_backoff(int argc, char **argv) { + const char *topic = test_mk_topic_name("topic", 1); + rd_kafka_mock_cluster_t *mcluster; + rd_kafka_conf_t *conf; + const char *bootstraps; + if (test_needs_auth()) { + TEST_SKIP("Mock cluster does not support SSL/SASL.\n"); + return 0; + } + mcluster = test_mock_cluster_new(1, &bootstraps); + rd_kafka_mock_start_request_tracking(mcluster); + rd_kafka_mock_topic_create(mcluster, topic, 1, 1); + + /* This test may be slower when running with CI or Helgrind, + * restart the timeout. */ + test_timeout_set(100); + + test_conf_init(&conf, NULL, 30); + test_conf_set(conf, "bootstrap.servers", bootstraps); + test_conf_set(conf, "topic.metadata.refresh.interval.ms", "-1"); + + test_produce(mcluster, topic, rd_kafka_conf_dup(conf)); + test_find_coordinator(mcluster, topic, rd_kafka_conf_dup(conf)); + test_offset_commit(mcluster, topic, rd_kafka_conf_dup(conf)); + test_heartbeat_find_coordinator(mcluster, topic, + rd_kafka_conf_dup(conf)); + test_joingroup_find_coordinator(mcluster, topic, + rd_kafka_conf_dup(conf)); + test_fetch_fast_leader_query(mcluster, topic, rd_kafka_conf_dup(conf)); + test_produce_fast_leader_query(mcluster, topic, + rd_kafka_conf_dup(conf)); + test_mock_cluster_destroy(mcluster); + rd_kafka_conf_destroy(conf); + return 0; +} diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 66be0fbb2d..be8c466623 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -133,6 +133,7 @@ set( 0139-offset_validation_mock.c 0140-commit_metadata.cpp 0142-reauthentication.c + 0143-exponential_backoff.c 8000-idle.cpp 8001-fetch_from_follower_mock_manual.c test.c diff --git a/tests/test.c b/tests/test.c index 000e3badab..8b3b0c5b94 100644 --- a/tests/test.c +++ b/tests/test.c @@ -254,6 +254,7 @@ _TEST_DECL(0138_admin_mock); _TEST_DECL(0139_offset_validation_mock); _TEST_DECL(0140_commit_metadata); _TEST_DECL(0142_reauthentication); +_TEST_DECL(0143_exponential_backoff); /* Manual tests */ _TEST_DECL(8000_idle); @@ -505,6 +506,7 @@ struct test tests[] = { _TEST(0139_offset_validation_mock, 0), _TEST(0140_commit_metadata, 0), _TEST(0142_reauthentication, 0, TEST_BRKVER(2, 2, 0, 0)), + _TEST(0143_exponential_backoff, 0), /* Manual tests */ diff --git a/win32/tests/tests.vcxproj b/win32/tests/tests.vcxproj index 8463ffdf44..40ab99fdbf 100644 --- a/win32/tests/tests.vcxproj +++ b/win32/tests/tests.vcxproj @@ -223,6 +223,7 @@ +