From 198db94f18ad6caefeb278050703067cc098c987 Mon Sep 17 00:00:00 2001 From: castorqin Date: Mon, 13 May 2024 12:44:13 +0800 Subject: [PATCH 1/3] [Manager] Kafka sink supports automatic allocation of sort standalone cluster --- .../resource/sink/kafka/KafkaResourceOperator.java | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/kafka/KafkaResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/kafka/KafkaResourceOperator.java index 0e89b9867ba..875c0b2d7be 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/kafka/KafkaResourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/kafka/KafkaResourceOperator.java @@ -17,6 +17,9 @@ package org.apache.inlong.manager.service.resource.sink.kafka; +import java.util.Collections; +import java.util.Optional; +import java.util.Properties; import org.apache.inlong.manager.common.consts.SinkType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.enums.SinkStatus; @@ -24,9 +27,8 @@ import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.pojo.sink.SinkInfo; import org.apache.inlong.manager.pojo.sink.kafka.KafkaSinkDTO; -import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator; +import org.apache.inlong.manager.service.resource.sink.AbstractStandaloneSinkResourceOperator; import org.apache.inlong.manager.service.sink.StreamSinkService; - import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.CreateTopicsResult; @@ -39,15 +41,11 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import java.util.Collections; -import java.util.Optional; -import java.util.Properties; - /** * Kafka resource operator for creating Kafka topic */ @Service -public class KafkaResourceOperator implements SinkResourceOperator { +public class KafkaResourceOperator extends AbstractStandaloneSinkResourceOperator { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaResourceOperator.class); @@ -76,7 +74,7 @@ public void createSinkResource(SinkInfo sinkInfo) { new NewTopic(topicName, Optional.of(partitionNum), Optional.empty()))); result.values().get(topicName).get(); } - + this.assignCluster(sinkInfo); sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_SUCCESSFUL.getCode(), "create kafka topic success"); LOGGER.info("success to create kafka topic [{}] for sinkInfo={}", topicName, sinkInfo); From aee1015ceab33d3ec339354dada16b831ef0cb59 Mon Sep 17 00:00:00 2001 From: castorqin Date: Mon, 13 May 2024 14:09:39 +0800 Subject: [PATCH 2/3] [Manager] Kafka sink supports automatic allocation of sort standalone cluster --- .../resource/sink/kafka/KafkaResourceOperator.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/kafka/KafkaResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/kafka/KafkaResourceOperator.java index 875c0b2d7be..e5805c61c82 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/kafka/KafkaResourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/kafka/KafkaResourceOperator.java @@ -17,9 +17,6 @@ package org.apache.inlong.manager.service.resource.sink.kafka; -import java.util.Collections; -import java.util.Optional; -import java.util.Properties; import org.apache.inlong.manager.common.consts.SinkType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.enums.SinkStatus; @@ -29,6 +26,7 @@ import org.apache.inlong.manager.pojo.sink.kafka.KafkaSinkDTO; import org.apache.inlong.manager.service.resource.sink.AbstractStandaloneSinkResourceOperator; import org.apache.inlong.manager.service.sink.StreamSinkService; + import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.CreateTopicsResult; @@ -41,6 +39,10 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.util.Collections; +import java.util.Optional; +import java.util.Properties; + /** * Kafka resource operator for creating Kafka topic */ From d1301a57b2f49b29a8465f57129d59490b5322dc Mon Sep 17 00:00:00 2001 From: castorqin Date: Mon, 13 May 2024 14:57:33 +0800 Subject: [PATCH 3/3] [Manager] Kafka sink supports automatic allocation of sort standalone cluster --- .../service/resource/sink/kafka/KafkaResourceOperator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/kafka/KafkaResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/kafka/KafkaResourceOperator.java index e5805c61c82..dac4a22efeb 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/kafka/KafkaResourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/kafka/KafkaResourceOperator.java @@ -76,7 +76,6 @@ public void createSinkResource(SinkInfo sinkInfo) { new NewTopic(topicName, Optional.of(partitionNum), Optional.empty()))); result.values().get(topicName).get(); } - this.assignCluster(sinkInfo); sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_SUCCESSFUL.getCode(), "create kafka topic success"); LOGGER.info("success to create kafka topic [{}] for sinkInfo={}", topicName, sinkInfo); @@ -85,6 +84,7 @@ public void createSinkResource(SinkInfo sinkInfo) { sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_FAILED.getCode(), e.getMessage()); throw new WorkflowException("create kafka topic failed, reason: " + e.getMessage()); } + this.assignCluster(sinkInfo); } /**