From 6e0640b254a1aa5b8ca839a8e0967fe4a4007877 Mon Sep 17 00:00:00 2001 From: wulin Date: Sun, 12 Jan 2025 14:32:34 +0800 Subject: [PATCH] [Feature] Add CDC jar client for support cdc yaml --- pom.xml | 1 + .../common/util/PropertiesUtils.scala | 6 + .../streampark-flink-cdcclient/pom.xml | 124 ++++++++++++++++++ .../apache/streampark/cdc/cli/CDCClient.java | 80 +++++++++++ .../streampark/cdc/cli/CDCExecutor.java | 69 ++++++++++ .../org/apache/streampark/package-info.java | 1 + 6 files changed, 281 insertions(+) create mode 100644 streampark-flink/streampark-flink-cdcclient/pom.xml create mode 100644 streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/cdc/cli/CDCClient.java create mode 100644 streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/cdc/cli/CDCExecutor.java create mode 100644 streampark-flink/streampark-flink-cdcclient/src/test/java/org/apache/streampark/package-info.java diff --git a/pom.xml b/pom.xml index 4a65812bae..3c4cbf74ec 100644 --- a/pom.xml +++ b/pom.xml @@ -67,6 +67,7 @@ streampark-flink streampark-spark streampark-console + streampark-flink/streampark-flink-cdcclient diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala index f7f68ae458..08eeeb6fdf 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala @@ -101,6 +101,12 @@ object PropertiesUtils extends Logger { } } + def fromYamlTextToJava(text: String): java.util.Map[String, String] = { + val scalaMap = fromYamlText(text) + val javaMap: java.util.Map[String, String] = scalaMap.asJava + javaMap + } + def fromHoconText(conf: String): Map[String, String] = { require(conf != null, s"[StreamPark] fromHoconText: Hocon content must not be null") try parseHoconByReader(new StringReader(conf)) diff --git a/streampark-flink/streampark-flink-cdcclient/pom.xml b/streampark-flink/streampark-flink-cdcclient/pom.xml new file mode 100644 index 0000000000..53a2adf714 --- /dev/null +++ b/streampark-flink/streampark-flink-cdcclient/pom.xml @@ -0,0 +1,124 @@ + + + 4.0.0 + + org.apache.streampark + streampark + 2.2.0-SNAPSHOT + + + streampark-flink-cdc-client_${scala.binary.version} + 2.2.0-SNAPSHOT + StreamPark : Flink CDC Client + + + 3.2.1 + + + + + org.apache.flink + flink-cdc-common + ${flink.cdc.version} + + + org.apache.flink + flink-cdc-runtime + ${flink.cdc.version} + + + org.apache.flink + flink-cdc-cli + ${flink.cdc.version} + + + org.apache.flink + flink-cdc-composer + ${flink.cdc.version} + + + + org.apache.flink + flink-java + ${flink.version} + provided + + + org.apache.flink + flink-runtime + ${flink.version} + provided + + + + org.apache.streampark + streampark-common_${scala.binary.version} + + + org.apache.flink + flink-yarn + 1.18.1 + provided + + + org.slf4j + slf4j-api + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-dist + + shade + + package + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + false + false + false + + + * + + + + + org.apache.commons.cli + org.apache.flink.cdc.shaded.com.apache.commons.cli + + + org.apache.calcite + org.apache.flink.cdc.shaded.org.apache.calcite + + + + + org.apache.streampark.cdc.CDCClient + + + + + + + + + + diff --git a/streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/cdc/cli/CDCClient.java b/streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/cdc/cli/CDCClient.java new file mode 100644 index 0000000000..5d31a99187 --- /dev/null +++ b/streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/cdc/cli/CDCClient.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.cdc.cli; + +import org.apache.streampark.common.conf.ConfigKeys; +import org.apache.streampark.common.util.DeflaterUtils; +import org.apache.streampark.common.util.PropertiesUtils; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.composer.PipelineExecution; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.util.StringUtils; +import org.apache.flink.yarn.configuration.YarnConfigOptions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +/** + * cdc client + */ +public class CDCClient { + + private static final Logger LOG = LoggerFactory.getLogger(CDCClient.class); + + public static void main(String[] args) throws Exception { + ParameterTool parameter = ParameterTool.fromArgs(args); + Map configMap = new HashMap<>(); + String cdcYamlDecode = parameter.get(ConfigKeys.KEY_FLINK_SQL(null)); + String appNameDecode = parameter.get(ConfigKeys.KEY_APP_NAME(null)); + String flinkConfigDecode = parameter.get(ConfigKeys.KEY_FLINK_CONF(null)); + String parallelism = parameter.get(ConfigKeys.KEY_FLINK_PARALLELISM(null)); + if (StringUtils.isNullOrWhitespaceOnly(cdcYamlDecode) + || StringUtils.isNullOrWhitespaceOnly(appNameDecode) + || StringUtils.isNullOrWhitespaceOnly(flinkConfigDecode)) { + LOG.error("--flink.conf or --app.name must not be null."); + return; + } + + String cdcYaml = DeflaterUtils.unzipString(cdcYamlDecode); + String appName = DeflaterUtils.unzipString(appNameDecode); + String flinkConfigString = DeflaterUtils.unzipString(flinkConfigDecode); + configMap.putAll(PropertiesUtils.fromYamlTextAsJava(flinkConfigString)); + configMap.put(YarnConfigOptions.APPLICATION_NAME.key(), appName); + configMap.put(CoreOptions.DEFAULT_PARALLELISM.key(), parallelism); + Configuration flinkConfig = Configuration.fromMap(configMap); + LOG.debug("Flink cdc config {}", flinkConfig); + LOG.debug("Flink cdc yaml {}", cdcYaml); + PipelineExecution.ExecutionInfo result = + new CDCExecutor(cdcYaml, flinkConfig, new ArrayList<>(), SavepointRestoreSettings.none()).run(); + printExecutionInfo(result); + + } + + private static void printExecutionInfo(PipelineExecution.ExecutionInfo info) { + System.out.println("Pipeline has been submitted to cluster."); + System.out.printf("Job ID: %s\n", info.getId()); + System.out.printf("Job Description: %s\n", info.getDescription()); + } +} diff --git a/streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/cdc/cli/CDCExecutor.java b/streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/cdc/cli/CDCExecutor.java new file mode 100644 index 0000000000..973b869e8b --- /dev/null +++ b/streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/cdc/cli/CDCExecutor.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.cdc.cli; + +import org.apache.flink.cdc.cli.parser.PipelineDefinitionParser; +import org.apache.flink.cdc.cli.parser.YamlPipelineDefinitionParser; +import org.apache.flink.cdc.cli.utils.FlinkEnvironmentUtils; +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.composer.PipelineComposer; +import org.apache.flink.cdc.composer.PipelineExecution; +import org.apache.flink.cdc.composer.definition.PipelineDef; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; + +import java.nio.file.Path; +import java.util.List; + +/** + * cdc executor + */ +public class CDCExecutor { + + private final String pipelineString; + private final Configuration configuration; + private final SavepointRestoreSettings savePointSettings; + private final List additionalJar; + + private PipelineComposer composer; + + public CDCExecutor(String pipelineString, + Configuration flinkConfig, + List additionalJar, + SavepointRestoreSettings savePointRestoreSettings) { + this.pipelineString = pipelineString; + this.configuration = flinkConfig; + this.additionalJar = additionalJar; + this.savePointSettings = savePointRestoreSettings; + } + + public PipelineExecution.ExecutionInfo run() throws Exception { + PipelineDefinitionParser pipelineDefinitionParser = new YamlPipelineDefinitionParser(); + PipelineDef pipelineDef = pipelineDefinitionParser.parse(pipelineString, configuration); + PipelineComposer composer = getComposer(); + PipelineExecution execution = composer.compose(pipelineDef); + return execution.execute(); + } + + private PipelineComposer getComposer() throws Exception { + if (composer == null) { + return FlinkEnvironmentUtils.createComposer( + true, configuration, additionalJar, savePointSettings); + } + return composer; + } +} diff --git a/streampark-flink/streampark-flink-cdcclient/src/test/java/org/apache/streampark/package-info.java b/streampark-flink/streampark-flink-cdcclient/src/test/java/org/apache/streampark/package-info.java new file mode 100644 index 0000000000..0ef7971d70 --- /dev/null +++ b/streampark-flink/streampark-flink-cdcclient/src/test/java/org/apache/streampark/package-info.java @@ -0,0 +1 @@ +package org.apache.streampark;