From 788cd0c868f1a9872394b87716fbec4504a3b95b Mon Sep 17 00:00:00 2001 From: Milind L Date: Fri, 29 Sep 2023 15:53:21 +0530 Subject: [PATCH 1/3] Fix loop of OffsetForLeaderEpoch requests on quick leader changes (#4433) --- CHANGELOG.md | 8 +++ src/rdkafka_topic.c | 8 +-- tests/0139-offset_validation_mock.c | 85 +++++++++++++++++++++++++++++ 3 files changed, 97 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ea741ddc76..540b7f7da6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ librdkafka v2.3.0 is a feature release: * Fix a segmentation fault when closing a consumer using the cooperative-sticky assignor before the first assignment (#4381). * Fix for insufficient buffer allocation when allocating rack information (@wolfchimneyrock, #4449). + * Fix for infinite loop of OffsetForLeaderEpoch requests on quick leader changes. (#4433). * Fix to add leader epoch to control messages, to make sure they're stored for committing even without a subsequent fetch message (#4434). * Fix for stored offsets not being committed if they lacked the leader epoch (#4442). @@ -42,6 +43,13 @@ librdkafka v2.3.0 is a feature release: than committed one, if either stored or committed leader epoch is -1 (#4442). +### Consumer Fixes + + * If an OffsetForLeaderEpoch request was being retried, and the leader changed + while the retry was in-flight, an infinite loop of requests was triggered, + because we weren't updating the leader epoch correctly. + Fixed by updating the leader epoch before sending the request (#4433). + # librdkafka v2.2.0 diff --git a/src/rdkafka_topic.c b/src/rdkafka_topic.c index b63a0bbea4..2b68ee0204 100644 --- a/src/rdkafka_topic.c +++ b/src/rdkafka_topic.c @@ -682,9 +682,7 @@ static int rd_kafka_toppar_leader_update(rd_kafka_topic_t *rkt, } } - if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT) - need_epoch_validation = rd_true; - else if (leader_epoch > rktp->rktp_leader_epoch) { + if (leader_epoch > rktp->rktp_leader_epoch) { rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BROKER", "%s [%" PRId32 "]: leader %" PRId32 " epoch %" PRId32 " -> leader %" PRId32 @@ -694,7 +692,9 @@ static int rd_kafka_toppar_leader_update(rd_kafka_topic_t *rkt, rktp->rktp_leader_epoch, leader_id, leader_epoch); rktp->rktp_leader_epoch = leader_epoch; need_epoch_validation = rd_true; - } + } else if (rktp->rktp_fetch_state == + RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT) + need_epoch_validation = rd_true; fetching_from_follower = leader != NULL && rktp->rktp_broker != NULL && diff --git a/tests/0139-offset_validation_mock.c b/tests/0139-offset_validation_mock.c index 48f5cc7e51..0fa2665b6b 100644 --- a/tests/0139-offset_validation_mock.c +++ b/tests/0139-offset_validation_mock.c @@ -212,6 +212,89 @@ static void do_test_ssl_error_retried(void) { } +/** + * @brief If there's an OffsetForLeaderEpoch request which fails, and the leader + * changes meanwhile, we end up in an infinite loop of OffsetForLeaderEpoch + * requests. + * Specifically: + * a. Leader Change - causes OffsetForLeaderEpoch + * request 'A'. + * b. Request 'A' fails with a retriable error, and we retry it. + * c. While waiting for Request 'A', the leader changes again, and we send a + * Request 'B', but the leader epoch is not updated correctly in this + * request, causing a loop. + * + * See #4425. + */ +static void do_test_two_leader_changes(void) { + const char *topic = test_mk_topic_name(__FUNCTION__, 1); + const char *c1_groupid = topic; + rd_kafka_t *c1; + const char *bootstraps; + rd_kafka_mock_cluster_t *mcluster; + int msg_cnt = 5; + uint64_t testid = test_id_generate(); + rd_kafka_conf_t *conf; + + SUB_TEST_QUICK(); + + mcluster = test_mock_cluster_new(2, &bootstraps); + rd_kafka_mock_topic_create(mcluster, topic, 1, 2); + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1); + + /* Seed the topic with messages */ + test_produce_msgs_easy_v(topic, testid, 0, 0, msg_cnt, 10, + "bootstrap.servers", bootstraps, + "batch.num.messages", "1", NULL); + + test_conf_init(&conf, NULL, 60); + test_conf_set(conf, "bootstrap.servers", bootstraps); + test_conf_set(conf, "auto.offset.reset", "earliest"); + + c1 = test_create_consumer(c1_groupid, NULL, conf, NULL); + test_consumer_subscribe(c1, topic); + + /* Consume initial messages and join the group, etc. */ + test_consumer_poll("MSG_INIT", c1, testid, 0, 0, msg_cnt, NULL); + + /* The leader will change from 1->2, and the OffsetForLeaderEpoch will + * be sent to broker 2. We need to first fail it with + * an error, and then give enough time to change the leader before + * returning a success. */ + rd_kafka_mock_broker_push_request_error_rtts( + mcluster, 2, RD_KAFKAP_OffsetForLeaderEpoch, 2, + RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR, 900, + RD_KAFKA_RESP_ERR_NO_ERROR, 1000); + + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 2); + rd_kafka_poll(c1, 1000); + /* Enough time to make a request, fail with a retriable error, and + * retry. */ + rd_sleep(1); + + /* Reset leader. */ + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1); + rd_kafka_poll(c1, 1000); + rd_sleep(1); + + /* There should be no infinite loop of OffsetForLeaderEpoch, and + * consequently, we should be able to consume these messages as a sign + * of success. */ + test_produce_msgs_easy_v(topic, testid, 0, 0, msg_cnt, 10, + "bootstrap.servers", bootstraps, + "batch.num.messages", "1", NULL); + + test_consumer_poll("MSG_INIT", c1, testid, 0, 0, msg_cnt, NULL); + + + rd_kafka_destroy(c1); + + test_mock_cluster_destroy(mcluster); + + TEST_LATER_CHECK(); + SUB_TEST_PASS(); +} + /** * @brief Storing an offset without leader epoch should still be allowed * and the greater than check should apply only to the offset. @@ -353,6 +436,8 @@ int main_0139_offset_validation_mock(int argc, char **argv) { do_test_ssl_error_retried(); + do_test_two_leader_changes(); + do_test_store_offset_without_leader_epoch(); return 0; From bd2afcfb7c47f35617cf208c0a860ded12278d8a Mon Sep 17 00:00:00 2001 From: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> Date: Fri, 29 Sep 2023 16:14:25 +0530 Subject: [PATCH 2/3] Increased flexver request size for Metadata request to include topic_id size (#4453) Co-authored-by: Emanuele Sabellico Co-authored-by: Milind L --- src/rdkafka_request.c | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index c1a650d984..de44677885 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -2231,9 +2231,11 @@ rd_kafka_MetadataRequest0(rd_kafka_broker_t *rkb, ApiVersion = rd_kafka_broker_ApiVersion_supported( rkb, RD_KAFKAP_Metadata, 0, metadata_max_version, &features); - rkbuf = rd_kafka_buf_new_flexver_request(rkb, RD_KAFKAP_Metadata, 1, - 4 + (50 * topic_cnt) + 1, - ApiVersion >= 9); + rkbuf = rd_kafka_buf_new_flexver_request( + rkb, RD_KAFKAP_Metadata, 1, + 4 + (66 /* 50 for topic name and 16 for topic id */ * topic_cnt) + + 1, + ApiVersion >= 9); if (!reason) reason = ""; From e2d79e1935dc6822a034b5965e4f0fa2617d7a1c Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Fri, 29 Sep 2023 13:13:37 +0200 Subject: [PATCH 3/3] Permanent errors during offset validation should be retried (#4447) --- CHANGELOG.md | 23 +++++++++++++---------- src/rdkafka_offset.c | 28 +++++++++------------------- tests/0139-offset_validation_mock.c | 19 ++++++++++--------- 3 files changed, 32 insertions(+), 38 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 540b7f7da6..9084312fad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,8 @@ librdkafka v2.3.0 is a feature release: * Fix to add leader epoch to control messages, to make sure they're stored for committing even without a subsequent fetch message (#4434). * Fix for stored offsets not being committed if they lacked the leader epoch (#4442). + * Fix to ensure permanent errors during offset validation continue being retried and + don't cause an offset reset (#4447). ## Fixes @@ -35,20 +37,21 @@ librdkafka v2.3.0 is a feature release: ### Consumer fixes - * Stored offsets were excluded from the commit if the leader epoch was - less than committed epoch, as it's possible if leader epoch is the default -1. - This didn't happen in Python, Go and .NET bindings when stored position was - taken from the message. - Solved by checking only that the stored offset is greater - than committed one, if either stored or committed leader epoch is -1 (#4442). - - -### Consumer Fixes - + * Stored offsets were excluded from the commit if the leader epoch was + less than committed epoch, as it's possible if leader epoch is the default -1. + This didn't happen in Python, Go and .NET bindings when stored position was + taken from the message. + Solved by checking only that the stored offset is greater + than committed one, if either stored or committed leader epoch is -1 (#4442). * If an OffsetForLeaderEpoch request was being retried, and the leader changed while the retry was in-flight, an infinite loop of requests was triggered, because we weren't updating the leader epoch correctly. Fixed by updating the leader epoch before sending the request (#4433). + * During offset validation a permanent error like host resolution failure + would cause an offset reset. + This isn't what's expected or what the Java implementation does. + Solved by retrying even in case of permanent errors (#4447). + # librdkafka v2.2.0 diff --git a/src/rdkafka_offset.c b/src/rdkafka_offset.c index 00cf8638f5..701a41613d 100644 --- a/src/rdkafka_offset.c +++ b/src/rdkafka_offset.c @@ -991,25 +991,15 @@ static void rd_kafka_toppar_handle_OffsetForLeaderEpoch(rd_kafka_t *rk, rd_kafka_topic_leader_query0(rk, rktp->rktp_rkt, 1, rd_true /* force */); - if (actions & RD_KAFKA_ERR_ACTION_RETRY) { - /* No need for refcnt on rktp for timer opaque - * since the timer resides on the rktp and will be - * stopped on toppar remove. */ - rd_kafka_timer_start_oneshot( - &rk->rk_timers, &rktp->rktp_validate_tmr, rd_false, - 500 * 1000 /* 500ms */, - rd_kafka_offset_validate_tmr_cb, rktp); - - } else if (actions & RD_KAFKA_ERR_ACTION_PERMANENT) { - /* Permanent error */ - rd_kafka_offset_reset( - rktp, rd_kafka_broker_id(rkb), - RD_KAFKA_FETCH_POS(RD_KAFKA_OFFSET_INVALID, - rktp->rktp_leader_epoch), - RD_KAFKA_RESP_ERR__LOG_TRUNCATION, - "Unable to validate offset and epoch: %s", - rd_kafka_err2str(err)); - } + /* No need for refcnt on rktp for timer opaque + * since the timer resides on the rktp and will be + * stopped on toppar remove. + * Retries the validation with a new call even in + * case of permanent error. */ + rd_kafka_timer_start_oneshot( + &rk->rk_timers, &rktp->rktp_validate_tmr, rd_false, + 500 * 1000 /* 500ms */, rd_kafka_offset_validate_tmr_cb, + rktp); goto done; } diff --git a/tests/0139-offset_validation_mock.c b/tests/0139-offset_validation_mock.c index 0fa2665b6b..967563fd70 100644 --- a/tests/0139-offset_validation_mock.c +++ b/tests/0139-offset_validation_mock.c @@ -140,10 +140,11 @@ static void do_test_no_duplicates_during_offset_validation(void) { /** - * @brief Test that an SSL error doesn't cause an offset reset. - * See issue #4293. + * @brief Test that a permanent error doesn't cause an offset reset. + * See issues #4293, #4427. + * @param err The error OffsetForLeaderEpoch fails with. */ -static void do_test_ssl_error_retried(void) { +static void do_test_permanent_error_retried(rd_kafka_resp_err_t err) { rd_kafka_mock_cluster_t *mcluster; rd_kafka_conf_t *conf; const char *bootstraps; @@ -155,7 +156,7 @@ static void do_test_ssl_error_retried(void) { int msg_count = 5; uint64_t testid = test_id_generate(); - SUB_TEST_QUICK(); + SUB_TEST_QUICK("err: %s", rd_kafka_err2name(err)); mcluster = test_mock_cluster_new(3, &bootstraps); rd_kafka_mock_topic_create(mcluster, topic, 1, 1); @@ -165,10 +166,9 @@ static void do_test_ssl_error_retried(void) { "bootstrap.servers", bootstraps, "batch.num.messages", "1", NULL); - /* Make OffsetForLeaderEpoch fail with the _SSL error */ - rd_kafka_mock_push_request_errors(mcluster, - RD_KAFKAP_OffsetForLeaderEpoch, 1, - RD_KAFKA_RESP_ERR__SSL); + /* Make OffsetForLeaderEpoch fail with the corresponding error code */ + rd_kafka_mock_push_request_errors( + mcluster, RD_KAFKAP_OffsetForLeaderEpoch, 1, err); test_conf_init(&conf, NULL, 60); @@ -434,7 +434,8 @@ int main_0139_offset_validation_mock(int argc, char **argv) { do_test_no_duplicates_during_offset_validation(); - do_test_ssl_error_retried(); + do_test_permanent_error_retried(RD_KAFKA_RESP_ERR__SSL); + do_test_permanent_error_retried(RD_KAFKA_RESP_ERR__RESOLVE); do_test_two_leader_changes();