diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java index 086f2f3ce7..3e671aad20 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java @@ -56,6 +56,7 @@ import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.TimelineHashMap; +import org.apache.kafka.timeline.TimelineHashSet; import org.slf4j.Logger; @@ -292,6 +293,11 @@ boolean check() { * The real next available node id is generally one greater than this value. */ private AtomicInteger nextNodeId = new AtomicInteger(-1); + + /** + * A set of node IDs that have been unregistered and can be reused for new node assignments. + */ + private final TimelineHashSet reusableNodeIds; // AutoMQ for Kafka inject end private ClusterControlManager( @@ -323,6 +329,7 @@ private ClusterControlManager( this.brokerUncleanShutdownHandler = brokerUncleanShutdownHandler; // AutoMQ for Kafka inject start this.maxControllerId = QuorumConfig.parseVoterConnections(quorumVoters).keySet().stream().max(Integer::compareTo).orElse(0); + this.reusableNodeIds = new TimelineHashSet<>(snapshotRegistry, 0); // AutoMQ for Kafka inject end } @@ -369,11 +376,21 @@ boolean zkRegistrationAllowed() { // AutoMQ for Kafka inject start public ControllerResult getNextNodeId() { - int maxBrokerId = brokerRegistrations.keySet().stream().max(Integer::compareTo).orElse(0); - int maxNodeId = Math.max(maxBrokerId, maxControllerId); - int nextId = this.nextNodeId.accumulateAndGet(maxNodeId, (x, y) -> Math.max(x, y) + 1); - // Let the broker's nodeId start from 1000 to easily distinguish broker and controller. - nextId = Math.max(nextId, 1000); + int nextId; + if (!reusableNodeIds.isEmpty()) { + Iterator iterator = reusableNodeIds.iterator(); + nextId = iterator.next(); + // we simply remove the id from reusable id set because we're unable to determine if the id + // will finally be used. + iterator.remove(); + } else { + int maxBrokerId = brokerRegistrations.keySet().stream().max(Integer::compareTo).orElse(0); + int maxNodeId = Math.max(maxBrokerId, maxControllerId); + nextId = this.nextNodeId.accumulateAndGet(maxNodeId, (x, y) -> Math.max(x, y) + 1); + // Let the broker's nodeId start from 1000 to easily distinguish broker and controller. + nextId = Math.max(nextId, 1000); + } + UpdateNextNodeIdRecord record = new UpdateNextNodeIdRecord().setNodeId(nextId); List records = new ArrayList<>(); @@ -583,6 +600,11 @@ public void replay(RegisterBrokerRecord record, long offset) { if (prevRegistration != null) heartbeatManager.remove(brokerId); heartbeatManager.register(brokerId, record.fenced()); } + + // AutoMQ injection start + reusableNodeIds.remove(brokerId); + // AutoMQ injection end + if (prevRegistration == null) { log.info("Replayed initial RegisterBrokerRecord for broker {}: {}", record.brokerId(), record); } else if (prevRegistration.incarnationId().equals(record.incarnationId())) { @@ -608,6 +630,9 @@ public void replay(UnregisterBrokerRecord record) { if (heartbeatManager != null) heartbeatManager.remove(brokerId); updateDirectories(brokerId, registration.directories(), null); brokerRegistrations.remove(brokerId); + // AutoMQ injection start + reusableNodeIds.add(brokerId); + // AutoMQ injection end log.info("Replayed {}", record); } }