diff --git a/group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/MemberSubscription.java b/group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/MemberSubscription.java index 4f3cf45eee47b..fcf7b84baf0b7 100644 --- a/group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/MemberSubscription.java +++ b/group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/MemberSubscription.java @@ -34,6 +34,13 @@ public interface MemberSubscription { */ Optional rackId(); + /** + * Gets the instance Id if present. + * + * @return An Optional containing the instance Id, or an empty Optional if not present. + */ + Optional instanceId(); + /** * Gets the set of subscribed topic Ids. * diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java index b11959de79591..ee33212b01fd2 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java @@ -21,40 +21,67 @@ import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment; import org.apache.kafka.coordinator.group.api.assignor.GroupSpec; import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.api.assignor.MemberSubscription; import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException; import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType; import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; -import static java.lang.Math.min; -import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS; - /** - * This Range Assignor inherits properties of both the range assignor and the sticky assignor. - * The properties are as follows: + * A range assignor assigns contiguous partition ranges to members of a consumer group such that: *
    - *
  1. Each member must get at least one partition from every topic that it is subscribed to. - * The only exception is when the number of subscribed members is greater than the - * number of partitions for that topic. (Range)
  2. - *
  3. Partitions should be assigned to members in a way that facilitates the join operation when required. (Range) - * This can only be done if every member is subscribed to the same topics and the topics are co-partitioned. - * Two streams are co-partitioned if the following conditions are met: - *
      - *
    • The keys must have the same schemas.
    • - *
    • The topics involved must have the same number of partitions.
    • - *
    - *
  4. - *
  5. Members should retain as much of their previous assignment as possible to reduce the number of partition - * movements during reassignment. (Sticky)
  6. + *
  7. Each subscribed member receives at least one partition from that topic.
  8. + *
  9. Each member receives the same partition number from every subscribed topic when co-partitioning is possible.
  10. *
+ * + * Co-partitioning is possible when the below conditions are satisfied: + *
    + *
  1. All the members are subscribed to the same set of topics.
  2. + *
  3. All the topics have the same number of partitions.
  4. + *
+ * + * Co-partitioning is useful in performing joins on data streams. + * + *

For example, suppose there are two members M0 and M1, two topics T1 and T2, and each topic has 3 partitions. + * + *

The co-partitioned assignment will be: + *

+ * + * Since the introduction of static membership, we could leverage member.instance.id to make the + * assignment behavior more sticky. + * For the above example, after one rolling bounce, the group coordinator will attempt to assign new member Ids towards + * members, for example if M0 -> M3 M1 -> M2. + * + *

The assignment could be completely shuffled to: + *

+ * + * The assignment change was caused by the change of member.id relative order, and + * can be avoided by setting the instance.id. + * Members will have individual instance Ids I0, I1. As long as + * 1. Number of members remain the same. + * 2. Topic metadata doesn't change. + * 3. Subscription pattern doesn't change for any member. + * + *

The assignment will always be: + *

+ *

*/ public class RangeAssignor implements ConsumerGroupPartitionAssignor { public static final String RANGE_ASSIGNOR_NAME = "range"; @@ -65,191 +92,225 @@ public String name() { } /** - * Pair of memberId and remaining partitions to meet the quota. + * Metadata for a topic including partition and subscription details. */ - private static class MemberWithRemainingAssignments { + private static class TopicMetadata { + private final Uuid topicId; + private final int numPartitions; + private int numMembers; + private int minQuota = -1; + private int extraPartitions = -1; + private int nextRange = 0; + /** - * Member Id. + * Constructs a new TopicMetadata instance. + * + * @param topicId The topic Id. + * @param numPartitions The number of partitions. + * @param numMembers The number of subscribed members. */ - private final String memberId; + private TopicMetadata(Uuid topicId, int numPartitions, int numMembers) { + this.topicId = topicId; + this.numPartitions = numPartitions; + this.numMembers = numMembers; + } /** - * Number of partitions required to meet the assignment quota. + * Computes the minimum partition quota per member and the extra partitions, if not already computed. */ - private final int remaining; + private void maybeComputeQuota() { + if (minQuota != -1) return; - public MemberWithRemainingAssignments(String memberId, int remaining) { - this.memberId = memberId; - this.remaining = remaining; + // The minimum number of partitions each member should receive for a balanced assignment. + minQuota = numPartitions / numMembers; + + // Extra partitions to be distributed one to each member. + extraPartitions = numPartitions % numMembers; + } + + @Override + public String toString() { + return "TopicMetadata(topicId=" + topicId + + ", numPartitions=" + numPartitions + + ", numMembers=" + numMembers + + ", minQuota=" + minQuota + + ", extraPartitions=" + extraPartitions + + ", nextRange=" + nextRange + + ')'; } } /** - * Returns a map of topic Ids to a list of members subscribed to them, - * based on the given assignment specification and metadata. - * - * @param groupSpec The specification required for group assignments. - * @param subscribedTopicDescriber The metadata describer for subscribed topics and clusters. - * @return A map of topic Ids to a list of member Ids subscribed to them. - * - * @throws PartitionAssignorException If a member is subscribed to a non-existent topic. + * Assigns partitions to members of a homogeneous group. All members are subscribed to the same set of topics. + * Assignment will be co-partitioned when all the topics have an equal number of partitions. */ - private Map> membersPerTopic( - final GroupSpec groupSpec, - final SubscribedTopicDescriber subscribedTopicDescriber - ) { - Map> membersPerTopic = new HashMap<>(); - - if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) { - Collection allMembers = groupSpec.memberIds(); - Collection topics = groupSpec.memberSubscription(groupSpec.memberIds().iterator().next()) - .subscribedTopicIds(); - - for (Uuid topicId : topics) { - if (subscribedTopicDescriber.numPartitions(topicId) == -1) { - throw new PartitionAssignorException("Member is subscribed to a non-existent topic"); - } - membersPerTopic.put(topicId, allMembers); + private GroupAssignment assignHomogeneousGroup( + GroupSpec groupSpec, + SubscribedTopicDescriber subscribedTopicDescriber + ) throws PartitionAssignorException { + List memberIds = sortMemberIds(groupSpec); + int numMembers = groupSpec.memberIds().size(); + + MemberSubscription subs = groupSpec.memberSubscription(memberIds.get(0)); + List topics = new ArrayList<>(subs.subscribedTopicIds().size()); + + for (Uuid topicId : subs.subscribedTopicIds()) { + int numPartitions = subscribedTopicDescriber.numPartitions(topicId); + if (numPartitions == -1) { + throw new PartitionAssignorException("Member is subscribed to a non-existent topic"); } - } else { - groupSpec.memberIds().forEach(memberId -> { - Collection topics = groupSpec.memberSubscription(memberId).subscribedTopicIds(); - for (Uuid topicId : topics) { - if (subscribedTopicDescriber.numPartitions(topicId) == -1) { - throw new PartitionAssignorException("Member is subscribed to a non-existent topic"); - } - membersPerTopic - .computeIfAbsent(topicId, k -> new ArrayList<>()) - .add(memberId); - } - }); + TopicMetadata m = new TopicMetadata( + topicId, + numPartitions, + numMembers + ); + topics.add(m); } - return membersPerTopic; + Map assignments = new HashMap<>((int) ((groupSpec.memberIds().size() / 0.75f) + 1)); + int memberAssignmentInitialCapacity = (int) ((topics.size() / 0.75f) + 1); + + for (String memberId : memberIds) { + Map> assignment = new HashMap<>(memberAssignmentInitialCapacity); + for (TopicMetadata topicMetadata : topics) { + topicMetadata.maybeComputeQuota(); + addPartitionsToAssignment(topicMetadata, assignment); + } + assignments.put(memberId, new MemberAssignmentImpl(assignment)); + } + + return new GroupAssignment(assignments); } /** - * The algorithm includes the following steps: - *

    - *
  1. Generate a map of members per topic using the given member subscriptions.
  2. - *
  3. Generate a list of members called potentially unfilled members, which consists of members that have not - * met the minimum required quota of partitions for the assignment AND get a list called assigned sticky - * partitions for topic, which has the partitions that will be retained in the new assignment.
  4. - *
  5. Generate a list of unassigned partitions by calculating the difference between the total partitions - * for the topic and the assigned (sticky) partitions.
  6. - *
  7. Find members from the potentially unfilled members list that haven't met the total required quota - * i.e. minRequiredQuota + 1, if the member is designated to receive one of the excess partitions OR - * minRequiredQuota otherwise.
  8. - *
  9. Assign partitions to them in ranges from the unassigned partitions per topic - * based on the remaining partitions value.
  10. - *
+ * Assigns partitions to members of a heterogeneous group. Not all members are subscribed to the same topics. */ - @Override - public GroupAssignment assign( - final GroupSpec groupSpec, - final SubscribedTopicDescriber subscribedTopicDescriber + private GroupAssignment assignHeterogeneousGroup( + GroupSpec groupSpec, + SubscribedTopicDescriber subscribedTopicDescriber ) throws PartitionAssignorException { + List memberIds = sortMemberIds(groupSpec); + + Map topics = new HashMap<>(); - Map newTargetAssignment = new HashMap<>(); - - // Step 1 - Map> membersPerTopic = membersPerTopic( - groupSpec, - subscribedTopicDescriber - ); - - membersPerTopic.forEach((topicId, membersForTopic) -> { - int numPartitionsForTopic = subscribedTopicDescriber.numPartitions(topicId); - int minRequiredQuota = numPartitionsForTopic / membersForTopic.size(); - // Each member can get only ONE extra partition per topic after receiving the minimum quota. - int numMembersWithExtraPartition = numPartitionsForTopic % membersForTopic.size(); - - // Step 2 - Set assignedStickyPartitionsForTopic = new HashSet<>(); - List potentiallyUnfilledMembers = new ArrayList<>(); - - for (String memberId : membersForTopic) { - Set assignedPartitionsForTopic = groupSpec - .memberAssignment(memberId) - .partitions() - .getOrDefault(topicId, Collections.emptySet()); - - int currentAssignmentSize = assignedPartitionsForTopic.size(); - List currentAssignmentListForTopic = new ArrayList<>(assignedPartitionsForTopic); - - // If there were partitions from this topic that were previously assigned to this member, retain as many as possible. - // Sort the current assignment in ascending order since we want the same partition numbers from each topic - // to go to the same member, in order to facilitate joins in case of co-partitioned topics. - if (currentAssignmentSize > 0) { - int retainedPartitionsCount = min(currentAssignmentSize, minRequiredQuota); - Collections.sort(currentAssignmentListForTopic); - for (int i = 0; i < retainedPartitionsCount; i++) { - assignedStickyPartitionsForTopic - .add(currentAssignmentListForTopic.get(i)); - newTargetAssignment.computeIfAbsent(memberId, k -> new MemberAssignmentImpl(new HashMap<>())) - .partitions() - .computeIfAbsent(topicId, k -> new HashSet<>()) - .add(currentAssignmentListForTopic.get(i)); + for (String memberId : memberIds) { + MemberSubscription subs = groupSpec.memberSubscription(memberId); + for (Uuid topicId : subs.subscribedTopicIds()) { + TopicMetadata topicMetadata = topics.computeIfAbsent(topicId, __ -> { + int numPartitions = subscribedTopicDescriber.numPartitions(topicId); + if (numPartitions == -1) { + throw new PartitionAssignorException("Member is subscribed to a non-existent topic"); } - } - - // Number of partitions required to meet the minRequiredQuota. - // There are 3 cases w.r.t the value of remaining: - // 1) remaining < 0: this means that the member has more than the min required amount. - // 2) If remaining = 0: member has the minimum required partitions, but it may get an extra partition, so it is a potentially unfilled member. - // 3) If remaining > 0: member doesn't have the minimum required partitions, so it should be added to potentiallyUnfilledMembers. - int remaining = minRequiredQuota - currentAssignmentSize; - - // Retain extra partitions as well when applicable. - if (remaining < 0 && numMembersWithExtraPartition > 0) { - numMembersWithExtraPartition--; - // Since we already added the minimumRequiredQuota of partitions in the previous step (until minReq - 1), we just need to - // add the extra partition that will be present at the index right after min quota was satisfied. - assignedStickyPartitionsForTopic - .add(currentAssignmentListForTopic.get(minRequiredQuota)); - newTargetAssignment.computeIfAbsent(memberId, k -> new MemberAssignmentImpl(new HashMap<>())) - .partitions() - .computeIfAbsent(topicId, k -> new HashSet<>()) - .add(currentAssignmentListForTopic.get(minRequiredQuota)); - } else { - MemberWithRemainingAssignments newPair = new MemberWithRemainingAssignments(memberId, remaining); - potentiallyUnfilledMembers.add(newPair); - } + + return new TopicMetadata( + topicId, + numPartitions, + 0 + ); + }); + topicMetadata.numMembers++; } + } + + Map assignments = new HashMap<>((int) ((groupSpec.memberIds().size() / 0.75f) + 1)); - // Step 3 - // Find the difference between the total partitions per topic and the already assigned sticky partitions for the topic to get the unassigned partitions. - // List of unassigned partitions for topic contains the partitions in ascending order. - List unassignedPartitionsForTopic = new ArrayList<>(); - for (int i = 0; i < numPartitionsForTopic; i++) { - if (!assignedStickyPartitionsForTopic.contains(i)) { - unassignedPartitionsForTopic.add(i); - } + for (String memberId : memberIds) { + MemberSubscription subs = groupSpec.memberSubscription(memberId); + Map> assignment = new HashMap<>((int) ((subs.subscribedTopicIds().size() / 0.75f) + 1)); + for (Uuid topicId : subs.subscribedTopicIds()) { + TopicMetadata metadata = topics.get(topicId); + metadata.maybeComputeQuota(); + addPartitionsToAssignment(metadata, assignment); } + assignments.put(memberId, new MemberAssignmentImpl(assignment)); + } + + return new GroupAssignment(assignments); + } - // Step 4 and Step 5 - // Account for the extra partitions if necessary and increase the required quota by 1. - // If remaining > 0 after increasing the required quota, assign the remaining number of partitions from the unassigned partitions list. - int unassignedPartitionsListStartPointer = 0; - for (MemberWithRemainingAssignments pair : potentiallyUnfilledMembers) { - String memberId = pair.memberId; - int remaining = pair.remaining; - if (numMembersWithExtraPartition > 0) { - remaining++; - numMembersWithExtraPartition--; - } - if (remaining > 0) { - List partitionsToAssign = unassignedPartitionsForTopic - .subList(unassignedPartitionsListStartPointer, unassignedPartitionsListStartPointer + remaining); - unassignedPartitionsListStartPointer += remaining; - newTargetAssignment.computeIfAbsent(memberId, k -> new MemberAssignmentImpl(new HashMap<>())) - .partitions() - .computeIfAbsent(topicId, k -> new HashSet<>()) - .addAll(partitionsToAssign); - } + /** + * Sorts members based on their instance Ids if available or by member Ids if not. + * + * Static members are placed first and non-static members follow. + * + * Prioritizing static members helps them retain their partitions, enhancing stickiness + * and stability. Non-static members, which do not have guaranteed rejoining Ids, are placed + * later, allowing for more dynamic and flexible partition assignments. + * + * @param groupSpec The group specification containing the member information. + * @return A sorted list of member Ids. + */ + private List sortMemberIds( + GroupSpec groupSpec + ) { + List sortedMemberIds = new ArrayList<>(groupSpec.memberIds()); + + sortedMemberIds.sort((memberId1, memberId2) -> { + Optional instanceId1 = groupSpec.memberSubscription(memberId1).instanceId(); + Optional instanceId2 = groupSpec.memberSubscription(memberId2).instanceId(); + + if (instanceId1.isPresent() && instanceId2.isPresent()) { + return instanceId1.get().compareTo(instanceId2.get()); + } else if (instanceId1.isPresent()) { + return -1; + } else if (instanceId2.isPresent()) { + return 1; + } else { + return memberId1.compareTo(memberId2); } }); + return sortedMemberIds; + } + + /** + * Assigns a range of partitions to the specified topic based on the provided metadata. + * + * @param topicMetadata Metadata containing the topic details, including the number of partitions, + * the next range to assign, minQuota, and extra partitions. + * @param memberAssignment Map from topic Id to the set of assigned partition Ids. + */ + private void addPartitionsToAssignment( + TopicMetadata topicMetadata, + Map> memberAssignment + ) { + int start = topicMetadata.nextRange; + int quota = topicMetadata.minQuota; + + // Adjust quota to account for extra partitions if available. + if (topicMetadata.extraPartitions > 0) { + quota++; + topicMetadata.extraPartitions--; + } + + // Calculate the end using the quota. + int end = Math.min(start + quota, topicMetadata.numPartitions); + + topicMetadata.nextRange = end; + + if (start < end) { + memberAssignment.put(topicMetadata.topicId, new RangeSet(start, end)); + } + } - return new GroupAssignment(newTargetAssignment); + /** + * Assigns partitions to members based on their topic subscriptions and the properties of a range assignor: + * + * @param groupSpec The group specification containing the member information. + * @param subscribedTopicDescriber The describer for subscribed topics to get the number of partitions. + * @return The group's assignment with the partition assignments for each member. + * @throws PartitionAssignorException if any member is subscribed to a non-existent topic. + */ + @Override + public GroupAssignment assign( + GroupSpec groupSpec, + SubscribedTopicDescriber subscribedTopicDescriber + ) throws PartitionAssignorException { + if (groupSpec.memberIds().isEmpty()) { + return new GroupAssignment(Collections.emptyMap()); + } else if (groupSpec.subscriptionType() == SubscriptionType.HOMOGENEOUS) { + return assignHomogeneousGroup(groupSpec, subscribedTopicDescriber); + } else { + return assignHeterogeneousGroup(groupSpec, subscribedTopicDescriber); + } } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeSet.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeSet.java new file mode 100644 index 0000000000000..867cdd9c55619 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeSet.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import java.lang.reflect.Array; +import java.util.Collection; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; + +/** + * A {@code RangeSet} represents a range of integers from {@code from} (inclusive) + * to {@code to} (exclusive). + * This implementation provides a view over a continuous range of integers without actually storing them. + */ +class RangeSet implements Set { + private final int from; + private final int to; + + /** + * Constructs a {@code RangeSet} with the specified range. + * + * @param from The starting value (inclusive) of the range. + * @param to The ending value (exclusive) of the range. + */ + public RangeSet(int from, int to) { + this.from = from; + this.to = to; + } + + @Override + public int size() { + return to - from; + } + + @Override + public boolean isEmpty() { + return size() == 0; + } + + @Override + public boolean contains(Object o) { + if (o instanceof Integer) { + int value = (Integer) o; + return value >= from && value < to; + } + return false; + } + + @Override + public Iterator iterator() { + return new Iterator() { + private int current = from; + + @Override + public boolean hasNext() { + return current < to; + } + + @Override + public Integer next() { + if (!hasNext()) throw new NoSuchElementException(); + return current++; + } + }; + } + + @Override + public Object[] toArray() { + Object[] array = new Object[size()]; + for (int i = 0; i < size(); i++) { + array[i] = from + i; + } + return array; + } + + @Override + @SuppressWarnings("unchecked") + public T[] toArray(T[] a) { + int size = size(); + if (a.length < size) { + // Create a new array of the same type as a with the correct size + a = (T[]) Array.newInstance(a.getClass().getComponentType(), size); + } + for (int i = 0; i < size; i++) { + a[i] = (T) Integer.valueOf(from + i); + } + if (a.length > size) { + a[size] = null; + } + return a; + } + + @Override + public boolean add(Integer integer) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean remove(Object o) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean containsAll(Collection c) { + for (Object o : c) { + if (!contains(o)) return false; + } + return true; + } + + @Override + public boolean addAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean retainAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean removeAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public void clear() { + throw new UnsupportedOperationException(); + } + + @Override + public String toString() { + return "RangeSet(from=" + from + " (inclusive), to=" + to + " (exclusive))"; + } + + /** + * Compares the specified object with this set for equality. + * Returns {@code true} if the specified object is also a set, + * the two sets have the same size, and every member of the specified + * set is contained in this set. + * + * @param o object to be compared for equality with this set + * @return {@code true} if the specified object is equal to this set + */ + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof Set)) return false; + + if (o instanceof RangeSet) { + RangeSet other = (RangeSet) o; + return this.from == other.from && this.to == other.to; + } + + Set otherSet = (Set) o; + if (otherSet.size() != this.size()) return false; + + for (int i = from; i < to; i++) { + if (!otherSet.contains(i)) return false; + } + return true; + } + + @Override + public int hashCode() { + int result = from; + result = 31 * result + to; + return result; + } +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/MemberSubscriptionAndAssignmentImpl.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/MemberSubscriptionAndAssignmentImpl.java index 77cef2c92fc46..c9f3e3da1512d 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/MemberSubscriptionAndAssignmentImpl.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/MemberSubscriptionAndAssignmentImpl.java @@ -30,6 +30,7 @@ */ public class MemberSubscriptionAndAssignmentImpl implements MemberSubscription, MemberAssignment { private final Optional rackId; + private final Optional instanceId; private final Set subscribedTopicIds; private final Assignment memberAssignment; @@ -42,10 +43,12 @@ public class MemberSubscriptionAndAssignmentImpl implements MemberSubscription, */ public MemberSubscriptionAndAssignmentImpl( Optional rackId, + Optional instanceId, Set subscribedTopicIds, Assignment memberAssignment ) { this.rackId = Objects.requireNonNull(rackId); + this.instanceId = Objects.requireNonNull(instanceId); this.subscribedTopicIds = Objects.requireNonNull(subscribedTopicIds); this.memberAssignment = Objects.requireNonNull(memberAssignment); } @@ -55,6 +58,11 @@ public Optional rackId() { return rackId; } + @Override + public Optional instanceId() { + return instanceId; + } + @Override public Set subscribedTopicIds() { return subscribedTopicIds; @@ -71,6 +79,7 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) return false; MemberSubscriptionAndAssignmentImpl that = (MemberSubscriptionAndAssignmentImpl) o; return rackId.equals(that.rackId) && + instanceId.equals(that.instanceId) && subscribedTopicIds.equals(that.subscribedTopicIds) && memberAssignment.equals(that.memberAssignment); } @@ -78,6 +87,7 @@ public boolean equals(Object o) { @Override public int hashCode() { int result = rackId.hashCode(); + result = 31 * result + instanceId.hashCode(); result = 31 * result + subscribedTopicIds.hashCode(); result = 31 * result + memberAssignment.hashCode(); return result; @@ -86,6 +96,7 @@ public int hashCode() { @Override public String toString() { return "MemberSubscriptionAndAssignmentImpl(rackId=" + rackId.orElse("N/A") + + ", instanceId=" + instanceId + ", subscribedTopicIds=" + subscribedTopicIds + ", memberAssignment=" + memberAssignment + ')'; diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilder.java index 4e5d35aa592f9..fe0a0e5245aa7 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilder.java @@ -390,6 +390,7 @@ static MemberSubscriptionAndAssignmentImpl createM ) { return new MemberSubscriptionAndAssignmentImpl( Optional.ofNullable(member.rackId()), + Optional.ofNullable(member.instanceId()), new TopicIds(member.subscribedTopicNames(), topicsImage), memberAssignment ); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImplTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImplTest.java index 0d7ea3d3853c4..379d149acfa5b 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImplTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImplTest.java @@ -54,6 +54,7 @@ void setUp() { topicId = Uuid.randomUuid(); members.put(TEST_MEMBER, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topicId), Assignment.EMPTY @@ -101,6 +102,7 @@ void testMemberAssignment() { mkSet(0, 1) ); members.put(TEST_MEMBER, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topicId), new Assignment(topicPartitions) diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java index 65537dee48a59..39299a01c5d5e 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java @@ -79,6 +79,7 @@ public void testOneMemberNoTopicSubscription() { Map members = Collections.singletonMap( memberA, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), Collections.emptySet(), Assignment.EMPTY @@ -116,6 +117,7 @@ public void testOneMemberSubscribedToNonexistentTopic() { Map members = Collections.singletonMap( memberA, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), Collections.singleton(topic2Uuid), Assignment.EMPTY @@ -151,12 +153,14 @@ public void testFirstAssignmentTwoMembersTwoTopicsNoMemberRacks() { Map members = new TreeMap<>(); members.put(memberA, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topic1Uuid, topic3Uuid), Assignment.EMPTY )); members.put(memberB, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topic1Uuid, topic3Uuid), Assignment.EMPTY @@ -200,18 +204,21 @@ public void testFirstAssignmentNumMembersGreaterThanTotalNumPartitions() { Map members = new TreeMap<>(); members.put(memberA, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), Collections.singleton(topic3Uuid), Assignment.EMPTY )); members.put(memberB, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), Collections.singleton(topic3Uuid), Assignment.EMPTY )); members.put(memberC, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), Collections.singleton(topic3Uuid), Assignment.EMPTY @@ -261,6 +268,7 @@ public void testValidityAndBalanceForLargeSampleSet() { Map members = new TreeMap<>(); for (int i = 1; i < 50; i++) { members.put("member" + i, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), topicMetadata.keySet(), Assignment.EMPTY @@ -301,6 +309,7 @@ public void testReassignmentForTwoMembersTwoTopicsGivenUnbalancedPrevAssignment( Map members = new TreeMap<>(); members.put(memberA, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topic1Uuid, topic2Uuid), new Assignment(mkOrderedAssignment( @@ -310,6 +319,7 @@ public void testReassignmentForTwoMembersTwoTopicsGivenUnbalancedPrevAssignment( )); members.put(memberB, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topic1Uuid, topic2Uuid), new Assignment(mkOrderedAssignment( @@ -364,6 +374,7 @@ public void testReassignmentWhenPartitionsAreAddedForTwoMembersTwoTopics() { Map members = new TreeMap<>(); members.put(memberA, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topic1Uuid, topic2Uuid), new Assignment(mkOrderedAssignment( @@ -373,6 +384,7 @@ public void testReassignmentWhenPartitionsAreAddedForTwoMembersTwoTopics() { )); members.put(memberB, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topic1Uuid, topic2Uuid), new Assignment(mkOrderedAssignment( @@ -426,6 +438,7 @@ public void testReassignmentWhenOneMemberAddedAfterInitialAssignmentWithTwoMembe Map members = new TreeMap<>(); members.put(memberA, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topic1Uuid, topic2Uuid), new Assignment(mkOrderedAssignment( @@ -435,6 +448,7 @@ public void testReassignmentWhenOneMemberAddedAfterInitialAssignmentWithTwoMembe )); members.put(memberB, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topic1Uuid, topic2Uuid), new Assignment(mkOrderedAssignment( @@ -445,6 +459,7 @@ public void testReassignmentWhenOneMemberAddedAfterInitialAssignmentWithTwoMembe // Add a new member to trigger a re-assignment. members.put(memberC, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topic1Uuid, topic2Uuid), Assignment.EMPTY @@ -497,6 +512,7 @@ public void testReassignmentWhenOneMemberRemovedAfterInitialAssignmentWithThreeM Map members = new TreeMap<>(); members.put(memberA, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topic1Uuid, topic2Uuid), new Assignment(mkAssignment( @@ -506,6 +522,7 @@ public void testReassignmentWhenOneMemberRemovedAfterInitialAssignmentWithThreeM )); members.put(memberB, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topic1Uuid, topic2Uuid), new Assignment(mkAssignment( @@ -562,6 +579,7 @@ public void testReassignmentWhenOneSubscriptionRemovedAfterInitialAssignmentWith Map members = new TreeMap<>(); members.put(memberA, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), Collections.singleton(topic2Uuid), new Assignment(mkAssignment( @@ -571,6 +589,7 @@ public void testReassignmentWhenOneSubscriptionRemovedAfterInitialAssignmentWith )); members.put(memberB, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), Collections.singleton(topic2Uuid), new Assignment(mkAssignment( diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java index a611f33eef4a8..64b946f906ec7 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java @@ -19,9 +19,13 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment; import org.apache.kafka.coordinator.group.api.assignor.GroupSpec; +import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment; import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType; import org.apache.kafka.coordinator.group.modern.Assignment; import org.apache.kafka.coordinator.group.modern.GroupSpecImpl; +import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl; import org.apache.kafka.coordinator.group.modern.MemberSubscriptionAndAssignmentImpl; import org.apache.kafka.coordinator.group.modern.SubscribedTopicDescriberImpl; import org.apache.kafka.coordinator.group.modern.TopicMetadata; @@ -42,7 +46,6 @@ import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HETEROGENEOUS; import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; public class RangeAssignorTest { @@ -58,7 +61,7 @@ public class RangeAssignorTest { private final String memberC = "C"; @Test - public void testOneConsumerNoTopic() { + public void testOneMemberNoTopic() { SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl( Collections.singletonMap( topic1Uuid, @@ -74,6 +77,7 @@ public void testOneConsumerNoTopic() { Map members = Collections.singletonMap( memberA, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), Collections.emptySet(), Assignment.EMPTY @@ -91,11 +95,16 @@ public void testOneConsumerNoTopic() { subscribedTopicMetadata ); - assertEquals(Collections.emptyMap(), groupAssignment.members()); + Map expectedAssignment = Collections.singletonMap( + memberA, + new MemberAssignmentImpl(Collections.emptyMap()) + ); + + assertEquals(expectedAssignment, groupAssignment.members()); } @Test - public void testOneConsumerSubscribedToNonExistentTopic() { + public void testOneMemberSubscribedToNonExistentTopic() { SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl( Collections.singletonMap( topic1Uuid, @@ -111,6 +120,7 @@ public void testOneConsumerSubscribedToNonExistentTopic() { Map members = Collections.singletonMap( memberA, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topic2Uuid), Assignment.EMPTY @@ -128,7 +138,7 @@ public void testOneConsumerSubscribedToNonExistentTopic() { } @Test - public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() { + public void testFirstAssignmentTwoMembersTwoTopicsSameSubscriptions() { Map topicMetadata = new HashMap<>(); topicMetadata.put(topic1Uuid, new TopicMetadata( topic1Uuid, @@ -146,12 +156,14 @@ public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() { Map members = new TreeMap<>(); members.put(memberA, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topic1Uuid, topic3Uuid), Assignment.EMPTY )); members.put(memberB, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topic1Uuid, topic3Uuid), Assignment.EMPTY @@ -183,7 +195,7 @@ public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() { } @Test - public void testFirstAssignmentThreeConsumersThreeTopicsDifferentSubscriptions() { + public void testFirstAssignmentThreeMembersThreeTopicsDifferentSubscriptions() { Map topicMetadata = new HashMap<>(); topicMetadata.put(topic1Uuid, new TopicMetadata( topic1Uuid, @@ -207,18 +219,21 @@ public void testFirstAssignmentThreeConsumersThreeTopicsDifferentSubscriptions() Map members = new TreeMap<>(); members.put(memberA, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topic1Uuid, topic2Uuid), Assignment.EMPTY )); members.put(memberB, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topic3Uuid), Assignment.EMPTY )); members.put(memberC, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topic2Uuid, topic3Uuid), Assignment.EMPTY @@ -253,7 +268,7 @@ public void testFirstAssignmentThreeConsumersThreeTopicsDifferentSubscriptions() } @Test - public void testFirstAssignmentNumConsumersGreaterThanNumPartitions() { + public void testFirstAssignmentNumMembersGreaterThanNumPartitions() { Map topicMetadata = new HashMap<>(); topicMetadata.put(topic1Uuid, new TopicMetadata( topic1Uuid, @@ -271,18 +286,21 @@ public void testFirstAssignmentNumConsumersGreaterThanNumPartitions() { Map members = new TreeMap<>(); members.put(memberA, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topic1Uuid, topic3Uuid), Assignment.EMPTY )); members.put(memberB, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topic1Uuid, topic3Uuid), Assignment.EMPTY )); members.put(memberC, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topic1Uuid, topic3Uuid), Assignment.EMPTY @@ -301,7 +319,7 @@ public void testFirstAssignmentNumConsumersGreaterThanNumPartitions() { ); Map>> expectedAssignment = new HashMap<>(); - // Topic 3 has 2 partitions but three consumers subscribed to it - one of them will not get a partition. + // Topic 3 has 2 partitions but three Members subscribed to it - one of them will not get a partition. expectedAssignment.put(memberA, mkAssignment( mkTopicAssignment(topic1Uuid, 0), mkTopicAssignment(topic3Uuid, 0) @@ -318,7 +336,163 @@ public void testFirstAssignmentNumConsumersGreaterThanNumPartitions() { } @Test - public void testReassignmentNumConsumersGreaterThanNumPartitionsWhenOneConsumerAdded() { + public void testStaticMembership() throws PartitionAssignorException { + SubscribedTopicDescriber subscribedTopicMetadata = new SubscribedTopicDescriberImpl( + Collections.singletonMap( + topic1Uuid, + new TopicMetadata( + topic1Uuid, + topic1Name, + 3, + Collections.emptyMap() + ) + ) + ); + + Map members = new TreeMap<>(); + members.put(memberA, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), + Optional.of("instanceA"), + Collections.singleton(topic1Uuid), + Assignment.EMPTY + )); + members.put(memberB, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), + Optional.of("instanceB"), + Collections.singleton(topic1Uuid), + Assignment.EMPTY + )); + + GroupSpec groupSpec = new GroupSpecImpl( + members, + SubscriptionType.HOMOGENEOUS, + invertedTargetAssignment(members) + ); + + GroupAssignment initialAssignment = assignor.assign( + groupSpec, + subscribedTopicMetadata + ); + + // Remove static memberA and add it back with a different member Id but same instance Id. + members.remove(memberA); + members.put("memberA1", new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), + Optional.of("instanceA"), + Collections.singleton(topic1Uuid), + Assignment.EMPTY + )); + + groupSpec = new GroupSpecImpl( + members, + SubscriptionType.HOMOGENEOUS, + invertedTargetAssignment(members) + ); + + GroupAssignment reassignedAssignment = assignor.assign( + groupSpec, + subscribedTopicMetadata + ); + + // Assert that the assignment did not change + assertEquals( + initialAssignment.members().get(memberA).partitions(), + reassignedAssignment.members().get("memberA1").partitions() + ); + + assertEquals( + initialAssignment.members().get(memberB).partitions(), + reassignedAssignment.members().get(memberB).partitions() + ); + } + + @Test + public void testMixedStaticMembership() throws PartitionAssignorException { + SubscribedTopicDescriber subscribedTopicMetadata = new SubscribedTopicDescriberImpl( + Collections.singletonMap( + topic1Uuid, + new TopicMetadata( + topic1Uuid, + topic1Name, + 5, + Collections.emptyMap() + ) + ) + ); + + // Initialize members with instance Ids. + Map members = new TreeMap<>(); + members.put(memberA, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), + Optional.of("instanceA"), + Collections.singleton(topic1Uuid), + Assignment.EMPTY + )); + members.put(memberC, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), + Optional.of("instanceC"), + Collections.singleton(topic1Uuid), + Assignment.EMPTY + )); + + // Initialize member without an instance Id. + members.put(memberB, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), + Optional.empty(), + Collections.singleton(topic1Uuid), + Assignment.EMPTY + )); + + GroupSpec groupSpec = new GroupSpecImpl( + members, + SubscriptionType.HOMOGENEOUS, + invertedTargetAssignment(members) + ); + + GroupAssignment initialAssignment = assignor.assign( + groupSpec, + subscribedTopicMetadata + ); + + // Remove memberA and add it back with a different member Id but same instance Id. + members.remove(memberA); + members.put("memberA1", new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), + Optional.of("instanceA"), + Collections.singleton(topic1Uuid), + Assignment.EMPTY + )); + + groupSpec = new GroupSpecImpl( + members, + SubscriptionType.HOMOGENEOUS, + invertedTargetAssignment(members) + ); + + GroupAssignment reassignedAssignment = assignor.assign( + groupSpec, + subscribedTopicMetadata + ); + + // Assert that the assignments did not change. + assertEquals( + initialAssignment.members().get(memberA).partitions(), + reassignedAssignment.members().get("memberA1").partitions() + ); + + assertEquals( + initialAssignment.members().get(memberB).partitions(), + reassignedAssignment.members().get(memberB).partitions() + ); + + assertEquals( + initialAssignment.members().get(memberC).partitions(), + reassignedAssignment.members().get(memberC).partitions() + ); + } + + @Test + public void testReassignmentNumMembersGreaterThanNumPartitionsWhenOneMemberAdded() { Map topicMetadata = new HashMap<>(); topicMetadata.put(topic1Uuid, new TopicMetadata( topic1Uuid, @@ -336,6 +510,7 @@ public void testReassignmentNumConsumersGreaterThanNumPartitionsWhenOneConsumerA Map members = new TreeMap<>(); members.put(memberA, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topic1Uuid, topic2Uuid), new Assignment(mkAssignment( @@ -345,6 +520,7 @@ public void testReassignmentNumConsumersGreaterThanNumPartitionsWhenOneConsumerA )); members.put(memberB, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topic1Uuid, topic2Uuid), new Assignment(mkAssignment( @@ -353,8 +529,9 @@ public void testReassignmentNumConsumersGreaterThanNumPartitionsWhenOneConsumerA )) )); - // Add a new consumer to trigger a re-assignment + // Add a new Member to trigger a re-assignment members.put(memberC, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topic1Uuid, topic2Uuid), Assignment.EMPTY @@ -381,14 +558,14 @@ public void testReassignmentNumConsumersGreaterThanNumPartitionsWhenOneConsumerA mkTopicAssignment(topic1Uuid, 1), mkTopicAssignment(topic2Uuid, 1) )); + // Member C shouldn't get any assignment. + expectedAssignment.put(memberC, Collections.emptyMap()); - // Consumer C shouldn't get any assignment, due to stickiness A, B retain their assignments - assertNull(computedAssignment.members().get(memberC)); assertAssignment(expectedAssignment, computedAssignment); } @Test - public void testReassignmentWhenOnePartitionAddedForTwoConsumersTwoTopics() { + public void testReassignmentWhenOnePartitionAddedForTwoMembersTwoTopics() { // Simulating adding a partition - originally T1 -> 3 Partitions and T2 -> 3 Partitions Map topicMetadata = new HashMap<>(); topicMetadata.put(topic1Uuid, new TopicMetadata( @@ -407,6 +584,7 @@ public void testReassignmentWhenOnePartitionAddedForTwoConsumersTwoTopics() { Map members = new TreeMap<>(); members.put(memberA, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topic1Uuid, topic2Uuid), new Assignment(mkAssignment( @@ -416,6 +594,7 @@ public void testReassignmentWhenOnePartitionAddedForTwoConsumersTwoTopics() { )); members.put(memberB, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topic1Uuid, topic2Uuid), new Assignment(mkAssignment( @@ -450,7 +629,7 @@ public void testReassignmentWhenOnePartitionAddedForTwoConsumersTwoTopics() { } @Test - public void testReassignmentWhenOneConsumerAddedAfterInitialAssignmentWithTwoConsumersTwoTopics() { + public void testReassignmentWhenOneMemberAddedAfterInitialAssignmentWithTwoMembersTwoTopics() { Map topicMetadata = new HashMap<>(); topicMetadata.put(topic1Uuid, new TopicMetadata( topic1Uuid, @@ -468,6 +647,7 @@ public void testReassignmentWhenOneConsumerAddedAfterInitialAssignmentWithTwoCon Map members = new TreeMap<>(); members.put(memberA, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topic1Uuid, topic2Uuid), new Assignment(mkAssignment( @@ -477,6 +657,7 @@ public void testReassignmentWhenOneConsumerAddedAfterInitialAssignmentWithTwoCon )); members.put(memberB, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topic1Uuid, topic2Uuid), new Assignment(mkAssignment( @@ -485,8 +666,9 @@ public void testReassignmentWhenOneConsumerAddedAfterInitialAssignmentWithTwoCon )) )); - // Add a new consumer to trigger a re-assignment + // Add a new Member to trigger a re-assignment members.put(memberC, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topic1Uuid, topic2Uuid), Assignment.EMPTY @@ -510,19 +692,19 @@ public void testReassignmentWhenOneConsumerAddedAfterInitialAssignmentWithTwoCon mkTopicAssignment(topic2Uuid, 0) )); expectedAssignment.put(memberB, mkAssignment( - mkTopicAssignment(topic1Uuid, 2), - mkTopicAssignment(topic2Uuid, 2) - )); - expectedAssignment.put(memberC, mkAssignment( mkTopicAssignment(topic1Uuid, 1), mkTopicAssignment(topic2Uuid, 1) )); + expectedAssignment.put(memberC, mkAssignment( + mkTopicAssignment(topic1Uuid, 2), + mkTopicAssignment(topic2Uuid, 2) + )); assertAssignment(expectedAssignment, computedAssignment); } @Test - public void testReassignmentWhenOneConsumerAddedAndOnePartitionAfterInitialAssignmentWithTwoConsumersTwoTopics() { + public void testReassignmentWhenOneMemberAddedAndOnePartitionAfterInitialAssignmentWithTwoMembersTwoTopics() { // Add a new partition to topic 1, initially T1 -> 3 partitions Map topicMetadata = new HashMap<>(); topicMetadata.put(topic1Uuid, new TopicMetadata( @@ -541,6 +723,7 @@ public void testReassignmentWhenOneConsumerAddedAndOnePartitionAfterInitialAssig Map members = new TreeMap<>(); members.put(memberA, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topic1Uuid, topic2Uuid), new Assignment(mkAssignment( @@ -550,6 +733,7 @@ public void testReassignmentWhenOneConsumerAddedAndOnePartitionAfterInitialAssig )); members.put(memberB, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topic1Uuid, topic2Uuid), new Assignment(mkAssignment( @@ -558,8 +742,9 @@ public void testReassignmentWhenOneConsumerAddedAndOnePartitionAfterInitialAssig )) )); - // Add a new consumer to trigger a re-assignment + // Add a new Member to trigger a re-assignment members.put(memberC, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topic1Uuid), Assignment.EMPTY @@ -594,7 +779,7 @@ public void testReassignmentWhenOneConsumerAddedAndOnePartitionAfterInitialAssig } @Test - public void testReassignmentWhenOneConsumerRemovedAfterInitialAssignmentWithTwoConsumersTwoTopics() { + public void testReassignmentWhenOneMemberRemovedAfterInitialAssignmentWithTwoMembersTwoTopics() { Map topicMetadata = new HashMap<>(); topicMetadata.put(topic1Uuid, new TopicMetadata( topic1Uuid, @@ -611,9 +796,10 @@ public void testReassignmentWhenOneConsumerRemovedAfterInitialAssignmentWithTwoC Map members = new TreeMap<>(); - // Consumer A was removed + // Member A was removed members.put(memberB, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topic1Uuid, topic2Uuid), new Assignment(mkAssignment( @@ -644,7 +830,7 @@ public void testReassignmentWhenOneConsumerRemovedAfterInitialAssignmentWithTwoC } @Test - public void testReassignmentWhenMultipleSubscriptionsRemovedAfterInitialAssignmentWithThreeConsumersTwoTopics() { + public void testReassignmentWhenMultipleSubscriptionsRemovedAfterInitialAssignmentWithThreeMembersTwoTopics() { Map topicMetadata = new HashMap<>(); topicMetadata.put(topic1Uuid, new TopicMetadata( topic1Uuid, @@ -670,6 +856,7 @@ public void testReassignmentWhenMultipleSubscriptionsRemovedAfterInitialAssignme Map members = new TreeMap<>(); members.put(memberA, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topic1Uuid), new Assignment(mkAssignment( @@ -679,6 +866,7 @@ public void testReassignmentWhenMultipleSubscriptionsRemovedAfterInitialAssignme )); members.put(memberB, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topic1Uuid, topic2Uuid, topic3Uuid), new Assignment(mkAssignment( @@ -687,6 +875,7 @@ public void testReassignmentWhenMultipleSubscriptionsRemovedAfterInitialAssignme )); members.put(memberC, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topic2Uuid), new Assignment(mkAssignment( @@ -723,6 +912,15 @@ public void testReassignmentWhenMultipleSubscriptionsRemovedAfterInitialAssignme assertAssignment(expectedAssignment, computedAssignment); } + /** + * Asserts that the computed group assignment matches the expected assignment. + * + * @param expectedAssignment A map representing the expected assignment for each member. + * The key is the member Id and the value is another map where + * the key is the topic Uuid and the value is a set of assigned partition Ids. + * @param computedGroupAssignment The computed group assignment to be checked against the expected assignment. + * Contains the actual assignments for each member. + */ private void assertAssignment( Map>> expectedAssignment, GroupAssignment computedGroupAssignment diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeSetTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeSetTest.java new file mode 100644 index 0000000000000..23dc0c57fa8d5 --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeSetTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.junit.jupiter.api.Test; + +import java.util.HashSet; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; + +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +public class RangeSetTest { + @Test + void testSize() { + RangeSet rangeSet = new RangeSet(5, 10); + assertEquals(5, rangeSet.size()); + } + + @Test + void testIsEmpty() { + RangeSet rangeSet = new RangeSet(5, 5); + assertTrue(rangeSet.isEmpty()); + } + + @Test + void testContains() { + RangeSet rangeSet = new RangeSet(5, 10); + assertTrue(rangeSet.contains(5)); + assertTrue(rangeSet.contains(9)); + assertFalse(rangeSet.contains(10)); + assertFalse(rangeSet.contains(4)); + } + + @Test + void testIterator() { + RangeSet rangeSet = new RangeSet(5, 10); + Iterator iterator = rangeSet.iterator(); + for (int i = 5; i < 10; i++) { + assertTrue(iterator.hasNext()); + assertEquals(i, iterator.next()); + } + assertFalse(iterator.hasNext()); + assertThrows(NoSuchElementException.class, iterator::next); + } + + @Test + void testUnsupportedOperations() { + RangeSet rangeSet = new RangeSet(5, 10); + assertThrows(UnsupportedOperationException.class, () -> rangeSet.add(5)); + assertThrows(UnsupportedOperationException.class, () -> rangeSet.remove(5)); + assertThrows(UnsupportedOperationException.class, () -> rangeSet.addAll(null)); + assertThrows(UnsupportedOperationException.class, () -> rangeSet.retainAll(null)); + assertThrows(UnsupportedOperationException.class, () -> rangeSet.removeAll(null)); + assertThrows(UnsupportedOperationException.class, rangeSet::clear); + } + + @Test + void testToArray() { + RangeSet rangeSet = new RangeSet(5, 10); + Object[] expectedArray = {5, 6, 7, 8, 9}; + assertArrayEquals(expectedArray, rangeSet.toArray()); + } + + @Test + void testToArrayWithArrayParameter() { + RangeSet rangeSet = new RangeSet(5, 10); + Integer[] inputArray = new Integer[5]; + Integer[] expectedArray = {5, 6, 7, 8, 9}; + assertArrayEquals(expectedArray, rangeSet.toArray(inputArray)); + } + + @Test + void testContainsAll() { + RangeSet rangeSet = new RangeSet(5, 10); + assertTrue(rangeSet.containsAll(mkSet(5, 6, 7, 8, 9))); + assertFalse(rangeSet.containsAll(mkSet(5, 6, 10))); + } + + @Test + void testToString() { + RangeSet rangeSet = new RangeSet(5, 8); + assertEquals("RangeSet(from=5 (inclusive), to=8 (exclusive))", rangeSet.toString()); + } + + @Test + void testEquals() { + RangeSet rangeSet1 = new RangeSet(5, 10); + RangeSet rangeSet2 = new RangeSet(5, 10); + RangeSet rangeSet3 = new RangeSet(6, 10); + Set set = mkSet(5, 6, 7, 8, 9); + HashSet hashSet = new HashSet<>(mkSet(6, 7, 8, 9)); + + assertEquals(rangeSet1, rangeSet2); + assertNotEquals(rangeSet1, rangeSet3); + assertEquals(rangeSet1, set); + assertEquals(rangeSet3, hashSet); + assertNotEquals(rangeSet1, new Object()); + } + + @Test + void testHashCode() { + RangeSet rangeSet1 = new RangeSet(5, 10); + RangeSet rangeSet2 = new RangeSet(5, 10); + RangeSet rangeSet3 = new RangeSet(6, 10); + + assertEquals(rangeSet1.hashCode(), rangeSet2.hashCode()); + assertNotEquals(rangeSet1.hashCode(), rangeSet3.hashCode()); + } +} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilderTest.java index 70f4decf6bb08..98d0d20ef660e 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilderTest.java @@ -75,11 +75,13 @@ public void testTwoMembersNoTopicSubscription() { Map members = new TreeMap<>(); members.put(memberA, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), Collections.emptySet(), Assignment.EMPTY )); members.put(memberB, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), Collections.emptySet(), Assignment.EMPTY @@ -115,11 +117,13 @@ public void testTwoMembersSubscribedToNonexistentTopics() { Map members = new TreeMap<>(); members.put(memberA, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), Collections.singleton(topic3Uuid), Assignment.EMPTY )); members.put(memberB, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), Collections.singleton(topic2Uuid), Assignment.EMPTY @@ -155,12 +159,14 @@ public void testFirstAssignmentTwoMembersTwoTopics() { Map members = new TreeMap<>(); members.put(memberA, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topic1Uuid, topic3Uuid), Assignment.EMPTY )); members.put(memberB, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), Collections.singleton(topic3Uuid), Assignment.EMPTY @@ -209,18 +215,21 @@ public void testFirstAssignmentNumMembersGreaterThanTotalNumPartitions() { Map members = new TreeMap<>(); members.put(memberA, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), Collections.singleton(topic3Uuid), Assignment.EMPTY )); members.put(memberB, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), Collections.singleton(topic3Uuid), Assignment.EMPTY )); members.put(memberC, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), Collections.singleton(topic1Uuid), Assignment.EMPTY @@ -278,6 +287,7 @@ public void testReassignmentForTwoMembersThreeTopicsGivenUnbalancedPrevAssignmen Map members = new TreeMap<>(); members.put(memberA, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), Collections.singleton(topic1Uuid), new Assignment(mkAssignment( @@ -286,6 +296,7 @@ public void testReassignmentForTwoMembersThreeTopicsGivenUnbalancedPrevAssignmen )); members.put(memberB, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topic1Uuid, topic2Uuid), new Assignment(mkAssignment( @@ -295,6 +306,7 @@ public void testReassignmentForTwoMembersThreeTopicsGivenUnbalancedPrevAssignmen )); members.put(memberC, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topic1Uuid, topic2Uuid, topic3Uuid), new Assignment(mkAssignment( @@ -363,6 +375,7 @@ public void testReassignmentWhenPartitionsAreAddedForTwoMembers() { Map members = new TreeMap<>(); members.put(memberA, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topic1Uuid, topic3Uuid), new Assignment(mkAssignment( @@ -372,6 +385,7 @@ public void testReassignmentWhenPartitionsAreAddedForTwoMembers() { )); members.put(memberB, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topic1Uuid, topic2Uuid, topic3Uuid, topic4Uuid), new Assignment(mkAssignment( @@ -425,6 +439,7 @@ public void testReassignmentWhenOneMemberAddedAndPartitionsAddedTwoMembersTwoTop Map members = new TreeMap<>(); members.put(memberA, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), Collections.singleton(topic1Uuid), new Assignment(mkAssignment( @@ -434,6 +449,7 @@ public void testReassignmentWhenOneMemberAddedAndPartitionsAddedTwoMembersTwoTop )); members.put(memberB, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topic1Uuid, topic2Uuid), new Assignment(mkAssignment( @@ -444,6 +460,7 @@ public void testReassignmentWhenOneMemberAddedAndPartitionsAddedTwoMembersTwoTop // Add a new member to trigger a re-assignment. members.put(memberC, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topic1Uuid, topic2Uuid), Assignment.EMPTY @@ -501,6 +518,7 @@ public void testReassignmentWhenOneMemberRemovedAfterInitialAssignmentWithThreeM Map members = new TreeMap<>(); members.put(memberA, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topic1Uuid, topic3Uuid), new Assignment(mkAssignment( @@ -510,6 +528,7 @@ public void testReassignmentWhenOneMemberRemovedAfterInitialAssignmentWithThreeM )); members.put(memberB, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), Collections.singleton(topic2Uuid), new Assignment(mkAssignment( @@ -563,6 +582,7 @@ public void testReassignmentWhenOneSubscriptionRemovedAfterInitialAssignmentWith Map members = new TreeMap<>(); members.put(memberA, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), Collections.singleton(topic1Uuid), new Assignment(mkAssignment( @@ -572,6 +592,7 @@ public void testReassignmentWhenOneSubscriptionRemovedAfterInitialAssignmentWith )); members.put(memberB, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topic1Uuid, topic2Uuid), new Assignment(mkAssignment( @@ -616,12 +637,14 @@ public void testFirstAssignmentWithTwoMembersIncludingOneWithoutSubscriptions() Map members = new TreeMap<>(); members.put(memberA, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), mkSet(topic1Uuid), Assignment.EMPTY )); members.put(memberB, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), Collections.emptySet(), Assignment.EMPTY diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java index 911e2d889e213..b77936ec80712 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java @@ -286,6 +286,7 @@ public void testCreateMemberSubscriptionSpecImpl() { assertEquals(new MemberSubscriptionAndAssignmentImpl( Optional.of("rackId"), + Optional.of("instanceId"), new TopicIds(mkSet("bar", "foo", "zar"), topicsImage), assignment ), subscriptionSpec); diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java index 1ea8d71f25e62..cbd074c889631 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java @@ -237,6 +237,7 @@ private void addMemberSpec( members.put(memberId, new MemberSubscriptionAndAssignmentImpl( rackId, + Optional.empty(), subscribedTopicIds, Assignment.EMPTY )); @@ -270,6 +271,7 @@ private void simulateIncrementalRebalance() { updatedMemberSpec.put(memberId, new MemberSubscriptionAndAssignmentImpl( groupSpec.memberSubscription(memberId).rackId(), + Optional.empty(), groupSpec.memberSubscription(memberId).subscribedTopicIds(), new Assignment(Collections.unmodifiableMap(memberAssignment.partitions())) )); @@ -285,6 +287,7 @@ private void simulateIncrementalRebalance() { Optional rackId = rackId(memberCount - 1); updatedMemberSpec.put("newMember", new MemberSubscriptionAndAssignmentImpl( rackId, + Optional.empty(), subscribedTopicIdsForNewMember, Assignment.EMPTY )); diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java index c660cbd8ea7e9..4734036d69394 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java @@ -196,6 +196,7 @@ private void createAssignmentSpec() { String memberId = "member" + i; members.put(memberId, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), Optional.empty(), new TopicIds(new HashSet<>(allTopicNames), topicsImage), Assignment.EMPTY