diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/AutoMQIdentityReplicationPolicy.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/AutoMQIdentityReplicationPolicy.java new file mode 100644 index 0000000000..0c8b03dad4 --- /dev/null +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/AutoMQIdentityReplicationPolicy.java @@ -0,0 +1,59 @@ +/* + * Copyright 2024, AutoMQ CO.,LTD. + * + * Use of this software is governed by the Business Source License + * included in the file BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package org.apache.kafka.connect.mirror; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * AutoMQIdentityReplicationPolicy is a custom implementation of the ReplicationPolicy interface that allows for the + * configuration of the offset-syncs-topic, checkpoints-topic, and heartbeats-topic via environment variables. + *

+ * See more details from KIP-690. + */ +public class AutoMQIdentityReplicationPolicy extends IdentityReplicationPolicy { + private static final Logger log = LoggerFactory.getLogger(AutoMQIdentityReplicationPolicy.class); + + private static final String OFFSET_SYNC_TOPIC_ENV_KEY = "OFFSET_SYNCS_TOPIC"; + private static final String CHECKPOINTS_TOPIC_ENV_KEY = "CHECKPOINTS_TOPIC"; + private static final String HEARTBEATS_TOPIC_ENV_KEY = "HEARTBEATS_TOPIC"; + + @Override + public String offsetSyncsTopic(String clusterAlias) { + String offsetSyncsTopic = System.getenv(OFFSET_SYNC_TOPIC_ENV_KEY); + if (offsetSyncsTopic == null) { + return super.offsetSyncsTopic(clusterAlias); + } + log.info("Using offset syncs topic: {}", offsetSyncsTopic); + return offsetSyncsTopic; + } + + @Override + public String checkpointsTopic(String clusterAlias) { + String checkpointsTopic = System.getenv(CHECKPOINTS_TOPIC_ENV_KEY); + if (checkpointsTopic == null) { + return super.checkpointsTopic(clusterAlias); + } + log.info("Using checkpoints topic: {}", checkpointsTopic); + return checkpointsTopic; + } + + @Override + public String heartbeatsTopic() { + String heartbeatsTopic = System.getenv(HEARTBEATS_TOPIC_ENV_KEY); + if (heartbeatsTopic == null) { + return super.heartbeatsTopic(); + } + log.info("Using heartbeats topic: {}", heartbeatsTopic); + return heartbeatsTopic; + } +}