Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector-V2] Starrocks implements multi table sink #8467

Merged
merged 23 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
d272789
[Fix] [sink elasticsearch] Fix the issue of sink-es saveMode conflict…
jw-itq Aug 19, 2024
4ebc66b
[Fix] [sink elasticsearch] Fix the issue of sink-es saveMode and es a…
jw-itq Aug 21, 2024
6810d80
[Bug] [sink elasticsearch] the savemode of sink-es conficts with es a…
jw-itq Aug 19, 2024
ac50c3e
[Fix] [sink elasticsearch] Fix the issue of sink-es saveMode and es a…
jw-itq Aug 21, 2024
6e8ccc9
[Fix] [sink elasticsearch] Fix the issue of sink-es saveMode and es a…
jw-itq Aug 21, 2024
3936038
[Doc] Add IGNORE savemode type into docment #7443
jw-itq Aug 22, 2024
3bc24c2
Merge branch 'apache:dev' into dev
jw-itq Aug 22, 2024
c26e89d
Merge branch 'apache:dev' into dev
jw-itq Aug 26, 2024
b5f5162
Merge branch 'apache:dev' into dev
jw-itq Aug 28, 2024
0347da2
Merge branch 'apache:dev' into dev
jw-itq Sep 17, 2024
4bce27f
Merge branch 'apache:dev' into dev
jw-itq Nov 11, 2024
d4993f3
Merge branch 'apache:dev' into dev
jw-itq Nov 12, 2024
5d61e76
Merge branch 'apache:dev' into dev
jw-itq Nov 19, 2024
9f85118
Merge branch 'apache:dev' into dev
jw-itq Dec 2, 2024
bc41c79
Merge branch 'apache:dev' into dev
jw-itq Dec 4, 2024
4b21c83
Merge branch 'apache:dev' into dev
jw-itq Jan 2, 2025
03286ea
starrocks-sink SupportMultiTableSink
jw-itq Jan 6, 2025
36dbbe3
add e2e
jw-itq Jan 7, 2025
f607c46
add license
jw-itq Jan 7, 2025
774af03
optimize e2e
jw-itq Jan 8, 2025
32604c3
optimize e2e
jw-itq Jan 8, 2025
188cfe6
optimize e2e
jw-itq Jan 9, 2025
412568e
optimize e2e
jw-itq Jan 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SchemaSaveMode;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSink;
import org.apache.seatunnel.api.table.catalog.Catalog;
Expand All @@ -40,7 +41,7 @@
import java.util.Optional;

public class StarRocksSink extends AbstractSimpleSink<SeaTunnelRow, Void>
implements SupportSaveMode, SupportSchemaEvolutionSink {
implements SupportSaveMode, SupportSchemaEvolutionSink, SupportMultiTableSink {

private final TableSchema tableSchema;
private final SinkConfig sinkConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Arrays;
import java.util.List;

import static org.apache.seatunnel.api.sink.SinkCommonOptions.MULTI_TABLE_SINK_REPLICA;
import static org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSinkOptions.DATA_SAVE_MODE;

@AutoService(Factory.class)
Expand Down Expand Up @@ -64,6 +65,7 @@ public OptionRule optionRule() {
StarRocksSinkOptions.ENABLE_UPSERT_DELETE,
StarRocksSinkOptions.SCHEMA_SAVE_MODE,
StarRocksSinkOptions.DATA_SAVE_MODE,
MULTI_TABLE_SINK_REPLICA,
StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE,
StarRocksSinkOptions.HTTP_SOCKET_TIMEOUT_MS)
.conditional(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.seatunnel.connectors.seatunnel.starrocks.sink;

import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSinkWriter;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
Expand Down Expand Up @@ -46,7 +47,7 @@

@Slf4j
public class StarRocksSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
implements SupportSchemaEvolutionSinkWriter {
implements SupportMultiTableSinkWriter<Void>, SupportSchemaEvolutionSinkWriter {
private StarRocksISerializer serializer;
private StarRocksSinkManager manager;
private TableSchema tableSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ VALUES (101,"scooter","Small 2-wheel scooter",3.14),
(108,"jacket","water resistent black wind breaker",0.1),
(109,"spare tire","24 inch spare tire",22.2);


drop table if exists products_on_hand;
CREATE TABLE products_on_hand (
product_id INTEGER NOT NULL PRIMARY KEY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ private void initializeStarRocksServer() {

@TestTemplate
public void testStarRocksSinkWithSchemaEvolutionCase(TestContainer container)
throws InterruptedException, IOException {
throws InterruptedException, IOException, SQLException {
String jobId = String.valueOf(JobIdGenerator.newJobId());
String jobConfigFile = "/mysqlcdc_to_starrocks_with_schema_change.conf";
CompletableFuture.runAsync(
Expand All @@ -187,16 +187,26 @@ public void testStarRocksSinkWithSchemaEvolutionCase(TestContainer container)
}
});
TimeUnit.SECONDS.sleep(20);

// verify multi table sink
verifyDataConsistency("orders");
verifyDataConsistency("customers");

// waiting for case1 completed
assertSchemaEvolutionForAddColumns(
DATABASE, SOURCE_TABLE, SINK_TABLE, mysqlConnection, starRocksConnection);

assertSchemaEvolutionForDropColumns(
DATABASE, SOURCE_TABLE, SINK_TABLE, mysqlConnection, starRocksConnection);

insertNewDataIntoMySQL();
insertNewDataIntoMySQL();
// verify incremental
verifyDataConsistency("orders");

// savepoint 1
Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode());

insertNewDataIntoMySQL();
// case2 drop columns with cdc data at same time
shopDatabase.setTemplateName("drop_columns").createAndInitialize();

Expand Down Expand Up @@ -240,6 +250,30 @@ public void testStarRocksSinkWithSchemaEvolutionCase(TestContainer container)
// waiting for case3/case4 completed
assertTableStructureAndData(
DATABASE, SOURCE_TABLE, SINK_TABLE, mysqlConnection, starRocksConnection);
insertNewDataIntoMySQL();
// verify restore
verifyDataConsistency("orders");
}

private void insertNewDataIntoMySQL() throws SQLException {
mysqlConnection
.createStatement()
.execute(
"INSERT INTO orders (id, customer_id, order_date, total_amount, status) "
+ "VALUES (null, 1, '2025-01-04 13:00:00', 498.99, 'pending')");
}

private void verifyDataConsistency(String tableName) {
await().atMost(10000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertIterableEquals(
query(
String.format(QUERY, DATABASE, tableName),
mysqlConnection),
query(
String.format(QUERY, DATABASE, tableName),
starRocksConnection)));
}

private void assertSchemaEvolutionForAddColumns(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,30 @@ CREATE TABLE products (
weight FLOAT
);

drop table if exists orders;

CREATE TABLE orders (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
customer_id BIGINT NOT NULL,
order_date DATETIME NOT NULL,
total_amount DECIMAL ( 10, 2 ) NOT NULL,
STATUS VARCHAR ( 50 ) DEFAULT 'pending',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

drop table if exists customers;

CREATE TABLE customers (
id BIGINT PRIMARY KEY,
NAME VARCHAR ( 255 ) NOT NULL,
email VARCHAR ( 255 ) NOT NULL,
phone VARCHAR ( 50 ),
address TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

ALTER TABLE products AUTO_INCREMENT = 101;

INSERT INTO products
Expand All @@ -41,4 +65,16 @@ VALUES (101,"scooter","Small 2-wheel scooter",3.14),
(106,"hammer","16oz carpenter's hammer",1.0),
(107,"rocks","box of assorted rocks",5.3),
(108,"jacket","water resistent black wind breaker",0.1),
(109,"spare tire","24 inch spare tire",22.2);
(109,"spare tire","24 inch spare tire",22.2);

INSERT INTO orders ( id, customer_id, order_date, total_amount, STATUS )
VALUES
( 1, 1, '2024-01-01 10:00:00', 299.99, 'completed' ),
( 2, 2, '2024-01-02 11:00:00', 199.99, 'completed' ),
( 3, 3, '2024-01-03 12:00:00', 399.99, 'processing' );

INSERT INTO customers ( id, NAME, email, phone, address )
VALUES
( 1, 'John Doe', '[email protected]', '123-456-7890', '123 Main St' ),
( 2, 'Jane Smith', '[email protected]', '234-567-8901', '456 Oak Ave' ),
( 3, 'Bob Johnson', '[email protected]', '345-678-9012', '789 Pine Rd' );
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,14 @@
env {
# You can set engine configuration here
job.mode = "STREAMING"
checkpoint.interval = 5000
read_limit.bytes_per_second=7000000
read_limit.rows_per_second=400
checkpoint.interval = 2000
}

source {
MySQL-CDC {
username = "st_user_source"
password = "mysqlpw"
table-names = ["shop.products"]
table-names = ["shop.products", "shop.orders", "shop.customers"]
base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"

schema-changes.enabled = true
Expand Down
Loading