SKYEDEN-3020 | Handle retransmission zookeeper race condition #1936
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Why:
The current implementation of the Retransmitter::reloadOffsets method contains a race condition. Multiple consumer instances can execute this code simultaneously, which may lead to unpredictable behavior.
Offsets are stored in ZooKeeper per partition, for example:
[1:{123}, 2:{345}, 3:{567}, 4:{678}]
.The method first fetches the entire list of available partitions from ZooKeeper (
[1,2,3,4]
), then retrieves the specific offsets for each partition. The fetched offsets are processed for partitions assigned to the consumer and subsequently deleted from ZooKeeper. For instance, if consumer1 is assigned partitions[1,2]
, it will process these offsets and delete them, leaving only partitions[3,4]
in ZooKeeper.The issue arises when multiple consumers execute this process simultaneously. A consumer may fetch the list of available partitions, but by the time it attempts to fetch the offset for a partition, the offset may no longer exist because it has already been deleted by another consumer.
Fix:
To resolve this, the step of listing all available partitions from zookeper was removed. Instead, the method now directly fetches the partitions assigned to the consumer. This ensures that the consumer only retrieves data for partitions it is responsible for, eliminating the possibility of fetching offsets that may have been deleted by another consumer.