diff --git a/CHANGELOG.md b/CHANGELOG.md index 49f528f53d..953864d514 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,8 @@ librdkafka v2.3.0 is a feature release: * Fix to add leader epoch to control messages, to make sure they're stored for committing even without a subsequent fetch message (#4434). * Fix for stored offsets not being committed if they lacked the leader epoch (#4442). + * Fix to ensure permanent errors during offset validation continue being retried and + don't cause an offset reset (#4447). ## Fixes @@ -46,6 +48,10 @@ librdkafka v2.3.0 is a feature release: while the retry was in-flight, an infinite loop of requests was triggered, because we weren't updating the leader epoch correctly. Fixed by updating the leader epoch before sending the request (#4433). + * During offset validation a permanent error like host resolution failure + would cause an offset reset. + This isn't what's expected or what the Java implementation does. + Solved by retrying even in case of permanent errors (#4447). ## Upgrade considerations diff --git a/src/rdkafka_offset.c b/src/rdkafka_offset.c index 00cf8638f5..701a41613d 100644 --- a/src/rdkafka_offset.c +++ b/src/rdkafka_offset.c @@ -991,25 +991,15 @@ static void rd_kafka_toppar_handle_OffsetForLeaderEpoch(rd_kafka_t *rk, rd_kafka_topic_leader_query0(rk, rktp->rktp_rkt, 1, rd_true /* force */); - if (actions & RD_KAFKA_ERR_ACTION_RETRY) { - /* No need for refcnt on rktp for timer opaque - * since the timer resides on the rktp and will be - * stopped on toppar remove. */ - rd_kafka_timer_start_oneshot( - &rk->rk_timers, &rktp->rktp_validate_tmr, rd_false, - 500 * 1000 /* 500ms */, - rd_kafka_offset_validate_tmr_cb, rktp); - - } else if (actions & RD_KAFKA_ERR_ACTION_PERMANENT) { - /* Permanent error */ - rd_kafka_offset_reset( - rktp, rd_kafka_broker_id(rkb), - RD_KAFKA_FETCH_POS(RD_KAFKA_OFFSET_INVALID, - rktp->rktp_leader_epoch), - RD_KAFKA_RESP_ERR__LOG_TRUNCATION, - "Unable to validate offset and epoch: %s", - rd_kafka_err2str(err)); - } + /* No need for refcnt on rktp for timer opaque + * since the timer resides on the rktp and will be + * stopped on toppar remove. + * Retries the validation with a new call even in + * case of permanent error. */ + rd_kafka_timer_start_oneshot( + &rk->rk_timers, &rktp->rktp_validate_tmr, rd_false, + 500 * 1000 /* 500ms */, rd_kafka_offset_validate_tmr_cb, + rktp); goto done; } diff --git a/tests/0139-offset_validation_mock.c b/tests/0139-offset_validation_mock.c index 0fa2665b6b..967563fd70 100644 --- a/tests/0139-offset_validation_mock.c +++ b/tests/0139-offset_validation_mock.c @@ -140,10 +140,11 @@ static void do_test_no_duplicates_during_offset_validation(void) { /** - * @brief Test that an SSL error doesn't cause an offset reset. - * See issue #4293. + * @brief Test that a permanent error doesn't cause an offset reset. + * See issues #4293, #4427. + * @param err The error OffsetForLeaderEpoch fails with. */ -static void do_test_ssl_error_retried(void) { +static void do_test_permanent_error_retried(rd_kafka_resp_err_t err) { rd_kafka_mock_cluster_t *mcluster; rd_kafka_conf_t *conf; const char *bootstraps; @@ -155,7 +156,7 @@ static void do_test_ssl_error_retried(void) { int msg_count = 5; uint64_t testid = test_id_generate(); - SUB_TEST_QUICK(); + SUB_TEST_QUICK("err: %s", rd_kafka_err2name(err)); mcluster = test_mock_cluster_new(3, &bootstraps); rd_kafka_mock_topic_create(mcluster, topic, 1, 1); @@ -165,10 +166,9 @@ static void do_test_ssl_error_retried(void) { "bootstrap.servers", bootstraps, "batch.num.messages", "1", NULL); - /* Make OffsetForLeaderEpoch fail with the _SSL error */ - rd_kafka_mock_push_request_errors(mcluster, - RD_KAFKAP_OffsetForLeaderEpoch, 1, - RD_KAFKA_RESP_ERR__SSL); + /* Make OffsetForLeaderEpoch fail with the corresponding error code */ + rd_kafka_mock_push_request_errors( + mcluster, RD_KAFKAP_OffsetForLeaderEpoch, 1, err); test_conf_init(&conf, NULL, 60); @@ -434,7 +434,8 @@ int main_0139_offset_validation_mock(int argc, char **argv) { do_test_no_duplicates_during_offset_validation(); - do_test_ssl_error_retried(); + do_test_permanent_error_retried(RD_KAFKA_RESP_ERR__SSL); + do_test_permanent_error_retried(RD_KAFKA_RESP_ERR__RESOLVE); do_test_two_leader_changes();