Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix assignment lost, on illegal generation, during a commit #4908

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ librdkafka v2.6.2 is a maintenance release:
trusted root certificates (#4900).
* Fixes to allow to migrate partitions to leaders with same leader epoch,
or NULL leader epoch (#4901).
* Commits during a cooperative incremental rebalance aren't causing
an assignment lost if the generation id was bumped in between (#4908).


## Fixes
Expand Down Expand Up @@ -40,6 +42,13 @@ librdkafka v2.6.2 is a maintenance release:
temporarily migrated to the internal broker (#4804), or if broker implementation
never bumps it, as it's not needed to validate the offsets.
Happening since v2.4.0 (#4901).
* Issues: #4059
Commits during a cooperative incremental rebalance could cause an
assignment lost if the generation id was bumped by a second join
group request.
Solved by not rejoining the group in case an illegal generation error happens
during a rebalance.
Happening since v1.6.0 (#4908)



Expand Down
7 changes: 6 additions & 1 deletion src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -3793,7 +3793,12 @@ static void rd_kafka_cgrp_op_handle_OffsetCommit(rd_kafka_t *rk,
break;

case RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION:
/* Revoke assignment and rebalance on illegal generation */
/* Revoke assignment and rebalance on illegal generation,
* only if not rebalancing, because a new generation id
* can be received soon after this error. */
if (RD_KAFKA_CGRP_REBALANCING(rkcg))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting rk->rk_cgrp->rkcg_generation_id to -1, even if not initiating a rejoin, was causing an error in next SyncGroup, that was causing the lost assignment.

break;

rk->rk_cgrp->rkcg_generation_id = -1;
rd_kafka_cgrp_revoke_all_rejoin_maybe(
rkcg, rd_true /*assignment is lost*/,
Expand Down
46 changes: 41 additions & 5 deletions tests/0113-cooperative_rebalance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3171,8 +3171,11 @@ static void v_rebalance_cb(rd_kafka_t *rk,
if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) {
test_consumer_incremental_assign("assign", rk, parts);
} else {
test_consumer_incremental_unassign("unassign", rk, parts);

TEST_ASSERT(!rd_kafka_assignment_lost(rk),
"Assignment must not be lost, "
" that is a sign that an ILLEGAL_GENERATION error, "
" during a commit happening during a rebalance is "
"causing the assignment to be lost.");
if (!*auto_commitp) {
rd_kafka_resp_err_t commit_err;

Expand All @@ -3181,10 +3184,14 @@ static void v_rebalance_cb(rd_kafka_t *rk,
rd_sleep(2);
commit_err = rd_kafka_commit(rk, NULL, 0 /*sync*/);
TEST_ASSERT(!commit_err || commit_err == RD_KAFKA_RESP_ERR__NO_OFFSET ||
commit_err == RD_KAFKA_RESP_ERR__DESTROY,
commit_err == RD_KAFKA_RESP_ERR__DESTROY ||
commit_err == RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
"%s: manual commit failed: %s", rd_kafka_name(rk),
rd_kafka_err2str(commit_err));
}

/* Unassign must be done after manual commit. */
test_consumer_incremental_unassign("unassign", rk, parts);
}
}

Expand All @@ -3198,11 +3205,23 @@ static void v_commit_cb(rd_kafka_t *rk,
TEST_SAY("%s offset commit for %d offsets: %s\n", rd_kafka_name(rk),
offsets ? offsets->cnt : -1, rd_kafka_err2name(err));
TEST_ASSERT(!err || err == RD_KAFKA_RESP_ERR__NO_OFFSET ||
err == RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION ||
err == RD_KAFKA_RESP_ERR__DESTROY /* consumer was closed */,
"%s offset commit failed: %s", rd_kafka_name(rk),
rd_kafka_err2str(err));
}

/**
* @brief Log callback for the v_.. test.
*/
static void v_log_cb(const rd_kafka_t *rk,
int level,
const char *fac,
const char *buf) {
/* Slow down logging to make ILLEGAL_GENERATION errors caused by
* manual commit more likely. */
rd_usleep(1000, 0);
}

static void v_commit_during_rebalance(bool with_rebalance_cb,
bool auto_commit) {
Expand Down Expand Up @@ -3240,8 +3259,13 @@ static void v_commit_during_rebalance(bool with_rebalance_cb,


test_conf_set(conf, "auto.offset.reset", "earliest");
test_conf_set(conf, "debug", "consumer,cgrp,topic,fetch");
test_conf_set(conf, "enable.auto.commit", auto_commit ? "true" : "false");
test_conf_set(conf, "partition.assignment.strategy", "cooperative-sticky");
if (!auto_commit)
/* Slowing down logging is necessary only to make assignment lost
* errors more evident. */
rd_kafka_conf_set_log_cb(conf, v_log_cb);
rd_kafka_conf_set_offset_commit_cb(conf, v_commit_cb);
rd_kafka_conf_set_opaque(conf, (void *)&auto_commit);

Expand All @@ -3266,8 +3290,20 @@ static void v_commit_during_rebalance(bool with_rebalance_cb,

/* Poll both consumers */
for (i = 0; i < 10; i++) {
test_consumer_poll_once(c1, NULL, 1000);
test_consumer_poll_once(c2, NULL, 1000);
int poll_result1, poll_result2;
do {
poll_result1 = test_consumer_poll_once(c1, NULL, 1000);
poll_result2 = test_consumer_poll_once(c2, NULL, 1000);

if (poll_result1 == 1 && !auto_commit) {
rd_kafka_resp_err_t err;
TEST_SAY("Attempting manual commit after poll\n");
err = rd_kafka_commit(c1, NULL, 0);
TEST_ASSERT(!err || err == RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
"Expected not error or ILLEGAL_GENERATION, got: %s",
rd_kafka_err2str(err));
}
} while (poll_result1 == 0 || poll_result2 == 0);
}

TEST_SAY("Closing consumers\n");
Expand Down