From d272789bb54d551b97c3d2a6643a0e3077ecf1f5 Mon Sep 17 00:00:00 2001 From: shiwanming <1633138551@qq.com> Date: Mon, 19 Aug 2024 15:09:35 +0800 Subject: [PATCH 01/13] [Fix] [sink elasticsearch] Fix the issue of sink-es saveMode conflicting with Elasticsearch's automatic index creation #7430 --- .../java/org/apache/seatunnel/api/sink/DataSaveMode.java | 5 ++++- .../apache/seatunnel/api/sink/DefaultSaveModeHandler.java | 4 ++++ .../java/org/apache/seatunnel/api/sink/SchemaSaveMode.java | 3 +++ .../seatunnel/elasticsearch/config/SinkConfig.java | 3 ++- 4 files changed, 13 insertions(+), 2 deletions(-) diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java index 2babe412a79..64743fefbea 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java @@ -32,5 +32,8 @@ public enum DataSaveMode { CUSTOM_PROCESSING, // When there exist data, an error will be reported - ERROR_WHEN_DATA_EXISTS + ERROR_WHEN_DATA_EXISTS, + + // Ignore + IGNORE } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java index 051068dba03..7f7de2ef6c9 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java @@ -71,6 +71,8 @@ public void handleSchemaSaveMode() { case ERROR_WHEN_SCHEMA_NOT_EXIST: errorWhenSchemaNotExist(); break; + case IGNORE: + break; default: throw new UnsupportedOperationException("Unsupported save mode: " + schemaSaveMode); } @@ -91,6 +93,8 @@ public void handleDataSaveMode() { case ERROR_WHEN_DATA_EXISTS: errorWhenDataExists(); break; + case IGNORE: + break; default: throw new UnsupportedOperationException("Unsupported save mode: " + dataSaveMode); } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SchemaSaveMode.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SchemaSaveMode.java index f3da320d742..d114b909313 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SchemaSaveMode.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SchemaSaveMode.java @@ -27,4 +27,7 @@ public enum SchemaSaveMode { // Error will be reported when the table does not exist ERROR_WHEN_SCHEMA_NOT_EXIST, + + // Ignore + IGNORE } diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java index fdb0300aab5..1c2464e273c 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java @@ -28,6 +28,7 @@ import static org.apache.seatunnel.api.sink.DataSaveMode.APPEND_DATA; import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA; import static org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS; +import static org.apache.seatunnel.api.sink.DataSaveMode.IGNORE; public class SinkConfig { @@ -80,7 +81,7 @@ public class SinkConfig { Options.key("data_save_mode") .singleChoice( DataSaveMode.class, - Arrays.asList(DROP_DATA, APPEND_DATA, ERROR_WHEN_DATA_EXISTS)) + Arrays.asList(DROP_DATA, APPEND_DATA, ERROR_WHEN_DATA_EXISTS, IGNORE)) .defaultValue(APPEND_DATA) .withDescription("data_save_mode"); } From 4ebc66bdd3dd61228bc95d7e01258971f5b007d7 Mon Sep 17 00:00:00 2001 From: shiwanming <1633138551@qq.com> Date: Wed, 21 Aug 2024 20:10:55 +0800 Subject: [PATCH 02/13] [Fix] [sink elasticsearch] Fix the issue of sink-es saveMode and es automatic index creation conflict #7430 --- .../java/org/apache/seatunnel/api/sink/DataSaveMode.java | 5 +---- .../apache/seatunnel/api/sink/DefaultSaveModeHandler.java | 2 -- .../seatunnel/elasticsearch/config/SinkConfig.java | 3 +-- 3 files changed, 2 insertions(+), 8 deletions(-) diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java index 64743fefbea..2babe412a79 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java @@ -32,8 +32,5 @@ public enum DataSaveMode { CUSTOM_PROCESSING, // When there exist data, an error will be reported - ERROR_WHEN_DATA_EXISTS, - - // Ignore - IGNORE + ERROR_WHEN_DATA_EXISTS } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java index 7f7de2ef6c9..da5d0281ce7 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java @@ -93,8 +93,6 @@ public void handleDataSaveMode() { case ERROR_WHEN_DATA_EXISTS: errorWhenDataExists(); break; - case IGNORE: - break; default: throw new UnsupportedOperationException("Unsupported save mode: " + dataSaveMode); } diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java index 1c2464e273c..fdb0300aab5 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java @@ -28,7 +28,6 @@ import static org.apache.seatunnel.api.sink.DataSaveMode.APPEND_DATA; import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA; import static org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS; -import static org.apache.seatunnel.api.sink.DataSaveMode.IGNORE; public class SinkConfig { @@ -81,7 +80,7 @@ public class SinkConfig { Options.key("data_save_mode") .singleChoice( DataSaveMode.class, - Arrays.asList(DROP_DATA, APPEND_DATA, ERROR_WHEN_DATA_EXISTS, IGNORE)) + Arrays.asList(DROP_DATA, APPEND_DATA, ERROR_WHEN_DATA_EXISTS)) .defaultValue(APPEND_DATA) .withDescription("data_save_mode"); } From 6810d80ad95eefdc7489fb5187f91189febda5d0 Mon Sep 17 00:00:00 2001 From: shiwanming <1633138551@qq.com> Date: Mon, 19 Aug 2024 15:09:35 +0800 Subject: [PATCH 03/13] [Bug] [sink elasticsearch] the savemode of sink-es conficts with es automatically creating indexes based on templates #7430 --- .../java/org/apache/seatunnel/api/sink/DataSaveMode.java | 5 ++++- .../apache/seatunnel/api/sink/DefaultSaveModeHandler.java | 2 ++ .../seatunnel/elasticsearch/config/SinkConfig.java | 3 ++- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java index 2babe412a79..64743fefbea 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java @@ -32,5 +32,8 @@ public enum DataSaveMode { CUSTOM_PROCESSING, // When there exist data, an error will be reported - ERROR_WHEN_DATA_EXISTS + ERROR_WHEN_DATA_EXISTS, + + // Ignore + IGNORE } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java index da5d0281ce7..7f7de2ef6c9 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java @@ -93,6 +93,8 @@ public void handleDataSaveMode() { case ERROR_WHEN_DATA_EXISTS: errorWhenDataExists(); break; + case IGNORE: + break; default: throw new UnsupportedOperationException("Unsupported save mode: " + dataSaveMode); } diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java index fdb0300aab5..1c2464e273c 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java @@ -28,6 +28,7 @@ import static org.apache.seatunnel.api.sink.DataSaveMode.APPEND_DATA; import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA; import static org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS; +import static org.apache.seatunnel.api.sink.DataSaveMode.IGNORE; public class SinkConfig { @@ -80,7 +81,7 @@ public class SinkConfig { Options.key("data_save_mode") .singleChoice( DataSaveMode.class, - Arrays.asList(DROP_DATA, APPEND_DATA, ERROR_WHEN_DATA_EXISTS)) + Arrays.asList(DROP_DATA, APPEND_DATA, ERROR_WHEN_DATA_EXISTS, IGNORE)) .defaultValue(APPEND_DATA) .withDescription("data_save_mode"); } From ac50c3e63c59f38273c0335b1b60274d370513b9 Mon Sep 17 00:00:00 2001 From: shiwanming <1633138551@qq.com> Date: Wed, 21 Aug 2024 21:16:22 +0800 Subject: [PATCH 04/13] [Fix] [sink elasticsearch] Fix the issue of sink-es saveMode and es automatic index creation conflict #7430 --- .../java/org/apache/seatunnel/api/sink/DataSaveMode.java | 5 +---- .../apache/seatunnel/api/sink/DefaultSaveModeHandler.java | 2 -- .../seatunnel/elasticsearch/config/SinkConfig.java | 3 +-- 3 files changed, 2 insertions(+), 8 deletions(-) diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java index 64743fefbea..2babe412a79 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java @@ -32,8 +32,5 @@ public enum DataSaveMode { CUSTOM_PROCESSING, // When there exist data, an error will be reported - ERROR_WHEN_DATA_EXISTS, - - // Ignore - IGNORE + ERROR_WHEN_DATA_EXISTS } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java index 7f7de2ef6c9..da5d0281ce7 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java @@ -93,8 +93,6 @@ public void handleDataSaveMode() { case ERROR_WHEN_DATA_EXISTS: errorWhenDataExists(); break; - case IGNORE: - break; default: throw new UnsupportedOperationException("Unsupported save mode: " + dataSaveMode); } diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java index 1c2464e273c..fdb0300aab5 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java @@ -28,7 +28,6 @@ import static org.apache.seatunnel.api.sink.DataSaveMode.APPEND_DATA; import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA; import static org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS; -import static org.apache.seatunnel.api.sink.DataSaveMode.IGNORE; public class SinkConfig { @@ -81,7 +80,7 @@ public class SinkConfig { Options.key("data_save_mode") .singleChoice( DataSaveMode.class, - Arrays.asList(DROP_DATA, APPEND_DATA, ERROR_WHEN_DATA_EXISTS, IGNORE)) + Arrays.asList(DROP_DATA, APPEND_DATA, ERROR_WHEN_DATA_EXISTS)) .defaultValue(APPEND_DATA) .withDescription("data_save_mode"); } From 6e8ccc9e8e7fd0f13fbfc7c48d3daa84141855ae Mon Sep 17 00:00:00 2001 From: shiwanming <1633138551@qq.com> Date: Thu, 22 Aug 2024 07:55:21 +0800 Subject: [PATCH 05/13] [Fix] [sink elasticsearch] Fix the issue of sink-es saveMode and es automatic index creation conflict #7430 --- .../main/java/org/apache/seatunnel/api/sink/SchemaSaveMode.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SchemaSaveMode.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SchemaSaveMode.java index d114b909313..cee39ca8e63 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SchemaSaveMode.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SchemaSaveMode.java @@ -28,6 +28,6 @@ public enum SchemaSaveMode { // Error will be reported when the table does not exist ERROR_WHEN_SCHEMA_NOT_EXIST, - // Ignore + // Ignore creation IGNORE } From 39360385afb3fee6d6f1d9e024e9c5c2f223a77c Mon Sep 17 00:00:00 2001 From: shiwanming <1633138551@qq.com> Date: Thu, 22 Aug 2024 14:04:51 +0800 Subject: [PATCH 06/13] [Doc] Add IGNORE savemode type into docment #7443 --- docs/en/connector-v2/sink/Doris.md | 3 ++- docs/en/connector-v2/sink/Elasticsearch.md | 13 +++++++------ docs/en/connector-v2/sink/Jdbc.md | 3 ++- docs/en/connector-v2/sink/LocalFile.md | 1 + docs/en/connector-v2/sink/PostgreSql.md | 3 ++- docs/en/connector-v2/sink/S3File.md | 3 ++- docs/en/connector-v2/sink/StarRocks.md | 3 ++- docs/zh/connector-v2/sink/Doris.md | 15 ++++++++------- docs/zh/connector-v2/sink/Elasticsearch.md | 1 + docs/zh/connector-v2/sink/Jdbc.md | 1 + docs/zh/connector-v2/sink/StarRocks.md | 9 +++++---- 11 files changed, 33 insertions(+), 22 deletions(-) diff --git a/docs/en/connector-v2/sink/Doris.md b/docs/en/connector-v2/sink/Doris.md index 592cd8702be..18915ac7b86 100644 --- a/docs/en/connector-v2/sink/Doris.md +++ b/docs/en/connector-v2/sink/Doris.md @@ -57,7 +57,8 @@ Before the synchronous task is turned on, different treatment schemes are select Option introduction: `RECREATE_SCHEMA` :Will create when the table does not exist, delete and rebuild when the table is saved `CREATE_SCHEMA_WHEN_NOT_EXIST` :Will Created when the table does not exist, skipped when the table is saved -`ERROR_WHEN_SCHEMA_NOT_EXIST` :Error will be reported when the table does not exist +`ERROR_WHEN_SCHEMA_NOT_EXIST` :Error will be reported when the table does not exist +`IGNORE` :Ignore the treatment of the table ### data_save_mode[Enum] diff --git a/docs/en/connector-v2/sink/Elasticsearch.md b/docs/en/connector-v2/sink/Elasticsearch.md index e03a2522300..b252f574daf 100644 --- a/docs/en/connector-v2/sink/Elasticsearch.md +++ b/docs/en/connector-v2/sink/Elasticsearch.md @@ -109,17 +109,18 @@ Sink plugin common parameters, please refer to [Sink Common Options](../sink-com Before the synchronous task is turned on, different treatment schemes are selected for the existing surface structure of the target side. Option introduction: -RECREATE_SCHEMA :Will create when the table does not exist, delete and rebuild when the table is saved -CREATE_SCHEMA_WHEN_NOT_EXIST :Will Created when the table does not exist, skipped when the table is saved -ERROR_WHEN_SCHEMA_NOT_EXIST :Error will be reported when the table does not exist +`RECREATE_SCHEMA` :Will create when the table does not exist, delete and rebuild when the table is saved +`CREATE_SCHEMA_WHEN_NOT_EXIST` :Will Created when the table does not exist, skipped when the table is saved +`ERROR_WHEN_SCHEMA_NOT_EXIST` :Error will be reported when the table does not exist +`IGNORE` :Ignore the treatment of the table ### data_save_mode Before the synchronous task is turned on, different processing schemes are selected for data existing data on the target side. Option introduction: -DROP_DATA: Preserve database structure and delete data -APPEND_DATA:Preserve database structure, preserve data -ERROR_WHEN_DATA_EXISTS:When there is data, an error is reported +`DROP_DATA`: Preserve database structure and delete data +`APPEND_DATA`:Preserve database structure, preserve data +`ERROR_WHEN_DATA_EXISTS`:When there is data, an error is reported ## Examples diff --git a/docs/en/connector-v2/sink/Jdbc.md b/docs/en/connector-v2/sink/Jdbc.md index b52c42f6310..99f06891a14 100644 --- a/docs/en/connector-v2/sink/Jdbc.md +++ b/docs/en/connector-v2/sink/Jdbc.md @@ -183,7 +183,8 @@ Before the synchronous task is turned on, different treatment schemes are select Option introduction: `RECREATE_SCHEMA` :Will create when the table does not exist, delete and rebuild when the table is saved `CREATE_SCHEMA_WHEN_NOT_EXIST` :Will Created when the table does not exist, skipped when the table is saved -`ERROR_WHEN_SCHEMA_NOT_EXIST` :Error will be reported when the table does not exist +`ERROR_WHEN_SCHEMA_NOT_EXIST` :Error will be reported when the table does not exist +`IGNORE` :Ignore the treatment of the table ### data_save_mode [Enum] diff --git a/docs/en/connector-v2/sink/LocalFile.md b/docs/en/connector-v2/sink/LocalFile.md index b1746e2492e..8e773fe6dd3 100644 --- a/docs/en/connector-v2/sink/LocalFile.md +++ b/docs/en/connector-v2/sink/LocalFile.md @@ -214,6 +214,7 @@ Existing dir processing method. - RECREATE_SCHEMA: will create when the dir does not exist, delete and recreate when the dir is exist - CREATE_SCHEMA_WHEN_NOT_EXIST: will create when the dir does not exist, skipped when the dir is exist - ERROR_WHEN_SCHEMA_NOT_EXIST: error will be reported when the dir does not exist +- IGNORE :Ignore the treatment of the table ### data_save_mode [string] diff --git a/docs/en/connector-v2/sink/PostgreSql.md b/docs/en/connector-v2/sink/PostgreSql.md index 545f0d176e6..cde299f6734 100644 --- a/docs/en/connector-v2/sink/PostgreSql.md +++ b/docs/en/connector-v2/sink/PostgreSql.md @@ -118,7 +118,8 @@ Before the synchronous task is turned on, different treatment schemes are select Option introduction: `RECREATE_SCHEMA` :Will create when the table does not exist, delete and rebuild when the table is saved `CREATE_SCHEMA_WHEN_NOT_EXIST` :Will Created when the table does not exist, skipped when the table is saved -`ERROR_WHEN_SCHEMA_NOT_EXIST` :Error will be reported when the table does not exist +`ERROR_WHEN_SCHEMA_NOT_EXIST` :Error will be reported when the table does not exist +`IGNORE` :Ignore the treatment of the table ### data_save_mode[Enum] diff --git a/docs/en/connector-v2/sink/S3File.md b/docs/en/connector-v2/sink/S3File.md index 90fb6636460..007c9395f7d 100644 --- a/docs/en/connector-v2/sink/S3File.md +++ b/docs/en/connector-v2/sink/S3File.md @@ -281,7 +281,8 @@ Before turning on the synchronous task, do different treatment of the target pat Option introduction: `RECREATE_SCHEMA` :Will be created when the path does not exist. If the path already exists, delete the path and recreate it. `CREATE_SCHEMA_WHEN_NOT_EXIST` :Will Created when the path does not exist, use the path when the path is existed. -`ERROR_WHEN_SCHEMA_NOT_EXIST` :Error will be reported when the path does not exist +`ERROR_WHEN_SCHEMA_NOT_EXIST` :Error will be reported when the path does not exist +`IGNORE` :Ignore the treatment of the table ### data_save_mode[Enum] diff --git a/docs/en/connector-v2/sink/StarRocks.md b/docs/en/connector-v2/sink/StarRocks.md index 5fe57cd3f4e..1d143884e66 100644 --- a/docs/en/connector-v2/sink/StarRocks.md +++ b/docs/en/connector-v2/sink/StarRocks.md @@ -107,7 +107,8 @@ Before the synchronous task is turned on, different treatment schemes are select Option introduction: `RECREATE_SCHEMA` :Will create when the table does not exist, delete and rebuild when the table is saved `CREATE_SCHEMA_WHEN_NOT_EXIST` :Will Created when the table does not exist, skipped when the table is saved -`ERROR_WHEN_SCHEMA_NOT_EXIST` :Error will be reported when the table does not exist +`ERROR_WHEN_SCHEMA_NOT_EXIST` :Error will be reported when the table does not exist +`IGNORE` :Ignore the treatment of the table ### data_save_mode[Enum] diff --git a/docs/zh/connector-v2/sink/Doris.md b/docs/zh/connector-v2/sink/Doris.md index afc470326f5..b35cd63f4ef 100644 --- a/docs/zh/connector-v2/sink/Doris.md +++ b/docs/zh/connector-v2/sink/Doris.md @@ -53,18 +53,19 @@ Doris Sink连接器的内部实现是通过stream load批量缓存和导入的 ### schema_save_mode[Enum] 在开启同步任务之前,针对现有的表结构选择不同的处理方案。 -选项介绍: +选项介绍: `RECREATE_SCHEMA` :表不存在时创建,表保存时删除并重建。 -`CREATE_SCHEMA_WHEN_NOT_EXIST` :表不存在时会创建,表存在时跳过。 -`ERROR_WHEN_SCHEMA_NOT_EXIST` :表不存在时会报错。 +`CREATE_SCHEMA_WHEN_NOT_EXIST` :表不存在时会创建,表存在时跳过。 +`ERROR_WHEN_SCHEMA_NOT_EXIST` :表不存在时会报错。 +`IGNORE` :忽略对表的处理。 ### data_save_mode[Enum] 在开启同步任务之前,针对目标端已有的数据选择不同的处理方案。 -选项介绍: -`DROP_DATA`: 保留数据库结构并删除数据。 -`APPEND_DATA`:保留数据库结构,保留数据。 -`CUSTOM_PROCESSING`:用户自定义处理。 +选项介绍: +`DROP_DATA`: 保留数据库结构并删除数据。 +`APPEND_DATA`:保留数据库结构,保留数据。 +`CUSTOM_PROCESSING`:用户自定义处理。 `ERROR_WHEN_DATA_EXISTS`:有数据时报错。 ### save_mode_create_template diff --git a/docs/zh/connector-v2/sink/Elasticsearch.md b/docs/zh/connector-v2/sink/Elasticsearch.md index 2d614918f91..8682d262274 100644 --- a/docs/zh/connector-v2/sink/Elasticsearch.md +++ b/docs/zh/connector-v2/sink/Elasticsearch.md @@ -111,6 +111,7 @@ Sink插件常用参数,请参考 [Sink常用选项](../sink-common-options.md) `RECREATE_SCHEMA` :当表不存在时会创建,当表已存在时会删除并重建
`CREATE_SCHEMA_WHEN_NOT_EXIST` :当表不存在时会创建,当表已存在时则跳过创建
`ERROR_WHEN_SCHEMA_NOT_EXIST` :当表不存在时将抛出错误
+`IGNORE` :忽略对表的处理
### data_save_mode diff --git a/docs/zh/connector-v2/sink/Jdbc.md b/docs/zh/connector-v2/sink/Jdbc.md index f24e56f1f70..b05ecbc501c 100644 --- a/docs/zh/connector-v2/sink/Jdbc.md +++ b/docs/zh/connector-v2/sink/Jdbc.md @@ -178,6 +178,7 @@ Sink插件常用参数,请参考 [Sink常用选项](../sink-common-options.md) `RECREATE_SCHEMA`:当表不存在时会创建,当表已存在时会删除并重建
`CREATE_SCHEMA_WHEN_NOT_EXIST`:当表不存在时会创建,当表已存在时则跳过创建
`ERROR_WHEN_SCHEMA_NOT_EXIST`:当表不存在时将抛出错误
+`IGNORE` :忽略对表的处理
### data_save_mode [Enum] diff --git a/docs/zh/connector-v2/sink/StarRocks.md b/docs/zh/connector-v2/sink/StarRocks.md index 6be7ff7e8e0..f5848a3b71a 100644 --- a/docs/zh/connector-v2/sink/StarRocks.md +++ b/docs/zh/connector-v2/sink/StarRocks.md @@ -99,10 +99,11 @@ table选项参数可以填入一任意表名,这个名字最终会被用作目 ### schema_save_mode[Enum] -在同步任务打开之前,针对目标端已存在的表结构选择不同的处理方法。可选值有: -`RECREATE_SCHEMA` :不存在的表会直接创建,已存在的表会删除并根据参数重新创建 -`CREATE_SCHEMA_WHEN_NOT_EXIST` :忽略已存在的表,不存在的表会直接创建 -`ERROR_WHEN_SCHEMA_NOT_EXIST` :当有不存在的表时会直接报错 +在同步任务打开之前,针对目标端已存在的表结构选择不同的处理方法。可选值有: +`RECREATE_SCHEMA` :不存在的表会直接创建,已存在的表会删除并根据参数重新创建 +`CREATE_SCHEMA_WHEN_NOT_EXIST` :忽略已存在的表,不存在的表会直接创建 +`ERROR_WHEN_SCHEMA_NOT_EXIST` :当有不存在的表时会直接报错 +`IGNORE` :忽略对表的处理 ### data_save_mode[Enum] From 03286ea0c919b13aa98dbabc1e78cccdc43db525 Mon Sep 17 00:00:00 2001 From: Shiwanming <1633138551@qq.com> Date: Mon, 6 Jan 2025 22:59:07 +0800 Subject: [PATCH 07/13] starrocks-sink SupportMultiTableSink --- .../connectors/seatunnel/starrocks/sink/StarRocksSink.java | 3 ++- .../seatunnel/starrocks/sink/StarRocksSinkWriter.java | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java index 1f6ffcd0769..bb0259ec905 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java @@ -22,6 +22,7 @@ import org.apache.seatunnel.api.sink.SaveModeHandler; import org.apache.seatunnel.api.sink.SchemaSaveMode; import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.sink.SupportMultiTableSink; import org.apache.seatunnel.api.sink.SupportSaveMode; import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSink; import org.apache.seatunnel.api.table.catalog.Catalog; @@ -40,7 +41,7 @@ import java.util.Optional; public class StarRocksSink extends AbstractSimpleSink - implements SupportSaveMode, SupportSchemaEvolutionSink { + implements SupportSaveMode, SupportSchemaEvolutionSink, SupportMultiTableSink { private final TableSchema tableSchema; private final SinkConfig sinkConfig; diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java index d3664087312..612e6e2b72d 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.connectors.seatunnel.starrocks.sink; +import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter; import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSinkWriter; import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.catalog.TableSchema; @@ -46,7 +47,7 @@ @Slf4j public class StarRocksSinkWriter extends AbstractSinkWriter - implements SupportSchemaEvolutionSinkWriter { + implements SupportMultiTableSinkWriter, SupportSchemaEvolutionSinkWriter { private StarRocksISerializer serializer; private StarRocksSinkManager manager; private TableSchema tableSchema; From 36dbbe3f50d945562748b4abe679102a610f2233 Mon Sep 17 00:00:00 2001 From: Shiwanming <1633138551@qq.com> Date: Tue, 7 Jan 2025 22:09:16 +0800 Subject: [PATCH 08/13] add e2e --- .../starrocks/sink/StarRocksSinkFactory.java | 2 + .../starrocks/StarRocksMultiSinkIT.java | 304 ++++++++++++++++++ .../src/test/resources/ddl/store.sql | 78 +++++ ..._multi_source_to_multi_sink_streaming.conf | 54 ++++ 4 files changed, 438 insertions(+) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksMultiSinkIT.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/ddl/store.sql create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/mysql_multi_source_to_multi_sink_streaming.conf diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java index bc851a91ed3..6057eb97af9 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java @@ -37,6 +37,7 @@ import java.util.Arrays; import java.util.List; +import static org.apache.seatunnel.api.sink.SinkCommonOptions.MULTI_TABLE_SINK_REPLICA; import static org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSinkOptions.DATA_SAVE_MODE; @AutoService(Factory.class) @@ -64,6 +65,7 @@ public OptionRule optionRule() { StarRocksSinkOptions.ENABLE_UPSERT_DELETE, StarRocksSinkOptions.SCHEMA_SAVE_MODE, StarRocksSinkOptions.DATA_SAVE_MODE, + MULTI_TABLE_SINK_REPLICA, StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE, StarRocksSinkOptions.HTTP_SOCKET_TIMEOUT_MS) .conditional( diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksMultiSinkIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksMultiSinkIT.java new file mode 100644 index 00000000000..33a1d081f10 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksMultiSinkIT.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.starrocks; + +import org.apache.seatunnel.shade.com.google.common.collect.Lists; + +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer; +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion; +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase; +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; +import org.apache.seatunnel.e2e.common.container.EngineType; +import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; +import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; +import org.apache.seatunnel.e2e.common.util.JobIdGenerator; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerLoggerFactory; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.net.URL; +import java.net.URLClassLoader; +import java.sql.Connection; +import java.sql.Driver; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +import static org.awaitility.Awaitility.given; + +@Slf4j +@DisabledOnContainer( + value = {}, + type = {EngineType.SPARK, EngineType.FLINK}, + disabledReason = + "Currently SPARK do not support cdc. In addition, currently only the zeta engine supports schema evolution for pr https://github.com/apache/seatunnel/pull/5125.") +public class StarRocksMultiSinkIT extends TestSuiteBase implements TestResource { + private static final String DATABASE = "store"; + private static final String MYSQL_HOST = "mysql_cdc_e2e"; + private static final String MYSQL_USER_NAME = "mysqluser"; + private static final String MYSQL_USER_PASSWORD = "mysqlpw"; + + private static final String DOCKER_IMAGE = "starrocks/allin1-ubuntu:3.3.1"; + private static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver"; + private static final String HOST = "starrocks_cdc_e2e"; + private static final int SR_PROXY_PORT = 8080; + private static final int QUERY_PORT = 9030; + private static final int HTTP_PORT = 8030; + private static final int BE_HTTP_PORT = 8040; + private static final String USERNAME = "root"; + private static final String PASSWORD = ""; + private static final String CREATE_DATABASE = "CREATE DATABASE IF NOT EXISTS " + DATABASE; + private static final String SR_DRIVER_JAR = + "https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.32/mysql-connector-j-8.0.32.jar"; + + private Connection starRocksConnection; + private Connection mysqlConnection; + private GenericContainer starRocksServer; + + public static final DateTimeFormatter DATE_TIME_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + + private static final String QUERY = "select * from %s.%s order by id"; + + private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0); + + private final UniqueDatabase shopDatabase = + new UniqueDatabase(MYSQL_CONTAINER, DATABASE, "mysqluser", "mysqlpw", DATABASE); + + @TestContainerExtension + private final ContainerExtendedFactory extendedFactory = + container -> { + Container.ExecResult extraCommands = + container.execInContainer( + "bash", + "-c", + "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O " + + SR_DRIVER_JAR); + Assertions.assertEquals(0, extraCommands.getExitCode()); + }; + + private static MySqlContainer createMySqlContainer(MySqlVersion version) { + return new MySqlContainer(version) + .withConfigurationOverride("docker/server-gtids/my.cnf") + .withSetupSQL("docker/setup.sql") + .withNetwork(NETWORK) + .withNetworkAliases(MYSQL_HOST) + .withDatabaseName(DATABASE) + .withUsername(MYSQL_USER_NAME) + .withPassword(MYSQL_USER_PASSWORD) + .withLogConsumer( + new Slf4jLogConsumer(DockerLoggerFactory.getLogger("mysql-docker-image"))); + } + + private void initializeJdbcConnection() throws Exception { + URLClassLoader urlClassLoader = + new URLClassLoader( + new URL[] {new URL(SR_DRIVER_JAR)}, + StarRocksCDCSinkIT.class.getClassLoader()); + Thread.currentThread().setContextClassLoader(urlClassLoader); + Driver driver = (Driver) urlClassLoader.loadClass(DRIVER_CLASS).newInstance(); + Properties props = new Properties(); + props.put("user", USERNAME); + props.put("password", PASSWORD); + starRocksConnection = + driver.connect( + String.format("jdbc:mysql://%s:%s", starRocksServer.getHost(), QUERY_PORT), + props); + } + + private void initializeStarRocksServer() { + starRocksServer = + new GenericContainer<>(DOCKER_IMAGE) + .withNetwork(NETWORK) + .withNetworkAliases(HOST) + .withLogConsumer( + new Slf4jLogConsumer(DockerLoggerFactory.getLogger(DOCKER_IMAGE))); + starRocksServer.setPortBindings( + Lists.newArrayList( + String.format("%s:%s", QUERY_PORT, QUERY_PORT), + String.format("%s:%s", HTTP_PORT, HTTP_PORT), + String.format("%s:%s", BE_HTTP_PORT, BE_HTTP_PORT))); + Startables.deepStart(Stream.of(starRocksServer)).join(); + log.info("StarRocks container started"); + // wait for starrocks fully start + given().ignoreExceptions() + .await() + .atMost(360, TimeUnit.SECONDS) + .untilAsserted(this::initializeJdbcConnection); + } + + @TestTemplate + public void testStarRocksMultiTableSinkCase(TestContainer container) + throws InterruptedException, IOException, SQLException { + String jobId = String.valueOf(JobIdGenerator.newJobId()); + String jobConfigFile = "/mysql_multi_source_to_multi_sink_streaming.conf"; + CompletableFuture.runAsync( + () -> { + try { + container.executeJob(jobConfigFile, jobId); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + }); + TimeUnit.SECONDS.sleep(30); + + // verify multi table sink + verifyDataConsistency("orders"); + verifyDataConsistency("customers"); + verifyDataConsistency("products"); + + insertNewDataIntoMySQL(); + insertNewDataIntoMySQL(); + TimeUnit.SECONDS.sleep(10); + verifyDataConsistency("orders"); + // savepoint 1 + Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode()); + TimeUnit.SECONDS.sleep(5); + insertNewDataIntoMySQL(); + // restore 1 + CompletableFuture.supplyAsync( + () -> { + try { + container.restoreJob(jobConfigFile, jobId); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + return null; + }); + insertNewDataIntoMySQL(); + TimeUnit.SECONDS.sleep(20); + verifyDataConsistency("orders"); + } + + private void verifyDataConsistency(String tableName) { + List> mysqlData = + query(String.format(QUERY, DATABASE, tableName), mysqlConnection); + List> starRocksData = + query(String.format(QUERY, DATABASE, tableName), starRocksConnection); + Assertions.assertEquals( + mysqlData, starRocksData, "Data consistency check failed for table: " + tableName); + } + + private void insertNewDataIntoMySQL() throws SQLException { + mysqlConnection + .createStatement() + .execute( + "INSERT INTO orders (id, customer_id, order_date, total_amount, status) " + + "VALUES (null, 1, '2025-01-04 13:00:00', 498.99, 'pending')"); + } + + private Connection getMysqlJdbcConnection() throws SQLException { + return DriverManager.getConnection( + MYSQL_CONTAINER.getJdbcUrl(), + MYSQL_CONTAINER.getUsername(), + MYSQL_CONTAINER.getPassword()); + } + + @BeforeAll + @Override + public void startUp() throws SQLException { + initializeStarRocksServer(); + log.info("The second stage: Starting Mysql containers..."); + Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join(); + log.info("Mysql Containers are started"); + shopDatabase.createAndInitialize(); + log.info("Mysql ddl execution is complete"); + initializeJdbcTable(); + mysqlConnection = getMysqlJdbcConnection(); + } + + @AfterAll + @Override + public void tearDown() throws SQLException { + if (MYSQL_CONTAINER != null) { + MYSQL_CONTAINER.close(); + } + if (starRocksServer != null) { + starRocksServer.close(); + } + if (starRocksConnection != null) { + starRocksConnection.close(); + } + if (mysqlConnection != null) { + mysqlConnection.close(); + } + } + + private void initializeJdbcTable() { + try (Statement statement = starRocksConnection.createStatement()) { + // create databases + statement.execute(CREATE_DATABASE); + } catch (SQLException e) { + throw new RuntimeException("Initializing table failed!", e); + } + } + + private List> query(String sql, Connection connection) { + try { + ResultSet resultSet = connection.createStatement().executeQuery(sql); + List> result = new ArrayList<>(); + int columnCount = resultSet.getMetaData().getColumnCount(); + while (resultSet.next()) { + ArrayList objects = new ArrayList<>(); + for (int i = 1; i <= columnCount; i++) { + if (resultSet.getObject(i) instanceof Timestamp) { + Timestamp timestamp = resultSet.getTimestamp(i); + objects.add(timestamp.toLocalDateTime().format(DATE_TIME_FORMATTER)); + break; + } + if (resultSet.getObject(i) instanceof LocalDateTime) { + LocalDateTime localDateTime = resultSet.getObject(i, LocalDateTime.class); + objects.add(localDateTime.format(DATE_TIME_FORMATTER)); + break; + } + objects.add(resultSet.getObject(i)); + } + log.debug(String.format("Print query, sql: %s, data: %s", sql, objects)); + result.add(objects); + } + return result; + } catch (SQLException e) { + throw new RuntimeException(e); + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/ddl/store.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/ddl/store.sql new file mode 100644 index 00000000000..59bb6d9f4e8 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/ddl/store.sql @@ -0,0 +1,78 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: store +-- ---------------------------------------------------------------------------------------------------------------- + +CREATE DATABASE IF NOT EXISTS store; +USE store; + +drop table if exists orders; + +CREATE TABLE orders ( + id BIGINT AUTO_INCREMENT PRIMARY KEY, + customer_id BIGINT NOT NULL, + order_date DATETIME NOT NULL, + total_amount DECIMAL ( 10, 2 ) NOT NULL, + STATUS VARCHAR ( 50 ) DEFAULT 'pending', + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP +); + +drop table if exists customers; + +CREATE TABLE customers ( + id BIGINT PRIMARY KEY, + NAME VARCHAR ( 255 ) NOT NULL, + email VARCHAR ( 255 ) NOT NULL, + phone VARCHAR ( 50 ), + address TEXT, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP +); + +drop table if exists products; + +CREATE TABLE products ( + id BIGINT PRIMARY KEY, + NAME VARCHAR ( 255 ) NOT NULL, + description TEXT, + price DECIMAL ( 10, 2 ) NOT NULL, + stock INT NOT NULL DEFAULT 0, + category VARCHAR ( 100 ), + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP +); + +INSERT INTO orders ( id, customer_id, order_date, total_amount, STATUS ) +VALUES + ( 1, 1, '2024-01-01 10:00:00', 299.99, 'completed' ), + ( 2, 2, '2024-01-02 11:00:00', 199.99, 'completed' ), + ( 3, 3, '2024-01-03 12:00:00', 399.99, 'processing' ); + +INSERT INTO customers ( id, NAME, email, phone, address ) +VALUES + ( 1, 'John Doe', 'john@example.com', '123-456-7890', '123 Main St' ), + ( 2, 'Jane Smith', 'jane@example.com', '234-567-8901', '456 Oak Ave' ), + ( 3, 'Bob Johnson', 'bob@example.com', '345-678-9012', '789 Pine Rd' ); + +INSERT INTO products ( id, NAME, description, price, stock, category ) +VALUES + ( 1, 'Laptop', 'High performance laptop', 999.99, 50, 'Electronics' ), + ( 2, 'Smartphone', '5G enabled smartphone', 699.99, 100, 'Electronics' ), + ( 3, 'Headphones', 'Wireless noise cancelling', 199.99, 200, 'Accessories' ); \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/mysql_multi_source_to_multi_sink_streaming.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/mysql_multi_source_to_multi_sink_streaming.conf new file mode 100644 index 00000000000..f586d980036 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/mysql_multi_source_to_multi_sink_streaming.conf @@ -0,0 +1,54 @@ + + +env { + parallelism = 1 + checkpoint.interval = 2000 + checkpoint.timeout=600000 + job.mode = "STREAMING" +} + +source { + MySQL-CDC { + username = "st_user_source" + password = "mysqlpw" + table-names = [ + "store.orders", + "store.products", + "store.customers" + ] + base-url = "jdbc:mysql://mysql_cdc_e2e:3306/store" + schema-changes.enabled = true + } +} + +sink { + StarRocks { + multi_table_sink_replica = 2 + # docker allin1 environment can use port 8080 8040 instead of port FE 8030 + nodeUrls = ["starrocks_cdc_e2e:8040"] + username = "root" + password = "" + database = "store" + table = "${table_name}" + base-url = "jdbc:mysql://starrocks_cdc_e2e:9030/store" + max_retries = 3 + enable_upsert_delete = true + schema_save_mode="RECREATE_SCHEMA" + data_save_mode="DROP_DATA" + save_mode_create_template = """ + CREATE TABLE IF NOT EXISTS store.`${table_name}` ( + ${rowtype_primary_key}, + ${rowtype_fields} + ) ENGINE=OLAP + PRIMARY KEY (${rowtype_primary_key}) + DISTRIBUTED BY HASH (${rowtype_primary_key}) + PROPERTIES ( + "replication_num" = "1", + "in_memory" = "false", + "enable_persistent_index" = "true", + "replicated_storage" = "true", + "compression" = "LZ4" + ) + """ + } +} \ No newline at end of file From f607c46a7bd25e9066d5ee4a2af3c88a71d20ae9 Mon Sep 17 00:00:00 2001 From: Shiwanming <1633138551@qq.com> Date: Tue, 7 Jan 2025 22:22:35 +0800 Subject: [PATCH 09/13] add license --- ...ql_multi_source_to_multi_sink_streaming.conf | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/mysql_multi_source_to_multi_sink_streaming.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/mysql_multi_source_to_multi_sink_streaming.conf index f586d980036..9fd0313a844 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/mysql_multi_source_to_multi_sink_streaming.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/mysql_multi_source_to_multi_sink_streaming.conf @@ -1,4 +1,19 @@ - +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# env { parallelism = 1 From 774af0356d225d065984a7411ebc407c6868a7c0 Mon Sep 17 00:00:00 2001 From: Shiwanming <1633138551@qq.com> Date: Wed, 8 Jan 2025 11:33:40 +0800 Subject: [PATCH 10/13] optimize e2e --- .../starrocks/StarRocksMultiSinkIT.java | 28 +++++++++++-------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksMultiSinkIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksMultiSinkIT.java index 33a1d081f10..0f4a57dd483 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksMultiSinkIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksMultiSinkIT.java @@ -62,6 +62,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Stream; +import static org.awaitility.Awaitility.await; import static org.awaitility.Awaitility.given; @Slf4j @@ -179,7 +180,7 @@ public void testStarRocksMultiTableSinkCase(TestContainer container) throw new RuntimeException(e); } }); - TimeUnit.SECONDS.sleep(30); + TimeUnit.SECONDS.sleep(10); // verify multi table sink verifyDataConsistency("orders"); @@ -188,13 +189,12 @@ public void testStarRocksMultiTableSinkCase(TestContainer container) insertNewDataIntoMySQL(); insertNewDataIntoMySQL(); - TimeUnit.SECONDS.sleep(10); + // verify incremental verifyDataConsistency("orders"); - // savepoint 1 + // savepoint Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode()); - TimeUnit.SECONDS.sleep(5); insertNewDataIntoMySQL(); - // restore 1 + // restore CompletableFuture.supplyAsync( () -> { try { @@ -206,17 +206,21 @@ public void testStarRocksMultiTableSinkCase(TestContainer container) return null; }); insertNewDataIntoMySQL(); - TimeUnit.SECONDS.sleep(20); + // verify restore verifyDataConsistency("orders"); } private void verifyDataConsistency(String tableName) { - List> mysqlData = - query(String.format(QUERY, DATABASE, tableName), mysqlConnection); - List> starRocksData = - query(String.format(QUERY, DATABASE, tableName), starRocksConnection); - Assertions.assertEquals( - mysqlData, starRocksData, "Data consistency check failed for table: " + tableName); + await().atMost(10000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertIterableEquals( + query( + String.format(QUERY, DATABASE, tableName), + mysqlConnection), + query( + String.format(QUERY, DATABASE, tableName), + starRocksConnection))); } private void insertNewDataIntoMySQL() throws SQLException { From 32604c3b4ad5c950f4ab197201f754b54f9f2b80 Mon Sep 17 00:00:00 2001 From: Shiwanming <1633138551@qq.com> Date: Wed, 8 Jan 2025 23:27:05 +0800 Subject: [PATCH 11/13] optimize e2e --- .../starrocks/StarRocksMultiSinkIT.java | 4 +- .../src/test/resources/ddl/shop.sql | 38 ++++++++- .../src/test/resources/ddl/store.sql | 78 ------------------- ..._multi_source_to_multi_sink_streaming.conf | 69 ---------------- ...qlcdc_to_starrocks_with_schema_change.conf | 6 +- 5 files changed, 41 insertions(+), 154 deletions(-) delete mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/ddl/store.sql delete mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/mysql_multi_source_to_multi_sink_streaming.conf diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksMultiSinkIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksMultiSinkIT.java index 0f4a57dd483..b799a272b51 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksMultiSinkIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksMultiSinkIT.java @@ -72,7 +72,7 @@ disabledReason = "Currently SPARK do not support cdc. In addition, currently only the zeta engine supports schema evolution for pr https://github.com/apache/seatunnel/pull/5125.") public class StarRocksMultiSinkIT extends TestSuiteBase implements TestResource { - private static final String DATABASE = "store"; + private static final String DATABASE = "shop"; private static final String MYSQL_HOST = "mysql_cdc_e2e"; private static final String MYSQL_USER_NAME = "mysqluser"; private static final String MYSQL_USER_PASSWORD = "mysqlpw"; @@ -170,7 +170,7 @@ private void initializeStarRocksServer() { public void testStarRocksMultiTableSinkCase(TestContainer container) throws InterruptedException, IOException, SQLException { String jobId = String.valueOf(JobIdGenerator.newJobId()); - String jobConfigFile = "/mysql_multi_source_to_multi_sink_streaming.conf"; + String jobConfigFile = "/mysqlcdc_to_starrocks_with_schema_change.conf"; CompletableFuture.runAsync( () -> { try { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/ddl/shop.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/ddl/shop.sql index be2eaaeca9e..d554893534c 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/ddl/shop.sql +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/ddl/shop.sql @@ -30,6 +30,30 @@ CREATE TABLE products ( weight FLOAT ); +drop table if exists orders; + +CREATE TABLE orders ( + id BIGINT AUTO_INCREMENT PRIMARY KEY, + customer_id BIGINT NOT NULL, + order_date DATETIME NOT NULL, + total_amount DECIMAL ( 10, 2 ) NOT NULL, + STATUS VARCHAR ( 50 ) DEFAULT 'pending', + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP +); + +drop table if exists customers; + +CREATE TABLE customers ( + id BIGINT PRIMARY KEY, + NAME VARCHAR ( 255 ) NOT NULL, + email VARCHAR ( 255 ) NOT NULL, + phone VARCHAR ( 50 ), + address TEXT, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP +); + ALTER TABLE products AUTO_INCREMENT = 101; INSERT INTO products @@ -41,4 +65,16 @@ VALUES (101,"scooter","Small 2-wheel scooter",3.14), (106,"hammer","16oz carpenter's hammer",1.0), (107,"rocks","box of assorted rocks",5.3), (108,"jacket","water resistent black wind breaker",0.1), - (109,"spare tire","24 inch spare tire",22.2); \ No newline at end of file + (109,"spare tire","24 inch spare tire",22.2); + +INSERT INTO orders ( id, customer_id, order_date, total_amount, STATUS ) +VALUES + ( 1, 1, '2024-01-01 10:00:00', 299.99, 'completed' ), + ( 2, 2, '2024-01-02 11:00:00', 199.99, 'completed' ), + ( 3, 3, '2024-01-03 12:00:00', 399.99, 'processing' ); + +INSERT INTO customers ( id, NAME, email, phone, address ) +VALUES + ( 1, 'John Doe', 'john@example.com', '123-456-7890', '123 Main St' ), + ( 2, 'Jane Smith', 'jane@example.com', '234-567-8901', '456 Oak Ave' ), + ( 3, 'Bob Johnson', 'bob@example.com', '345-678-9012', '789 Pine Rd' ); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/ddl/store.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/ddl/store.sql deleted file mode 100644 index 59bb6d9f4e8..00000000000 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/ddl/store.sql +++ /dev/null @@ -1,78 +0,0 @@ --- --- Licensed to the Apache Software Foundation (ASF) under one or more --- contributor license agreements. See the NOTICE file distributed with --- this work for additional information regarding copyright ownership. --- The ASF licenses this file to You under the Apache License, Version 2.0 --- (the "License"); you may not use this file except in compliance with --- the License. You may obtain a copy of the License at --- --- http://www.apache.org/licenses/LICENSE-2.0 --- --- Unless required by applicable law or agreed to in writing, software --- distributed under the License is distributed on an "AS IS" BASIS, --- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. --- See the License for the specific language governing permissions and --- limitations under the License. --- - --- ---------------------------------------------------------------------------------------------------------------- --- DATABASE: store --- ---------------------------------------------------------------------------------------------------------------- - -CREATE DATABASE IF NOT EXISTS store; -USE store; - -drop table if exists orders; - -CREATE TABLE orders ( - id BIGINT AUTO_INCREMENT PRIMARY KEY, - customer_id BIGINT NOT NULL, - order_date DATETIME NOT NULL, - total_amount DECIMAL ( 10, 2 ) NOT NULL, - STATUS VARCHAR ( 50 ) DEFAULT 'pending', - created_at DATETIME DEFAULT CURRENT_TIMESTAMP, - updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP -); - -drop table if exists customers; - -CREATE TABLE customers ( - id BIGINT PRIMARY KEY, - NAME VARCHAR ( 255 ) NOT NULL, - email VARCHAR ( 255 ) NOT NULL, - phone VARCHAR ( 50 ), - address TEXT, - created_at DATETIME DEFAULT CURRENT_TIMESTAMP, - updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP -); - -drop table if exists products; - -CREATE TABLE products ( - id BIGINT PRIMARY KEY, - NAME VARCHAR ( 255 ) NOT NULL, - description TEXT, - price DECIMAL ( 10, 2 ) NOT NULL, - stock INT NOT NULL DEFAULT 0, - category VARCHAR ( 100 ), - created_at DATETIME DEFAULT CURRENT_TIMESTAMP, - updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP -); - -INSERT INTO orders ( id, customer_id, order_date, total_amount, STATUS ) -VALUES - ( 1, 1, '2024-01-01 10:00:00', 299.99, 'completed' ), - ( 2, 2, '2024-01-02 11:00:00', 199.99, 'completed' ), - ( 3, 3, '2024-01-03 12:00:00', 399.99, 'processing' ); - -INSERT INTO customers ( id, NAME, email, phone, address ) -VALUES - ( 1, 'John Doe', 'john@example.com', '123-456-7890', '123 Main St' ), - ( 2, 'Jane Smith', 'jane@example.com', '234-567-8901', '456 Oak Ave' ), - ( 3, 'Bob Johnson', 'bob@example.com', '345-678-9012', '789 Pine Rd' ); - -INSERT INTO products ( id, NAME, description, price, stock, category ) -VALUES - ( 1, 'Laptop', 'High performance laptop', 999.99, 50, 'Electronics' ), - ( 2, 'Smartphone', '5G enabled smartphone', 699.99, 100, 'Electronics' ), - ( 3, 'Headphones', 'Wireless noise cancelling', 199.99, 200, 'Accessories' ); \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/mysql_multi_source_to_multi_sink_streaming.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/mysql_multi_source_to_multi_sink_streaming.conf deleted file mode 100644 index 9fd0313a844..00000000000 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/mysql_multi_source_to_multi_sink_streaming.conf +++ /dev/null @@ -1,69 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -env { - parallelism = 1 - checkpoint.interval = 2000 - checkpoint.timeout=600000 - job.mode = "STREAMING" -} - -source { - MySQL-CDC { - username = "st_user_source" - password = "mysqlpw" - table-names = [ - "store.orders", - "store.products", - "store.customers" - ] - base-url = "jdbc:mysql://mysql_cdc_e2e:3306/store" - schema-changes.enabled = true - } -} - -sink { - StarRocks { - multi_table_sink_replica = 2 - # docker allin1 environment can use port 8080 8040 instead of port FE 8030 - nodeUrls = ["starrocks_cdc_e2e:8040"] - username = "root" - password = "" - database = "store" - table = "${table_name}" - base-url = "jdbc:mysql://starrocks_cdc_e2e:9030/store" - max_retries = 3 - enable_upsert_delete = true - schema_save_mode="RECREATE_SCHEMA" - data_save_mode="DROP_DATA" - save_mode_create_template = """ - CREATE TABLE IF NOT EXISTS store.`${table_name}` ( - ${rowtype_primary_key}, - ${rowtype_fields} - ) ENGINE=OLAP - PRIMARY KEY (${rowtype_primary_key}) - DISTRIBUTED BY HASH (${rowtype_primary_key}) - PROPERTIES ( - "replication_num" = "1", - "in_memory" = "false", - "enable_persistent_index" = "true", - "replicated_storage" = "true", - "compression" = "LZ4" - ) - """ - } -} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/mysqlcdc_to_starrocks_with_schema_change.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/mysqlcdc_to_starrocks_with_schema_change.conf index 76d86a4e8ca..33fafcdf5fc 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/mysqlcdc_to_starrocks_with_schema_change.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/mysqlcdc_to_starrocks_with_schema_change.conf @@ -21,16 +21,14 @@ env { # You can set engine configuration here job.mode = "STREAMING" - checkpoint.interval = 5000 - read_limit.bytes_per_second=7000000 - read_limit.rows_per_second=400 + checkpoint.interval = 2000 } source { MySQL-CDC { username = "st_user_source" password = "mysqlpw" - table-names = ["shop.products"] + table-names = ["shop.products", "shop.orders", "shop.customers"] base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop" schema-changes.enabled = true From 188cfe6f801a1790369a358723d3c41b12566bc8 Mon Sep 17 00:00:00 2001 From: Shiwanming <1633138551@qq.com> Date: Thu, 9 Jan 2025 09:04:41 +0800 Subject: [PATCH 12/13] optimize e2e --- .../src/test/resources/ddl/shop.sql | 36 +++++++++ .../src/test/resources/ddl/shop.sql | 80 ------------------- 2 files changed, 36 insertions(+), 80 deletions(-) delete mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/ddl/shop.sql diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/shop.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/shop.sql index f97d5852f3a..bd1eaaa7242 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/shop.sql +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/shop.sql @@ -30,6 +30,30 @@ CREATE TABLE products ( weight FLOAT ); +drop table if exists orders; + +CREATE TABLE orders ( + id BIGINT AUTO_INCREMENT PRIMARY KEY, + customer_id BIGINT NOT NULL, + order_date DATETIME NOT NULL, + total_amount DECIMAL ( 10, 2 ) NOT NULL, + STATUS VARCHAR ( 50 ) DEFAULT 'pending', + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP +); + +drop table if exists customers; + +CREATE TABLE customers ( + id BIGINT PRIMARY KEY, + NAME VARCHAR ( 255 ) NOT NULL, + email VARCHAR ( 255 ) NOT NULL, + phone VARCHAR ( 50 ), + address TEXT, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP +); + drop table if exists mysql_cdc_e2e_sink_table_with_schema_change; CREATE TABLE if not exists mysql_cdc_e2e_sink_table_with_schema_change ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, @@ -59,6 +83,18 @@ VALUES (101,"scooter","Small 2-wheel scooter",3.14), (108,"jacket","water resistent black wind breaker",0.1), (109,"spare tire","24 inch spare tire",22.2); +INSERT INTO orders ( id, customer_id, order_date, total_amount, STATUS ) +VALUES + ( 1, 1, '2024-01-01 10:00:00', 299.99, 'completed' ), + ( 2, 2, '2024-01-02 11:00:00', 199.99, 'completed' ), + ( 3, 3, '2024-01-03 12:00:00', 399.99, 'processing' ); + +INSERT INTO customers ( id, NAME, email, phone, address ) +VALUES + ( 1, 'John Doe', 'john@example.com', '123-456-7890', '123 Main St' ), + ( 2, 'Jane Smith', 'jane@example.com', '234-567-8901', '456 Oak Ave' ), + ( 3, 'Bob Johnson', 'bob@example.com', '345-678-9012', '789 Pine Rd' ); + drop table if exists products_on_hand; CREATE TABLE products_on_hand ( diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/ddl/shop.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/ddl/shop.sql deleted file mode 100644 index d554893534c..00000000000 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/ddl/shop.sql +++ /dev/null @@ -1,80 +0,0 @@ --- --- Licensed to the Apache Software Foundation (ASF) under one or more --- contributor license agreements. See the NOTICE file distributed with --- this work for additional information regarding copyright ownership. --- The ASF licenses this file to You under the Apache License, Version 2.0 --- (the "License"); you may not use this file except in compliance with --- the License. You may obtain a copy of the License at --- --- http://www.apache.org/licenses/LICENSE-2.0 --- --- Unless required by applicable law or agreed to in writing, software --- distributed under the License is distributed on an "AS IS" BASIS, --- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. --- See the License for the specific language governing permissions and --- limitations under the License. --- - --- ---------------------------------------------------------------------------------------------------------------- --- DATABASE: shop --- ---------------------------------------------------------------------------------------------------------------- -CREATE DATABASE IF NOT EXISTS `shop`; -use shop; - -drop table if exists products; --- Create and populate our products using a single insert with many rows -CREATE TABLE products ( - id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, - name VARCHAR(255) NOT NULL DEFAULT 'SeaTunnel', - description VARCHAR(512), - weight FLOAT -); - -drop table if exists orders; - -CREATE TABLE orders ( - id BIGINT AUTO_INCREMENT PRIMARY KEY, - customer_id BIGINT NOT NULL, - order_date DATETIME NOT NULL, - total_amount DECIMAL ( 10, 2 ) NOT NULL, - STATUS VARCHAR ( 50 ) DEFAULT 'pending', - created_at DATETIME DEFAULT CURRENT_TIMESTAMP, - updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP -); - -drop table if exists customers; - -CREATE TABLE customers ( - id BIGINT PRIMARY KEY, - NAME VARCHAR ( 255 ) NOT NULL, - email VARCHAR ( 255 ) NOT NULL, - phone VARCHAR ( 50 ), - address TEXT, - created_at DATETIME DEFAULT CURRENT_TIMESTAMP, - updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP -); - -ALTER TABLE products AUTO_INCREMENT = 101; - -INSERT INTO products -VALUES (101,"scooter","Small 2-wheel scooter",3.14), - (102,"car battery","12V car battery",8.1), - (103,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8), - (104,"hammer","12oz carpenter's hammer",0.75), - (105,"hammer","14oz carpenter's hammer",0.875), - (106,"hammer","16oz carpenter's hammer",1.0), - (107,"rocks","box of assorted rocks",5.3), - (108,"jacket","water resistent black wind breaker",0.1), - (109,"spare tire","24 inch spare tire",22.2); - -INSERT INTO orders ( id, customer_id, order_date, total_amount, STATUS ) -VALUES - ( 1, 1, '2024-01-01 10:00:00', 299.99, 'completed' ), - ( 2, 2, '2024-01-02 11:00:00', 199.99, 'completed' ), - ( 3, 3, '2024-01-03 12:00:00', 399.99, 'processing' ); - -INSERT INTO customers ( id, NAME, email, phone, address ) -VALUES - ( 1, 'John Doe', 'john@example.com', '123-456-7890', '123 Main St' ), - ( 2, 'Jane Smith', 'jane@example.com', '234-567-8901', '456 Oak Ave' ), - ( 3, 'Bob Johnson', 'bob@example.com', '345-678-9012', '789 Pine Rd' ); From 412568ede8844de66883fd6760afed41428b7f1e Mon Sep 17 00:00:00 2001 From: Shiwanming <1633138551@qq.com> Date: Thu, 9 Jan 2025 10:33:05 +0800 Subject: [PATCH 13/13] optimize e2e --- .../src/test/resources/ddl/shop.sql | 37 --- .../starrocks/StarRocksMultiSinkIT.java | 308 ------------------ .../starrocks/StarRocksSchemaChangeIT.java | 38 ++- .../src/test/resources/ddl/shop.sql | 80 +++++ 4 files changed, 116 insertions(+), 347 deletions(-) delete mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksMultiSinkIT.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/ddl/shop.sql diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/shop.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/shop.sql index bd1eaaa7242..9887b4e6877 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/shop.sql +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/shop.sql @@ -30,30 +30,6 @@ CREATE TABLE products ( weight FLOAT ); -drop table if exists orders; - -CREATE TABLE orders ( - id BIGINT AUTO_INCREMENT PRIMARY KEY, - customer_id BIGINT NOT NULL, - order_date DATETIME NOT NULL, - total_amount DECIMAL ( 10, 2 ) NOT NULL, - STATUS VARCHAR ( 50 ) DEFAULT 'pending', - created_at DATETIME DEFAULT CURRENT_TIMESTAMP, - updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP -); - -drop table if exists customers; - -CREATE TABLE customers ( - id BIGINT PRIMARY KEY, - NAME VARCHAR ( 255 ) NOT NULL, - email VARCHAR ( 255 ) NOT NULL, - phone VARCHAR ( 50 ), - address TEXT, - created_at DATETIME DEFAULT CURRENT_TIMESTAMP, - updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP -); - drop table if exists mysql_cdc_e2e_sink_table_with_schema_change; CREATE TABLE if not exists mysql_cdc_e2e_sink_table_with_schema_change ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, @@ -83,19 +59,6 @@ VALUES (101,"scooter","Small 2-wheel scooter",3.14), (108,"jacket","water resistent black wind breaker",0.1), (109,"spare tire","24 inch spare tire",22.2); -INSERT INTO orders ( id, customer_id, order_date, total_amount, STATUS ) -VALUES - ( 1, 1, '2024-01-01 10:00:00', 299.99, 'completed' ), - ( 2, 2, '2024-01-02 11:00:00', 199.99, 'completed' ), - ( 3, 3, '2024-01-03 12:00:00', 399.99, 'processing' ); - -INSERT INTO customers ( id, NAME, email, phone, address ) -VALUES - ( 1, 'John Doe', 'john@example.com', '123-456-7890', '123 Main St' ), - ( 2, 'Jane Smith', 'jane@example.com', '234-567-8901', '456 Oak Ave' ), - ( 3, 'Bob Johnson', 'bob@example.com', '345-678-9012', '789 Pine Rd' ); - - drop table if exists products_on_hand; CREATE TABLE products_on_hand ( product_id INTEGER NOT NULL PRIMARY KEY, diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksMultiSinkIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksMultiSinkIT.java deleted file mode 100644 index b799a272b51..00000000000 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksMultiSinkIT.java +++ /dev/null @@ -1,308 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.e2e.connector.starrocks; - -import org.apache.seatunnel.shade.com.google.common.collect.Lists; - -import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer; -import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion; -import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase; -import org.apache.seatunnel.e2e.common.TestResource; -import org.apache.seatunnel.e2e.common.TestSuiteBase; -import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; -import org.apache.seatunnel.e2e.common.container.EngineType; -import org.apache.seatunnel.e2e.common.container.TestContainer; -import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; -import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; -import org.apache.seatunnel.e2e.common.util.JobIdGenerator; - -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.TestTemplate; -import org.testcontainers.containers.Container; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.lifecycle.Startables; -import org.testcontainers.utility.DockerLoggerFactory; - -import lombok.extern.slf4j.Slf4j; - -import java.io.IOException; -import java.net.URL; -import java.net.URLClassLoader; -import java.sql.Connection; -import java.sql.Driver; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.sql.Timestamp; -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.stream.Stream; - -import static org.awaitility.Awaitility.await; -import static org.awaitility.Awaitility.given; - -@Slf4j -@DisabledOnContainer( - value = {}, - type = {EngineType.SPARK, EngineType.FLINK}, - disabledReason = - "Currently SPARK do not support cdc. In addition, currently only the zeta engine supports schema evolution for pr https://github.com/apache/seatunnel/pull/5125.") -public class StarRocksMultiSinkIT extends TestSuiteBase implements TestResource { - private static final String DATABASE = "shop"; - private static final String MYSQL_HOST = "mysql_cdc_e2e"; - private static final String MYSQL_USER_NAME = "mysqluser"; - private static final String MYSQL_USER_PASSWORD = "mysqlpw"; - - private static final String DOCKER_IMAGE = "starrocks/allin1-ubuntu:3.3.1"; - private static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver"; - private static final String HOST = "starrocks_cdc_e2e"; - private static final int SR_PROXY_PORT = 8080; - private static final int QUERY_PORT = 9030; - private static final int HTTP_PORT = 8030; - private static final int BE_HTTP_PORT = 8040; - private static final String USERNAME = "root"; - private static final String PASSWORD = ""; - private static final String CREATE_DATABASE = "CREATE DATABASE IF NOT EXISTS " + DATABASE; - private static final String SR_DRIVER_JAR = - "https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.32/mysql-connector-j-8.0.32.jar"; - - private Connection starRocksConnection; - private Connection mysqlConnection; - private GenericContainer starRocksServer; - - public static final DateTimeFormatter DATE_TIME_FORMATTER = - DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); - - private static final String QUERY = "select * from %s.%s order by id"; - - private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0); - - private final UniqueDatabase shopDatabase = - new UniqueDatabase(MYSQL_CONTAINER, DATABASE, "mysqluser", "mysqlpw", DATABASE); - - @TestContainerExtension - private final ContainerExtendedFactory extendedFactory = - container -> { - Container.ExecResult extraCommands = - container.execInContainer( - "bash", - "-c", - "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O " - + SR_DRIVER_JAR); - Assertions.assertEquals(0, extraCommands.getExitCode()); - }; - - private static MySqlContainer createMySqlContainer(MySqlVersion version) { - return new MySqlContainer(version) - .withConfigurationOverride("docker/server-gtids/my.cnf") - .withSetupSQL("docker/setup.sql") - .withNetwork(NETWORK) - .withNetworkAliases(MYSQL_HOST) - .withDatabaseName(DATABASE) - .withUsername(MYSQL_USER_NAME) - .withPassword(MYSQL_USER_PASSWORD) - .withLogConsumer( - new Slf4jLogConsumer(DockerLoggerFactory.getLogger("mysql-docker-image"))); - } - - private void initializeJdbcConnection() throws Exception { - URLClassLoader urlClassLoader = - new URLClassLoader( - new URL[] {new URL(SR_DRIVER_JAR)}, - StarRocksCDCSinkIT.class.getClassLoader()); - Thread.currentThread().setContextClassLoader(urlClassLoader); - Driver driver = (Driver) urlClassLoader.loadClass(DRIVER_CLASS).newInstance(); - Properties props = new Properties(); - props.put("user", USERNAME); - props.put("password", PASSWORD); - starRocksConnection = - driver.connect( - String.format("jdbc:mysql://%s:%s", starRocksServer.getHost(), QUERY_PORT), - props); - } - - private void initializeStarRocksServer() { - starRocksServer = - new GenericContainer<>(DOCKER_IMAGE) - .withNetwork(NETWORK) - .withNetworkAliases(HOST) - .withLogConsumer( - new Slf4jLogConsumer(DockerLoggerFactory.getLogger(DOCKER_IMAGE))); - starRocksServer.setPortBindings( - Lists.newArrayList( - String.format("%s:%s", QUERY_PORT, QUERY_PORT), - String.format("%s:%s", HTTP_PORT, HTTP_PORT), - String.format("%s:%s", BE_HTTP_PORT, BE_HTTP_PORT))); - Startables.deepStart(Stream.of(starRocksServer)).join(); - log.info("StarRocks container started"); - // wait for starrocks fully start - given().ignoreExceptions() - .await() - .atMost(360, TimeUnit.SECONDS) - .untilAsserted(this::initializeJdbcConnection); - } - - @TestTemplate - public void testStarRocksMultiTableSinkCase(TestContainer container) - throws InterruptedException, IOException, SQLException { - String jobId = String.valueOf(JobIdGenerator.newJobId()); - String jobConfigFile = "/mysqlcdc_to_starrocks_with_schema_change.conf"; - CompletableFuture.runAsync( - () -> { - try { - container.executeJob(jobConfigFile, jobId); - } catch (Exception e) { - log.error("Commit task exception :" + e.getMessage()); - throw new RuntimeException(e); - } - }); - TimeUnit.SECONDS.sleep(10); - - // verify multi table sink - verifyDataConsistency("orders"); - verifyDataConsistency("customers"); - verifyDataConsistency("products"); - - insertNewDataIntoMySQL(); - insertNewDataIntoMySQL(); - // verify incremental - verifyDataConsistency("orders"); - // savepoint - Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode()); - insertNewDataIntoMySQL(); - // restore - CompletableFuture.supplyAsync( - () -> { - try { - container.restoreJob(jobConfigFile, jobId); - } catch (Exception e) { - log.error("Commit task exception :" + e.getMessage()); - throw new RuntimeException(e); - } - return null; - }); - insertNewDataIntoMySQL(); - // verify restore - verifyDataConsistency("orders"); - } - - private void verifyDataConsistency(String tableName) { - await().atMost(10000, TimeUnit.MILLISECONDS) - .untilAsserted( - () -> - Assertions.assertIterableEquals( - query( - String.format(QUERY, DATABASE, tableName), - mysqlConnection), - query( - String.format(QUERY, DATABASE, tableName), - starRocksConnection))); - } - - private void insertNewDataIntoMySQL() throws SQLException { - mysqlConnection - .createStatement() - .execute( - "INSERT INTO orders (id, customer_id, order_date, total_amount, status) " - + "VALUES (null, 1, '2025-01-04 13:00:00', 498.99, 'pending')"); - } - - private Connection getMysqlJdbcConnection() throws SQLException { - return DriverManager.getConnection( - MYSQL_CONTAINER.getJdbcUrl(), - MYSQL_CONTAINER.getUsername(), - MYSQL_CONTAINER.getPassword()); - } - - @BeforeAll - @Override - public void startUp() throws SQLException { - initializeStarRocksServer(); - log.info("The second stage: Starting Mysql containers..."); - Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join(); - log.info("Mysql Containers are started"); - shopDatabase.createAndInitialize(); - log.info("Mysql ddl execution is complete"); - initializeJdbcTable(); - mysqlConnection = getMysqlJdbcConnection(); - } - - @AfterAll - @Override - public void tearDown() throws SQLException { - if (MYSQL_CONTAINER != null) { - MYSQL_CONTAINER.close(); - } - if (starRocksServer != null) { - starRocksServer.close(); - } - if (starRocksConnection != null) { - starRocksConnection.close(); - } - if (mysqlConnection != null) { - mysqlConnection.close(); - } - } - - private void initializeJdbcTable() { - try (Statement statement = starRocksConnection.createStatement()) { - // create databases - statement.execute(CREATE_DATABASE); - } catch (SQLException e) { - throw new RuntimeException("Initializing table failed!", e); - } - } - - private List> query(String sql, Connection connection) { - try { - ResultSet resultSet = connection.createStatement().executeQuery(sql); - List> result = new ArrayList<>(); - int columnCount = resultSet.getMetaData().getColumnCount(); - while (resultSet.next()) { - ArrayList objects = new ArrayList<>(); - for (int i = 1; i <= columnCount; i++) { - if (resultSet.getObject(i) instanceof Timestamp) { - Timestamp timestamp = resultSet.getTimestamp(i); - objects.add(timestamp.toLocalDateTime().format(DATE_TIME_FORMATTER)); - break; - } - if (resultSet.getObject(i) instanceof LocalDateTime) { - LocalDateTime localDateTime = resultSet.getObject(i, LocalDateTime.class); - objects.add(localDateTime.format(DATE_TIME_FORMATTER)); - break; - } - objects.add(resultSet.getObject(i)); - } - log.debug(String.format("Print query, sql: %s, data: %s", sql, objects)); - result.add(objects); - } - return result; - } catch (SQLException e) { - throw new RuntimeException(e); - } - } -} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksSchemaChangeIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksSchemaChangeIT.java index ea7fe35fe0b..f5b7522499b 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksSchemaChangeIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksSchemaChangeIT.java @@ -174,7 +174,7 @@ private void initializeStarRocksServer() { @TestTemplate public void testStarRocksSinkWithSchemaEvolutionCase(TestContainer container) - throws InterruptedException, IOException { + throws InterruptedException, IOException, SQLException { String jobId = String.valueOf(JobIdGenerator.newJobId()); String jobConfigFile = "/mysqlcdc_to_starrocks_with_schema_change.conf"; CompletableFuture.runAsync( @@ -187,6 +187,11 @@ public void testStarRocksSinkWithSchemaEvolutionCase(TestContainer container) } }); TimeUnit.SECONDS.sleep(20); + + // verify multi table sink + verifyDataConsistency("orders"); + verifyDataConsistency("customers"); + // waiting for case1 completed assertSchemaEvolutionForAddColumns( DATABASE, SOURCE_TABLE, SINK_TABLE, mysqlConnection, starRocksConnection); @@ -194,9 +199,14 @@ public void testStarRocksSinkWithSchemaEvolutionCase(TestContainer container) assertSchemaEvolutionForDropColumns( DATABASE, SOURCE_TABLE, SINK_TABLE, mysqlConnection, starRocksConnection); + insertNewDataIntoMySQL(); + insertNewDataIntoMySQL(); + // verify incremental + verifyDataConsistency("orders"); + // savepoint 1 Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode()); - + insertNewDataIntoMySQL(); // case2 drop columns with cdc data at same time shopDatabase.setTemplateName("drop_columns").createAndInitialize(); @@ -240,6 +250,30 @@ public void testStarRocksSinkWithSchemaEvolutionCase(TestContainer container) // waiting for case3/case4 completed assertTableStructureAndData( DATABASE, SOURCE_TABLE, SINK_TABLE, mysqlConnection, starRocksConnection); + insertNewDataIntoMySQL(); + // verify restore + verifyDataConsistency("orders"); + } + + private void insertNewDataIntoMySQL() throws SQLException { + mysqlConnection + .createStatement() + .execute( + "INSERT INTO orders (id, customer_id, order_date, total_amount, status) " + + "VALUES (null, 1, '2025-01-04 13:00:00', 498.99, 'pending')"); + } + + private void verifyDataConsistency(String tableName) { + await().atMost(10000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertIterableEquals( + query( + String.format(QUERY, DATABASE, tableName), + mysqlConnection), + query( + String.format(QUERY, DATABASE, tableName), + starRocksConnection))); } private void assertSchemaEvolutionForAddColumns( diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/ddl/shop.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/ddl/shop.sql new file mode 100644 index 00000000000..b867cd24c3c --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/ddl/shop.sql @@ -0,0 +1,80 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: shop +-- ---------------------------------------------------------------------------------------------------------------- +CREATE DATABASE IF NOT EXISTS `shop`; +use shop; + +drop table if exists products; +-- Create and populate our products using a single insert with many rows +CREATE TABLE products ( + id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + name VARCHAR(255) NOT NULL DEFAULT 'SeaTunnel', + description VARCHAR(512), + weight FLOAT +); + +drop table if exists orders; + +CREATE TABLE orders ( + id BIGINT AUTO_INCREMENT PRIMARY KEY, + customer_id BIGINT NOT NULL, + order_date DATETIME NOT NULL, + total_amount DECIMAL ( 10, 2 ) NOT NULL, + STATUS VARCHAR ( 50 ) DEFAULT 'pending', + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP +); + +drop table if exists customers; + +CREATE TABLE customers ( + id BIGINT PRIMARY KEY, + NAME VARCHAR ( 255 ) NOT NULL, + email VARCHAR ( 255 ) NOT NULL, + phone VARCHAR ( 50 ), + address TEXT, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP +); + +ALTER TABLE products AUTO_INCREMENT = 101; + +INSERT INTO products +VALUES (101,"scooter","Small 2-wheel scooter",3.14), + (102,"car battery","12V car battery",8.1), + (103,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8), + (104,"hammer","12oz carpenter's hammer",0.75), + (105,"hammer","14oz carpenter's hammer",0.875), + (106,"hammer","16oz carpenter's hammer",1.0), + (107,"rocks","box of assorted rocks",5.3), + (108,"jacket","water resistent black wind breaker",0.1), + (109,"spare tire","24 inch spare tire",22.2); + +INSERT INTO orders ( id, customer_id, order_date, total_amount, STATUS ) +VALUES + ( 1, 1, '2024-01-01 10:00:00', 299.99, 'completed' ), + ( 2, 2, '2024-01-02 11:00:00', 199.99, 'completed' ), + ( 3, 3, '2024-01-03 12:00:00', 399.99, 'processing' ); + +INSERT INTO customers ( id, NAME, email, phone, address ) +VALUES + ( 1, 'John Doe', 'john@example.com', '123-456-7890', '123 Main St' ), + ( 2, 'Jane Smith', 'jane@example.com', '234-567-8901', '456 Oak Ave' ), + ( 3, 'Bob Johnson', 'bob@example.com', '345-678-9012', '789 Pine Rd' );