Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
hailin0 committed Jul 9, 2024
1 parent 0963b6a commit 6676ed9
Show file tree
Hide file tree
Showing 15 changed files with 210 additions and 21 deletions.
4 changes: 2 additions & 2 deletions docs/en/concept/sink-options-placeholders.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Sink Options Placeholders.md
# Sink Options Placeholders

## Introduction

Expand Down Expand Up @@ -40,7 +40,7 @@ The placeholders are mainly controlled by the following expressions:

## Configuration

Requires:
*Requires*:
- Make sure the sink connector you are using has implemented `TableSinkFactory` API

### Example 1
Expand Down
110 changes: 110 additions & 0 deletions docs/zh/concept/sink-options-placeholders.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# Sink 参数占位符

## 介绍

SeaTunnel 提供了 Sink 参数占位符自动替换功能,可让您通过占位符获取上游表元数据。

当您需要动态获取上游表元数据(例如多表写入)时,此功能至关重要。

本文档将指导您如何使用这些占位符以及如何有效地利用它们。

## 支持的引擎

> SeaTunnel Zeta<br/>
> Flink<br/>
> Spark<br/>
## 占位符变量

占位符主要通过以下表达式实现:

- `${database_name}`
- 用于获取上游表中的数据库名称
- 也可以通过表达式指定默认值:`${database_name:default_my_db}`
- `${schema_name}`
- 用于获取上游表中的 schema 名称
- 也可以通过表达式指定默认值:`${schema_name:default_my_schema}`
- `${table_name}`
- 用于获取上游表中的 table 名称
- 也可以通过表达式指定默认值:`${table_name:default_my_table}`
- `${schema_full_name}`
- 用于获取上游表中的 schema 全路径名称,包含 database/schema 名称
- `${table_full_name}`
- 用于获取上游表中的 table 全路径名称,包含 database/schema/table 名称
- `${primary_key}`
- 用于获取上游表中的主键字段名称列表
- `${unique_key}`
- 用于获取上游表中的唯一键字段名称列表
- `${field_names}`
- 用于获取上游表中的所有字段名称列表

## 配置

*先决条件*:
- 确认 Sink 连接器已经支持了 `TableSinkFactory` API

### 配置示例 1

```hocon
env {
// ignore...
}
source {
MySQL-CDC {
// ignore...
}
}
transform {
// ignore...
}
sink {
jdbc {
url = "jdbc:mysql://localhost:3306"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "123456"
database = "${database_name}_test"
table = "${table_name}_test"
primary_keys = ["${primary_key}"]
}
}
```

### 配置示例 2

```hocon
env {
// ignore...
}
source {
Oracle-CDC {
// ignore...
}
}
transform {
// ignore...
}
sink {
jdbc {
url = "jdbc:mysql://localhost:3306"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "123456"
database = "${schema_name}_test"
table = "${table_name}_test"
primary_keys = ["${primary_key}"]
}
}
```

占位符的替换将在连接器启动之前完成,确保 Sink 参数在使用前已准备就绪。
若该占位符变量没有被替换,则可能是上游表元数据缺少该选项,例如:
- `mysql` source 连接器不包含 `${schema_name}` 元数据
- `oracle` source 连接器不包含 `${databse_name}` 元数据
- ...
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -53,10 +55,6 @@ public class TablePlaceholder {
public static final String NAME_DELIMITER = ".";
public static final String FIELD_DELIMITER = ",";

@Deprecated
private static final List<String> EXCLUDE_TABLE_PLACEHOLDER_KEYS =
Arrays.asList("save_mode_create_template");

private static String replacePlaceholders(String input, String placeholderName, String value) {
return replacePlaceholders(input, placeholderName, value, null);
}
Expand Down Expand Up @@ -176,10 +174,14 @@ public static String replaceTableFieldNames(String placeholder, TableSchema sche

public static ReadonlyConfig replaceTablePlaceholder(
ReadonlyConfig config, CatalogTable table) {
return replaceTablePlaceholder(config, table, Collections.emptyList());
}

public static ReadonlyConfig replaceTablePlaceholder(
ReadonlyConfig config, CatalogTable table, Collection<String> excludeKeys) {
Map<String, Object> copyOnWriteData = config.copyData();
for (String key : copyOnWriteData.keySet()) {
if (EXCLUDE_TABLE_PLACEHOLDER_KEYS.contains(key)) {
// TODO: Remove this compatibility config
if (excludeKeys.contains(key)) {
continue;
}
Object value = copyOnWriteData.get(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,10 @@ SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> createAndPrepareSi
discoverFactory(classLoader, TableSinkFactory.class, factoryIdentifier);
TableSinkFactoryContext context =
TableSinkFactoryContext.replacePlaceholderAndCreate(
catalogTable, config, classLoader);
catalogTable,
config,
classLoader,
factory.excludeTablePlaceholderReplaceKeys());
ConfigValidator.of(context.getOptions()).validate(factory.optionRule());

LOG.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

import org.apache.seatunnel.api.table.connector.TableSink;

import java.util.Collections;
import java.util.List;

/**
* This is an SPI interface, used to create {@link TableSink}. Each plugin need to have it own
* implementation.
Expand All @@ -41,4 +44,9 @@ default TableSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> createSink(
throw new UnsupportedOperationException(
"The Factory has not been implemented and the deprecated Plugin will be used.");
}

@Deprecated
default List<String> excludeTablePlaceholderReplaceKeys() {
return Collections.emptyList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,27 @@

import lombok.Getter;

import java.util.Collection;

@Getter
public class TableSinkFactoryContext extends TableFactoryContext {

private final CatalogTable catalogTable;

public TableSinkFactoryContext(
protected TableSinkFactoryContext(
CatalogTable catalogTable, ReadonlyConfig options, ClassLoader classLoader) {
super(options, classLoader);
this.catalogTable = catalogTable;
}

public static TableSinkFactoryContext replacePlaceholderAndCreate(
CatalogTable catalogTable, ReadonlyConfig options, ClassLoader classLoader) {
CatalogTable catalogTable,
ReadonlyConfig options,
ClassLoader classLoader,
Collection<String> excludeTablePlaceholderReplaceKeys) {
ReadonlyConfig rewriteConfig =
TablePlaceholder.replaceTablePlaceholder(options, catalogTable);
TablePlaceholder.replaceTablePlaceholder(
options, catalogTable, excludeTablePlaceholderReplaceKeys);
return new TableSinkFactoryContext(catalogTable, rewriteConfig, classLoader);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,26 @@ public void testSinkOptionsWithNoTablePath() {
Arrays.asList("f1", "f2", "f3", "f4", "f5"), newConfig.get(FIELD_NAMES_ARRAY));
}

@Test
public void testSinkOptionsWithExcludeKeys() {
ReadonlyConfig config = createConfig();
CatalogTable table = createTestTableWithNoTablePath();
ReadonlyConfig newConfig =
TablePlaceholder.replaceTablePlaceholder(
config, table, Arrays.asList(DATABASE.key()));

Assertions.assertEquals("xyz_${database_name: default_db}_test", newConfig.get(DATABASE));
Assertions.assertEquals("xyz_default_schema_test", newConfig.get(SCHEMA));
Assertions.assertEquals("xyz_default_table_test", newConfig.get(TABLE));
Assertions.assertEquals("f1,f2", newConfig.get(PRIMARY_KEY));
Assertions.assertEquals("f3,f4", newConfig.get(UNIQUE_KEY));
Assertions.assertEquals("f1,f2,f3,f4,f5", newConfig.get(FIELD_NAMES));
Assertions.assertEquals(Arrays.asList("f1", "f2"), newConfig.get(PRIMARY_KEY_ARRAY));
Assertions.assertEquals(Arrays.asList("f3", "f4"), newConfig.get(UNIQUE_KEY_ARRAY));
Assertions.assertEquals(
Arrays.asList("f1", "f2", "f3", "f4", "f5"), newConfig.get(FIELD_NAMES_ARRAY));
}

private static ReadonlyConfig createConfig() {
Map<String, Object> configMap = new HashMap<>();
configMap.put(DATABASE.key(), "xyz_${database_name: default_db}_test");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@

import com.google.auto.service.AutoService;

import java.util.Arrays;
import java.util.List;

import static org.apache.seatunnel.connectors.doris.config.DorisOptions.DATABASE;
import static org.apache.seatunnel.connectors.doris.config.DorisOptions.NEEDS_UNSUPPORTED_TYPE_CASTING;
import static org.apache.seatunnel.connectors.doris.config.DorisOptions.TABLE;
Expand All @@ -55,6 +58,11 @@ public OptionRule optionRule() {
return DorisOptions.SINK_RULE.build();
}

@Override
public List<String> excludeTablePlaceholderReplaceKeys() {
return Arrays.asList(DorisOptions.SAVE_MODE_CREATE_TEMPLATE.key());
}

@Override
public TableSink<SeaTunnelRow, DorisSinkState, DorisCommitInfo, DorisCommitInfo> createSink(
TableSinkFactoryContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@

import com.google.auto.service.AutoService;

import java.util.Arrays;
import java.util.List;

import static org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSinkOptions.DATA_SAVE_MODE;

@AutoService(Factory.class)
Expand Down Expand Up @@ -70,6 +73,11 @@ public OptionRule optionRule() {
.build();
}

@Override
public List<String> excludeTablePlaceholderReplaceKeys() {
return Arrays.asList(StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key());
}

@Override
public TableSink createSink(TableSinkFactoryContext context) {
CatalogTable catalogTable = context.getCatalogTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,9 @@ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> upstreamDataS
TableSinkFactoryContext.replacePlaceholderAndCreate(
stream.getCatalogTable(),
ReadonlyConfig.fromConfig(sinkConfig),
classLoader);
classLoader,
((TableSinkFactory) factory.get())
.excludeTablePlaceholderReplaceKeys());
ConfigValidator.of(context.getOptions()).validate(factory.get().optionRule());
sink = ((TableSinkFactory) factory.get()).createSink(context).createSink();
sink.setJobContext(jobContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> upstreamDataS
TableSinkFactoryContext.replacePlaceholderAndCreate(
stream.getCatalogTable(),
ReadonlyConfig.fromConfig(sinkConfig),
classLoader);
classLoader,
((TableSinkFactory) factory.get())
.excludeTablePlaceholderReplaceKeys());
ConfigValidator.of(context.getOptions()).validate(factory.get().optionRule());
sink = ((TableSinkFactory) factory.get()).createSink(context).createSink();
sink.setJobContext(jobContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ public List<DatasetTableInfo> execute(List<DatasetTableInfo> upstreamDataStreams
TableSinkFactoryContext.replacePlaceholderAndCreate(
datasetTableInfo.getCatalogTable(),
ReadonlyConfig.fromConfig(sinkConfig),
classLoader);
classLoader,
((TableSinkFactory) factory.get())
.excludeTablePlaceholderReplaceKeys());
ConfigValidator.of(context.getOptions()).validate(factory.get().optionRule());
sink = ((TableSinkFactory) factory.get()).createSink(context).createSink();
sink.setJobContext(jobContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,9 @@ public List<DatasetTableInfo> execute(List<DatasetTableInfo> upstreamDataStreams
TableSinkFactoryContext.replacePlaceholderAndCreate(
datasetTableInfo.getCatalogTable(),
ReadonlyConfig.fromConfig(sinkConfig),
classLoader);
classLoader,
((TableSinkFactory) factory.get())
.excludeTablePlaceholderReplaceKeys());
ConfigValidator.of(context.getOptions()).validate(factory.get().optionRule());
sink = ((TableSinkFactory) factory.get()).createSink(context).createSink();
sink.setJobContext(jobContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,10 @@ private CatalogTable assertCreateTable(
DorisSinkFactory dorisSinkFactory = new DorisSinkFactory();
TableSinkFactoryContext context =
TableSinkFactoryContext.replacePlaceholderAndCreate(
upstreamTable, config, Thread.currentThread().getContextClassLoader());
upstreamTable,
config,
Thread.currentThread().getContextClassLoader(),
Collections.emptyList());
SupportSaveMode sink = (SupportSaveMode) dorisSinkFactory.createSink(context).createSink();
sink.getSaveModeHandler().get().handleSaveMode();
CatalogTable createdTable = catalog.getTable(TablePath.of(fullName));
Expand Down
Loading

0 comments on commit 6676ed9

Please sign in to comment.