Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector-V2] Starrocks implements multi table sink #8467

Merged
merged 23 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
d272789
[Fix] [sink elasticsearch] Fix the issue of sink-es saveMode conflict…
jw-itq Aug 19, 2024
4ebc66b
[Fix] [sink elasticsearch] Fix the issue of sink-es saveMode and es a…
jw-itq Aug 21, 2024
6810d80
[Bug] [sink elasticsearch] the savemode of sink-es conficts with es a…
jw-itq Aug 19, 2024
ac50c3e
[Fix] [sink elasticsearch] Fix the issue of sink-es saveMode and es a…
jw-itq Aug 21, 2024
6e8ccc9
[Fix] [sink elasticsearch] Fix the issue of sink-es saveMode and es a…
jw-itq Aug 21, 2024
3936038
[Doc] Add IGNORE savemode type into docment #7443
jw-itq Aug 22, 2024
3bc24c2
Merge branch 'apache:dev' into dev
jw-itq Aug 22, 2024
c26e89d
Merge branch 'apache:dev' into dev
jw-itq Aug 26, 2024
b5f5162
Merge branch 'apache:dev' into dev
jw-itq Aug 28, 2024
0347da2
Merge branch 'apache:dev' into dev
jw-itq Sep 17, 2024
4bce27f
Merge branch 'apache:dev' into dev
jw-itq Nov 11, 2024
d4993f3
Merge branch 'apache:dev' into dev
jw-itq Nov 12, 2024
5d61e76
Merge branch 'apache:dev' into dev
jw-itq Nov 19, 2024
9f85118
Merge branch 'apache:dev' into dev
jw-itq Dec 2, 2024
bc41c79
Merge branch 'apache:dev' into dev
jw-itq Dec 4, 2024
4b21c83
Merge branch 'apache:dev' into dev
jw-itq Jan 2, 2025
03286ea
starrocks-sink SupportMultiTableSink
jw-itq Jan 6, 2025
36dbbe3
add e2e
jw-itq Jan 7, 2025
f607c46
add license
jw-itq Jan 7, 2025
774af03
optimize e2e
jw-itq Jan 8, 2025
32604c3
optimize e2e
jw-itq Jan 8, 2025
188cfe6
optimize e2e
jw-itq Jan 9, 2025
412568e
optimize e2e
jw-itq Jan 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SchemaSaveMode;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSink;
import org.apache.seatunnel.api.table.catalog.Catalog;
Expand All @@ -40,7 +41,7 @@
import java.util.Optional;

public class StarRocksSink extends AbstractSimpleSink<SeaTunnelRow, Void>
implements SupportSaveMode, SupportSchemaEvolutionSink {
implements SupportSaveMode, SupportSchemaEvolutionSink, SupportMultiTableSink {

private final TableSchema tableSchema;
private final SinkConfig sinkConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Arrays;
import java.util.List;

import static org.apache.seatunnel.api.sink.SinkCommonOptions.MULTI_TABLE_SINK_REPLICA;
import static org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSinkOptions.DATA_SAVE_MODE;

@AutoService(Factory.class)
Expand Down Expand Up @@ -64,6 +65,7 @@ public OptionRule optionRule() {
StarRocksSinkOptions.ENABLE_UPSERT_DELETE,
StarRocksSinkOptions.SCHEMA_SAVE_MODE,
StarRocksSinkOptions.DATA_SAVE_MODE,
MULTI_TABLE_SINK_REPLICA,
StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE,
StarRocksSinkOptions.HTTP_SOCKET_TIMEOUT_MS)
.conditional(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.seatunnel.connectors.seatunnel.starrocks.sink;

import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSinkWriter;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
Expand Down Expand Up @@ -46,7 +47,7 @@

@Slf4j
public class StarRocksSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
implements SupportSchemaEvolutionSinkWriter {
implements SupportMultiTableSinkWriter<Void>, SupportSchemaEvolutionSinkWriter {
private StarRocksISerializer serializer;
private StarRocksSinkManager manager;
private TableSchema tableSchema;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,308 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.e2e.connector.starrocks;

import org.apache.seatunnel.shade.com.google.common.collect.Lists;

import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
import org.apache.seatunnel.e2e.common.util.JobIdGenerator;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerLoggerFactory;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import static org.awaitility.Awaitility.await;
import static org.awaitility.Awaitility.given;

@Slf4j
@DisabledOnContainer(
value = {},
type = {EngineType.SPARK, EngineType.FLINK},
disabledReason =
"Currently SPARK do not support cdc. In addition, currently only the zeta engine supports schema evolution for pr https://github.com/apache/seatunnel/pull/5125.")
public class StarRocksMultiSinkIT extends TestSuiteBase implements TestResource {
private static final String DATABASE = "store";
private static final String MYSQL_HOST = "mysql_cdc_e2e";
private static final String MYSQL_USER_NAME = "mysqluser";
private static final String MYSQL_USER_PASSWORD = "mysqlpw";

private static final String DOCKER_IMAGE = "starrocks/allin1-ubuntu:3.3.1";
private static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
private static final String HOST = "starrocks_cdc_e2e";
private static final int SR_PROXY_PORT = 8080;
private static final int QUERY_PORT = 9030;
private static final int HTTP_PORT = 8030;
private static final int BE_HTTP_PORT = 8040;
private static final String USERNAME = "root";
private static final String PASSWORD = "";
private static final String CREATE_DATABASE = "CREATE DATABASE IF NOT EXISTS " + DATABASE;
private static final String SR_DRIVER_JAR =
"https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.32/mysql-connector-j-8.0.32.jar";

private Connection starRocksConnection;
private Connection mysqlConnection;
private GenericContainer<?> starRocksServer;

public static final DateTimeFormatter DATE_TIME_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

private static final String QUERY = "select * from %s.%s order by id";

private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0);

private final UniqueDatabase shopDatabase =
new UniqueDatabase(MYSQL_CONTAINER, DATABASE, "mysqluser", "mysqlpw", DATABASE);

@TestContainerExtension
private final ContainerExtendedFactory extendedFactory =
container -> {
Container.ExecResult extraCommands =
container.execInContainer(
"bash",
"-c",
"mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O "
+ SR_DRIVER_JAR);
Assertions.assertEquals(0, extraCommands.getExitCode());
};

private static MySqlContainer createMySqlContainer(MySqlVersion version) {
return new MySqlContainer(version)
.withConfigurationOverride("docker/server-gtids/my.cnf")
.withSetupSQL("docker/setup.sql")
.withNetwork(NETWORK)
.withNetworkAliases(MYSQL_HOST)
.withDatabaseName(DATABASE)
.withUsername(MYSQL_USER_NAME)
.withPassword(MYSQL_USER_PASSWORD)
.withLogConsumer(
new Slf4jLogConsumer(DockerLoggerFactory.getLogger("mysql-docker-image")));
}

private void initializeJdbcConnection() throws Exception {
URLClassLoader urlClassLoader =
new URLClassLoader(
new URL[] {new URL(SR_DRIVER_JAR)},
StarRocksCDCSinkIT.class.getClassLoader());
Thread.currentThread().setContextClassLoader(urlClassLoader);
Driver driver = (Driver) urlClassLoader.loadClass(DRIVER_CLASS).newInstance();
Properties props = new Properties();
props.put("user", USERNAME);
props.put("password", PASSWORD);
starRocksConnection =
driver.connect(
String.format("jdbc:mysql://%s:%s", starRocksServer.getHost(), QUERY_PORT),
props);
}

private void initializeStarRocksServer() {
starRocksServer =
new GenericContainer<>(DOCKER_IMAGE)
.withNetwork(NETWORK)
.withNetworkAliases(HOST)
.withLogConsumer(
new Slf4jLogConsumer(DockerLoggerFactory.getLogger(DOCKER_IMAGE)));
starRocksServer.setPortBindings(
Lists.newArrayList(
String.format("%s:%s", QUERY_PORT, QUERY_PORT),
String.format("%s:%s", HTTP_PORT, HTTP_PORT),
String.format("%s:%s", BE_HTTP_PORT, BE_HTTP_PORT)));
Startables.deepStart(Stream.of(starRocksServer)).join();
log.info("StarRocks container started");
// wait for starrocks fully start
given().ignoreExceptions()
.await()
.atMost(360, TimeUnit.SECONDS)
.untilAsserted(this::initializeJdbcConnection);
}

@TestTemplate
public void testStarRocksMultiTableSinkCase(TestContainer container)
throws InterruptedException, IOException, SQLException {
String jobId = String.valueOf(JobIdGenerator.newJobId());
String jobConfigFile = "/mysql_multi_source_to_multi_sink_streaming.conf";
CompletableFuture.runAsync(
() -> {
try {
container.executeJob(jobConfigFile, jobId);
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
}
});
TimeUnit.SECONDS.sleep(10);

// verify multi table sink
verifyDataConsistency("orders");
verifyDataConsistency("customers");
verifyDataConsistency("products");

insertNewDataIntoMySQL();
insertNewDataIntoMySQL();
// verify incremental
verifyDataConsistency("orders");
// savepoint
Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode());
insertNewDataIntoMySQL();
// restore
CompletableFuture.supplyAsync(
() -> {
try {
container.restoreJob(jobConfigFile, jobId);
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
}
return null;
});
insertNewDataIntoMySQL();
// verify restore
verifyDataConsistency("orders");
}

private void verifyDataConsistency(String tableName) {
await().atMost(10000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertIterableEquals(
query(
String.format(QUERY, DATABASE, tableName),
mysqlConnection),
query(
String.format(QUERY, DATABASE, tableName),
starRocksConnection)));
}

private void insertNewDataIntoMySQL() throws SQLException {
mysqlConnection
.createStatement()
.execute(
"INSERT INTO orders (id, customer_id, order_date, total_amount, status) "
+ "VALUES (null, 1, '2025-01-04 13:00:00', 498.99, 'pending')");
}

private Connection getMysqlJdbcConnection() throws SQLException {
return DriverManager.getConnection(
MYSQL_CONTAINER.getJdbcUrl(),
MYSQL_CONTAINER.getUsername(),
MYSQL_CONTAINER.getPassword());
}

@BeforeAll
@Override
public void startUp() throws SQLException {
initializeStarRocksServer();
log.info("The second stage: Starting Mysql containers...");
Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
log.info("Mysql Containers are started");
shopDatabase.createAndInitialize();
log.info("Mysql ddl execution is complete");
initializeJdbcTable();
mysqlConnection = getMysqlJdbcConnection();
}

@AfterAll
@Override
public void tearDown() throws SQLException {
if (MYSQL_CONTAINER != null) {
MYSQL_CONTAINER.close();
}
if (starRocksServer != null) {
starRocksServer.close();
}
if (starRocksConnection != null) {
starRocksConnection.close();
}
if (mysqlConnection != null) {
mysqlConnection.close();
}
}

private void initializeJdbcTable() {
try (Statement statement = starRocksConnection.createStatement()) {
// create databases
statement.execute(CREATE_DATABASE);
} catch (SQLException e) {
throw new RuntimeException("Initializing table failed!", e);
}
}

private List<List<Object>> query(String sql, Connection connection) {
try {
ResultSet resultSet = connection.createStatement().executeQuery(sql);
List<List<Object>> result = new ArrayList<>();
int columnCount = resultSet.getMetaData().getColumnCount();
while (resultSet.next()) {
ArrayList<Object> objects = new ArrayList<>();
for (int i = 1; i <= columnCount; i++) {
if (resultSet.getObject(i) instanceof Timestamp) {
Timestamp timestamp = resultSet.getTimestamp(i);
objects.add(timestamp.toLocalDateTime().format(DATE_TIME_FORMATTER));
break;
}
if (resultSet.getObject(i) instanceof LocalDateTime) {
LocalDateTime localDateTime = resultSet.getObject(i, LocalDateTime.class);
objects.add(localDateTime.format(DATE_TIME_FORMATTER));
break;
}
objects.add(resultSet.getObject(i));
}
log.debug(String.format("Print query, sql: %s, data: %s", sql, objects));
result.add(objects);
}
return result;
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
Loading
Loading