Skip to content

Commit

Permalink
chore: merge the partition implementation of s3stream into new trunk …
Browse files Browse the repository at this point in the history
…branch of automq kafka
  • Loading branch information
daniel-y committed Mar 5, 2024
2 parents 0979327 + 4a52cc8 commit ba76df0
Show file tree
Hide file tree
Showing 191 changed files with 33,520 additions and 0 deletions.
155 changes: 155 additions & 0 deletions core/src/main/java/kafka/autobalancer/AutoBalancerListener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Copyright 2024, AutoMQ CO.,LTD.
*
* Use of this software is governed by the Business Source License
* included in the file BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package kafka.autobalancer;

import com.automq.stream.utils.LogContext;
import kafka.autobalancer.common.AutoBalancerConstants;
import kafka.autobalancer.detector.AnomalyDetector;
import kafka.autobalancer.listeners.BrokerStatusListener;
import kafka.autobalancer.listeners.ClusterStatusListenerRegistry;
import kafka.autobalancer.listeners.TopicPartitionStatusListener;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.MetadataRecordType;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.snapshot.SnapshotReader;
import org.slf4j.Logger;

import java.util.List;

public class AutoBalancerListener implements RaftClient.Listener<ApiMessageAndVersion> {
private final int nodeId;
private final Logger logger;
private final KafkaEventQueue queue;
private final ClusterStatusListenerRegistry registry;
private final LoadRetriever loadRetriever;
private final AnomalyDetector anomalyDetector;

public AutoBalancerListener(int nodeId, LogContext logContext, KafkaEventQueue queue, ClusterStatusListenerRegistry registry,
LoadRetriever loadRetriever, AnomalyDetector anomalyDetector) {
this.nodeId = nodeId;
this.logger = logContext.logger(AutoBalancerConstants.AUTO_BALANCER_LOGGER_CLAZZ);
this.queue = queue;
this.registry = registry;
this.loadRetriever = loadRetriever;
this.anomalyDetector = anomalyDetector;
}

private void handleMessageBatch(Batch<ApiMessageAndVersion> messageBatch) {
List<ApiMessageAndVersion> messages = messageBatch.records();
for (ApiMessageAndVersion apiMessage : messages) {
try {
handleMessage(apiMessage.message());
} catch (Exception e) {
logger.error("Failed to handle message", e);
}
}
}

private void handleMessage(ApiMessage message) {
MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
switch (type) {
case REGISTER_BROKER_RECORD:
for (BrokerStatusListener listener : this.registry.brokerListeners()) {
listener.onBrokerRegister((RegisterBrokerRecord) message);
}
break;
case UNREGISTER_BROKER_RECORD:
for (BrokerStatusListener listener : this.registry.brokerListeners()) {
listener.onBrokerUnregister((UnregisterBrokerRecord) message);
}
break;
case BROKER_REGISTRATION_CHANGE_RECORD:
for (BrokerStatusListener listener : this.registry.brokerListeners()) {
listener.onBrokerRegistrationChanged((BrokerRegistrationChangeRecord) message);
}
break;
case TOPIC_RECORD:
for (TopicPartitionStatusListener listener : this.registry.topicPartitionListeners()) {
listener.onTopicCreate((TopicRecord) message);
}
break;
case REMOVE_TOPIC_RECORD:
for (TopicPartitionStatusListener listener : this.registry.topicPartitionListeners()) {
listener.onTopicDelete((RemoveTopicRecord) message);
}
break;
case PARTITION_RECORD:
for (TopicPartitionStatusListener listener : this.registry.topicPartitionListeners()) {
listener.onPartitionCreate((PartitionRecord) message);
}
break;
case PARTITION_CHANGE_RECORD:
for (TopicPartitionStatusListener listener : this.registry.topicPartitionListeners()) {
listener.onPartitionChange((PartitionChangeRecord) message);
}
break;
default:
break;
}
}

@Override
public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
queue.append(() -> {
try (reader) {
while (reader.hasNext()) {
handleMessageBatch(reader.next());
}
}
});
}

@Override
public void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
queue.append(() -> {
try (reader) {
while (reader.hasNext()) {
handleMessageBatch(reader.next());
}
}
});
}

@Override
public void handleLeaderChange(LeaderAndEpoch leader) {
queue.append(() -> {
if (leader.leaderId().isEmpty()) {
return;
}
boolean isLeader = leader.isLeader(nodeId);
if (isLeader) {
this.anomalyDetector.resume();
} else {
this.anomalyDetector.pause();
}
this.loadRetriever.onLeaderChanged(isLeader);
});
}

@Override
public void beginShutdown() {
RaftClient.Listener.super.beginShutdown();
}
}

106 changes: 106 additions & 0 deletions core/src/main/java/kafka/autobalancer/AutoBalancerManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright 2024, AutoMQ CO.,LTD.
*
* Use of this software is governed by the Business Source License
* included in the file BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package kafka.autobalancer;

import com.automq.stream.utils.LogContext;
import kafka.autobalancer.common.AutoBalancerConstants;
import kafka.autobalancer.detector.AnomalyDetector;
import kafka.autobalancer.config.AutoBalancerControllerConfig;
import kafka.autobalancer.detector.AnomalyDetectorBuilder;
import kafka.autobalancer.goals.Goal;
import kafka.autobalancer.listeners.BrokerStatusListener;
import kafka.autobalancer.listeners.ClusterStatusListenerRegistry;
import kafka.autobalancer.listeners.TopicPartitionStatusListener;
import kafka.autobalancer.executor.ControllerActionExecutorService;
import kafka.autobalancer.model.RecordClusterModel;
import kafka.server.KafkaConfig;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.controller.QuorumController;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.KafkaRaftClient;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.slf4j.Logger;

import java.util.HashSet;
import java.util.Set;

public class AutoBalancerManager implements AutoBalancerService {
private final Logger logger;
private final LoadRetriever loadRetriever;
private final AnomalyDetector anomalyDetector;
private final KafkaEventQueue queue;

public AutoBalancerManager(Time time, KafkaConfig kafkaConfig, QuorumController quorumController, KafkaRaftClient<ApiMessageAndVersion> raftClient) {
LogContext logContext = new LogContext(String.format("[AutoBalancerManager id=%d] ", quorumController.nodeId()));
logger = logContext.logger(AutoBalancerConstants.AUTO_BALANCER_LOGGER_CLAZZ);
AutoBalancerControllerConfig config = new AutoBalancerControllerConfig(kafkaConfig.props(), false);
RecordClusterModel clusterModel = new RecordClusterModel(new LogContext(String.format("[ClusterModel id=%d] ", quorumController.nodeId())));
this.loadRetriever = new LoadRetriever(config, quorumController, clusterModel,
new LogContext(String.format("[LoadRetriever id=%d] ", quorumController.nodeId())));
ControllerActionExecutorService actionExecutorService = new ControllerActionExecutorService(config, quorumController,
new LogContext(String.format("[ExecutionManager id=%d] ", quorumController.nodeId())));

this.anomalyDetector = new AnomalyDetectorBuilder()
.logContext(new LogContext(String.format("[AnomalyDetector id=%d] ", quorumController.nodeId())))
.maxActionsNumPerExecution(config.getInt(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_STEPS))
.detectIntervalMs(config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_ANOMALY_DETECT_INTERVAL_MS))
.maxTolerateMetricsDelayMs(config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_ACCEPTED_METRICS_DELAY_MS))
.coolDownIntervalPerActionMs(config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_INTERVAL_MS))
.clusterModel(clusterModel)
.executor(actionExecutorService)
.addGoals(config.getConfiguredInstances(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_GOALS, Goal.class))
.excludedBrokers(parseExcludedBrokers(config))
.excludedTopics(config.getList(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXCLUDE_TOPICS))
.build();

this.queue = new KafkaEventQueue(time, new org.apache.kafka.common.utils.LogContext(), "auto-balancer-");
ClusterStatusListenerRegistry registry = new ClusterStatusListenerRegistry();
registry.register((BrokerStatusListener) clusterModel);
registry.register((TopicPartitionStatusListener) clusterModel);
registry.register(actionExecutorService);
registry.register(this.loadRetriever);
raftClient.register(new AutoBalancerListener(quorumController.nodeId(), logContext, queue, registry, this.loadRetriever, this.anomalyDetector));
}

@Override
public void start() {
loadRetriever.start();
anomalyDetector.start();
logger.info("Started");
}

@Override
public void shutdown() {
try {
anomalyDetector.shutdown();
loadRetriever.shutdown();
queue.close();
} catch (Exception e) {
logger.error("Exception in shutdown", e);
}

logger.info("Shutdown completed");
}

private Set<Integer> parseExcludedBrokers(AutoBalancerControllerConfig config) {
Set<Integer> excludedBrokers = new HashSet<>();
for (String brokerId : config.getList(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXCLUDE_BROKER_IDS)) {
try {
excludedBrokers.add(Integer.parseInt(brokerId));
} catch (Exception e) {
logger.warn("Failed to parse broker id {} from config", brokerId);
}

}
return excludedBrokers;
}
}
17 changes: 17 additions & 0 deletions core/src/main/java/kafka/autobalancer/AutoBalancerService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright 2024, AutoMQ CO.,LTD.
*
* Use of this software is governed by the Business Source License
* included in the file BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package kafka.autobalancer;

public interface AutoBalancerService {
void start();
void shutdown();
}
Loading

0 comments on commit ba76df0

Please sign in to comment.