-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-16944: Rewrite Range Assignor #16504
KAFKA-16944: Rewrite Range Assignor #16504
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rreddy-22 Thanks for the PR. I left some comments for considerations.
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeSet.java
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeSet.java
Show resolved
Hide resolved
...coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java
Outdated
Show resolved
Hide resolved
...coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java
Outdated
Show resolved
Hide resolved
...coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java
Outdated
Show resolved
Hide resolved
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rreddy-22 Thanks for the update. I left more comments for consideration. I'd like also point out that they are related failed tests. Could you please check them out?
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeSet.java
Outdated
Show resolved
Hide resolved
...coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java
Outdated
Show resolved
Hide resolved
...coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeSetTest.java
Outdated
Show resolved
Hide resolved
@Override | ||
public Object[] toArray() { | ||
throw new UnsupportedOperationException(); | ||
} | ||
|
||
@Override | ||
public <T> T[] toArray(T[] a) { | ||
throw new UnsupportedOperationException(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have noticed that many tests are failing in the build. I suspect that they do because those two methods are not implemented.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the logs:
java.lang.UnsupportedOperationException
at org.apache.kafka.coordinator.group.assignor.RangeSet.toArray(RangeSet.java:83)
at java.base/java.util.ArrayList.<init>(ArrayList.java:181)
at org.apache.kafka.coordinator.group.CoordinatorRecordHelpers.newTargetAssignmentRecord(CoordinatorRecordHelpers.java:242)
at org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder.build(TargetAssignmentBuilder.java:368)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The TargetAssignmentBuilderBenchmark
results are very likely impacted by this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks.
Here are the results of the benchmarks based on the last commit:
|
@dajac @rreddy-22 It looks like this commit broke the build since there were new tests added here a few hours before this PR was merged that were not updated to use the new Can we either revert this commit or publish a fix PR ASAP? |
Looks like a fix has been published: #16526 |
GroupSpec groupSpec, | ||
SubscribedTopicDescriber subscribedTopicDescriber | ||
) throws PartitionAssignorException { | ||
List<String> memberIds = sortMemberIds(groupSpec); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems this sorting can be delayed. On line 159, there may be exception thrown.
the sorting can be done when the loop starting on line 157 finishes.
The server side range assignor was made to be sticky i.e. partitions from the existing assignment are retained as much as possible. During a rebalance, the expected behavior is to achieve co-partitioning for members that are subscribed to the same set of topics with equal number of partitions. However, there are cases where this cannot be achieved efficiently with the current algorithm. There is no easy way to implement stickiness and co-partitioning and hence we have resorted to recomputing the target assignment every time. In case of static membership, instanceIds are leveraged to ensure some form of stickiness. ``` Benchmark (assignmentType) (assignorType) (isRackAware) (memberCount) (partitionsToMemberRatio) (subscriptionType) (topicCount) Mode Cnt Score Error Units ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false 100 10 HOMOGENEOUS 100 avgt 5 0.052 ± 0.001 ms/op ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false 100 10 HOMOGENEOUS 1000 avgt 5 0.454 ± 0.003 ms/op ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false 1000 10 HOMOGENEOUS 100 avgt 5 0.476 ± 0.046 ms/op ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false 1000 10 HOMOGENEOUS 1000 avgt 5 3.102 ± 0.055 ms/op ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false 10000 10 HOMOGENEOUS 100 avgt 5 5.640 ± 0.223 ms/op ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false 10000 10 HOMOGENEOUS 1000 avgt 5 37.947 ± 1.000 ms/op ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false 100 10 HETEROGENEOUS 100 avgt 5 0.172 ± 0.001 ms/op ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false 100 10 HETEROGENEOUS 1000 avgt 5 1.882 ± 0.006 ms/op ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false 1000 10 HETEROGENEOUS 100 avgt 5 1.730 ± 0.036 ms/op ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false 1000 10 HETEROGENEOUS 1000 avgt 5 17.654 ± 1.160 ms/op ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false 10000 10 HETEROGENEOUS 100 avgt 5 18.595 ± 0.316 ms/op ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false 10000 10 HETEROGENEOUS 1000 avgt 5 172.398 ± 2.251 ms/op JMH benchmarks done Benchmark (memberCount) (partitionsToMemberRatio) (topicCount) Mode Cnt Score Error Units TargetAssignmentBuilderBenchmark.build 100 10 100 avgt 5 0.071 ± 0.004 ms/op TargetAssignmentBuilderBenchmark.build 100 10 1000 avgt 5 0.428 ± 0.026 ms/op TargetAssignmentBuilderBenchmark.build 1000 10 100 avgt 5 0.659 ± 0.028 ms/op TargetAssignmentBuilderBenchmark.build 1000 10 1000 avgt 5 3.346 ± 0.102 ms/op TargetAssignmentBuilderBenchmark.build 10000 10 100 avgt 5 8.947 ± 0.386 ms/op TargetAssignmentBuilderBenchmark.build 10000 10 1000 avgt 5 40.240 ± 3.113 ms/op JMH benchmarks done ``` Reviewers: David Jacot <[email protected]>
The server side range assignor was made to be sticky i.e. partitions from the existing assignment are retained as much as possible. During a rebalance, the expected behavior is to achieve co-partitioning for members that are subscribed to the same set of topics with equal number of partitions.
However, there are cases where this cannot be achieved efficiently with the current algorithm. There is no easy way to implement stickiness and co-partitioning and hence we have resorted to recomputing the target assignment every time.
In case of static membership, instanceIds are leveraged to ensure some form of stickiness.