Skip to content

Commit

Permalink
feat(core): reuse unregistered node when requesting for next node id (#…
Browse files Browse the repository at this point in the history
…2183)

Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh committed Nov 30, 2024
1 parent d6641d7 commit 94ede7f
Showing 1 changed file with 30 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineHashSet;

import org.slf4j.Logger;

Expand Down Expand Up @@ -292,6 +293,11 @@ boolean check() {
* The real next available node id is generally one greater than this value.
*/
private AtomicInteger nextNodeId = new AtomicInteger(-1);

/**
* A set of node IDs that have been unregistered and can be reused for new node assignments.
*/
private final TimelineHashSet<Integer> reusableNodeIds;
// AutoMQ for Kafka inject end

private ClusterControlManager(
Expand Down Expand Up @@ -323,6 +329,7 @@ private ClusterControlManager(
this.brokerUncleanShutdownHandler = brokerUncleanShutdownHandler;
// AutoMQ for Kafka inject start
this.maxControllerId = QuorumConfig.parseVoterConnections(quorumVoters).keySet().stream().max(Integer::compareTo).orElse(0);
this.reusableNodeIds = new TimelineHashSet<>(snapshotRegistry, 0);
// AutoMQ for Kafka inject end
}

Expand Down Expand Up @@ -369,11 +376,21 @@ boolean zkRegistrationAllowed() {

// AutoMQ for Kafka inject start
public ControllerResult<Integer> getNextNodeId() {
int maxBrokerId = brokerRegistrations.keySet().stream().max(Integer::compareTo).orElse(0);
int maxNodeId = Math.max(maxBrokerId, maxControllerId);
int nextId = this.nextNodeId.accumulateAndGet(maxNodeId, (x, y) -> Math.max(x, y) + 1);
// Let the broker's nodeId start from 1000 to easily distinguish broker and controller.
nextId = Math.max(nextId, 1000);
int nextId;
if (!reusableNodeIds.isEmpty()) {
Iterator<Integer> iterator = reusableNodeIds.iterator();
nextId = iterator.next();
// we simply remove the id from reusable id set because we're unable to determine if the id
// will finally be used.
iterator.remove();
} else {
int maxBrokerId = brokerRegistrations.keySet().stream().max(Integer::compareTo).orElse(0);
int maxNodeId = Math.max(maxBrokerId, maxControllerId);
nextId = this.nextNodeId.accumulateAndGet(maxNodeId, (x, y) -> Math.max(x, y) + 1);
// Let the broker's nodeId start from 1000 to easily distinguish broker and controller.
nextId = Math.max(nextId, 1000);
}

UpdateNextNodeIdRecord record = new UpdateNextNodeIdRecord().setNodeId(nextId);

List<ApiMessageAndVersion> records = new ArrayList<>();
Expand Down Expand Up @@ -583,6 +600,11 @@ public void replay(RegisterBrokerRecord record, long offset) {
if (prevRegistration != null) heartbeatManager.remove(brokerId);
heartbeatManager.register(brokerId, record.fenced());
}

// AutoMQ injection start
reusableNodeIds.remove(brokerId);
// AutoMQ injection end

if (prevRegistration == null) {
log.info("Replayed initial RegisterBrokerRecord for broker {}: {}", record.brokerId(), record);
} else if (prevRegistration.incarnationId().equals(record.incarnationId())) {
Expand All @@ -608,6 +630,9 @@ public void replay(UnregisterBrokerRecord record) {
if (heartbeatManager != null) heartbeatManager.remove(brokerId);
updateDirectories(brokerId, registration.directories(), null);
brokerRegistrations.remove(brokerId);
// AutoMQ injection start
reusableNodeIds.add(brokerId);
// AutoMQ injection end
log.info("Replayed {}", record);
}
}
Expand Down

0 comments on commit 94ede7f

Please sign in to comment.