Skip to content

Commit

Permalink
[Feature] Add CDC jar client for support cdc yaml (#4166)
Browse files Browse the repository at this point in the history
* [fixed] fixed envsetting unload flink-conf.yaml,if not like catalogstore conf will not work

* [Feature] Add CDC jar client for support cdc yaml

* [Feature] The front-end is modified to support the cdc yaml api

* fixed package and log info

* delete unused method

* fixed module
  • Loading branch information
Mrart authored Jan 13, 2025
1 parent 422c276 commit 7019777
Show file tree
Hide file tree
Showing 8 changed files with 343 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import java.time.Duration;

/** A constant class to hold the constants variables. */
/**
* A constant class to hold the constants variables.
*/
public final class Constants {

private Constants() {
Expand Down Expand Up @@ -51,6 +53,8 @@ private Constants() {

public static final String STREAMPARK_SPARKSQL_CLIENT_CLASS = "org.apache.streampark.spark.cli.SqlClient";

public static final String STREAMPARK_FLINKCDC_CLIENT_CLASS = "org.apache.streampark.flink.cdc.cli.CDCClient";

public static final String PYTHON_EXECUTABLE = "venv.zip/venv/bin/python3";

public static final String SINGLE_SLASH = "/";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public class ServiceHelper {

private static String flinkSqlClientJar = null;

private static String flinkCDCClientJar = null;

private static String sparkSqlClientJar = null;

public static User getLoginUser() {
Expand All @@ -58,6 +60,28 @@ public static Long getUserId() {
return null;
}

public static String getFlinkCDCClientJar(FlinkEnv flinkEnv) {
if (flinkCDCClientJar == null) {
File localClient = WebUtils.getAppClientDir();
ApiAlertException.throwIfFalse(localClient.exists(),
"[StreamPark]" + localClient + " no exists. please check.");
String regex = String.format("streampark-flink-cdcclient_%s-.*\\.jar", flinkEnv.getScalaVersion());

List<String> jars = Arrays.stream(Objects.requireNonNull(localClient.list()))
.filter(x -> x.matches(regex))
.collect(Collectors.toList());
ApiAlertException.throwIfTrue(
jars.isEmpty(),
"[StreamPark] can't found streampark-flink-cdcclient jar in " + localClient);

ApiAlertException.throwIfTrue(
jars.size() > 1,
"[StreamPark] found multiple streampark-flink-cdclient jar in " + localClient);
flinkCDCClientJar = jars.get(0);
}
return flinkCDCClientJar;
}

public static String getFlinkSqlClientJar(FlinkEnv flinkEnv) {
if (flinkSqlClientJar == null) {
File localClient = WebUtils.getAppClientDir();
Expand Down
1 change: 1 addition & 0 deletions streampark-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
<module>streampark-flink-kubernetes</module>
<module>streampark-flink-catalog-store</module>
<module>streampark-flink-connector-plugin</module>
<module>streampark-flink-cdcclient</module>
</modules>

<dependencies>
Expand Down
140 changes: 140 additions & 0 deletions streampark-flink/streampark-flink-cdcclient/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.streampark</groupId>
<artifactId>streampark-flink</artifactId>
<version>2.2.0-SNAPSHOT</version>
</parent>

<artifactId>streampark-flink-cdc-client_${scala.binary.version}</artifactId>
<version>2.2.0-SNAPSHOT</version>
<name>StreamPark : Flink CDC Client</name>

<properties>
<flink.cdc.version>3.2.1</flink.cdc.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-common</artifactId>
<version>${flink.cdc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-runtime</artifactId>
<version>${flink.cdc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-cli</artifactId>
<version>${flink.cdc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-composer</artifactId>
<version>${flink.cdc.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.streampark</groupId>
<artifactId>streampark-common_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-yarn</artifactId>
<version>1.18.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>

</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<id>shade-dist</id>
<goals>
<goal>shade</goal>
</goals>
<phase>package</phase>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<shadeTestJar>false</shadeTestJar>
<createDependencyReducedPom>false</createDependencyReducedPom>
<shadedArtifactAttached>false</shadedArtifactAttached>
<artifactSet>
<includes>
<include>*</include>
</includes>
</artifactSet>
<relocations>
<relocation>
<pattern>org.apache.commons.cli</pattern>
<shadedPattern>org.apache.flink.cdc.shaded.com.apache.commons.cli</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.calcite</pattern>
<shadedPattern>org.apache.flink.cdc.shaded.org.apache.calcite</shadedPattern>
</relocation>
</relocations>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.apache.streampark.flink.cdc.cli.CDCClient</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.flink.cdc.cli;

import org.apache.streampark.common.conf.ConfigKeys;
import org.apache.streampark.common.util.DeflaterUtils;
import org.apache.streampark.common.util.PropertiesUtils;

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.composer.PipelineExecution;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.util.StringUtils;
import org.apache.flink.yarn.configuration.YarnConfigOptions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;

/**
* cdc client
*/
public class CDCClient {

private static final Logger LOG = LoggerFactory.getLogger(CDCClient.class);

public static void main(String[] args) throws Exception {
ParameterTool parameter = ParameterTool.fromArgs(args);
Map<String, String> configMap = new HashMap<>();
String cdcYamlDecode = parameter.get(ConfigKeys.KEY_FLINK_SQL(null));
String appNameDecode = parameter.get(ConfigKeys.KEY_APP_NAME(null));
String flinkConfigDecode = parameter.get(ConfigKeys.KEY_FLINK_CONF(null));
String parallelism = parameter.get(ConfigKeys.KEY_FLINK_PARALLELISM(null));
if (StringUtils.isNullOrWhitespaceOnly(cdcYamlDecode)
|| StringUtils.isNullOrWhitespaceOnly(appNameDecode)
|| StringUtils.isNullOrWhitespaceOnly(flinkConfigDecode)) {
LOG.error("--flink.conf or --app.name or `cdc yaml` must not be null.");
return;
}

String cdcYaml = DeflaterUtils.unzipString(cdcYamlDecode);
String appName = DeflaterUtils.unzipString(appNameDecode);
String flinkConfigString = DeflaterUtils.unzipString(flinkConfigDecode);
configMap.putAll(PropertiesUtils.fromYamlTextAsJava(flinkConfigString));
configMap.put(YarnConfigOptions.APPLICATION_NAME.key(), appName);
configMap.put(CoreOptions.DEFAULT_PARALLELISM.key(), parallelism);
Configuration flinkConfig = Configuration.fromMap(configMap);
LOG.debug("Flink cdc config {}", flinkConfig);
LOG.debug("Flink cdc yaml {}", cdcYaml);
PipelineExecution.ExecutionInfo result =
new CDCExecutor(cdcYaml, flinkConfig, new ArrayList<>(), SavepointRestoreSettings.none()).run();
printExecutionInfo(result);

}

private static void printExecutionInfo(PipelineExecution.ExecutionInfo info) {
System.out.println("Pipeline has been submitted to cluster.");
System.out.printf("Job ID: %s\n", info.getId());
System.out.printf("Job Description: %s\n", info.getDescription());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.flink.cdc.cli;

import org.apache.flink.cdc.cli.parser.PipelineDefinitionParser;
import org.apache.flink.cdc.cli.parser.YamlPipelineDefinitionParser;
import org.apache.flink.cdc.cli.utils.FlinkEnvironmentUtils;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.composer.PipelineComposer;
import org.apache.flink.cdc.composer.PipelineExecution;
import org.apache.flink.cdc.composer.definition.PipelineDef;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;

import java.nio.file.Path;
import java.util.List;

/**
* cdc executor
*/
public class CDCExecutor {

private final String pipelineString;
private final Configuration configuration;
private final SavepointRestoreSettings savePointSettings;
private final List<Path> additionalJar;

private PipelineComposer composer;

public CDCExecutor(String pipelineString,
Configuration flinkConfig,
List<Path> additionalJar,
SavepointRestoreSettings savePointRestoreSettings) {
this.pipelineString = pipelineString;
this.configuration = flinkConfig;
this.additionalJar = additionalJar;
this.savePointSettings = savePointRestoreSettings;
}

public PipelineExecution.ExecutionInfo run() throws Exception {
PipelineDefinitionParser pipelineDefinitionParser = new YamlPipelineDefinitionParser();
PipelineDef pipelineDef = pipelineDefinitionParser.parse(pipelineString, configuration);
PipelineComposer composer = getComposer();
PipelineExecution execution = composer.compose(pipelineDef);
return execution.execute();
}

private PipelineComposer getComposer() throws Exception {
if (composer == null) {
return FlinkEnvironmentUtils.createComposer(
true, configuration, additionalJar, savePointSettings);
}
return composer;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.flink.cdc.cli;
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType
}
}

parameter.get(KEY_FLINK_CONF(), null) match {
case null | "" =>
throw new ExceptionInInitializerError(
"[StreamPark] Usage:can't find config,please set \"--flink.conf $conf \" in main arguments")
case conf => builder.withConfiguration(Configuration.fromMap(PropertiesUtils.fromYamlText(DeflaterUtils.unzipString(conf))))
}
val buildWith =
(parameter.get(KEY_FLINK_TABLE_CATALOG), parameter.get(KEY_FLINK_TABLE_DATABASE))
buildWith match {
Expand Down

0 comments on commit 7019777

Please sign in to comment.