Skip to content

Commit

Permalink
Merge branch 'master' into feature/KIP-580
Browse files Browse the repository at this point in the history
  • Loading branch information
milindl committed Sep 29, 2023
2 parents 95ffaf8 + e2d79e1 commit cd5d802
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 28 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ librdkafka v2.3.0 is a feature release:
* Fix to add leader epoch to control messages, to make sure they're stored
for committing even without a subsequent fetch message (#4434).
* Fix for stored offsets not being committed if they lacked the leader epoch (#4442).
* Fix to ensure permanent errors during offset validation continue being retried and
don't cause an offset reset (#4447).


## Fixes
Expand All @@ -46,6 +48,10 @@ librdkafka v2.3.0 is a feature release:
while the retry was in-flight, an infinite loop of requests was triggered,
because we weren't updating the leader epoch correctly.
Fixed by updating the leader epoch before sending the request (#4433).
* During offset validation a permanent error like host resolution failure
would cause an offset reset.
This isn't what's expected or what the Java implementation does.
Solved by retrying even in case of permanent errors (#4447).


## Upgrade considerations
Expand Down
28 changes: 9 additions & 19 deletions src/rdkafka_offset.c
Original file line number Diff line number Diff line change
Expand Up @@ -991,25 +991,15 @@ static void rd_kafka_toppar_handle_OffsetForLeaderEpoch(rd_kafka_t *rk,
rd_kafka_topic_leader_query0(rk, rktp->rktp_rkt, 1,
rd_true /* force */);

if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
/* No need for refcnt on rktp for timer opaque
* since the timer resides on the rktp and will be
* stopped on toppar remove. */
rd_kafka_timer_start_oneshot(
&rk->rk_timers, &rktp->rktp_validate_tmr, rd_false,
500 * 1000 /* 500ms */,
rd_kafka_offset_validate_tmr_cb, rktp);

} else if (actions & RD_KAFKA_ERR_ACTION_PERMANENT) {
/* Permanent error */
rd_kafka_offset_reset(
rktp, rd_kafka_broker_id(rkb),
RD_KAFKA_FETCH_POS(RD_KAFKA_OFFSET_INVALID,
rktp->rktp_leader_epoch),
RD_KAFKA_RESP_ERR__LOG_TRUNCATION,
"Unable to validate offset and epoch: %s",
rd_kafka_err2str(err));
}
/* No need for refcnt on rktp for timer opaque
* since the timer resides on the rktp and will be
* stopped on toppar remove.
* Retries the validation with a new call even in
* case of permanent error. */
rd_kafka_timer_start_oneshot(
&rk->rk_timers, &rktp->rktp_validate_tmr, rd_false,
500 * 1000 /* 500ms */, rd_kafka_offset_validate_tmr_cb,
rktp);
goto done;
}

Expand Down
19 changes: 10 additions & 9 deletions tests/0139-offset_validation_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,11 @@ static void do_test_no_duplicates_during_offset_validation(void) {


/**
* @brief Test that an SSL error doesn't cause an offset reset.
* See issue #4293.
* @brief Test that a permanent error doesn't cause an offset reset.
* See issues #4293, #4427.
* @param err The error OffsetForLeaderEpoch fails with.
*/
static void do_test_ssl_error_retried(void) {
static void do_test_permanent_error_retried(rd_kafka_resp_err_t err) {
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_conf_t *conf;
const char *bootstraps;
Expand All @@ -155,7 +156,7 @@ static void do_test_ssl_error_retried(void) {
int msg_count = 5;
uint64_t testid = test_id_generate();

SUB_TEST_QUICK();
SUB_TEST_QUICK("err: %s", rd_kafka_err2name(err));

mcluster = test_mock_cluster_new(3, &bootstraps);
rd_kafka_mock_topic_create(mcluster, topic, 1, 1);
Expand All @@ -165,10 +166,9 @@ static void do_test_ssl_error_retried(void) {
"bootstrap.servers", bootstraps,
"batch.num.messages", "1", NULL);

/* Make OffsetForLeaderEpoch fail with the _SSL error */
rd_kafka_mock_push_request_errors(mcluster,
RD_KAFKAP_OffsetForLeaderEpoch, 1,
RD_KAFKA_RESP_ERR__SSL);
/* Make OffsetForLeaderEpoch fail with the corresponding error code */
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_OffsetForLeaderEpoch, 1, err);

test_conf_init(&conf, NULL, 60);

Expand Down Expand Up @@ -434,7 +434,8 @@ int main_0139_offset_validation_mock(int argc, char **argv) {

do_test_no_duplicates_during_offset_validation();

do_test_ssl_error_retried();
do_test_permanent_error_retried(RD_KAFKA_RESP_ERR__SSL);
do_test_permanent_error_retried(RD_KAFKA_RESP_ERR__RESOLVE);

do_test_two_leader_changes();

Expand Down

0 comments on commit cd5d802

Please sign in to comment.