Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature][connector-jdbc]Add Save Mode function and Connector-JDBC (MySQL) connector has been realized #5663

Merged
merged 27 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
9f97bed
[feature] Add Save Mode function and Connector-JDBC (MySQL) connector…
chl-wxp Oct 19, 2023
fe53452
[feature] update MysqlCreateTableSqlBuilder
chl-wxp Oct 19, 2023
d60d4e6
Merge remote-tracking branch 'origin/dev' into feature/save_mode_jdbc…
chl-wxp Oct 19, 2023
8fe5b80
[update] Solving the conflict of consolidation
chl-wxp Oct 19, 2023
a4309ed
[update] update jdbc-e2e-part-7
chl-wxp Oct 19, 2023
aea950a
[fix] fix get table bug
chl-wxp Oct 19, 2023
09e503b
[fix] fix mysql_container bug
chl-wxp Oct 19, 2023
e681a19
[fix] fix mysql_container bug2
chl-wxp Oct 19, 2023
783628e
[fix] Delete some unreasonable logic
chl-wxp Oct 20, 2023
0d1cbd5
Merge remote-tracking branch 'origin/dev' into feature/save_mode_jdbc…
chl-wxp Oct 20, 2023
3f31cf9
[update] Solving the conflict of consolidation
chl-wxp Oct 20, 2023
cc8e509
[update] update DataSaveMode enum
chl-wxp Oct 23, 2023
4087977
[update] update jdbcSinkFactory NPE bug
chl-wxp Oct 23, 2023
b17e4af
[update] update jdbcSinkFactory NPE bug 2
chl-wxp Oct 23, 2023
2d92bae
[update] update jdbcSinkFactory NPE bug 3
chl-wxp Oct 23, 2023
a16e051
[update] add time for e2e
chl-wxp Oct 23, 2023
91b3946
Merge remote-tracking branch 'origin/dev' into feature/save_mode_jdbc…
chl-wxp Oct 25, 2023
7c6a692
Merge remote-tracking branch 'origin/dev' into feature/save_mode_jdbc…
chl-wxp Oct 25, 2023
f06c920
[update] update create mysql table bit length is 0 bug
chl-wxp Oct 25, 2023
f19b46b
[update] update dameng bug
chl-wxp Oct 26, 2023
c06d38b
[update] update dameng bug2
chl-wxp Oct 26, 2023
05f1fdf
[update] update dameng port for e2e
chl-wxp Oct 26, 2023
fb640db
[update] add fink and spark e2e for saveMode handler
chl-wxp Oct 27, 2023
ed97b4f
[update] update jdbcSinkFactory
chl-wxp Oct 27, 2023
61700da
Merge remote-tracking branch 'origin/dev' into feature/save_mode_jdbc…
chl-wxp Oct 27, 2023
aee4d78
[update] Solve conflict
chl-wxp Oct 27, 2023
fc5d110
[update] Solve conflict2
chl-wxp Oct 27, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions .github/workflows/backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,30 @@ jobs:
env:
MAVEN_OPTS: -Xmx4096m

jdbc-connectors-it-part-7:
needs: [ changes, sanity-check ]
if: needs.changes.outputs.api == 'true'
runs-on: ${{ matrix.os }}
strategy:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
timeout-minutes: 90
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
uses: actions/setup-java@v3
with:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: run jdbc connectors integration test (part-6)
if: needs.changes.outputs.api == 'true'
run: |
./mvnw -B -T 1C verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-jdbc-e2e-part-7 -am -Pci
env:
MAVEN_OPTS: -Xmx4096m


kafka-connector-it:
needs: [ changes, sanity-check ]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ public enum SeaTunnelAPIErrorCode implements SeaTunnelErrorCode {
DATABASE_ALREADY_EXISTED("API-07", "Database already existed"),
TABLE_ALREADY_EXISTED("API-08", "Table already existed"),
HANDLE_SAVE_MODE_FAILED("API-09", "Handle save mode failed"),
;
SOURCE_ALREADY_HAS_DATA("API-10", "The target data source already has data"),
SINK_TABLE_NOT_EXIST("API-11", "The sink table not exist");

private final String code;
private final String description;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,16 @@
* The SaveMode for the Sink connectors that use table or other table structures to organize data
*/
public enum DataSaveMode {
// Will drop table in MySQL, Will drop path for File Connector.
DROP_SCHEMA,

// Only drop the data in MySQL, Only drop the files in the path for File Connector.
KEEP_SCHEMA_DROP_DATA,
// Preserve database structure and delete data
DROP_DATA,

// Keep the table and data and continue to write data to the existing table for MySQL. Keep the
// path and files in the path, create new files in the path.
KEEP_SCHEMA_AND_DATA,
// Preserve database structure, preserve data
APPEND_DATA,

// The connector provides custom processing methods, such as running user provided SQL or shell
// scripts, etc
// User defined processing
CUSTOM_PROCESSING,

// Throw error when table is exists for MySQL. Throw error when path is exists.
ERROR_WHEN_EXISTS
// When there exist data, an error will be reported
ERROR_WHEN_DATA_EXISTS
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.sink;

import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;

import lombok.AllArgsConstructor;

import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.SINK_TABLE_NOT_EXIST;
import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.SOURCE_ALREADY_HAS_DATA;

@AllArgsConstructor
public class DefaultSaveModeHandler implements SaveModeHandler {

public SchemaSaveMode schemaSaveMode;
public DataSaveMode dataSaveMode;
public Catalog catalog;
public TablePath tablePath;
public CatalogTable catalogTable;
public String customSql;

public DefaultSaveModeHandler(
SchemaSaveMode schemaSaveMode,
DataSaveMode dataSaveMode,
Catalog catalog,
CatalogTable catalogTable,
String customSql) {
this(
schemaSaveMode,
dataSaveMode,
catalog,
catalogTable.getTableId().toTablePath(),
catalogTable,
customSql);
}

@Override
public void handleSchemaSaveMode() {
switch (schemaSaveMode) {
case RECREATE_SCHEMA:
recreateSchema();
break;
case CREATE_SCHEMA_WHEN_NOT_EXIST:
createSchemaWhenNotExist();
break;
case ERROR_WHEN_SCHEMA_NOT_EXIST:
errorWhenSchemaNotExist();
break;
default:
throw new UnsupportedOperationException("Unsupported save mode: " + schemaSaveMode);
}
}

@Override
public void handleDataSaveMode() {
switch (dataSaveMode) {
case DROP_DATA:
keepSchemaDropData();
break;
case APPEND_DATA:
keepSchemaAndData();
break;
case CUSTOM_PROCESSING:
customProcessing();
break;
case ERROR_WHEN_DATA_EXISTS:
errorWhenDataExists();
break;
default:
throw new UnsupportedOperationException("Unsupported save mode: " + dataSaveMode);
}
}

protected void recreateSchema() {
if (tableExists()) {
dropTable();
}
createTable();
}

protected void createSchemaWhenNotExist() {
if (!tableExists()) {
createTable();
}
}

protected void errorWhenSchemaNotExist() {
if (!tableExists()) {
throw new SeaTunnelRuntimeException(SINK_TABLE_NOT_EXIST, "The sink table not exist");
}
}

protected void keepSchemaDropData() {
if (tableExists()) {
truncateTable();
}
}

protected void keepSchemaAndData() {}

protected void customProcessing() {
executeCustomSql();
}

protected void errorWhenDataExists() {
if (dataExists()) {
throw new SeaTunnelRuntimeException(
SOURCE_ALREADY_HAS_DATA, "The target data source already has data");
}
}

protected boolean tableExists() {
return catalog.tableExists(tablePath);
}

protected void dropTable() {
catalog.dropTable(tablePath, true);
}

protected void createTable() {
catalog.createTable(tablePath, catalogTable, true);
}

protected void truncateTable() {
catalog.truncateTable(tablePath, true);
}

protected boolean dataExists() {
return catalog.isExistsData(tablePath);
}

protected void executeCustomSql() {
catalog.executeSql(tablePath, customSql);
}

@Override
public void close() throws Exception {
catalog.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,14 @@

package org.apache.seatunnel.api.sink;

/** The Sink Connectors which support data SaveMode should implement this interface */
public interface SupportDataSaveMode {
String SAVE_MODE_KEY = "savemode";
/**
* Return the value of DataSaveMode configured by user in the job config file.
*
* @return
*/
DataSaveMode getUserConfigSaveMode();
public interface SaveModeHandler extends AutoCloseable {

/** The implementation of specific logic according to different {@link DataSaveMode} */
void handleSaveMode(DataSaveMode userConfigSaveMode);
void handleSchemaSaveMode();

void handleDataSaveMode();

default void handleSaveMode() {
handleSchemaSaveMode();
handleDataSaveMode();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.sink;

public enum SchemaSaveMode {

// Will create when the table does not exist, delete and rebuild when the table is saved
RECREATE_SCHEMA,

// Will Created when the table does not exist, skipped when the table is saved
CREATE_SCHEMA_WHEN_NOT_EXIST,

// Error will be reported when the table does not exist
ERROR_WHEN_SCHEMA_NOT_EXIST,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.sink;

public final class SinkReplaceNameConstant {

public static final String REPLACE_TABLE_NAME_KEY = "${table_name}";

public static final String REPLACE_SCHEMA_NAME_KEY = "${schema_name}";

public static final String REPLACE_DATABASE_NAME_KEY = "${database_name}";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.sink;

import java.util.Optional;

/** The Sink Connectors which support schema and data SaveMode should implement this interface */
public interface SupportSaveMode {

String DATA_SAVE_MODE_KEY = "data_save_mode";

String SCHEMA_SAVE_MODE_KEY = "schema_save_mode";

// This method defines the return of a specific save_mode handler
Optional<SaveModeHandler> getSaveModeHandler();
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,25 @@ void createDatabase(TablePath tablePath, boolean ignoreIfExists)
void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
throws DatabaseNotExistException, CatalogException;

/**
* Truncate an existing table data in this catalog.
*
* @param tablePath Path of the table
* @param ignoreIfNotExists Flag to specify behavior when a table with the given name doesn't
* exist
* @throws TableNotExistException thrown if the table doesn't exist in the catalog and
* ignoreIfNotExists is false
* @throws CatalogException in case of any runtime exception
*/
default void truncateTable(TablePath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {}

default boolean isExistsData(TablePath tablePath) {
return false;
}

default void executeSql(TablePath tablePath, String sql) {}

// todo: Support for update table metadata

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,8 @@ public void test() {
Options.key("save_mode")
.singleChoice(
DataSaveMode.class,
Arrays.asList(
DataSaveMode.DROP_SCHEMA,
DataSaveMode.KEEP_SCHEMA_DROP_DATA))
.defaultValue(DataSaveMode.DROP_SCHEMA)
Arrays.asList(DataSaveMode.APPEND_DATA, DataSaveMode.DROP_DATA))
.defaultValue(DataSaveMode.APPEND_DATA)
.withDescription("save mode test");

OptionRule build = OptionRule.builder().optional(stringOption, saveModeOption).build();
Expand All @@ -58,6 +56,6 @@ public void test() {
option = optionalOptions.get(1);
singleChoiceOption = (SingleChoiceOption) option;
Assertions.assertEquals(2, singleChoiceOption.getOptionValues().size());
Assertions.assertEquals(DataSaveMode.DROP_SCHEMA, singleChoiceOption.defaultValue());
Assertions.assertEquals(DataSaveMode.APPEND_DATA, singleChoiceOption.defaultValue());
}
}
Loading