Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector] add starrocks save_mode #6029

Merged
merged 41 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
bb690ff
[feature] add_starrocks_save_mode
chl-wxp Dec 19, 2023
03056da
[feature] add_starrocks_save_mode2
chl-wxp Dec 19, 2023
655fca3
[feature] add_starrocks_save_mode3
chl-wxp Dec 19, 2023
7755581
[feature] add_starrocks_save_mode4
chl-wxp Dec 19, 2023
ee44f04
[feature] add_starrocks_save_mode4
chl-wxp Dec 19, 2023
84ad25d
[feature] add_starrocks_save_mode5
chl-wxp Dec 20, 2023
b9d3295
[feature] add_starrocks_save_mode6
chl-wxp Dec 20, 2023
53544dc
[feature] add_starrocks_save_mode7
chl-wxp Dec 20, 2023
534d9d9
[feature] add_starrocks_save_mode8
chl-wxp Dec 20, 2023
f5bb0a5
[feature] add_starrocks_save_mode8
chl-wxp Dec 20, 2023
a52494e
[feature] add_starrocks_save_mode8
chl-wxp Dec 20, 2023
d58c687
[feature] add_starrocks_save_mode8
chl-wxp Dec 20, 2023
a1f5920
[feature] add_starrocks_save_mode8
chl-wxp Dec 20, 2023
b5e27e5
[feature] add_starrocks_save_mode8
chl-wxp Dec 20, 2023
e9e4cc6
[feature] add_starrocks_save_mode9
chl-wxp Dec 22, 2023
7631921
Merge remote-tracking branch 'seatunnel/dev' into feature/add_starroc…
chl-wxp Dec 25, 2023
8110e02
[feature] update config
chl-wxp Dec 25, 2023
43462f5
Merge remote-tracking branch 'origin/feature/add_starrocks_save_mode'…
chl-wxp Dec 25, 2023
d706f99
[feature] add starrocks catalog for save mode
chl-wxp Dec 27, 2023
ad7b00e
update
Hisoka-X Dec 28, 2023
fa63457
update
Hisoka-X Dec 28, 2023
7a73567
update
Hisoka-X Dec 28, 2023
434053f
Merge remote-tracking branch 'seatunnel/dev' into feature/add_starroc…
chl-wxp Jan 5, 2024
4a04cc9
[feature] add starrocks save mode doc
chl-wxp Jan 5, 2024
74b532f
[feature] update doc and test
chl-wxp Jan 5, 2024
055d4ef
[feature] update doc and test2
chl-wxp Jan 8, 2024
03499c3
[feature] update doc and test3
chl-wxp Jan 8, 2024
5da9afe
[update] remove sink ddl
chl-wxp Jan 9, 2024
bec91e3
[update] remove sink ddl2
chl-wxp Jan 9, 2024
facd413
[update] remove sink ddl3
chl-wxp Jan 10, 2024
87efa3f
[update] remove sink ddl4
chl-wxp Jan 10, 2024
b841edf
[update] remove sink ddl5
chl-wxp Jan 10, 2024
40d2bb3
[update] remove sink ddl6
chl-wxp Jan 10, 2024
139aec5
[update] remove sink ddl7
chl-wxp Jan 10, 2024
add8a6f
[update] remove sink ddl8
chl-wxp Jan 10, 2024
9c69453
[update] remove sink ddl9
chl-wxp Jan 10, 2024
4ecadb3
[update] remove sink ddl10
chl-wxp Jan 10, 2024
d7d2c46
[update] update row value
chl-wxp Jan 10, 2024
5639f04
[update] update row type
chl-wxp Jan 10, 2024
70c21f1
[update] update catalog create table
chl-wxp Jan 10, 2024
3419ace
[update] delete code
chl-wxp Jan 10, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
import org.apache.seatunnel.connectors.seatunnel.starrocks.sink.StarRocksSaveModeUtil;

import org.apache.commons.lang3.StringUtils;

Expand Down Expand Up @@ -72,6 +73,7 @@ public class StarRocksCatalog implements Catalog {
protected final String baseUrl;
protected String defaultUrl;
private final JdbcUrlUtil.UrlInfo urlInfo;
private final String template;

private static final Set<String> SYS_DATABASES = new HashSet<>();
private static final Logger LOG = LoggerFactory.getLogger(StarRocksCatalog.class);
Expand All @@ -81,10 +83,10 @@ public class StarRocksCatalog implements Catalog {
SYS_DATABASES.add("_statistics_");
}

public StarRocksCatalog(String catalogName, String username, String pwd, String defaultUrl) {
public StarRocksCatalog(
String catalogName, String username, String pwd, String defaultUrl, String template) {

checkArgument(StringUtils.isNotBlank(username));
checkArgument(StringUtils.isNotBlank(pwd));
checkArgument(StringUtils.isNotBlank(defaultUrl));
urlInfo = JdbcUrlUtil.getUrlInfo(defaultUrl);
this.baseUrl = urlInfo.getUrlWithoutDatabase();
Expand All @@ -95,6 +97,7 @@ public StarRocksCatalog(String catalogName, String username, String pwd, String
this.catalogName = catalogName;
this.username = username;
this.pwd = pwd;
this.template = template;
}

@Override
Expand Down Expand Up @@ -208,13 +211,70 @@ public CatalogTable getTable(TablePath tablePath)
@Override
public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
throw new UnsupportedOperationException();
this.createTable(
StarRocksSaveModeUtil.fillingCreateSql(
template,
table.getTableId().getDatabaseName(),
table.getTableId().getTableName(),
table.getTableSchema()));
}

@Override
public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
throw new UnsupportedOperationException();
try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
if (ignoreIfNotExists) {
conn.createStatement().execute("DROP TABLE IF EXISTS " + tablePath.getFullName());
} else {
conn.createStatement()
.execute(String.format("DROP TABLE %s", tablePath.getFullName()));
}
} catch (Exception e) {
throw new CatalogException(
String.format("Failed listing database in catalog %s", catalogName), e);
}
}

public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
if (ignoreIfNotExists) {
conn.createStatement()
.execute(String.format("TRUNCATE TABLE %s", tablePath.getFullName()));
}
} catch (Exception e) {
throw new CatalogException(
String.format("Failed TRUNCATE TABLE in catalog %s", tablePath.getFullName()),
e);
}
}

public void executeSql(TablePath tablePath, String sql) {
try (Connection connection = DriverManager.getConnection(defaultUrl, username, pwd)) {
try (PreparedStatement ps = connection.prepareStatement(sql)) {
// Will there exist concurrent drop for one table?
ps.execute();
} catch (SQLException e) {
throw new CatalogException(String.format("Failed executeSql error %s", sql), e);
}
} catch (Exception e) {
throw new CatalogException(String.format("Failed EXECUTE SQL in catalog %s", sql), e);
}
}

public boolean isExistsData(TablePath tablePath) {
try (Connection connection = DriverManager.getConnection(defaultUrl, username, pwd)) {
String sql = String.format("select * from %s limit 1", tablePath.getFullName());
PreparedStatement ps = connection.prepareStatement(sql);
chl-wxp marked this conversation as resolved.
Show resolved Hide resolved
ResultSet resultSet = ps.executeQuery();
if (resultSet == null) {
return false;
}
return resultSet.next();
} catch (SQLException e) {
throw new CatalogException(
String.format("Failed Connection JDBC error %s", tablePath.getTableName()), e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksOptions;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSinkOptions;

import com.google.auto.service.AutoService;

Expand All @@ -36,7 +37,8 @@ public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
catalogName,
options.get(StarRocksOptions.USERNAME),
options.get(StarRocksOptions.PASSWORD),
options.get(StarRocksOptions.BASE_URL));
options.get(StarRocksOptions.BASE_URL),
options.get(StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SchemaSaveMode;

import lombok.Getter;
import lombok.Setter;
Expand Down Expand Up @@ -58,7 +59,9 @@ public enum StreamLoadFormat {

private String saveModeCreateTemplate;

private SchemaSaveMode schemaSaveMode;
private DataSaveMode dataSaveMode;
private String customSql;

private int httpSocketTimeout;

Expand Down Expand Up @@ -91,7 +94,9 @@ public static SinkConfig of(ReadonlyConfig config) {
config.getOptional(StarRocksSinkOptions.COLUMN_SEPARATOR)
.ifPresent(sinkConfig::setColumnSeparator);
sinkConfig.setLoadFormat(config.get(StarRocksSinkOptions.LOAD_FORMAT));
sinkConfig.setDataSaveMode(config.get(StarRocksSinkOptions.SAVE_MODE));
sinkConfig.setSchemaSaveMode(config.get(StarRocksSinkOptions.SCHEMA_SAVE_MODE));
sinkConfig.setDataSaveMode(config.get(StarRocksSinkOptions.DATA_SAVE_MODE));
sinkConfig.setCustomSql(config.get(StarRocksSinkOptions.CUSTOM_SQL));
sinkConfig.setHttpSocketTimeout(config.get(StarRocksSinkOptions.HTTP_SOCKET_TIMEOUT_MS));
return sinkConfig;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.SingleChoiceOption;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.sink.SchemaSaveMode;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig.StreamLoadFormat;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -131,10 +129,15 @@ public interface StarRocksSinkOptions {
.enumType(StreamLoadFormat.class)
.defaultValue(StreamLoadFormat.JSON)
.withDescription("");

SingleChoiceOption<DataSaveMode> SAVE_MODE =
Options.key(SupportSaveMode.DATA_SAVE_MODE_KEY)
.singleChoice(DataSaveMode.class, Arrays.asList(DataSaveMode.APPEND_DATA))
Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
Options.key("schema_save_mode")
.enumType(SchemaSaveMode.class)
.defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST)
.withDescription("schema_save_mode");
chl-wxp marked this conversation as resolved.
Show resolved Hide resolved

Option<DataSaveMode> DATA_SAVE_MODE =
Options.key("data_save_mode")
.enumType(DataSaveMode.class)
.defaultValue(DataSaveMode.APPEND_DATA)
.withDescription(
"Table structure and data processing methods that already exist on the target end");
Expand All @@ -144,4 +147,7 @@ public interface StarRocksSinkOptions {
.intType()
.defaultValue(3 * 60 * 1000)
.withDescription("Set http socket timeout, default is 3 minutes.");

Option<String> CUSTOM_SQL =
Options.key("custom_sql").stringType().noDefaultValue().withDescription("custom_sql");
chl-wxp marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@

package org.apache.seatunnel.connectors.seatunnel.starrocks.sink;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SchemaSaveMode;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
Expand All @@ -38,62 +42,51 @@ public class StarRocksSink extends AbstractSimpleSink<SeaTunnelRow, Void>

private SeaTunnelRowType seaTunnelRowType;
private final SinkConfig sinkConfig;
private final DataSaveMode dataSaveMode;

private DataSaveMode dataSaveMode;
private SchemaSaveMode schemaSaveMode;
private final CatalogTable catalogTable;

public StarRocksSink(SinkConfig sinkConfig, CatalogTable catalogTable) {
public StarRocksSink(
SinkConfig sinkConfig, CatalogTable catalogTable, ReadonlyConfig readonlyConfig) {
this.sinkConfig = sinkConfig;
this.seaTunnelRowType = catalogTable.getTableSchema().toPhysicalRowDataType();
this.catalogTable = catalogTable;
this.dataSaveMode = sinkConfig.getDataSaveMode();
this.schemaSaveMode = sinkConfig.getSchemaSaveMode();
}

@Override
public String getPluginName() {
return StarRocksCatalogFactory.IDENTIFIER;
}

private void autoCreateTable(String template) {
StarRocksCatalog starRocksCatalog =
new StarRocksCatalog(
"StarRocks",
sinkConfig.getUsername(),
sinkConfig.getPassword(),
sinkConfig.getJdbcUrl());
if (!starRocksCatalog.databaseExists(sinkConfig.getDatabase())) {
starRocksCatalog.createDatabase(TablePath.of(sinkConfig.getDatabase(), ""), true);
}
if (!starRocksCatalog.tableExists(
TablePath.of(sinkConfig.getDatabase(), sinkConfig.getTable()))) {
starRocksCatalog.createTable(
StarRocksSaveModeUtil.fillingCreateSql(
template,
sinkConfig.getDatabase(),
sinkConfig.getTable(),
catalogTable.getTableSchema()));
}
}

@Override
public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context) {
return new StarRocksSinkWriter(sinkConfig, seaTunnelRowType);
}

@Override
public Optional<SaveModeHandler> getSaveModeHandler() {
return Optional.empty();
}

/*@Override
public DataSaveMode getUserConfigSaveMode() {
return dataSaveMode;
TablePath tablePath =
TablePath.of(
catalogTable.getTableId().getDatabaseName(),
catalogTable.getTableId().getSchemaName(),
catalogTable.getTableId().getTableName());
Catalog catalog =
new StarRocksCatalog(
"StarRocks",
sinkConfig.getUsername(),
sinkConfig.getPassword(),
sinkConfig.getJdbcUrl(),
sinkConfig.getSaveModeCreateTemplate());
catalog.open();
return Optional.of(
new DefaultSaveModeHandler(
schemaSaveMode,
dataSaveMode,
catalog,
tablePath,
catalogTable,
sinkConfig.getCustomSql()));
}

@Override
public void handleSaveMode(DataSaveMode saveMode) {
if (catalogTable != null) {
autoCreateTable(sinkConfig.getSaveModeCreateTemplate());
}
}*/
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.seatunnel.connectors.seatunnel.starrocks.sink;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
Expand All @@ -31,6 +33,11 @@

import com.google.auto.service.AutoService;

import static org.apache.seatunnel.api.sink.SinkReplaceNameConstant.REPLACE_DATABASE_NAME_KEY;
import static org.apache.seatunnel.api.sink.SinkReplaceNameConstant.REPLACE_SCHEMA_NAME_KEY;
import static org.apache.seatunnel.api.sink.SinkReplaceNameConstant.REPLACE_TABLE_NAME_KEY;
import static org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSinkOptions.DATA_SAVE_MODE;

@AutoService(Factory.class)
public class StarRocksSinkFactory implements TableSinkFactory {
@Override
Expand All @@ -54,9 +61,14 @@ public OptionRule optionRule() {
StarRocksSinkOptions.RETRY_BACKOFF_MULTIPLIER_MS,
StarRocksSinkOptions.STARROCKS_CONFIG,
StarRocksSinkOptions.ENABLE_UPSERT_DELETE,
StarRocksSinkOptions.SAVE_MODE,
StarRocksSinkOptions.SCHEMA_SAVE_MODE,
StarRocksSinkOptions.DATA_SAVE_MODE,
StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE,
StarRocksSinkOptions.HTTP_SOCKET_TIMEOUT_MS)
.conditional(
DATA_SAVE_MODE,
DataSaveMode.CUSTOM_PROCESSING,
StarRocksSinkOptions.CUSTOM_SQL)
.build();
}

Expand All @@ -67,6 +79,47 @@ public TableSink createSink(TableSinkFactoryContext context) {
if (StringUtils.isBlank(sinkConfig.getTable())) {
sinkConfig.setTable(catalogTable.getTableId().getTableName());
}
return () -> new StarRocksSink(sinkConfig, catalogTable);
// get source table relevant information
TableIdentifier tableId = catalogTable.getTableId();
String sourceDatabaseName = tableId.getDatabaseName();
String sourceSchemaName = tableId.getSchemaName();
String sourceTableName = tableId.getTableName();
// get sink table relevant information
String sinkDatabaseName = sinkConfig.getDatabase();
String sinkTableName = sinkConfig.getTable();
// to replace
String finalDatabaseName =
sinkDatabaseName.replace(REPLACE_DATABASE_NAME_KEY, sourceDatabaseName);
String finalTableName = this.replaceFullTableName(sinkTableName, tableId);
// rebuild TableIdentifier and catalogTable
TableIdentifier newTableId =
TableIdentifier.of(
tableId.getCatalogName(), finalDatabaseName, null, finalTableName);
catalogTable =
CatalogTable.of(
newTableId,
catalogTable.getTableSchema(),
catalogTable.getOptions(),
catalogTable.getPartitionKeys(),
catalogTable.getCatalogName());

CatalogTable finalCatalogTable = catalogTable;
// reset
sinkConfig.setTable(finalTableName);
sinkConfig.setDatabase(finalDatabaseName);
return () -> new StarRocksSink(sinkConfig, finalCatalogTable, context.getOptions());
}

private String replaceFullTableName(String original, TableIdentifier tableId) {
if (StringUtils.isNotBlank(tableId.getDatabaseName())) {
original = original.replace(REPLACE_DATABASE_NAME_KEY, tableId.getDatabaseName());
}
if (StringUtils.isNotBlank(tableId.getSchemaName())) {
original = original.replace(REPLACE_SCHEMA_NAME_KEY, tableId.getSchemaName());
}
if (StringUtils.isNotBlank(tableId.getTableName())) {
original = original.replace(REPLACE_TABLE_NAME_KEY, tableId.getTableName());
}
return original;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class StarRocksCatalogTest {
public void testCatalog() {
StarRocksCatalog catalog =
new StarRocksCatalog(
"starrocks", "root", "123456", "jdbc:mysql://47.108.65.163:9030/");
"starrocks", "root", "123456", "jdbc:mysql://47.108.65.163:9030/", "");
List<String> databases = catalog.listDatabases();
LOGGER.info("find databases: " + databases);

Expand Down
Loading