-
Notifications
You must be signed in to change notification settings - Fork 3.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Clean buffer after rebalancing for batch queue #3269
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good work!
See comments.
Issue: Buffer is not cleaned after rebalance if messages are polled using the batch queue method, so the consumer will still get old messages. Solution: when assign happens, a new op event with type RD_KAFKA_OP_BARRIER will be created, a new version is been created at mean time. If the consumer met this event, will clean the buffer by comparing the version of msgs and the new version just created.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost there! Good work!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good stuff!
I'm a bit concerned with the test, I don't think it actually triggers/verifies the buggy behaviour.
src/rdkafka_queue.c
Outdated
if (rd_kafka_op_version_outdated(rko, version)) { | ||
/* This also destroys the corresponding rkmessage. */ | ||
rd_kafka_op_destroy(rko); | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps as an optimization to avoid a write shuffle: else if (i > valid_count++)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi Magnus, thanks for pointing this out! I think copy should before valid_count + 1, so I change this part to the below logic:
if (rd_kafka_op_version_outdated(rko, version)) {
/* This also destroys the corresponding rkmessage. */
rd_kafka_op_destroy(rko);
} else if ((size_t)i > valid_count) {
rkmessages[valid_count++] = rkmessages[i];
} else {
valid_count++;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really close now!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great! Good work on this rather hairy bug, Jing!
|
||
|
||
/** | ||
* @brief Produce 400 messages and consume 500 messages totally by 2 consumers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very good !
Summary: This is bug fix for confluentinc/confluent-kafka-python#1013
Issue: Buffer is not cleaned after rebalance if messages are polled using the batch queue method, so the consumer will still get old messages.
Solution: when assign happens, a new op event with type RD_KAFKA_OP_BARRIER will be created, a new version is been created at mean time. If the consumer met this event, will clean the buffer by comparing the version of msgs and the new version just created.
Test passed in local test:
/ 63.161s] ALL-TESTS: duration 63160.668msTesting:
[0122_buffer_cleaning_after_rebalance/ 62.283s] ================= Test 0122_buffer_cleaning_after_rebalance PASSED =================
[
TEST 20210224223610 (bare, scenario default) SUMMARY
#==================================================================#
| | PASSED | 63.161s |
| 0122_buffer_cleaning_after_rebalance | PASSED | 62.283s |
#==================================================================#
[ / 63.163s] 0 thread(s) in use by librdkafka
[ / 63.163s]
============== ALL TESTS PASSED ==============
./test-runner in bare mode PASSED!