Skip to content

Commit

Permalink
[Feature] Add CDC jar client for support cdc yaml
Browse files Browse the repository at this point in the history
  • Loading branch information
Mrart committed Jan 12, 2025
1 parent ddb6551 commit 6e0640b
Show file tree
Hide file tree
Showing 6 changed files with 281 additions and 0 deletions.
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
<module>streampark-flink</module>
<module>streampark-spark</module>
<module>streampark-console</module>
<module>streampark-flink/streampark-flink-cdcclient</module>
</modules>

<scm>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ object PropertiesUtils extends Logger {
}
}

def fromYamlTextToJava(text: String): java.util.Map[String, String] = {
val scalaMap = fromYamlText(text)
val javaMap: java.util.Map[String, String] = scalaMap.asJava
javaMap
}

def fromHoconText(conf: String): Map[String, String] = {
require(conf != null, s"[StreamPark] fromHoconText: Hocon content must not be null")
try parseHoconByReader(new StringReader(conf))
Expand Down
124 changes: 124 additions & 0 deletions streampark-flink/streampark-flink-cdcclient/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.streampark</groupId>
<artifactId>streampark</artifactId>
<version>2.2.0-SNAPSHOT</version>
</parent>

<artifactId>streampark-flink-cdc-client_${scala.binary.version}</artifactId>
<version>2.2.0-SNAPSHOT</version>
<name>StreamPark : Flink CDC Client</name>

<properties>
<flink.cdc.version>3.2.1</flink.cdc.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-common</artifactId>
<version>${flink.cdc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-runtime</artifactId>
<version>${flink.cdc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-cli</artifactId>
<version>${flink.cdc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-composer</artifactId>
<version>${flink.cdc.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.streampark</groupId>
<artifactId>streampark-common_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-yarn</artifactId>
<version>1.18.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>

</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<id>shade-dist</id>
<goals>
<goal>shade</goal>
</goals>
<phase>package</phase>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<shadeTestJar>false</shadeTestJar>
<createDependencyReducedPom>false</createDependencyReducedPom>
<shadedArtifactAttached>false</shadedArtifactAttached>
<artifactSet>
<includes>
<include>*</include>
</includes>
</artifactSet>
<relocations>
<relocation>
<pattern>org.apache.commons.cli</pattern>
<shadedPattern>org.apache.flink.cdc.shaded.com.apache.commons.cli</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.calcite</pattern>
<shadedPattern>org.apache.flink.cdc.shaded.org.apache.calcite</shadedPattern>
</relocation>
</relocations>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.apache.streampark.cdc.CDCClient</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.cdc.cli;

import org.apache.streampark.common.conf.ConfigKeys;
import org.apache.streampark.common.util.DeflaterUtils;
import org.apache.streampark.common.util.PropertiesUtils;

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.composer.PipelineExecution;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.util.StringUtils;
import org.apache.flink.yarn.configuration.YarnConfigOptions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;

/**
* cdc client
*/
public class CDCClient {

private static final Logger LOG = LoggerFactory.getLogger(CDCClient.class);

public static void main(String[] args) throws Exception {
ParameterTool parameter = ParameterTool.fromArgs(args);
Map<String, String> configMap = new HashMap<>();
String cdcYamlDecode = parameter.get(ConfigKeys.KEY_FLINK_SQL(null));
String appNameDecode = parameter.get(ConfigKeys.KEY_APP_NAME(null));
String flinkConfigDecode = parameter.get(ConfigKeys.KEY_FLINK_CONF(null));
String parallelism = parameter.get(ConfigKeys.KEY_FLINK_PARALLELISM(null));
if (StringUtils.isNullOrWhitespaceOnly(cdcYamlDecode)
|| StringUtils.isNullOrWhitespaceOnly(appNameDecode)
|| StringUtils.isNullOrWhitespaceOnly(flinkConfigDecode)) {
LOG.error("--flink.conf or --app.name must not be null.");
return;
}

String cdcYaml = DeflaterUtils.unzipString(cdcYamlDecode);
String appName = DeflaterUtils.unzipString(appNameDecode);
String flinkConfigString = DeflaterUtils.unzipString(flinkConfigDecode);
configMap.putAll(PropertiesUtils.fromYamlTextAsJava(flinkConfigString));
configMap.put(YarnConfigOptions.APPLICATION_NAME.key(), appName);
configMap.put(CoreOptions.DEFAULT_PARALLELISM.key(), parallelism);
Configuration flinkConfig = Configuration.fromMap(configMap);
LOG.debug("Flink cdc config {}", flinkConfig);
LOG.debug("Flink cdc yaml {}", cdcYaml);
PipelineExecution.ExecutionInfo result =
new CDCExecutor(cdcYaml, flinkConfig, new ArrayList<>(), SavepointRestoreSettings.none()).run();
printExecutionInfo(result);

}

private static void printExecutionInfo(PipelineExecution.ExecutionInfo info) {
System.out.println("Pipeline has been submitted to cluster.");
System.out.printf("Job ID: %s\n", info.getId());
System.out.printf("Job Description: %s\n", info.getDescription());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.cdc.cli;

import org.apache.flink.cdc.cli.parser.PipelineDefinitionParser;
import org.apache.flink.cdc.cli.parser.YamlPipelineDefinitionParser;
import org.apache.flink.cdc.cli.utils.FlinkEnvironmentUtils;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.composer.PipelineComposer;
import org.apache.flink.cdc.composer.PipelineExecution;
import org.apache.flink.cdc.composer.definition.PipelineDef;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;

import java.nio.file.Path;
import java.util.List;

/**
* cdc executor
*/
public class CDCExecutor {

private final String pipelineString;
private final Configuration configuration;
private final SavepointRestoreSettings savePointSettings;
private final List<Path> additionalJar;

private PipelineComposer composer;

public CDCExecutor(String pipelineString,
Configuration flinkConfig,
List<Path> additionalJar,
SavepointRestoreSettings savePointRestoreSettings) {
this.pipelineString = pipelineString;
this.configuration = flinkConfig;
this.additionalJar = additionalJar;
this.savePointSettings = savePointRestoreSettings;
}

public PipelineExecution.ExecutionInfo run() throws Exception {
PipelineDefinitionParser pipelineDefinitionParser = new YamlPipelineDefinitionParser();
PipelineDef pipelineDef = pipelineDefinitionParser.parse(pipelineString, configuration);
PipelineComposer composer = getComposer();
PipelineExecution execution = composer.compose(pipelineDef);
return execution.execute();
}

private PipelineComposer getComposer() throws Exception {
if (composer == null) {
return FlinkEnvironmentUtils.createComposer(
true, configuration, additionalJar, savePointSettings);
}
return composer;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package org.apache.streampark;

0 comments on commit 6e0640b

Please sign in to comment.