Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16944: Rewrite Range Assignor #16504

Merged
merged 9 commits into from
Jul 4, 2024

Conversation

rreddy-22
Copy link
Contributor

@rreddy-22 rreddy-22 commented Jul 1, 2024

The server side range assignor was made to be sticky i.e. partitions from the existing assignment are retained as much as possible. During a rebalance, the expected behavior is to achieve co-partitioning for members that are subscribed to the same set of topics with equal number of partitions.

However, there are cases where this cannot be achieved efficiently with the current algorithm. There is no easy way to implement stickiness and co-partitioning and hence we have resorted to recomputing the target assignment every time.

In case of static membership, instanceIds are leveraged to ensure some form of stickiness.

@rreddy-22 rreddy-22 marked this pull request as ready for review July 1, 2024 23:47
@dajac dajac added the KIP-848 The Next Generation of the Consumer Rebalance Protocol label Jul 2, 2024
Copy link
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rreddy-22 Thanks for the PR. I left some comments for considerations.

@rreddy-22
Copy link
Contributor Author

TargetAssignmentBuilderBenchmark.build            100                         10           100  avgt    5    0.432 ± 0.003  ms/op
TargetAssignmentBuilderBenchmark.build            100                         10          1000  avgt    5    4.139 ± 0.013  ms/op
TargetAssignmentBuilderBenchmark.build           1000                         10           100  avgt    5    4.332 ± 0.049  ms/op
TargetAssignmentBuilderBenchmark.build           1000                         10          1000  avgt    5   43.449 ± 0.058  ms/op
TargetAssignmentBuilderBenchmark.build          10000                         10           100  avgt    5   47.766 ± 0.389  ms/op
TargetAssignmentBuilderBenchmark.build          10000                         10          1000  avgt    5  487.833 ± 3.459  ms/op

Copy link
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rreddy-22 Thanks for the update. I left more comments for consideration. I'd like also point out that they are related failed tests. Could you please check them out?

Comment on lines 81 to 89
@Override
public Object[] toArray() {
throw new UnsupportedOperationException();
}

@Override
public <T> T[] toArray(T[] a) {
throw new UnsupportedOperationException();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have noticed that many tests are failing in the build. I suspect that they do because those two methods are not implemented.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the logs:

java.lang.UnsupportedOperationException
	at org.apache.kafka.coordinator.group.assignor.RangeSet.toArray(RangeSet.java:83)
	at java.base/java.util.ArrayList.<init>(ArrayList.java:181)
	at org.apache.kafka.coordinator.group.CoordinatorRecordHelpers.newTargetAssignmentRecord(CoordinatorRecordHelpers.java:242)
	at org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder.build(TargetAssignmentBuilder.java:368)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TargetAssignmentBuilderBenchmark results are very likely impacted by this.

Copy link
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks.

@dajac
Copy link
Member

dajac commented Jul 4, 2024

Here are the results of the benchmarks based on the last commit:


Benchmark                                       (assignmentType)  (assignorType)  (isRackAware)  (memberCount)  (partitionsToMemberRatio)  (subscriptionType)  (topicCount)  Mode  Cnt    Score    Error  Units
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false            100                         10         HOMOGENEOUS           100  avgt    5    0.052 ±  0.001  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false            100                         10         HOMOGENEOUS          1000  avgt    5    0.454 ±  0.003  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false           1000                         10         HOMOGENEOUS           100  avgt    5    0.476 ±  0.046  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false           1000                         10         HOMOGENEOUS          1000  avgt    5    3.102 ±  0.055  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false          10000                         10         HOMOGENEOUS           100  avgt    5    5.640 ±  0.223  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false          10000                         10         HOMOGENEOUS          1000  avgt    5   37.947 ±  1.000  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false            100                         10       HETEROGENEOUS           100  avgt    5    0.172 ±  0.001  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false            100                         10       HETEROGENEOUS          1000  avgt    5    1.882 ±  0.006  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false           1000                         10       HETEROGENEOUS           100  avgt    5    1.730 ±  0.036  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false           1000                         10       HETEROGENEOUS          1000  avgt    5   17.654 ±  1.160  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false          10000                         10       HETEROGENEOUS           100  avgt    5   18.595 ±  0.316  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false          10000                         10       HETEROGENEOUS          1000  avgt    5  172.398 ±  2.251  ms/op
JMH benchmarks done

Benchmark                                     (memberCount)  (partitionsToMemberRatio)  (topicCount)  Mode  Cnt   Score   Error  Units
TargetAssignmentBuilderBenchmark.build                  100                         10           100  avgt    5   0.071 ± 0.004  ms/op
TargetAssignmentBuilderBenchmark.build                  100                         10          1000  avgt    5   0.428 ± 0.026  ms/op
TargetAssignmentBuilderBenchmark.build                 1000                         10           100  avgt    5   0.659 ± 0.028  ms/op
TargetAssignmentBuilderBenchmark.build                 1000                         10          1000  avgt    5   3.346 ± 0.102  ms/op
TargetAssignmentBuilderBenchmark.build                10000                         10           100  avgt    5   8.947 ± 0.386  ms/op
TargetAssignmentBuilderBenchmark.build                10000                         10          1000  avgt    5  40.240 ± 3.113  ms/op
JMH benchmarks done

@dajac dajac merged commit 42f267a into apache:trunk Jul 4, 2024
1 check failed
@C0urante
Copy link
Contributor

C0urante commented Jul 4, 2024

@dajac @rreddy-22 It looks like this commit broke the build since there were new tests added here a few hours before this PR was merged that were not updated to use the new MemberSubscriptionAndAssignmentImpl constructor.

Can we either revert this commit or publish a fix PR ASAP?

@C0urante
Copy link
Contributor

C0urante commented Jul 4, 2024

Looks like a fix has been published: #16526

GroupSpec groupSpec,
SubscribedTopicDescriber subscribedTopicDescriber
) throws PartitionAssignorException {
List<String> memberIds = sortMemberIds(groupSpec);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this sorting can be delayed. On line 159, there may be exception thrown.
the sorting can be done when the loop starting on line 157 finishes.

abhi-ksolves pushed a commit to ksolves/kafka that referenced this pull request Jul 31, 2024
The server side range assignor was made to be sticky i.e. partitions from the existing assignment are retained as much as possible. During a rebalance, the expected behavior is to achieve co-partitioning for members that are subscribed to the same set of topics with equal number of partitions.

However, there are cases where this cannot be achieved efficiently with the current algorithm. There is no easy way to implement stickiness and co-partitioning and hence we have resorted to recomputing the target assignment every time.

In case of static membership, instanceIds are leveraged to ensure some form of stickiness.

```
Benchmark                                       (assignmentType)  (assignorType)  (isRackAware)  (memberCount)  (partitionsToMemberRatio)  (subscriptionType)  (topicCount)  Mode  Cnt    Score    Error  Units
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false            100                         10         HOMOGENEOUS           100  avgt    5    0.052 ±  0.001  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false            100                         10         HOMOGENEOUS          1000  avgt    5    0.454 ±  0.003  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false           1000                         10         HOMOGENEOUS           100  avgt    5    0.476 ±  0.046  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false           1000                         10         HOMOGENEOUS          1000  avgt    5    3.102 ±  0.055  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false          10000                         10         HOMOGENEOUS           100  avgt    5    5.640 ±  0.223  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false          10000                         10         HOMOGENEOUS          1000  avgt    5   37.947 ±  1.000  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false            100                         10       HETEROGENEOUS           100  avgt    5    0.172 ±  0.001  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false            100                         10       HETEROGENEOUS          1000  avgt    5    1.882 ±  0.006  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false           1000                         10       HETEROGENEOUS           100  avgt    5    1.730 ±  0.036  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false           1000                         10       HETEROGENEOUS          1000  avgt    5   17.654 ±  1.160  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false          10000                         10       HETEROGENEOUS           100  avgt    5   18.595 ±  0.316  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false          10000                         10       HETEROGENEOUS          1000  avgt    5  172.398 ±  2.251  ms/op
JMH benchmarks done

Benchmark                                     (memberCount)  (partitionsToMemberRatio)  (topicCount)  Mode  Cnt   Score   Error  Units
TargetAssignmentBuilderBenchmark.build                  100                         10           100  avgt    5   0.071 ± 0.004  ms/op
TargetAssignmentBuilderBenchmark.build                  100                         10          1000  avgt    5   0.428 ± 0.026  ms/op
TargetAssignmentBuilderBenchmark.build                 1000                         10           100  avgt    5   0.659 ± 0.028  ms/op
TargetAssignmentBuilderBenchmark.build                 1000                         10          1000  avgt    5   3.346 ± 0.102  ms/op
TargetAssignmentBuilderBenchmark.build                10000                         10           100  avgt    5   8.947 ± 0.386  ms/op
TargetAssignmentBuilderBenchmark.build                10000                         10          1000  avgt    5  40.240 ± 3.113  ms/op
JMH benchmarks done
```

Reviewers: David Jacot <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
KIP-848 The Next Generation of the Consumer Rebalance Protocol
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants