From 70197772df8650d824bc083db89041a675f6c114 Mon Sep 17 00:00:00 2001 From: ouyangwulin Date: Mon, 13 Jan 2025 23:12:53 +0800 Subject: [PATCH] [Feature] Add CDC jar client for support cdc yaml (#4166) * [fixed] fixed envsetting unload flink-conf.yaml,if not like catalogstore conf will not work * [Feature] Add CDC jar client for support cdc yaml * [Feature] The front-end is modified to support the cdc yaml api * fixed package and log info * delete unused method * fixed module --- .../common/constants/Constants.java | 6 +- .../console/core/util/ServiceHelper.java | 24 +++ streampark-flink/pom.xml | 1 + .../streampark-flink-cdcclient/pom.xml | 140 ++++++++++++++++++ .../streampark/flink/cdc/cli/CDCClient.java | 80 ++++++++++ .../streampark/flink/cdc/cli/CDCExecutor.java | 69 +++++++++ .../flink/cdc/cli/package-info.java | 18 +++ .../flink/core/FlinkTableInitializer.scala | 6 + 8 files changed, 343 insertions(+), 1 deletion(-) create mode 100644 streampark-flink/streampark-flink-cdcclient/pom.xml create mode 100644 streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/flink/cdc/cli/CDCClient.java create mode 100644 streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/flink/cdc/cli/CDCExecutor.java create mode 100644 streampark-flink/streampark-flink-cdcclient/src/test/java/org/apache/streampark/flink/cdc/cli/package-info.java diff --git a/streampark-common/src/main/java/org/apache/streampark/common/constants/Constants.java b/streampark-common/src/main/java/org/apache/streampark/common/constants/Constants.java index b6d84e2727..c8f27bcb5a 100644 --- a/streampark-common/src/main/java/org/apache/streampark/common/constants/Constants.java +++ b/streampark-common/src/main/java/org/apache/streampark/common/constants/Constants.java @@ -19,7 +19,9 @@ import java.time.Duration; -/** A constant class to hold the constants variables. */ +/** + * A constant class to hold the constants variables. + */ public final class Constants { private Constants() { @@ -51,6 +53,8 @@ private Constants() { public static final String STREAMPARK_SPARKSQL_CLIENT_CLASS = "org.apache.streampark.spark.cli.SqlClient"; + public static final String STREAMPARK_FLINKCDC_CLIENT_CLASS = "org.apache.streampark.flink.cdc.cli.CDCClient"; + public static final String PYTHON_EXECUTABLE = "venv.zip/venv/bin/python3"; public static final String SINGLE_SLASH = "/"; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/util/ServiceHelper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/util/ServiceHelper.java index 252b7364e1..02a6e8ab39 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/util/ServiceHelper.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/util/ServiceHelper.java @@ -39,6 +39,8 @@ public class ServiceHelper { private static String flinkSqlClientJar = null; + private static String flinkCDCClientJar = null; + private static String sparkSqlClientJar = null; public static User getLoginUser() { @@ -58,6 +60,28 @@ public static Long getUserId() { return null; } + public static String getFlinkCDCClientJar(FlinkEnv flinkEnv) { + if (flinkCDCClientJar == null) { + File localClient = WebUtils.getAppClientDir(); + ApiAlertException.throwIfFalse(localClient.exists(), + "[StreamPark]" + localClient + " no exists. please check."); + String regex = String.format("streampark-flink-cdcclient_%s-.*\\.jar", flinkEnv.getScalaVersion()); + + List jars = Arrays.stream(Objects.requireNonNull(localClient.list())) + .filter(x -> x.matches(regex)) + .collect(Collectors.toList()); + ApiAlertException.throwIfTrue( + jars.isEmpty(), + "[StreamPark] can't found streampark-flink-cdcclient jar in " + localClient); + + ApiAlertException.throwIfTrue( + jars.size() > 1, + "[StreamPark] found multiple streampark-flink-cdclient jar in " + localClient); + flinkCDCClientJar = jars.get(0); + } + return flinkCDCClientJar; + } + public static String getFlinkSqlClientJar(FlinkEnv flinkEnv) { if (flinkSqlClientJar == null) { File localClient = WebUtils.getAppClientDir(); diff --git a/streampark-flink/pom.xml b/streampark-flink/pom.xml index 7976177a70..1543627153 100644 --- a/streampark-flink/pom.xml +++ b/streampark-flink/pom.xml @@ -39,6 +39,7 @@ streampark-flink-kubernetes streampark-flink-catalog-store streampark-flink-connector-plugin + streampark-flink-cdcclient diff --git a/streampark-flink/streampark-flink-cdcclient/pom.xml b/streampark-flink/streampark-flink-cdcclient/pom.xml new file mode 100644 index 0000000000..ca972a301b --- /dev/null +++ b/streampark-flink/streampark-flink-cdcclient/pom.xml @@ -0,0 +1,140 @@ + + + + 4.0.0 + + org.apache.streampark + streampark-flink + 2.2.0-SNAPSHOT + + + streampark-flink-cdc-client_${scala.binary.version} + 2.2.0-SNAPSHOT + StreamPark : Flink CDC Client + + + 3.2.1 + + + + + org.apache.flink + flink-cdc-common + ${flink.cdc.version} + + + org.apache.flink + flink-cdc-runtime + ${flink.cdc.version} + + + org.apache.flink + flink-cdc-cli + ${flink.cdc.version} + + + org.apache.flink + flink-cdc-composer + ${flink.cdc.version} + + + + org.apache.flink + flink-java + ${flink.version} + provided + + + org.apache.flink + flink-runtime + ${flink.version} + provided + + + + org.apache.streampark + streampark-common_${scala.binary.version} + + + org.apache.flink + flink-yarn + 1.18.1 + provided + + + org.slf4j + slf4j-api + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-dist + + shade + + package + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + false + false + false + + + * + + + + + org.apache.commons.cli + org.apache.flink.cdc.shaded.com.apache.commons.cli + + + org.apache.calcite + org.apache.flink.cdc.shaded.org.apache.calcite + + + + + org.apache.streampark.flink.cdc.cli.CDCClient + + + + + + + + + + diff --git a/streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/flink/cdc/cli/CDCClient.java b/streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/flink/cdc/cli/CDCClient.java new file mode 100644 index 0000000000..3793595483 --- /dev/null +++ b/streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/flink/cdc/cli/CDCClient.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.flink.cdc.cli; + +import org.apache.streampark.common.conf.ConfigKeys; +import org.apache.streampark.common.util.DeflaterUtils; +import org.apache.streampark.common.util.PropertiesUtils; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.composer.PipelineExecution; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.util.StringUtils; +import org.apache.flink.yarn.configuration.YarnConfigOptions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +/** + * cdc client + */ +public class CDCClient { + + private static final Logger LOG = LoggerFactory.getLogger(CDCClient.class); + + public static void main(String[] args) throws Exception { + ParameterTool parameter = ParameterTool.fromArgs(args); + Map configMap = new HashMap<>(); + String cdcYamlDecode = parameter.get(ConfigKeys.KEY_FLINK_SQL(null)); + String appNameDecode = parameter.get(ConfigKeys.KEY_APP_NAME(null)); + String flinkConfigDecode = parameter.get(ConfigKeys.KEY_FLINK_CONF(null)); + String parallelism = parameter.get(ConfigKeys.KEY_FLINK_PARALLELISM(null)); + if (StringUtils.isNullOrWhitespaceOnly(cdcYamlDecode) + || StringUtils.isNullOrWhitespaceOnly(appNameDecode) + || StringUtils.isNullOrWhitespaceOnly(flinkConfigDecode)) { + LOG.error("--flink.conf or --app.name or `cdc yaml` must not be null."); + return; + } + + String cdcYaml = DeflaterUtils.unzipString(cdcYamlDecode); + String appName = DeflaterUtils.unzipString(appNameDecode); + String flinkConfigString = DeflaterUtils.unzipString(flinkConfigDecode); + configMap.putAll(PropertiesUtils.fromYamlTextAsJava(flinkConfigString)); + configMap.put(YarnConfigOptions.APPLICATION_NAME.key(), appName); + configMap.put(CoreOptions.DEFAULT_PARALLELISM.key(), parallelism); + Configuration flinkConfig = Configuration.fromMap(configMap); + LOG.debug("Flink cdc config {}", flinkConfig); + LOG.debug("Flink cdc yaml {}", cdcYaml); + PipelineExecution.ExecutionInfo result = + new CDCExecutor(cdcYaml, flinkConfig, new ArrayList<>(), SavepointRestoreSettings.none()).run(); + printExecutionInfo(result); + + } + + private static void printExecutionInfo(PipelineExecution.ExecutionInfo info) { + System.out.println("Pipeline has been submitted to cluster."); + System.out.printf("Job ID: %s\n", info.getId()); + System.out.printf("Job Description: %s\n", info.getDescription()); + } +} diff --git a/streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/flink/cdc/cli/CDCExecutor.java b/streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/flink/cdc/cli/CDCExecutor.java new file mode 100644 index 0000000000..78a46d7a84 --- /dev/null +++ b/streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/flink/cdc/cli/CDCExecutor.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.flink.cdc.cli; + +import org.apache.flink.cdc.cli.parser.PipelineDefinitionParser; +import org.apache.flink.cdc.cli.parser.YamlPipelineDefinitionParser; +import org.apache.flink.cdc.cli.utils.FlinkEnvironmentUtils; +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.composer.PipelineComposer; +import org.apache.flink.cdc.composer.PipelineExecution; +import org.apache.flink.cdc.composer.definition.PipelineDef; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; + +import java.nio.file.Path; +import java.util.List; + +/** + * cdc executor + */ +public class CDCExecutor { + + private final String pipelineString; + private final Configuration configuration; + private final SavepointRestoreSettings savePointSettings; + private final List additionalJar; + + private PipelineComposer composer; + + public CDCExecutor(String pipelineString, + Configuration flinkConfig, + List additionalJar, + SavepointRestoreSettings savePointRestoreSettings) { + this.pipelineString = pipelineString; + this.configuration = flinkConfig; + this.additionalJar = additionalJar; + this.savePointSettings = savePointRestoreSettings; + } + + public PipelineExecution.ExecutionInfo run() throws Exception { + PipelineDefinitionParser pipelineDefinitionParser = new YamlPipelineDefinitionParser(); + PipelineDef pipelineDef = pipelineDefinitionParser.parse(pipelineString, configuration); + PipelineComposer composer = getComposer(); + PipelineExecution execution = composer.compose(pipelineDef); + return execution.execute(); + } + + private PipelineComposer getComposer() throws Exception { + if (composer == null) { + return FlinkEnvironmentUtils.createComposer( + true, configuration, additionalJar, savePointSettings); + } + return composer; + } +} diff --git a/streampark-flink/streampark-flink-cdcclient/src/test/java/org/apache/streampark/flink/cdc/cli/package-info.java b/streampark-flink/streampark-flink-cdcclient/src/test/java/org/apache/streampark/flink/cdc/cli/package-info.java new file mode 100644 index 0000000000..b85a06e754 --- /dev/null +++ b/streampark-flink/streampark-flink-cdcclient/src/test/java/org/apache/streampark/flink/cdc/cli/package-info.java @@ -0,0 +1,18 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.flink.cdc.cli; diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala index 7cb463ed75..8e1e19b7b5 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala @@ -118,6 +118,12 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType } } + parameter.get(KEY_FLINK_CONF(), null) match { + case null | "" => + throw new ExceptionInInitializerError( + "[StreamPark] Usage:can't find config,please set \"--flink.conf $conf \" in main arguments") + case conf => builder.withConfiguration(Configuration.fromMap(PropertiesUtils.fromYamlText(DeflaterUtils.unzipString(conf)))) + } val buildWith = (parameter.get(KEY_FLINK_TABLE_CATALOG), parameter.get(KEY_FLINK_TABLE_DATABASE)) buildWith match {