Skip to content

Commit

Permalink
KAFKA-16944; Rewrite Range Assignor (apache#16504)
Browse files Browse the repository at this point in the history
The server side range assignor was made to be sticky i.e. partitions from the existing assignment are retained as much as possible. During a rebalance, the expected behavior is to achieve co-partitioning for members that are subscribed to the same set of topics with equal number of partitions.

However, there are cases where this cannot be achieved efficiently with the current algorithm. There is no easy way to implement stickiness and co-partitioning and hence we have resorted to recomputing the target assignment every time.

In case of static membership, instanceIds are leveraged to ensure some form of stickiness.

```
Benchmark                                       (assignmentType)  (assignorType)  (isRackAware)  (memberCount)  (partitionsToMemberRatio)  (subscriptionType)  (topicCount)  Mode  Cnt    Score    Error  Units
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false            100                         10         HOMOGENEOUS           100  avgt    5    0.052 ±  0.001  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false            100                         10         HOMOGENEOUS          1000  avgt    5    0.454 ±  0.003  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false           1000                         10         HOMOGENEOUS           100  avgt    5    0.476 ±  0.046  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false           1000                         10         HOMOGENEOUS          1000  avgt    5    3.102 ±  0.055  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false          10000                         10         HOMOGENEOUS           100  avgt    5    5.640 ±  0.223  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false          10000                         10         HOMOGENEOUS          1000  avgt    5   37.947 ±  1.000  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false            100                         10       HETEROGENEOUS           100  avgt    5    0.172 ±  0.001  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false            100                         10       HETEROGENEOUS          1000  avgt    5    1.882 ±  0.006  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false           1000                         10       HETEROGENEOUS           100  avgt    5    1.730 ±  0.036  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false           1000                         10       HETEROGENEOUS          1000  avgt    5   17.654 ±  1.160  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false          10000                         10       HETEROGENEOUS           100  avgt    5   18.595 ±  0.316  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false          10000                         10       HETEROGENEOUS          1000  avgt    5  172.398 ±  2.251  ms/op
JMH benchmarks done

Benchmark                                     (memberCount)  (partitionsToMemberRatio)  (topicCount)  Mode  Cnt   Score   Error  Units
TargetAssignmentBuilderBenchmark.build                  100                         10           100  avgt    5   0.071 ± 0.004  ms/op
TargetAssignmentBuilderBenchmark.build                  100                         10          1000  avgt    5   0.428 ± 0.026  ms/op
TargetAssignmentBuilderBenchmark.build                 1000                         10           100  avgt    5   0.659 ± 0.028  ms/op
TargetAssignmentBuilderBenchmark.build                 1000                         10          1000  avgt    5   3.346 ± 0.102  ms/op
TargetAssignmentBuilderBenchmark.build                10000                         10           100  avgt    5   8.947 ± 0.386  ms/op
TargetAssignmentBuilderBenchmark.build                10000                         10          1000  avgt    5  40.240 ± 3.113  ms/op
JMH benchmarks done
```

Reviewers: David Jacot <[email protected]>
  • Loading branch information
rreddy-22 authored Jul 4, 2024
1 parent e0dcfa7 commit 42f267a
Show file tree
Hide file tree
Showing 13 changed files with 845 additions and 201 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ public interface MemberSubscription {
*/
Optional<String> rackId();

/**
* Gets the instance Id if present.
*
* @return An Optional containing the instance Id, or an empty Optional if not present.
*/
Optional<String> instanceId();

/**
* Gets the set of subscribed topic Ids.
*
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;

import java.lang.reflect.Array;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Set;

/**
* A {@code RangeSet} represents a range of integers from {@code from} (inclusive)
* to {@code to} (exclusive).
* This implementation provides a view over a continuous range of integers without actually storing them.
*/
class RangeSet implements Set<Integer> {
private final int from;
private final int to;

/**
* Constructs a {@code RangeSet} with the specified range.
*
* @param from The starting value (inclusive) of the range.
* @param to The ending value (exclusive) of the range.
*/
public RangeSet(int from, int to) {
this.from = from;
this.to = to;
}

@Override
public int size() {
return to - from;
}

@Override
public boolean isEmpty() {
return size() == 0;
}

@Override
public boolean contains(Object o) {
if (o instanceof Integer) {
int value = (Integer) o;
return value >= from && value < to;
}
return false;
}

@Override
public Iterator<Integer> iterator() {
return new Iterator<Integer>() {
private int current = from;

@Override
public boolean hasNext() {
return current < to;
}

@Override
public Integer next() {
if (!hasNext()) throw new NoSuchElementException();
return current++;
}
};
}

@Override
public Object[] toArray() {
Object[] array = new Object[size()];
for (int i = 0; i < size(); i++) {
array[i] = from + i;
}
return array;
}

@Override
@SuppressWarnings("unchecked")
public <T> T[] toArray(T[] a) {
int size = size();
if (a.length < size) {
// Create a new array of the same type as a with the correct size
a = (T[]) Array.newInstance(a.getClass().getComponentType(), size);
}
for (int i = 0; i < size; i++) {
a[i] = (T) Integer.valueOf(from + i);
}
if (a.length > size) {
a[size] = null;
}
return a;
}

@Override
public boolean add(Integer integer) {
throw new UnsupportedOperationException();
}

@Override
public boolean remove(Object o) {
throw new UnsupportedOperationException();
}

@Override
public boolean containsAll(Collection<?> c) {
for (Object o : c) {
if (!contains(o)) return false;
}
return true;
}

@Override
public boolean addAll(Collection<? extends Integer> c) {
throw new UnsupportedOperationException();
}

@Override
public boolean retainAll(Collection<?> c) {
throw new UnsupportedOperationException();
}

@Override
public boolean removeAll(Collection<?> c) {
throw new UnsupportedOperationException();
}

@Override
public void clear() {
throw new UnsupportedOperationException();
}

@Override
public String toString() {
return "RangeSet(from=" + from + " (inclusive), to=" + to + " (exclusive))";
}

/**
* Compares the specified object with this set for equality.
* Returns {@code true} if the specified object is also a set,
* the two sets have the same size, and every member of the specified
* set is contained in this set.
*
* @param o object to be compared for equality with this set
* @return {@code true} if the specified object is equal to this set
*/
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof Set)) return false;

if (o instanceof RangeSet) {
RangeSet other = (RangeSet) o;
return this.from == other.from && this.to == other.to;
}

Set<?> otherSet = (Set<?>) o;
if (otherSet.size() != this.size()) return false;

for (int i = from; i < to; i++) {
if (!otherSet.contains(i)) return false;
}
return true;
}

@Override
public int hashCode() {
int result = from;
result = 31 * result + to;
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
*/
public class MemberSubscriptionAndAssignmentImpl implements MemberSubscription, MemberAssignment {
private final Optional<String> rackId;
private final Optional<String> instanceId;
private final Set<Uuid> subscribedTopicIds;
private final Assignment memberAssignment;

Expand All @@ -42,10 +43,12 @@ public class MemberSubscriptionAndAssignmentImpl implements MemberSubscription,
*/
public MemberSubscriptionAndAssignmentImpl(
Optional<String> rackId,
Optional<String> instanceId,
Set<Uuid> subscribedTopicIds,
Assignment memberAssignment
) {
this.rackId = Objects.requireNonNull(rackId);
this.instanceId = Objects.requireNonNull(instanceId);
this.subscribedTopicIds = Objects.requireNonNull(subscribedTopicIds);
this.memberAssignment = Objects.requireNonNull(memberAssignment);
}
Expand All @@ -55,6 +58,11 @@ public Optional<String> rackId() {
return rackId;
}

@Override
public Optional<String> instanceId() {
return instanceId;
}

@Override
public Set<Uuid> subscribedTopicIds() {
return subscribedTopicIds;
Expand All @@ -71,13 +79,15 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
MemberSubscriptionAndAssignmentImpl that = (MemberSubscriptionAndAssignmentImpl) o;
return rackId.equals(that.rackId) &&
instanceId.equals(that.instanceId) &&
subscribedTopicIds.equals(that.subscribedTopicIds) &&
memberAssignment.equals(that.memberAssignment);
}

@Override
public int hashCode() {
int result = rackId.hashCode();
result = 31 * result + instanceId.hashCode();
result = 31 * result + subscribedTopicIds.hashCode();
result = 31 * result + memberAssignment.hashCode();
return result;
Expand All @@ -86,6 +96,7 @@ public int hashCode() {
@Override
public String toString() {
return "MemberSubscriptionAndAssignmentImpl(rackId=" + rackId.orElse("N/A") +
", instanceId=" + instanceId +
", subscribedTopicIds=" + subscribedTopicIds +
", memberAssignment=" + memberAssignment +
')';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ static <T extends ModernGroupMember> MemberSubscriptionAndAssignmentImpl createM
) {
return new MemberSubscriptionAndAssignmentImpl(
Optional.ofNullable(member.rackId()),
Optional.ofNullable(member.instanceId()),
new TopicIds(member.subscribedTopicNames(), topicsImage),
memberAssignment
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ void setUp() {
topicId = Uuid.randomUuid();

members.put(TEST_MEMBER, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Optional.empty(),
mkSet(topicId),
Assignment.EMPTY
Expand Down Expand Up @@ -101,6 +102,7 @@ void testMemberAssignment() {
mkSet(0, 1)
);
members.put(TEST_MEMBER, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Optional.empty(),
mkSet(topicId),
new Assignment(topicPartitions)
Expand Down
Loading

0 comments on commit 42f267a

Please sign in to comment.