From d272789bb54d551b97c3d2a6643a0e3077ecf1f5 Mon Sep 17 00:00:00 2001
From: shiwanming <1633138551@qq.com>
Date: Mon, 19 Aug 2024 15:09:35 +0800
Subject: [PATCH 01/13] [Fix] [sink elasticsearch] Fix the issue of sink-es
saveMode conflicting with Elasticsearch's automatic index creation #7430
---
.../java/org/apache/seatunnel/api/sink/DataSaveMode.java | 5 ++++-
.../apache/seatunnel/api/sink/DefaultSaveModeHandler.java | 4 ++++
.../java/org/apache/seatunnel/api/sink/SchemaSaveMode.java | 3 +++
.../seatunnel/elasticsearch/config/SinkConfig.java | 3 ++-
4 files changed, 13 insertions(+), 2 deletions(-)
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java
index 2babe412a79..64743fefbea 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java
@@ -32,5 +32,8 @@ public enum DataSaveMode {
CUSTOM_PROCESSING,
// When there exist data, an error will be reported
- ERROR_WHEN_DATA_EXISTS
+ ERROR_WHEN_DATA_EXISTS,
+
+ // Ignore
+ IGNORE
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
index 051068dba03..7f7de2ef6c9 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
@@ -71,6 +71,8 @@ public void handleSchemaSaveMode() {
case ERROR_WHEN_SCHEMA_NOT_EXIST:
errorWhenSchemaNotExist();
break;
+ case IGNORE:
+ break;
default:
throw new UnsupportedOperationException("Unsupported save mode: " + schemaSaveMode);
}
@@ -91,6 +93,8 @@ public void handleDataSaveMode() {
case ERROR_WHEN_DATA_EXISTS:
errorWhenDataExists();
break;
+ case IGNORE:
+ break;
default:
throw new UnsupportedOperationException("Unsupported save mode: " + dataSaveMode);
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SchemaSaveMode.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SchemaSaveMode.java
index f3da320d742..d114b909313 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SchemaSaveMode.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SchemaSaveMode.java
@@ -27,4 +27,7 @@ public enum SchemaSaveMode {
// Error will be reported when the table does not exist
ERROR_WHEN_SCHEMA_NOT_EXIST,
+
+ // Ignore
+ IGNORE
}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
index fdb0300aab5..1c2464e273c 100644
--- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
@@ -28,6 +28,7 @@
import static org.apache.seatunnel.api.sink.DataSaveMode.APPEND_DATA;
import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA;
import static org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS;
+import static org.apache.seatunnel.api.sink.DataSaveMode.IGNORE;
public class SinkConfig {
@@ -80,7 +81,7 @@ public class SinkConfig {
Options.key("data_save_mode")
.singleChoice(
DataSaveMode.class,
- Arrays.asList(DROP_DATA, APPEND_DATA, ERROR_WHEN_DATA_EXISTS))
+ Arrays.asList(DROP_DATA, APPEND_DATA, ERROR_WHEN_DATA_EXISTS, IGNORE))
.defaultValue(APPEND_DATA)
.withDescription("data_save_mode");
}
From 4ebc66bdd3dd61228bc95d7e01258971f5b007d7 Mon Sep 17 00:00:00 2001
From: shiwanming <1633138551@qq.com>
Date: Wed, 21 Aug 2024 20:10:55 +0800
Subject: [PATCH 02/13] [Fix] [sink elasticsearch] Fix the issue of sink-es
saveMode and es automatic index creation conflict #7430
---
.../java/org/apache/seatunnel/api/sink/DataSaveMode.java | 5 +----
.../apache/seatunnel/api/sink/DefaultSaveModeHandler.java | 2 --
.../seatunnel/elasticsearch/config/SinkConfig.java | 3 +--
3 files changed, 2 insertions(+), 8 deletions(-)
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java
index 64743fefbea..2babe412a79 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java
@@ -32,8 +32,5 @@ public enum DataSaveMode {
CUSTOM_PROCESSING,
// When there exist data, an error will be reported
- ERROR_WHEN_DATA_EXISTS,
-
- // Ignore
- IGNORE
+ ERROR_WHEN_DATA_EXISTS
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
index 7f7de2ef6c9..da5d0281ce7 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
@@ -93,8 +93,6 @@ public void handleDataSaveMode() {
case ERROR_WHEN_DATA_EXISTS:
errorWhenDataExists();
break;
- case IGNORE:
- break;
default:
throw new UnsupportedOperationException("Unsupported save mode: " + dataSaveMode);
}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
index 1c2464e273c..fdb0300aab5 100644
--- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
@@ -28,7 +28,6 @@
import static org.apache.seatunnel.api.sink.DataSaveMode.APPEND_DATA;
import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA;
import static org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS;
-import static org.apache.seatunnel.api.sink.DataSaveMode.IGNORE;
public class SinkConfig {
@@ -81,7 +80,7 @@ public class SinkConfig {
Options.key("data_save_mode")
.singleChoice(
DataSaveMode.class,
- Arrays.asList(DROP_DATA, APPEND_DATA, ERROR_WHEN_DATA_EXISTS, IGNORE))
+ Arrays.asList(DROP_DATA, APPEND_DATA, ERROR_WHEN_DATA_EXISTS))
.defaultValue(APPEND_DATA)
.withDescription("data_save_mode");
}
From 6810d80ad95eefdc7489fb5187f91189febda5d0 Mon Sep 17 00:00:00 2001
From: shiwanming <1633138551@qq.com>
Date: Mon, 19 Aug 2024 15:09:35 +0800
Subject: [PATCH 03/13] [Bug] [sink elasticsearch] the savemode of sink-es
conficts with es automatically creating indexes based on templates #7430
---
.../java/org/apache/seatunnel/api/sink/DataSaveMode.java | 5 ++++-
.../apache/seatunnel/api/sink/DefaultSaveModeHandler.java | 2 ++
.../seatunnel/elasticsearch/config/SinkConfig.java | 3 ++-
3 files changed, 8 insertions(+), 2 deletions(-)
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java
index 2babe412a79..64743fefbea 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java
@@ -32,5 +32,8 @@ public enum DataSaveMode {
CUSTOM_PROCESSING,
// When there exist data, an error will be reported
- ERROR_WHEN_DATA_EXISTS
+ ERROR_WHEN_DATA_EXISTS,
+
+ // Ignore
+ IGNORE
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
index da5d0281ce7..7f7de2ef6c9 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
@@ -93,6 +93,8 @@ public void handleDataSaveMode() {
case ERROR_WHEN_DATA_EXISTS:
errorWhenDataExists();
break;
+ case IGNORE:
+ break;
default:
throw new UnsupportedOperationException("Unsupported save mode: " + dataSaveMode);
}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
index fdb0300aab5..1c2464e273c 100644
--- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
@@ -28,6 +28,7 @@
import static org.apache.seatunnel.api.sink.DataSaveMode.APPEND_DATA;
import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA;
import static org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS;
+import static org.apache.seatunnel.api.sink.DataSaveMode.IGNORE;
public class SinkConfig {
@@ -80,7 +81,7 @@ public class SinkConfig {
Options.key("data_save_mode")
.singleChoice(
DataSaveMode.class,
- Arrays.asList(DROP_DATA, APPEND_DATA, ERROR_WHEN_DATA_EXISTS))
+ Arrays.asList(DROP_DATA, APPEND_DATA, ERROR_WHEN_DATA_EXISTS, IGNORE))
.defaultValue(APPEND_DATA)
.withDescription("data_save_mode");
}
From ac50c3e63c59f38273c0335b1b60274d370513b9 Mon Sep 17 00:00:00 2001
From: shiwanming <1633138551@qq.com>
Date: Wed, 21 Aug 2024 21:16:22 +0800
Subject: [PATCH 04/13] [Fix] [sink elasticsearch] Fix the issue of sink-es
saveMode and es automatic index creation conflict #7430
---
.../java/org/apache/seatunnel/api/sink/DataSaveMode.java | 5 +----
.../apache/seatunnel/api/sink/DefaultSaveModeHandler.java | 2 --
.../seatunnel/elasticsearch/config/SinkConfig.java | 3 +--
3 files changed, 2 insertions(+), 8 deletions(-)
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java
index 64743fefbea..2babe412a79 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java
@@ -32,8 +32,5 @@ public enum DataSaveMode {
CUSTOM_PROCESSING,
// When there exist data, an error will be reported
- ERROR_WHEN_DATA_EXISTS,
-
- // Ignore
- IGNORE
+ ERROR_WHEN_DATA_EXISTS
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
index 7f7de2ef6c9..da5d0281ce7 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
@@ -93,8 +93,6 @@ public void handleDataSaveMode() {
case ERROR_WHEN_DATA_EXISTS:
errorWhenDataExists();
break;
- case IGNORE:
- break;
default:
throw new UnsupportedOperationException("Unsupported save mode: " + dataSaveMode);
}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
index 1c2464e273c..fdb0300aab5 100644
--- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
@@ -28,7 +28,6 @@
import static org.apache.seatunnel.api.sink.DataSaveMode.APPEND_DATA;
import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA;
import static org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS;
-import static org.apache.seatunnel.api.sink.DataSaveMode.IGNORE;
public class SinkConfig {
@@ -81,7 +80,7 @@ public class SinkConfig {
Options.key("data_save_mode")
.singleChoice(
DataSaveMode.class,
- Arrays.asList(DROP_DATA, APPEND_DATA, ERROR_WHEN_DATA_EXISTS, IGNORE))
+ Arrays.asList(DROP_DATA, APPEND_DATA, ERROR_WHEN_DATA_EXISTS))
.defaultValue(APPEND_DATA)
.withDescription("data_save_mode");
}
From 6e8ccc9e8e7fd0f13fbfc7c48d3daa84141855ae Mon Sep 17 00:00:00 2001
From: shiwanming <1633138551@qq.com>
Date: Thu, 22 Aug 2024 07:55:21 +0800
Subject: [PATCH 05/13] [Fix] [sink elasticsearch] Fix the issue of sink-es
saveMode and es automatic index creation conflict #7430
---
.../main/java/org/apache/seatunnel/api/sink/SchemaSaveMode.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SchemaSaveMode.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SchemaSaveMode.java
index d114b909313..cee39ca8e63 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SchemaSaveMode.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SchemaSaveMode.java
@@ -28,6 +28,6 @@ public enum SchemaSaveMode {
// Error will be reported when the table does not exist
ERROR_WHEN_SCHEMA_NOT_EXIST,
- // Ignore
+ // Ignore creation
IGNORE
}
From 39360385afb3fee6d6f1d9e024e9c5c2f223a77c Mon Sep 17 00:00:00 2001
From: shiwanming <1633138551@qq.com>
Date: Thu, 22 Aug 2024 14:04:51 +0800
Subject: [PATCH 06/13] [Doc] Add IGNORE savemode type into docment #7443
---
docs/en/connector-v2/sink/Doris.md | 3 ++-
docs/en/connector-v2/sink/Elasticsearch.md | 13 +++++++------
docs/en/connector-v2/sink/Jdbc.md | 3 ++-
docs/en/connector-v2/sink/LocalFile.md | 1 +
docs/en/connector-v2/sink/PostgreSql.md | 3 ++-
docs/en/connector-v2/sink/S3File.md | 3 ++-
docs/en/connector-v2/sink/StarRocks.md | 3 ++-
docs/zh/connector-v2/sink/Doris.md | 15 ++++++++-------
docs/zh/connector-v2/sink/Elasticsearch.md | 1 +
docs/zh/connector-v2/sink/Jdbc.md | 1 +
docs/zh/connector-v2/sink/StarRocks.md | 9 +++++----
11 files changed, 33 insertions(+), 22 deletions(-)
diff --git a/docs/en/connector-v2/sink/Doris.md b/docs/en/connector-v2/sink/Doris.md
index 592cd8702be..18915ac7b86 100644
--- a/docs/en/connector-v2/sink/Doris.md
+++ b/docs/en/connector-v2/sink/Doris.md
@@ -57,7 +57,8 @@ Before the synchronous task is turned on, different treatment schemes are select
Option introduction:
`RECREATE_SCHEMA` :Will create when the table does not exist, delete and rebuild when the table is saved
`CREATE_SCHEMA_WHEN_NOT_EXIST` :Will Created when the table does not exist, skipped when the table is saved
-`ERROR_WHEN_SCHEMA_NOT_EXIST` :Error will be reported when the table does not exist
+`ERROR_WHEN_SCHEMA_NOT_EXIST` :Error will be reported when the table does not exist
+`IGNORE` :Ignore the treatment of the table
### data_save_mode[Enum]
diff --git a/docs/en/connector-v2/sink/Elasticsearch.md b/docs/en/connector-v2/sink/Elasticsearch.md
index e03a2522300..b252f574daf 100644
--- a/docs/en/connector-v2/sink/Elasticsearch.md
+++ b/docs/en/connector-v2/sink/Elasticsearch.md
@@ -109,17 +109,18 @@ Sink plugin common parameters, please refer to [Sink Common Options](../sink-com
Before the synchronous task is turned on, different treatment schemes are selected for the existing surface structure of the target side.
Option introduction:
-RECREATE_SCHEMA :Will create when the table does not exist, delete and rebuild when the table is saved
-CREATE_SCHEMA_WHEN_NOT_EXIST :Will Created when the table does not exist, skipped when the table is saved
-ERROR_WHEN_SCHEMA_NOT_EXIST :Error will be reported when the table does not exist
+`RECREATE_SCHEMA` :Will create when the table does not exist, delete and rebuild when the table is saved
+`CREATE_SCHEMA_WHEN_NOT_EXIST` :Will Created when the table does not exist, skipped when the table is saved
+`ERROR_WHEN_SCHEMA_NOT_EXIST` :Error will be reported when the table does not exist
+`IGNORE` :Ignore the treatment of the table
### data_save_mode
Before the synchronous task is turned on, different processing schemes are selected for data existing data on the target side.
Option introduction:
-DROP_DATA: Preserve database structure and delete data
-APPEND_DATA:Preserve database structure, preserve data
-ERROR_WHEN_DATA_EXISTS:When there is data, an error is reported
+`DROP_DATA`: Preserve database structure and delete data
+`APPEND_DATA`:Preserve database structure, preserve data
+`ERROR_WHEN_DATA_EXISTS`:When there is data, an error is reported
## Examples
diff --git a/docs/en/connector-v2/sink/Jdbc.md b/docs/en/connector-v2/sink/Jdbc.md
index b52c42f6310..99f06891a14 100644
--- a/docs/en/connector-v2/sink/Jdbc.md
+++ b/docs/en/connector-v2/sink/Jdbc.md
@@ -183,7 +183,8 @@ Before the synchronous task is turned on, different treatment schemes are select
Option introduction:
`RECREATE_SCHEMA` :Will create when the table does not exist, delete and rebuild when the table is saved
`CREATE_SCHEMA_WHEN_NOT_EXIST` :Will Created when the table does not exist, skipped when the table is saved
-`ERROR_WHEN_SCHEMA_NOT_EXIST` :Error will be reported when the table does not exist
+`ERROR_WHEN_SCHEMA_NOT_EXIST` :Error will be reported when the table does not exist
+`IGNORE` :Ignore the treatment of the table
### data_save_mode [Enum]
diff --git a/docs/en/connector-v2/sink/LocalFile.md b/docs/en/connector-v2/sink/LocalFile.md
index b1746e2492e..8e773fe6dd3 100644
--- a/docs/en/connector-v2/sink/LocalFile.md
+++ b/docs/en/connector-v2/sink/LocalFile.md
@@ -214,6 +214,7 @@ Existing dir processing method.
- RECREATE_SCHEMA: will create when the dir does not exist, delete and recreate when the dir is exist
- CREATE_SCHEMA_WHEN_NOT_EXIST: will create when the dir does not exist, skipped when the dir is exist
- ERROR_WHEN_SCHEMA_NOT_EXIST: error will be reported when the dir does not exist
+- IGNORE :Ignore the treatment of the table
### data_save_mode [string]
diff --git a/docs/en/connector-v2/sink/PostgreSql.md b/docs/en/connector-v2/sink/PostgreSql.md
index 545f0d176e6..cde299f6734 100644
--- a/docs/en/connector-v2/sink/PostgreSql.md
+++ b/docs/en/connector-v2/sink/PostgreSql.md
@@ -118,7 +118,8 @@ Before the synchronous task is turned on, different treatment schemes are select
Option introduction:
`RECREATE_SCHEMA` :Will create when the table does not exist, delete and rebuild when the table is saved
`CREATE_SCHEMA_WHEN_NOT_EXIST` :Will Created when the table does not exist, skipped when the table is saved
-`ERROR_WHEN_SCHEMA_NOT_EXIST` :Error will be reported when the table does not exist
+`ERROR_WHEN_SCHEMA_NOT_EXIST` :Error will be reported when the table does not exist
+`IGNORE` :Ignore the treatment of the table
### data_save_mode[Enum]
diff --git a/docs/en/connector-v2/sink/S3File.md b/docs/en/connector-v2/sink/S3File.md
index 90fb6636460..007c9395f7d 100644
--- a/docs/en/connector-v2/sink/S3File.md
+++ b/docs/en/connector-v2/sink/S3File.md
@@ -281,7 +281,8 @@ Before turning on the synchronous task, do different treatment of the target pat
Option introduction:
`RECREATE_SCHEMA` :Will be created when the path does not exist. If the path already exists, delete the path and recreate it.
`CREATE_SCHEMA_WHEN_NOT_EXIST` :Will Created when the path does not exist, use the path when the path is existed.
-`ERROR_WHEN_SCHEMA_NOT_EXIST` :Error will be reported when the path does not exist
+`ERROR_WHEN_SCHEMA_NOT_EXIST` :Error will be reported when the path does not exist
+`IGNORE` :Ignore the treatment of the table
### data_save_mode[Enum]
diff --git a/docs/en/connector-v2/sink/StarRocks.md b/docs/en/connector-v2/sink/StarRocks.md
index 5fe57cd3f4e..1d143884e66 100644
--- a/docs/en/connector-v2/sink/StarRocks.md
+++ b/docs/en/connector-v2/sink/StarRocks.md
@@ -107,7 +107,8 @@ Before the synchronous task is turned on, different treatment schemes are select
Option introduction:
`RECREATE_SCHEMA` :Will create when the table does not exist, delete and rebuild when the table is saved
`CREATE_SCHEMA_WHEN_NOT_EXIST` :Will Created when the table does not exist, skipped when the table is saved
-`ERROR_WHEN_SCHEMA_NOT_EXIST` :Error will be reported when the table does not exist
+`ERROR_WHEN_SCHEMA_NOT_EXIST` :Error will be reported when the table does not exist
+`IGNORE` :Ignore the treatment of the table
### data_save_mode[Enum]
diff --git a/docs/zh/connector-v2/sink/Doris.md b/docs/zh/connector-v2/sink/Doris.md
index afc470326f5..b35cd63f4ef 100644
--- a/docs/zh/connector-v2/sink/Doris.md
+++ b/docs/zh/connector-v2/sink/Doris.md
@@ -53,18 +53,19 @@ Doris Sink连接器的内部实现是通过stream load批量缓存和导入的
### schema_save_mode[Enum]
在开启同步任务之前,针对现有的表结构选择不同的处理方案。
-选项介绍:
+选项介绍:
`RECREATE_SCHEMA` :表不存在时创建,表保存时删除并重建。
-`CREATE_SCHEMA_WHEN_NOT_EXIST` :表不存在时会创建,表存在时跳过。
-`ERROR_WHEN_SCHEMA_NOT_EXIST` :表不存在时会报错。
+`CREATE_SCHEMA_WHEN_NOT_EXIST` :表不存在时会创建,表存在时跳过。
+`ERROR_WHEN_SCHEMA_NOT_EXIST` :表不存在时会报错。
+`IGNORE` :忽略对表的处理。
### data_save_mode[Enum]
在开启同步任务之前,针对目标端已有的数据选择不同的处理方案。
-选项介绍:
-`DROP_DATA`: 保留数据库结构并删除数据。
-`APPEND_DATA`:保留数据库结构,保留数据。
-`CUSTOM_PROCESSING`:用户自定义处理。
+选项介绍:
+`DROP_DATA`: 保留数据库结构并删除数据。
+`APPEND_DATA`:保留数据库结构,保留数据。
+`CUSTOM_PROCESSING`:用户自定义处理。
`ERROR_WHEN_DATA_EXISTS`:有数据时报错。
### save_mode_create_template
diff --git a/docs/zh/connector-v2/sink/Elasticsearch.md b/docs/zh/connector-v2/sink/Elasticsearch.md
index 2d614918f91..8682d262274 100644
--- a/docs/zh/connector-v2/sink/Elasticsearch.md
+++ b/docs/zh/connector-v2/sink/Elasticsearch.md
@@ -111,6 +111,7 @@ Sink插件常用参数,请参考 [Sink常用选项](../sink-common-options.md)
`RECREATE_SCHEMA` :当表不存在时会创建,当表已存在时会删除并重建
`CREATE_SCHEMA_WHEN_NOT_EXIST` :当表不存在时会创建,当表已存在时则跳过创建
`ERROR_WHEN_SCHEMA_NOT_EXIST` :当表不存在时将抛出错误
+`IGNORE` :忽略对表的处理
### data_save_mode
diff --git a/docs/zh/connector-v2/sink/Jdbc.md b/docs/zh/connector-v2/sink/Jdbc.md
index f24e56f1f70..b05ecbc501c 100644
--- a/docs/zh/connector-v2/sink/Jdbc.md
+++ b/docs/zh/connector-v2/sink/Jdbc.md
@@ -178,6 +178,7 @@ Sink插件常用参数,请参考 [Sink常用选项](../sink-common-options.md)
`RECREATE_SCHEMA`:当表不存在时会创建,当表已存在时会删除并重建
`CREATE_SCHEMA_WHEN_NOT_EXIST`:当表不存在时会创建,当表已存在时则跳过创建
`ERROR_WHEN_SCHEMA_NOT_EXIST`:当表不存在时将抛出错误
+`IGNORE` :忽略对表的处理
### data_save_mode [Enum]
diff --git a/docs/zh/connector-v2/sink/StarRocks.md b/docs/zh/connector-v2/sink/StarRocks.md
index 6be7ff7e8e0..f5848a3b71a 100644
--- a/docs/zh/connector-v2/sink/StarRocks.md
+++ b/docs/zh/connector-v2/sink/StarRocks.md
@@ -99,10 +99,11 @@ table选项参数可以填入一任意表名,这个名字最终会被用作目
### schema_save_mode[Enum]
-在同步任务打开之前,针对目标端已存在的表结构选择不同的处理方法。可选值有:
-`RECREATE_SCHEMA` :不存在的表会直接创建,已存在的表会删除并根据参数重新创建
-`CREATE_SCHEMA_WHEN_NOT_EXIST` :忽略已存在的表,不存在的表会直接创建
-`ERROR_WHEN_SCHEMA_NOT_EXIST` :当有不存在的表时会直接报错
+在同步任务打开之前,针对目标端已存在的表结构选择不同的处理方法。可选值有:
+`RECREATE_SCHEMA` :不存在的表会直接创建,已存在的表会删除并根据参数重新创建
+`CREATE_SCHEMA_WHEN_NOT_EXIST` :忽略已存在的表,不存在的表会直接创建
+`ERROR_WHEN_SCHEMA_NOT_EXIST` :当有不存在的表时会直接报错
+`IGNORE` :忽略对表的处理
### data_save_mode[Enum]
From 03286ea0c919b13aa98dbabc1e78cccdc43db525 Mon Sep 17 00:00:00 2001
From: Shiwanming <1633138551@qq.com>
Date: Mon, 6 Jan 2025 22:59:07 +0800
Subject: [PATCH 07/13] starrocks-sink SupportMultiTableSink
---
.../connectors/seatunnel/starrocks/sink/StarRocksSink.java | 3 ++-
.../seatunnel/starrocks/sink/StarRocksSinkWriter.java | 3 ++-
2 files changed, 4 insertions(+), 2 deletions(-)
diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
index 1f6ffcd0769..bb0259ec905 100644
--- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
+++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
@@ -22,6 +22,7 @@
import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SchemaSaveMode;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSink;
import org.apache.seatunnel.api.table.catalog.Catalog;
@@ -40,7 +41,7 @@
import java.util.Optional;
public class StarRocksSink extends AbstractSimpleSink
- implements SupportSaveMode, SupportSchemaEvolutionSink {
+ implements SupportSaveMode, SupportSchemaEvolutionSink, SupportMultiTableSink {
private final TableSchema tableSchema;
private final SinkConfig sinkConfig;
diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java
index d3664087312..612e6e2b72d 100644
--- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.starrocks.sink;
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSinkWriter;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
@@ -46,7 +47,7 @@
@Slf4j
public class StarRocksSinkWriter extends AbstractSinkWriter
- implements SupportSchemaEvolutionSinkWriter {
+ implements SupportMultiTableSinkWriter, SupportSchemaEvolutionSinkWriter {
private StarRocksISerializer serializer;
private StarRocksSinkManager manager;
private TableSchema tableSchema;
From 36dbbe3f50d945562748b4abe679102a610f2233 Mon Sep 17 00:00:00 2001
From: Shiwanming <1633138551@qq.com>
Date: Tue, 7 Jan 2025 22:09:16 +0800
Subject: [PATCH 08/13] add e2e
---
.../starrocks/sink/StarRocksSinkFactory.java | 2 +
.../starrocks/StarRocksMultiSinkIT.java | 304 ++++++++++++++++++
.../src/test/resources/ddl/store.sql | 78 +++++
..._multi_source_to_multi_sink_streaming.conf | 54 ++++
4 files changed, 438 insertions(+)
create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksMultiSinkIT.java
create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/ddl/store.sql
create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/mysql_multi_source_to_multi_sink_streaming.conf
diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
index bc851a91ed3..6057eb97af9 100644
--- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
@@ -37,6 +37,7 @@
import java.util.Arrays;
import java.util.List;
+import static org.apache.seatunnel.api.sink.SinkCommonOptions.MULTI_TABLE_SINK_REPLICA;
import static org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSinkOptions.DATA_SAVE_MODE;
@AutoService(Factory.class)
@@ -64,6 +65,7 @@ public OptionRule optionRule() {
StarRocksSinkOptions.ENABLE_UPSERT_DELETE,
StarRocksSinkOptions.SCHEMA_SAVE_MODE,
StarRocksSinkOptions.DATA_SAVE_MODE,
+ MULTI_TABLE_SINK_REPLICA,
StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE,
StarRocksSinkOptions.HTTP_SOCKET_TIMEOUT_MS)
.conditional(
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksMultiSinkIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksMultiSinkIT.java
new file mode 100644
index 00000000000..33a1d081f10
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksMultiSinkIT.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.starrocks;
+
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+import org.apache.seatunnel.e2e.common.util.JobIdGenerator;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.given;
+
+@Slf4j
+@DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason =
+ "Currently SPARK do not support cdc. In addition, currently only the zeta engine supports schema evolution for pr https://github.com/apache/seatunnel/pull/5125.")
+public class StarRocksMultiSinkIT extends TestSuiteBase implements TestResource {
+ private static final String DATABASE = "store";
+ private static final String MYSQL_HOST = "mysql_cdc_e2e";
+ private static final String MYSQL_USER_NAME = "mysqluser";
+ private static final String MYSQL_USER_PASSWORD = "mysqlpw";
+
+ private static final String DOCKER_IMAGE = "starrocks/allin1-ubuntu:3.3.1";
+ private static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
+ private static final String HOST = "starrocks_cdc_e2e";
+ private static final int SR_PROXY_PORT = 8080;
+ private static final int QUERY_PORT = 9030;
+ private static final int HTTP_PORT = 8030;
+ private static final int BE_HTTP_PORT = 8040;
+ private static final String USERNAME = "root";
+ private static final String PASSWORD = "";
+ private static final String CREATE_DATABASE = "CREATE DATABASE IF NOT EXISTS " + DATABASE;
+ private static final String SR_DRIVER_JAR =
+ "https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.32/mysql-connector-j-8.0.32.jar";
+
+ private Connection starRocksConnection;
+ private Connection mysqlConnection;
+ private GenericContainer> starRocksServer;
+
+ public static final DateTimeFormatter DATE_TIME_FORMATTER =
+ DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+
+ private static final String QUERY = "select * from %s.%s order by id";
+
+ private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0);
+
+ private final UniqueDatabase shopDatabase =
+ new UniqueDatabase(MYSQL_CONTAINER, DATABASE, "mysqluser", "mysqlpw", DATABASE);
+
+ @TestContainerExtension
+ private final ContainerExtendedFactory extendedFactory =
+ container -> {
+ Container.ExecResult extraCommands =
+ container.execInContainer(
+ "bash",
+ "-c",
+ "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O "
+ + SR_DRIVER_JAR);
+ Assertions.assertEquals(0, extraCommands.getExitCode());
+ };
+
+ private static MySqlContainer createMySqlContainer(MySqlVersion version) {
+ return new MySqlContainer(version)
+ .withConfigurationOverride("docker/server-gtids/my.cnf")
+ .withSetupSQL("docker/setup.sql")
+ .withNetwork(NETWORK)
+ .withNetworkAliases(MYSQL_HOST)
+ .withDatabaseName(DATABASE)
+ .withUsername(MYSQL_USER_NAME)
+ .withPassword(MYSQL_USER_PASSWORD)
+ .withLogConsumer(
+ new Slf4jLogConsumer(DockerLoggerFactory.getLogger("mysql-docker-image")));
+ }
+
+ private void initializeJdbcConnection() throws Exception {
+ URLClassLoader urlClassLoader =
+ new URLClassLoader(
+ new URL[] {new URL(SR_DRIVER_JAR)},
+ StarRocksCDCSinkIT.class.getClassLoader());
+ Thread.currentThread().setContextClassLoader(urlClassLoader);
+ Driver driver = (Driver) urlClassLoader.loadClass(DRIVER_CLASS).newInstance();
+ Properties props = new Properties();
+ props.put("user", USERNAME);
+ props.put("password", PASSWORD);
+ starRocksConnection =
+ driver.connect(
+ String.format("jdbc:mysql://%s:%s", starRocksServer.getHost(), QUERY_PORT),
+ props);
+ }
+
+ private void initializeStarRocksServer() {
+ starRocksServer =
+ new GenericContainer<>(DOCKER_IMAGE)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(HOST)
+ .withLogConsumer(
+ new Slf4jLogConsumer(DockerLoggerFactory.getLogger(DOCKER_IMAGE)));
+ starRocksServer.setPortBindings(
+ Lists.newArrayList(
+ String.format("%s:%s", QUERY_PORT, QUERY_PORT),
+ String.format("%s:%s", HTTP_PORT, HTTP_PORT),
+ String.format("%s:%s", BE_HTTP_PORT, BE_HTTP_PORT)));
+ Startables.deepStart(Stream.of(starRocksServer)).join();
+ log.info("StarRocks container started");
+ // wait for starrocks fully start
+ given().ignoreExceptions()
+ .await()
+ .atMost(360, TimeUnit.SECONDS)
+ .untilAsserted(this::initializeJdbcConnection);
+ }
+
+ @TestTemplate
+ public void testStarRocksMultiTableSinkCase(TestContainer container)
+ throws InterruptedException, IOException, SQLException {
+ String jobId = String.valueOf(JobIdGenerator.newJobId());
+ String jobConfigFile = "/mysql_multi_source_to_multi_sink_streaming.conf";
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ container.executeJob(jobConfigFile, jobId);
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ });
+ TimeUnit.SECONDS.sleep(30);
+
+ // verify multi table sink
+ verifyDataConsistency("orders");
+ verifyDataConsistency("customers");
+ verifyDataConsistency("products");
+
+ insertNewDataIntoMySQL();
+ insertNewDataIntoMySQL();
+ TimeUnit.SECONDS.sleep(10);
+ verifyDataConsistency("orders");
+ // savepoint 1
+ Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode());
+ TimeUnit.SECONDS.sleep(5);
+ insertNewDataIntoMySQL();
+ // restore 1
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ container.restoreJob(jobConfigFile, jobId);
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ return null;
+ });
+ insertNewDataIntoMySQL();
+ TimeUnit.SECONDS.sleep(20);
+ verifyDataConsistency("orders");
+ }
+
+ private void verifyDataConsistency(String tableName) {
+ List> mysqlData =
+ query(String.format(QUERY, DATABASE, tableName), mysqlConnection);
+ List> starRocksData =
+ query(String.format(QUERY, DATABASE, tableName), starRocksConnection);
+ Assertions.assertEquals(
+ mysqlData, starRocksData, "Data consistency check failed for table: " + tableName);
+ }
+
+ private void insertNewDataIntoMySQL() throws SQLException {
+ mysqlConnection
+ .createStatement()
+ .execute(
+ "INSERT INTO orders (id, customer_id, order_date, total_amount, status) "
+ + "VALUES (null, 1, '2025-01-04 13:00:00', 498.99, 'pending')");
+ }
+
+ private Connection getMysqlJdbcConnection() throws SQLException {
+ return DriverManager.getConnection(
+ MYSQL_CONTAINER.getJdbcUrl(),
+ MYSQL_CONTAINER.getUsername(),
+ MYSQL_CONTAINER.getPassword());
+ }
+
+ @BeforeAll
+ @Override
+ public void startUp() throws SQLException {
+ initializeStarRocksServer();
+ log.info("The second stage: Starting Mysql containers...");
+ Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+ log.info("Mysql Containers are started");
+ shopDatabase.createAndInitialize();
+ log.info("Mysql ddl execution is complete");
+ initializeJdbcTable();
+ mysqlConnection = getMysqlJdbcConnection();
+ }
+
+ @AfterAll
+ @Override
+ public void tearDown() throws SQLException {
+ if (MYSQL_CONTAINER != null) {
+ MYSQL_CONTAINER.close();
+ }
+ if (starRocksServer != null) {
+ starRocksServer.close();
+ }
+ if (starRocksConnection != null) {
+ starRocksConnection.close();
+ }
+ if (mysqlConnection != null) {
+ mysqlConnection.close();
+ }
+ }
+
+ private void initializeJdbcTable() {
+ try (Statement statement = starRocksConnection.createStatement()) {
+ // create databases
+ statement.execute(CREATE_DATABASE);
+ } catch (SQLException e) {
+ throw new RuntimeException("Initializing table failed!", e);
+ }
+ }
+
+ private List> query(String sql, Connection connection) {
+ try {
+ ResultSet resultSet = connection.createStatement().executeQuery(sql);
+ List> result = new ArrayList<>();
+ int columnCount = resultSet.getMetaData().getColumnCount();
+ while (resultSet.next()) {
+ ArrayList