diff --git a/docs/en/concept/sink-options-placeholders.md b/docs/en/concept/sink-options-placeholders.md index 093b0113ac60..88eada299fc8 100644 --- a/docs/en/concept/sink-options-placeholders.md +++ b/docs/en/concept/sink-options-placeholders.md @@ -1,4 +1,4 @@ -# Sink Options Placeholders.md +# Sink Options Placeholders ## Introduction @@ -40,7 +40,7 @@ The placeholders are mainly controlled by the following expressions: ## Configuration -Requires: +*Requires*: - Make sure the sink connector you are using has implemented `TableSinkFactory` API ### Example 1 diff --git a/docs/zh/concept/sink-options-placeholders.md b/docs/zh/concept/sink-options-placeholders.md new file mode 100644 index 000000000000..2553feb549fc --- /dev/null +++ b/docs/zh/concept/sink-options-placeholders.md @@ -0,0 +1,110 @@ +# Sink 参数占位符 + +## 介绍 + +SeaTunnel 提供了 Sink 参数占位符自动替换功能,可让您通过占位符获取上游表元数据。 + +当您需要动态获取上游表元数据(例如多表写入)时,此功能至关重要。 + +本文档将指导您如何使用这些占位符以及如何有效地利用它们。 + +## 支持的引擎 + +> SeaTunnel Zeta
+> Flink
+> Spark
+ +## 占位符变量 + +占位符主要通过以下表达式实现: + +- `${database_name}` + - 用于获取上游表中的数据库名称 + - 也可以通过表达式指定默认值:`${database_name:default_my_db}` +- `${schema_name}` + - 用于获取上游表中的 schema 名称 + - 也可以通过表达式指定默认值:`${schema_name:default_my_schema}` +- `${table_name}` + - 用于获取上游表中的 table 名称 + - 也可以通过表达式指定默认值:`${table_name:default_my_table}` +- `${schema_full_name}` + - 用于获取上游表中的 schema 全路径名称,包含 database/schema 名称 +- `${table_full_name}` + - 用于获取上游表中的 table 全路径名称,包含 database/schema/table 名称 +- `${primary_key}` + - 用于获取上游表中的主键字段名称列表 +- `${unique_key}` + - 用于获取上游表中的唯一键字段名称列表 +- `${field_names}` + - 用于获取上游表中的所有字段名称列表 + +## 配置 + +*先决条件*: +- 确认 Sink 连接器已经支持了 `TableSinkFactory` API + +### 配置示例 1 + +```hocon +env { + // ignore... +} +source { + MySQL-CDC { + // ignore... + } +} + +transform { + // ignore... +} + +sink { + jdbc { + url = "jdbc:mysql://localhost:3306" + driver = "com.mysql.cj.jdbc.Driver" + user = "root" + password = "123456" + + database = "${database_name}_test" + table = "${table_name}_test" + primary_keys = ["${primary_key}"] + } +} +``` + +### 配置示例 2 + +```hocon +env { + // ignore... +} +source { + Oracle-CDC { + // ignore... + } +} + +transform { + // ignore... +} + +sink { + jdbc { + url = "jdbc:mysql://localhost:3306" + driver = "com.mysql.cj.jdbc.Driver" + user = "root" + password = "123456" + + database = "${schema_name}_test" + table = "${table_name}_test" + primary_keys = ["${primary_key}"] + } +} +``` + +占位符的替换将在连接器启动之前完成,确保 Sink 参数在使用前已准备就绪。 +若该占位符变量没有被替换,则可能是上游表元数据缺少该选项,例如: +- `mysql` source 连接器不包含 `${schema_name}` 元数据 +- `oracle` source 连接器不包含 `${databse_name}` 元数据 +- ... diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/TablePlaceholder.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/TablePlaceholder.java index 280be7437b28..c8370e08c061 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/TablePlaceholder.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/TablePlaceholder.java @@ -26,6 +26,8 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -53,10 +55,6 @@ public class TablePlaceholder { public static final String NAME_DELIMITER = "."; public static final String FIELD_DELIMITER = ","; - @Deprecated - private static final List EXCLUDE_TABLE_PLACEHOLDER_KEYS = - Arrays.asList("save_mode_create_template"); - private static String replacePlaceholders(String input, String placeholderName, String value) { return replacePlaceholders(input, placeholderName, value, null); } @@ -176,10 +174,14 @@ public static String replaceTableFieldNames(String placeholder, TableSchema sche public static ReadonlyConfig replaceTablePlaceholder( ReadonlyConfig config, CatalogTable table) { + return replaceTablePlaceholder(config, table, Collections.emptyList()); + } + + public static ReadonlyConfig replaceTablePlaceholder( + ReadonlyConfig config, CatalogTable table, Collection excludeKeys) { Map copyOnWriteData = config.copyData(); for (String key : copyOnWriteData.keySet()) { - if (EXCLUDE_TABLE_PLACEHOLDER_KEYS.contains(key)) { - // TODO: Remove this compatibility config + if (excludeKeys.contains(key)) { continue; } Object value = copyOnWriteData.get(key); diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java index da7bebdae337..668ff2a43c84 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java @@ -123,7 +123,10 @@ SeaTunnelSink createAndPrepareSi discoverFactory(classLoader, TableSinkFactory.class, factoryIdentifier); TableSinkFactoryContext context = TableSinkFactoryContext.replacePlaceholderAndCreate( - catalogTable, config, classLoader); + catalogTable, + config, + classLoader, + factory.excludeTablePlaceholderReplaceKeys()); ConfigValidator.of(context.getOptions()).validate(factory.optionRule()); LOG.info( diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java index 97fba1f256ab..5ba125854b30 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java @@ -19,6 +19,9 @@ import org.apache.seatunnel.api.table.connector.TableSink; +import java.util.Collections; +import java.util.List; + /** * This is an SPI interface, used to create {@link TableSink}. Each plugin need to have it own * implementation. @@ -41,4 +44,9 @@ default TableSink createSink( throw new UnsupportedOperationException( "The Factory has not been implemented and the deprecated Plugin will be used."); } + + @Deprecated + default List excludeTablePlaceholderReplaceKeys() { + return Collections.emptyList(); + } } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactoryContext.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactoryContext.java index a1eddcdaa99f..9565bad6a031 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactoryContext.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactoryContext.java @@ -23,21 +23,27 @@ import lombok.Getter; +import java.util.Collection; + @Getter public class TableSinkFactoryContext extends TableFactoryContext { private final CatalogTable catalogTable; - public TableSinkFactoryContext( + protected TableSinkFactoryContext( CatalogTable catalogTable, ReadonlyConfig options, ClassLoader classLoader) { super(options, classLoader); this.catalogTable = catalogTable; } public static TableSinkFactoryContext replacePlaceholderAndCreate( - CatalogTable catalogTable, ReadonlyConfig options, ClassLoader classLoader) { + CatalogTable catalogTable, + ReadonlyConfig options, + ClassLoader classLoader, + Collection excludeTablePlaceholderReplaceKeys) { ReadonlyConfig rewriteConfig = - TablePlaceholder.replaceTablePlaceholder(options, catalogTable); + TablePlaceholder.replaceTablePlaceholder( + options, catalogTable, excludeTablePlaceholderReplaceKeys); return new TableSinkFactoryContext(catalogTable, rewriteConfig, classLoader); } } diff --git a/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/TablePlaceholderTest.java b/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/TablePlaceholderTest.java index 61f3f807f4b6..ac8ef1315262 100644 --- a/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/TablePlaceholderTest.java +++ b/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/TablePlaceholderTest.java @@ -92,6 +92,26 @@ public void testSinkOptionsWithNoTablePath() { Arrays.asList("f1", "f2", "f3", "f4", "f5"), newConfig.get(FIELD_NAMES_ARRAY)); } + @Test + public void testSinkOptionsWithExcludeKeys() { + ReadonlyConfig config = createConfig(); + CatalogTable table = createTestTableWithNoTablePath(); + ReadonlyConfig newConfig = + TablePlaceholder.replaceTablePlaceholder( + config, table, Arrays.asList(DATABASE.key())); + + Assertions.assertEquals("xyz_${database_name: default_db}_test", newConfig.get(DATABASE)); + Assertions.assertEquals("xyz_default_schema_test", newConfig.get(SCHEMA)); + Assertions.assertEquals("xyz_default_table_test", newConfig.get(TABLE)); + Assertions.assertEquals("f1,f2", newConfig.get(PRIMARY_KEY)); + Assertions.assertEquals("f3,f4", newConfig.get(UNIQUE_KEY)); + Assertions.assertEquals("f1,f2,f3,f4,f5", newConfig.get(FIELD_NAMES)); + Assertions.assertEquals(Arrays.asList("f1", "f2"), newConfig.get(PRIMARY_KEY_ARRAY)); + Assertions.assertEquals(Arrays.asList("f3", "f4"), newConfig.get(UNIQUE_KEY_ARRAY)); + Assertions.assertEquals( + Arrays.asList("f1", "f2", "f3", "f4", "f5"), newConfig.get(FIELD_NAMES_ARRAY)); + } + private static ReadonlyConfig createConfig() { Map configMap = new HashMap<>(); configMap.put(DATABASE.key(), "xyz_${database_name: default_db}_test"); diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java index fd404183832a..e1849c393414 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java @@ -35,6 +35,9 @@ import com.google.auto.service.AutoService; +import java.util.Arrays; +import java.util.List; + import static org.apache.seatunnel.connectors.doris.config.DorisOptions.DATABASE; import static org.apache.seatunnel.connectors.doris.config.DorisOptions.NEEDS_UNSUPPORTED_TYPE_CASTING; import static org.apache.seatunnel.connectors.doris.config.DorisOptions.TABLE; @@ -55,6 +58,11 @@ public OptionRule optionRule() { return DorisOptions.SINK_RULE.build(); } + @Override + public List excludeTablePlaceholderReplaceKeys() { + return Arrays.asList(DorisOptions.SAVE_MODE_CREATE_TEMPLATE.key()); + } + @Override public TableSink createSink( TableSinkFactoryContext context) { diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java index 3a6d1232fbf4..51f7486569be 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java @@ -34,6 +34,9 @@ import com.google.auto.service.AutoService; +import java.util.Arrays; +import java.util.List; + import static org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSinkOptions.DATA_SAVE_MODE; @AutoService(Factory.class) @@ -70,6 +73,11 @@ public OptionRule optionRule() { .build(); } + @Override + public List excludeTablePlaceholderReplaceKeys() { + return Arrays.asList(StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key()); + } + @Override public TableSink createSink(TableSinkFactoryContext context) { CatalogTable catalogTable = context.getCatalogTable(); diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java index d5ae77bba7d7..6a272aadb216 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java @@ -114,7 +114,9 @@ public List execute(List upstreamDataS TableSinkFactoryContext.replacePlaceholderAndCreate( stream.getCatalogTable(), ReadonlyConfig.fromConfig(sinkConfig), - classLoader); + classLoader, + ((TableSinkFactory) factory.get()) + .excludeTablePlaceholderReplaceKeys()); ConfigValidator.of(context.getOptions()).validate(factory.get().optionRule()); sink = ((TableSinkFactory) factory.get()).createSink(context).createSink(); sink.setJobContext(jobContext); diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java index eee00d22a9ac..14247464551e 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java @@ -115,7 +115,9 @@ public List execute(List upstreamDataS TableSinkFactoryContext.replacePlaceholderAndCreate( stream.getCatalogTable(), ReadonlyConfig.fromConfig(sinkConfig), - classLoader); + classLoader, + ((TableSinkFactory) factory.get()) + .excludeTablePlaceholderReplaceKeys()); ConfigValidator.of(context.getOptions()).validate(factory.get().optionRule()); sink = ((TableSinkFactory) factory.get()).createSink(context).createSink(); sink.setJobContext(jobContext); diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java index c0b13aaeaea1..7751286b227c 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java @@ -132,7 +132,9 @@ public List execute(List upstreamDataStreams TableSinkFactoryContext.replacePlaceholderAndCreate( datasetTableInfo.getCatalogTable(), ReadonlyConfig.fromConfig(sinkConfig), - classLoader); + classLoader, + ((TableSinkFactory) factory.get()) + .excludeTablePlaceholderReplaceKeys()); ConfigValidator.of(context.getOptions()).validate(factory.get().optionRule()); sink = ((TableSinkFactory) factory.get()).createSink(context).createSink(); sink.setJobContext(jobContext); diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java index 509b648f1f90..46b3233b00e5 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java @@ -133,7 +133,9 @@ public List execute(List upstreamDataStreams TableSinkFactoryContext.replacePlaceholderAndCreate( datasetTableInfo.getCatalogTable(), ReadonlyConfig.fromConfig(sinkConfig), - classLoader); + classLoader, + ((TableSinkFactory) factory.get()) + .excludeTablePlaceholderReplaceKeys()); ConfigValidator.of(context.getOptions()).validate(factory.get().optionRule()); sink = ((TableSinkFactory) factory.get()).createSink(context).createSink(); sink.setJobContext(jobContext); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java index ec07f02d365a..f8550a615afb 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java @@ -240,7 +240,10 @@ private CatalogTable assertCreateTable( DorisSinkFactory dorisSinkFactory = new DorisSinkFactory(); TableSinkFactoryContext context = TableSinkFactoryContext.replacePlaceholderAndCreate( - upstreamTable, config, Thread.currentThread().getContextClassLoader()); + upstreamTable, + config, + Thread.currentThread().getContextClassLoader(), + Collections.emptyList()); SupportSaveMode sink = (SupportSaveMode) dorisSinkFactory.createSink(context).createSink(); sink.getSaveModeHandler().get().handleSaveMode(); CatalogTable createdTable = catalog.getTable(TablePath.of(fullName)); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java index ac3208ea0b05..4c2ecc94e394 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java @@ -67,6 +67,7 @@ import java.time.LocalDateTime; import java.time.LocalTime; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -480,7 +481,10 @@ void defaultSinkParametersTest() throws IOException, SQLException, ClassNotFound ReadonlyConfig config1 = ReadonlyConfig.fromMap(map1); TableSinkFactoryContext context1 = TableSinkFactoryContext.replacePlaceholderAndCreate( - catalogTable, config1, Thread.currentThread().getContextClassLoader()); + catalogTable, + config1, + Thread.currentThread().getContextClassLoader(), + Collections.emptyList()); JdbcSink jdbcSink1 = (JdbcSink) new JdbcSinkFactory().createSink(context1).createSink(); Properties connectionProperties1 = getSinkProperties(jdbcSink1); Assertions.assertEquals(connectionProperties1.get("rewriteBatchedStatements"), "true"); @@ -491,7 +495,10 @@ void defaultSinkParametersTest() throws IOException, SQLException, ClassNotFound ReadonlyConfig config2 = ReadonlyConfig.fromMap(map2); TableSinkFactoryContext context2 = TableSinkFactoryContext.replacePlaceholderAndCreate( - catalogTable, config2, Thread.currentThread().getContextClassLoader()); + catalogTable, + config2, + Thread.currentThread().getContextClassLoader(), + Collections.emptyList()); JdbcSink jdbcSink2 = (JdbcSink) new JdbcSinkFactory().createSink(context2).createSink(); Properties connectionProperties2 = getSinkProperties(jdbcSink2); Assertions.assertEquals(connectionProperties2.get("rewriteBatchedStatements"), "false"); @@ -505,7 +512,10 @@ void defaultSinkParametersTest() throws IOException, SQLException, ClassNotFound ReadonlyConfig config3 = ReadonlyConfig.fromMap(map3); TableSinkFactoryContext context3 = TableSinkFactoryContext.replacePlaceholderAndCreate( - catalogTable, config3, Thread.currentThread().getContextClassLoader()); + catalogTable, + config3, + Thread.currentThread().getContextClassLoader(), + Collections.emptyList()); JdbcSink jdbcSink3 = (JdbcSink) new JdbcSinkFactory().createSink(context3).createSink(); Properties connectionProperties3 = getSinkProperties(jdbcSink3); Assertions.assertEquals(connectionProperties3.get("rewriteBatchedStatements"), "false"); @@ -520,7 +530,10 @@ void defaultSinkParametersTest() throws IOException, SQLException, ClassNotFound ReadonlyConfig config4 = ReadonlyConfig.fromMap(map4); TableSinkFactoryContext context4 = TableSinkFactoryContext.replacePlaceholderAndCreate( - catalogTable, config4, Thread.currentThread().getContextClassLoader()); + catalogTable, + config4, + Thread.currentThread().getContextClassLoader(), + Collections.emptyList()); JdbcSink jdbcSink4 = (JdbcSink) new JdbcSinkFactory().createSink(context4).createSink(); Properties connectionProperties4 = getSinkProperties(jdbcSink4); Assertions.assertEquals(connectionProperties4.get("useSSL"), "true");