From 1187a2d38cd212b7f42427582c4e4d4f8065f214 Mon Sep 17 00:00:00 2001 From: deng-jeffer Date: Mon, 9 Dec 2024 21:33:13 +0800 Subject: [PATCH 1/9] [Feature][Connector-V2[Doris]Support sink ddl --- docs/en/concept/schema-evolution.md | 40 ++ docs/zh/concept/schema-evolution.md | 43 +- .../exception/DorisConnectorErrorCode.java | 1 + .../exception/DorisSchemaChangeException.java | 20 + .../doris/schema/SchemaChangeManager.java | 387 ++++++++++++++++++ .../connectors/doris/sink/DorisSink.java | 15 +- .../doris/sink/writer/DorisSinkWriter.java | 34 +- .../connector/doris/DorisSchemaChangeIT.java | 356 ++++++++++++++++ .../mysqlcdc_to_doris_with_schema_change.conf | 55 +++ 9 files changed, 945 insertions(+), 6 deletions(-) create mode 100644 seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/exception/DorisSchemaChangeException.java create mode 100644 seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/schema/SchemaChangeManager.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisSchemaChangeIT.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/mysqlcdc_to_doris_with_schema_change.conf diff --git a/docs/en/concept/schema-evolution.md b/docs/en/concept/schema-evolution.md index a5a052042f0..8740dc04be1 100644 --- a/docs/en/concept/schema-evolution.md +++ b/docs/en/concept/schema-evolution.md @@ -13,6 +13,7 @@ Now we only support the operation about `add column`、`drop column`、`rename c [Jdbc-Mysql](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Jdbc.md) [Jdbc-Oracle](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Jdbc.md) [StarRocks](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/StarRocks.md) +[Doris](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Doris.md) Note: The schema evolution is not support the transform at now. The schema evolution of different types of databases(Oracle-CDC -> Jdbc-Mysql)is currently not supported the default value of the column in ddl. @@ -204,3 +205,42 @@ sink { } } ``` +### Mysql-CDC -> Doris +``` +env { + # You can set engine configuration here + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 +} + +source { + MySQL-CDC { + server-id = 5652-5657 + username = "st_user_source" + password = "mysqlpw" + table-names = ["shop.products"] + base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop" + debezium = { + include.schema.changes = true + } + } +} + +sink { + Doris { + fenodes = "doris_cdc_e2e:8030" + username = root + password = "" + database = "test" + table = "e2e_table_sink" + sink.label-prefix = "test-cdc" + sink.enable-2pc = "true" + sink.enable-delete = "true" + doris.config { + format = "json" + read_json_by_line = "true" + } + } +} +``` \ No newline at end of file diff --git a/docs/zh/concept/schema-evolution.md b/docs/zh/concept/schema-evolution.md index bb1a2564ef3..b5fabbacb94 100644 --- a/docs/zh/concept/schema-evolution.md +++ b/docs/zh/concept/schema-evolution.md @@ -10,8 +10,9 @@ ### 目标 [Jdbc-Mysql](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/Jdbc.md) -[Jdbc-Oracle](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Jdbc.md) +[Jdbc-Oracle](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/Jdbc.md) [StarRocks](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/StarRocks.md) +[Doris](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/Doris.md) 注意: 目前模式演进不支持transform。不同类型数据库(Oracle-CDC -> Jdbc-Mysql)的模式演进目前不支持ddl中列的默认值。 @@ -203,3 +204,43 @@ sink { } } ``` + +### Mysql-CDC -> Doris +``` +env { + # You can set engine configuration here + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 +} + +source { + MySQL-CDC { + server-id = 5652-5657 + username = "st_user_source" + password = "mysqlpw" + table-names = ["shop.products"] + base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop" + debezium = { + include.schema.changes = true + } + } +} + +sink { + Doris { + fenodes = "doris_cdc_e2e:8030" + username = root + password = "" + database = "shop" + table = "products" + sink.label-prefix = "test-cdc" + sink.enable-2pc = "true" + sink.enable-delete = "true" + doris.config { + format = "json" + read_json_by_line = "true" + } + } +} +``` diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/exception/DorisConnectorErrorCode.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/exception/DorisConnectorErrorCode.java index d9b501027e7..a3df5982226 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/exception/DorisConnectorErrorCode.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/exception/DorisConnectorErrorCode.java @@ -30,6 +30,7 @@ public enum DorisConnectorErrorCode implements SeaTunnelErrorCode { SCHEMA_FAILED("Doirs-08", "get schema error"), SCAN_BATCH_FAILED("Doris-09", "scan batch error"), RESOURCE_CLOSE_FAILED("Doris-10", "resource close failed"), + SCHEMA_CHANGE_FAILED("Doris-11", "schema change failed"), SHOULD_NEVER_HAPPEN("Doris-00", "Should Never Happen !"); private final String code; diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/exception/DorisSchemaChangeException.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/exception/DorisSchemaChangeException.java new file mode 100644 index 00000000000..e43b03b213a --- /dev/null +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/exception/DorisSchemaChangeException.java @@ -0,0 +1,20 @@ + package org.apache.seatunnel.connectors.doris.exception; + +import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; + +public class DorisSchemaChangeException extends SeaTunnelRuntimeException { + + public DorisSchemaChangeException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) { + super(seaTunnelErrorCode, errorMessage); + } + + public DorisSchemaChangeException( + SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) { + super(seaTunnelErrorCode, errorMessage, cause); + } + + public DorisSchemaChangeException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) { + super(seaTunnelErrorCode, cause); + } +} diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/schema/SchemaChangeManager.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/schema/SchemaChangeManager.java new file mode 100644 index 00000000000..147825ffbad --- /dev/null +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/schema/SchemaChangeManager.java @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.seatunnel.connectors.doris.schema; + +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.converter.BasicTypeDefine; +import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent; +import org.apache.seatunnel.api.table.schema.event.AlterTableChangeColumnEvent; +import org.apache.seatunnel.api.table.schema.event.AlterTableColumnEvent; +import org.apache.seatunnel.api.table.schema.event.AlterTableColumnsEvent; +import org.apache.seatunnel.api.table.schema.event.AlterTableDropColumnEvent; +import org.apache.seatunnel.api.table.schema.event.AlterTableModifyColumnEvent; +import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent; +import org.apache.seatunnel.common.utils.SeaTunnelException; +import org.apache.seatunnel.connectors.doris.config.DorisSinkConfig; +import org.apache.seatunnel.connectors.doris.datatype.DorisTypeConverterV2; +import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode; +import org.apache.seatunnel.connectors.doris.exception.DorisSchemaChangeException; +import org.apache.seatunnel.connectors.doris.rest.RestService; + +import org.apache.commons.codec.binary.Base64; +import org.apache.http.HttpHeaders; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + +@Slf4j +public class SchemaChangeManager implements Serializable { + private static final long serialVersionUID = 1L; + + private static final String CHECK_COLUMN_EXISTS = + "SELECT COLUMN_NAME FROM information_schema.`COLUMNS` WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s' AND COLUMN_NAME = '%s'"; + private static final String SCHEMA_CHANGE_API = "http://%s/api/query/default_cluster/%s"; + private ObjectMapper objectMapper = new ObjectMapper(); + private DorisSinkConfig dorisSinkConfig; + private String charsetEncoding = "UTF-8"; + + public SchemaChangeManager(DorisSinkConfig dorisSinkConfig) { + this.dorisSinkConfig = dorisSinkConfig; + } + + public SchemaChangeManager(DorisSinkConfig dorisSinkConfig, String charsetEncoding) { + this.dorisSinkConfig = dorisSinkConfig; + this.charsetEncoding = charsetEncoding; + } + + /** + * Refresh physical table schema by schema change event + * + * @param event schema change event + * @param tablePath sink table path + */ + public void applySchemaChange(TablePath tablePath, SchemaChangeEvent event) throws IOException { + if (event instanceof AlterTableColumnsEvent) { + for (AlterTableColumnEvent columnEvent : ((AlterTableColumnsEvent) event).getEvents()) { + applySchemaChange(tablePath, columnEvent); + } + } else { + if (event instanceof AlterTableChangeColumnEvent) { + AlterTableChangeColumnEvent changeColumnEvent = (AlterTableChangeColumnEvent) event; + if (!changeColumnEvent + .getOldColumn() + .equals(changeColumnEvent.getColumn().getName())) { + if (!columnExists(tablePath, changeColumnEvent.getOldColumn()) + && columnExists(tablePath, changeColumnEvent.getColumn().getName())) { + log.warn( + "Column {} already exists in table {}. Skipping change column operation. event: {}", + changeColumnEvent.getColumn().getName(), + tablePath.getFullName(), + event); + return; + } + } + applySchemaChange(tablePath, changeColumnEvent); + } else if (event instanceof AlterTableModifyColumnEvent) { + applySchemaChange(tablePath, (AlterTableModifyColumnEvent) event); + } else if (event instanceof AlterTableAddColumnEvent) { + AlterTableAddColumnEvent addColumnEvent = (AlterTableAddColumnEvent) event; + if (columnExists(tablePath, addColumnEvent.getColumn().getName())) { + log.warn( + "Column {} already exists in table {}. Skipping add column operation. event: {}", + addColumnEvent.getColumn().getName(), + tablePath.getFullName(), + event); + return; + } + applySchemaChange(tablePath, addColumnEvent); + } else if (event instanceof AlterTableDropColumnEvent) { + AlterTableDropColumnEvent dropColumnEvent = (AlterTableDropColumnEvent) event; + if (!columnExists(tablePath, dropColumnEvent.getColumn())) { + log.warn( + "Column {} does not exist in table {}. Skipping drop column operation. event: {}", + dropColumnEvent.getColumn(), + tablePath.getFullName(), + event); + return; + } + applySchemaChange(tablePath, dropColumnEvent); + } else { + throw new SeaTunnelException( + "Unsupported schemaChangeEvent : " + event.getEventType()); + } + } + } + + public void applySchemaChange(TablePath tablePath, AlterTableChangeColumnEvent event) + throws IOException { + StringBuilder sqlBuilder = + new StringBuilder() + .append("ALTER TABLE") + .append(" ") + .append(tablePath.getFullName()) + .append(" ") + .append("RENAME COLUMN") + .append(" ") + .append(quoteIdentifier(event.getOldColumn())) + .append(" TO ") + .append(quoteIdentifier(event.getColumn().getName())); + if (event.getColumn().getComment() != null) { + sqlBuilder + .append(" ") + .append("COMMENT ") + .append("'") + .append(event.getColumn().getComment()) + .append("'"); + } + if (event.getAfterColumn() != null) { + sqlBuilder.append(" ").append("AFTER ").append(quoteIdentifier(event.getAfterColumn())); + } + + String changeColumnSQL = sqlBuilder.toString(); + log.info("Executing change column SQL: " + changeColumnSQL); + if (!sendHttpPostRequest(changeColumnSQL, tablePath.getDatabaseName())) { + log.warn("Failed to alter table change column, SQL:" + changeColumnSQL); + } + } + + public void applySchemaChange(TablePath tablePath, AlterTableModifyColumnEvent event) + throws IOException { + BasicTypeDefine typeDefine = DorisTypeConverterV2.INSTANCE.reconvert(event.getColumn()); + StringBuilder sqlBuilder = + new StringBuilder() + .append("ALTER TABLE") + .append(" ") + .append(tablePath.getFullName()) + .append(" ") + .append("MODIFY COLUMN") + .append(" ") + .append(quoteIdentifier(event.getColumn().getName())) + .append(" ") + .append(typeDefine.getColumnType()); + if (event.getColumn().getComment() != null) { + sqlBuilder + .append(" ") + .append("COMMENT ") + .append("'") + .append(event.getColumn().getComment()) + .append("'"); + } + if (event.getAfterColumn() != null) { + sqlBuilder.append(" ").append("AFTER ").append(quoteIdentifier(event.getAfterColumn())); + } + + String modifyColumnSQL = sqlBuilder.toString(); + log.info("Executing modify column SQL: " + modifyColumnSQL); + if (!sendHttpPostRequest(modifyColumnSQL, tablePath.getDatabaseName())) { + log.warn("Failed to alter table modify column, SQL:" + modifyColumnSQL); + } + } + + public void applySchemaChange(TablePath tablePath, AlterTableAddColumnEvent event) + throws IOException { + BasicTypeDefine typeDefine = DorisTypeConverterV2.INSTANCE.reconvert(event.getColumn()); + StringBuilder sqlBuilder = + new StringBuilder() + .append("ALTER TABLE") + .append(" ") + .append(tablePath.getFullName()) + .append(" ") + .append("ADD COLUMN") + .append(" ") + .append(quoteIdentifier(event.getColumn().getName())) + .append(" ") + .append(typeDefine.getColumnType()); + if (event.getColumn().getDefaultValue() != null + && isSupportDefaultValue(event.getColumn())) { + sqlBuilder + .append(" DEFAULT ") + .append(quoteDefaultValue(event.getColumn().getDefaultValue())); + } + if (event.getColumn().getComment() != null) { + sqlBuilder + .append(" ") + .append("COMMENT ") + .append("'") + .append(event.getColumn().getComment()) + .append("'"); + } + if (event.getAfterColumn() != null) { + sqlBuilder.append(" ").append("AFTER ").append(quoteIdentifier(event.getAfterColumn())); + } + + String addColumnSQL = sqlBuilder.toString(); + log.info("Executing add column SQL: " + addColumnSQL); + if (!sendHttpPostRequest(addColumnSQL, tablePath.getDatabaseName())) { + log.warn("Failed to alter table add column, SQL:" + addColumnSQL); + } + } + + /** + * Support Default Value + * + * @param column + * @return + */ + // todo support more type + private boolean isSupportDefaultValue(Column column) { + switch (column.getDataType().getSqlType()) { + case STRING: + case BIGINT: + case INT: + case TIMESTAMP: + return true; + default: + return false; + } + } + + public void applySchemaChange(TablePath tablePath, AlterTableDropColumnEvent event) + throws IOException { + String dropColumnSQL = + String.format( + "ALTER TABLE %s DROP COLUMN %s", + tablePath.getFullName(), quoteIdentifier(event.getColumn())); + log.info("Executing drop column SQL: {}", dropColumnSQL); + if (!sendHttpPostRequest(dropColumnSQL, tablePath.getDatabaseName())) { + log.warn("Failed to alter table drop column, SQL:" + dropColumnSQL); + } + } + + /** + * Check if the column exists in the table + * + * @param tablePath + * @param column + * @return + */ + public boolean columnExists(TablePath tablePath, String column) throws IOException { + String selectColumnSQL = + buildColumnExistsQuery( + tablePath.getDatabaseName(), tablePath.getTableName(), column); + return sendHttpPostRequest(selectColumnSQL, tablePath.getDatabaseName()); + } + + public static String buildColumnExistsQuery(String database, String table, String column) { + return String.format(CHECK_COLUMN_EXISTS, database, table, column); + } + + public static String quoteIdentifier(String identifier) { + return "`" + identifier + "`"; + } + + public static String quoteDefaultValue(Object defaultValue) { + // DEFAULT current_timestamp not need quote + if (defaultValue.toString().startsWith("current_timestamp")) { + return "current_timestamp"; + } + return "'" + defaultValue + "'"; + } + + private boolean sendHttpPostRequest(String sql, String database) + throws IOException, IllegalArgumentException { + HttpPost httpPost = buildHttpPost(sql, database); + try (CloseableHttpClient httpclient = HttpClients.createDefault()) { + CloseableHttpResponse response = httpclient.execute(httpPost); + final int statusCode = response.getStatusLine().getStatusCode(); + if (statusCode == 200 && response.getEntity() != null) { + String loadResult = EntityUtils.toString(response.getEntity()); + log.info( + "http post response success. statusCode: {}, loadResult: {}", + statusCode, + loadResult); + JsonNode responseNode = objectMapper.readTree(loadResult); + String code = responseNode.get("code").asText("-1"); + if (code.equals("0")) { + JsonNode data = responseNode.get("data"); + if (!data.isEmpty()) { + return true; + } + } + } else { + log.warn("http post response failed. statusCode: {}", statusCode); + } + } catch (Exception e) { + log.error( + "send http post request error {}, default return false, SQL:{}", + e.getMessage(), + sql); + log.error(e.getMessage(), e); + } + return false; + } + + public HttpPost buildHttpPost(String ddl, String database) + throws IllegalArgumentException, IOException { + Map param = new HashMap<>(); + param.put("stmt", ddl); + String requestUrl = + String.format( + SCHEMA_CHANGE_API, + RestService.randomEndpoint(dorisSinkConfig.getFrontends(), log), + database); + HttpPost httpPost = new HttpPost(requestUrl); + httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader()); + httpPost.setHeader( + HttpHeaders.CONTENT_TYPE, + String.format("application/json;charset=%s", charsetEncoding)); + httpPost.setEntity( + new StringEntity(objectMapper.writeValueAsString(param), charsetEncoding)); + return httpPost; + } + + private String handleResponse(HttpUriRequest request) { + try (CloseableHttpClient httpclient = HttpClients.createDefault()) { + CloseableHttpResponse response = httpclient.execute(request); + final int statusCode = response.getStatusLine().getStatusCode(); + final String reasonPhrase = response.getStatusLine().getReasonPhrase(); + if (statusCode == 200 && response.getEntity() != null) { + return EntityUtils.toString(response.getEntity()); + } else { + throw new DorisSchemaChangeException( + DorisConnectorErrorCode.SCHEMA_CHANGE_FAILED, + "Failed to schemaChange, status: " + + statusCode + + ", reason: " + + reasonPhrase); + } + } catch (Exception e) { + log.error("SchemaChange request error,", e); + throw new DorisSchemaChangeException( + DorisConnectorErrorCode.SCHEMA_CHANGE_FAILED, + "SchemaChange request error with " + e.getMessage()); + } + } + + private String authHeader() { + return "Basic " + + new String( + Base64.encodeBase64( + (dorisSinkConfig.getUsername() + + ":" + + dorisSinkConfig.getPassword()) + .getBytes(StandardCharsets.UTF_8))); + } +} diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java index deb88a51b11..66b19e39716 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java @@ -28,9 +28,11 @@ import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.sink.SupportMultiTableSink; import org.apache.seatunnel.api.sink.SupportSaveMode; +import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSink; import org.apache.seatunnel.api.table.catalog.Catalog; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.factory.CatalogFactory; +import org.apache.seatunnel.api.table.schema.SchemaChangeType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.connectors.doris.config.DorisSinkConfig; @@ -44,6 +46,7 @@ import org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter; import java.io.IOException; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Optional; @@ -53,7 +56,8 @@ public class DorisSink implements SeaTunnelSink, SupportSaveMode, - SupportMultiTableSink { + SupportMultiTableSink, + SupportSchemaEvolutionSink { private final DorisSinkConfig dorisSinkConfig; private final ReadonlyConfig config; @@ -138,4 +142,13 @@ public Optional getSaveModeHandler() { public Optional getWriteCatalogTable() { return Optional.of(catalogTable); } + + @Override + public List supports() { + return Arrays.asList( + SchemaChangeType.ADD_COLUMN, + SchemaChangeType.DROP_COLUMN, + SchemaChangeType.RENAME_COLUMN, + SchemaChangeType.UPDATE_COLUMN); + } } diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java index fa0d671e82c..f5087b6f939 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java @@ -17,18 +17,23 @@ package org.apache.seatunnel.connectors.doris.sink.writer; -import org.apache.seatunnel.shade.com.google.common.util.concurrent.ThreadFactoryBuilder; - import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter; +import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSinkWriter; import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent; +import org.apache.seatunnel.api.table.schema.handler.TableSchemaChangeEventDispatcher; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.doris.config.DorisSinkConfig; import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode; import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException; +import org.apache.seatunnel.connectors.doris.exception.DorisSchemaChangeException; import org.apache.seatunnel.connectors.doris.rest.RestService; import org.apache.seatunnel.connectors.doris.rest.models.RespContent; +import org.apache.seatunnel.connectors.doris.schema.SchemaChangeManager; import org.apache.seatunnel.connectors.doris.serialize.DorisSerializer; import org.apache.seatunnel.connectors.doris.serialize.SeaTunnelRowSerializer; import org.apache.seatunnel.connectors.doris.sink.LoadStatus; @@ -36,6 +41,7 @@ import org.apache.seatunnel.connectors.doris.util.HttpUtil; import org.apache.seatunnel.connectors.doris.util.UnsupportedTypeConverterUtils; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; import java.io.IOException; @@ -49,12 +55,13 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Preconditions.checkState; @Slf4j public class DorisSinkWriter implements SinkWriter, - SupportMultiTableSinkWriter { + SupportMultiTableSinkWriter, + SupportSchemaEvolutionSinkWriter { private static final int INITIAL_DELAY = 200; private static final List DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList(LoadStatus.SUCCESS, LoadStatus.PUBLISH_TIMEOUT)); @@ -68,6 +75,11 @@ public class DorisSinkWriter private final CatalogTable catalogTable; private final ScheduledExecutorService scheduledExecutorService; private volatile Exception loadException = null; + private TableSchema tableSchema; + private final TablePath sinkTablePath; + protected TableSchemaChangeEventDispatcher tableSchemaChanger = + new TableSchemaChangeEventDispatcher(); + private SchemaChangeManager schemaChangeManager; public DorisSinkWriter( SinkWriter.Context context, @@ -94,6 +106,9 @@ public DorisSinkWriter( 1, new ThreadFactoryBuilder().setNameFormat("stream-load-check").build()); this.serializer = createSerializer(dorisSinkConfig, catalogTable.getSeaTunnelRowType()); this.intervalTime = dorisSinkConfig.getCheckInterval(); + this.tableSchema = catalogTable.getTableSchema(); + this.sinkTablePath = catalogTable.getTablePath(); + this.schemaChangeManager = new SchemaChangeManager(dorisSinkConfig); this.initializeLoad(); } @@ -139,6 +154,17 @@ public void write(SeaTunnelRow element) throws IOException { } } + @Override + public void applySchemaChange(SchemaChangeEvent event) { + this.tableSchema = tableSchemaChanger.reset(tableSchema).apply(event); + try { + schemaChangeManager.applySchemaChange(sinkTablePath, event); + } catch (Exception e) { + throw new DorisSchemaChangeException( + DorisConnectorErrorCode.SCHEMA_CHANGE_FAILED, "Failed to schemaChange"); + } + } + @Override public Optional prepareCommit() throws IOException { RespContent respContent = flush(); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisSchemaChangeIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisSchemaChangeIT.java new file mode 100644 index 00000000000..d215d889892 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisSchemaChangeIT.java @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.doris; + +import org.apache.seatunnel.shade.com.google.common.collect.Lists; + +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer; +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion; +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase; +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; +import org.apache.seatunnel.e2e.common.container.EngineType; +import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; +import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; +import org.apache.seatunnel.e2e.common.util.JobIdGenerator; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerLoggerFactory; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +import static org.awaitility.Awaitility.await; + +@Slf4j +@DisabledOnContainer( + value = {}, + type = {EngineType.SPARK, EngineType.FLINK}, + disabledReason = + "Currently SPARK do not support cdc. In addition, currently only the zeta engine supports schema evolution for pr https://github.com/apache/seatunnel/pull/5125.") +public class DorisSchemaChangeIT extends AbstractDorisIT implements TestResource { + private static final String DATABASE = "shop"; + private static final String SOURCE_TABLE = "products"; + private static final String MYSQL_HOST = "mysql_cdc_e2e"; + private static final String MYSQL_USER_NAME = "mysqluser"; + private static final String MYSQL_USER_PASSWORD = "mysqlpw"; + private static final String SINK_TABLE = SOURCE_TABLE; + private static final String CREATE_DATABASE = "CREATE DATABASE IF NOT EXISTS " + DATABASE; + private Connection mysqlConnection; + public static final DateTimeFormatter DATE_TIME_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + private static final String QUERY = "select * from %s.%s order by id"; + private static final String QUERY_COLUMNS = + "SELECT COLUMN_NAME FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s' ORDER by COLUMN_NAME;"; + private static final String PROJECTION_QUERY = + "select id,name,description,weight,add_column1,add_column2,add_column`3` from %s.%s order by id;"; + private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0); + private final UniqueDatabase shopDatabase = new UniqueDatabase(MYSQL_CONTAINER, DATABASE); + + @TestContainerExtension + private final ContainerExtendedFactory extendedFactory = + container -> { + Container.ExecResult extraCommands = + container.execInContainer( + "bash", + "-c", + "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O " + + DRIVER_JAR); + Assertions.assertEquals(0, extraCommands.getExitCode()); + }; + + private static MySqlContainer createMySqlContainer(MySqlVersion version) { + MySqlContainer mySqlContainer = + new MySqlContainer(version) + .withConfigurationOverride("docker/server-gtids/my.cnf") + .withSetupSQL("docker/setup.sql") + .withNetwork(NETWORK) + .withNetworkAliases(MYSQL_HOST) + .withDatabaseName(DATABASE) + .withUsername(MYSQL_USER_NAME) + .withPassword(MYSQL_USER_PASSWORD) + .withLogConsumer( + new Slf4jLogConsumer( + DockerLoggerFactory.getLogger("mysql-docker-image"))); + mySqlContainer.setPortBindings(Lists.newArrayList(String.format("%s:%s", 3306, 3306))); + return mySqlContainer; + } + + @TestTemplate + public void testDorisWithSchemaEvolutionCase(TestContainer container) + throws InterruptedException, IOException { + String jobId = String.valueOf(JobIdGenerator.newJobId()); + String jobConfigFile = "/mysqlcdc_to_doris_with_schema_change.conf"; + CompletableFuture.runAsync( + () -> { + try { + container.executeJob(jobConfigFile, jobId); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + }); + TimeUnit.SECONDS.sleep(20); + // waiting for case1 completed + assertSchemaEvolutionForAddColumns( + DATABASE, SOURCE_TABLE, SINK_TABLE, mysqlConnection, jdbcConnection); + + // savepoint 1 + Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode()); + + // case2 drop columns with cdc data at same time + shopDatabase.setTemplateName("drop_columns").createAndInitialize(); + + // restore 1 + CompletableFuture.supplyAsync( + () -> { + try { + container.restoreJob(jobConfigFile, jobId); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + return null; + }); + + // waiting for case2 completed + assertTableStructureAndData( + DATABASE, SOURCE_TABLE, SINK_TABLE, mysqlConnection, jdbcConnection); + + // savepoint 2 + Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode()); + + // case3 change column name with cdc data at same time + shopDatabase.setTemplateName("change_columns").createAndInitialize(); + + // case4 modify column data type with cdc data at same time + shopDatabase.setTemplateName("modify_columns").createAndInitialize(); + + // restore 2 + CompletableFuture.supplyAsync( + () -> { + try { + container.restoreJob(jobConfigFile, jobId); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + return null; + }); + + // waiting for case3/case4 completed + assertTableStructureAndData( + DATABASE, SOURCE_TABLE, SINK_TABLE, mysqlConnection, jdbcConnection); + } + + private void assertSchemaEvolutionForAddColumns( + String database, + String sourceTable, + String sinkTable, + Connection sourceConnection, + Connection sinkConnection) { + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertIterableEquals( + query( + String.format(QUERY, database, sourceTable), + sourceConnection), + query( + String.format(QUERY, database, sinkTable), + sinkConnection))); + + // case1 add columns with cdc data at same time + shopDatabase.setTemplateName("add_columns").createAndInitialize(); + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertIterableEquals( + query( + String.format(QUERY_COLUMNS, database, sourceTable), + sourceConnection), + query( + String.format(QUERY_COLUMNS, database, sinkTable), + sinkConnection))); + try { + TimeUnit.SECONDS.sleep(120); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Assertions.assertIterableEquals( + query( + String.format( + QUERY.replaceAll( + "order by id", + "where id >= 128 order by id"), + database, + sourceTable), + sourceConnection), + query( + String.format( + QUERY.replaceAll( + "order by id", + "where id >= 128 order by id"), + database, + sinkTable), + sinkConnection)); + }); + + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Assertions.assertIterableEquals( + query( + String.format(PROJECTION_QUERY, database, sourceTable), + sourceConnection), + query( + String.format(PROJECTION_QUERY, database, sinkTable), + sinkConnection)); + }); + } + + private void assertTableStructureAndData( + String database, + String sourceTable, + String sinkTable, + Connection sourceConnection, + Connection sinkConnection) { + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertIterableEquals( + query( + String.format(QUERY_COLUMNS, database, sourceTable), + sourceConnection), + query( + String.format(QUERY_COLUMNS, database, sinkTable), + sinkConnection))); + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertIterableEquals( + query( + String.format(QUERY, database, sourceTable), + sourceConnection), + query( + String.format(QUERY, database, sinkTable), + sinkConnection))); + } + + private Connection getMysqlJdbcConnection() throws SQLException { + return DriverManager.getConnection( + MYSQL_CONTAINER.getJdbcUrl(), + MYSQL_CONTAINER.getUsername(), + MYSQL_CONTAINER.getPassword()); + } + + @BeforeAll + @Override + public void startUp() { + super.startUp(); + log.info("The second stage: Starting Mysql containers..."); + Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join(); + log.info("Mysql Containers are started"); + shopDatabase.createAndInitialize(); + log.info("Mysql ddl execution is complete"); + initializeJdbcTable(); + try { + mysqlConnection = getMysqlJdbcConnection(); + try { + TimeUnit.DAYS.sleep(1); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @AfterAll + @Override + public void tearDown() throws SQLException { + if (MYSQL_CONTAINER != null) { + MYSQL_CONTAINER.close(); + } + if (mysqlConnection != null) { + mysqlConnection.close(); + } + } + + private void initializeJdbcTable() { + try (Statement statement = jdbcConnection.createStatement()) { + // create databases + statement.execute(CREATE_DATABASE); + } catch (SQLException e) { + throw new RuntimeException("Initializing table failed!", e); + } + } + + private List> query(String sql, Connection connection) { + try { + ResultSet resultSet = connection.createStatement().executeQuery(sql); + List> result = new ArrayList<>(); + int columnCount = resultSet.getMetaData().getColumnCount(); + while (resultSet.next()) { + ArrayList objects = new ArrayList<>(); + for (int i = 1; i <= columnCount; i++) { + if (resultSet.getObject(i) instanceof Timestamp) { + Timestamp timestamp = resultSet.getTimestamp(i); + objects.add(timestamp.toLocalDateTime().format(DATE_TIME_FORMATTER)); + break; + } + if (resultSet.getObject(i) instanceof LocalDateTime) { + LocalDateTime localDateTime = resultSet.getObject(i, LocalDateTime.class); + objects.add(localDateTime.format(DATE_TIME_FORMATTER)); + break; + } + objects.add(resultSet.getObject(i)); + } + log.debug(String.format("Print query, sql: %s, data: %s", sql, objects)); + result.add(objects); + } + return result; + } catch (SQLException e) { + throw new RuntimeException(e); + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/mysqlcdc_to_doris_with_schema_change.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/mysqlcdc_to_doris_with_schema_change.conf new file mode 100644 index 00000000000..795d0d144f5 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/mysqlcdc_to_doris_with_schema_change.conf @@ -0,0 +1,55 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### +env { + # You can set engine configuration here + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 +} + +source { + MySQL-CDC { + server-id = 5652-5657 + username = "st_user_source" + password = "mysqlpw" + table-names = ["shop.products"] + base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop" + debezium = { + include.schema.changes = true + } + } +} + +sink { + Doris { + fenodes = "doris_e2e:8030" + username = "root" + password = "" + database = "shop" + table = "products" + sink.label-prefix = "test-cdc" + sink.enable-2pc = "true" + sink.enable-delete = "false" + doris.config { + format = "json" + read_json_by_line = "true" + } + } +} \ No newline at end of file From 0b943a05fe279e6a199a11db4a9cc3198e929589 Mon Sep 17 00:00:00 2001 From: deng-jeffer Date: Sat, 14 Dec 2024 09:33:04 +0800 Subject: [PATCH 2/9] [Feature][Connector-V2[Doris]Support sink ddl --- docs/en/concept/schema-evolution.md | 8 +- docs/zh/concept/schema-evolution.md | 4 +- .../exception/DorisSchemaChangeException.java | 2 +- .../doris/schema/SchemaChangeManager.java | 56 ++++++++++--- .../doris/sink/writer/DorisSinkWriter.java | 5 +- .../connector/doris/DorisSchemaChangeIT.java | 12 +-- .../src/test/resources/ddl/add_columns.sql | 83 +++++++++++++++++++ .../src/test/resources/ddl/change_columns.sql | 36 ++++++++ .../src/test/resources/ddl/drop_columns.sql | 50 +++++++++++ .../src/test/resources/ddl/modify_columns.sql | 36 ++++++++ .../src/test/resources/ddl/shop.sql | 44 ++++++++++ .../mysqlcdc_to_doris_with_schema_change.conf | 2 +- 12 files changed, 305 insertions(+), 33 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/ddl/add_columns.sql create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/ddl/change_columns.sql create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/ddl/drop_columns.sql create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/ddl/modify_columns.sql create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/ddl/shop.sql diff --git a/docs/en/concept/schema-evolution.md b/docs/en/concept/schema-evolution.md index 8740dc04be1..a597c355c5b 100644 --- a/docs/en/concept/schema-evolution.md +++ b/docs/en/concept/schema-evolution.md @@ -229,11 +229,11 @@ source { sink { Doris { - fenodes = "doris_cdc_e2e:8030" - username = root + fenodes = "doris_e2e:8030" + username = "root" password = "" - database = "test" - table = "e2e_table_sink" + database = "shop" + table = "products" sink.label-prefix = "test-cdc" sink.enable-2pc = "true" sink.enable-delete = "true" diff --git a/docs/zh/concept/schema-evolution.md b/docs/zh/concept/schema-evolution.md index b5fabbacb94..8a75fb8d691 100644 --- a/docs/zh/concept/schema-evolution.md +++ b/docs/zh/concept/schema-evolution.md @@ -229,8 +229,8 @@ source { sink { Doris { - fenodes = "doris_cdc_e2e:8030" - username = root + fenodes = "doris_e2e:8030" + username = "root" password = "" database = "shop" table = "products" diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/exception/DorisSchemaChangeException.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/exception/DorisSchemaChangeException.java index e43b03b213a..10b7d8c86c6 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/exception/DorisSchemaChangeException.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/exception/DorisSchemaChangeException.java @@ -1,4 +1,4 @@ - package org.apache.seatunnel.connectors.doris.exception; +package org.apache.seatunnel.connectors.doris.exception; import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/schema/SchemaChangeManager.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/schema/SchemaChangeManager.java index 147825ffbad..e0bc93d72c2 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/schema/SchemaChangeManager.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/schema/SchemaChangeManager.java @@ -37,6 +37,7 @@ import org.apache.seatunnel.connectors.doris.rest.RestService; import org.apache.commons.codec.binary.Base64; +import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpHeaders; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; @@ -46,6 +47,7 @@ import org.apache.http.impl.client.HttpClients; import org.apache.http.util.EntityUtils; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; @@ -146,7 +148,7 @@ public void applySchemaChange(TablePath tablePath, AlterTableChangeColumnEvent e .append("RENAME COLUMN") .append(" ") .append(quoteIdentifier(event.getOldColumn())) - .append(" TO ") + .append(" ") .append(quoteIdentifier(event.getColumn().getName())); if (event.getColumn().getComment() != null) { sqlBuilder @@ -161,8 +163,7 @@ public void applySchemaChange(TablePath tablePath, AlterTableChangeColumnEvent e } String changeColumnSQL = sqlBuilder.toString(); - log.info("Executing change column SQL: " + changeColumnSQL); - if (!sendHttpPostRequest(changeColumnSQL, tablePath.getDatabaseName())) { + if (!execute(changeColumnSQL, tablePath.getDatabaseName())) { log.warn("Failed to alter table change column, SQL:" + changeColumnSQL); } } @@ -194,8 +195,7 @@ public void applySchemaChange(TablePath tablePath, AlterTableModifyColumnEvent e } String modifyColumnSQL = sqlBuilder.toString(); - log.info("Executing modify column SQL: " + modifyColumnSQL); - if (!sendHttpPostRequest(modifyColumnSQL, tablePath.getDatabaseName())) { + if (!execute(modifyColumnSQL, tablePath.getDatabaseName())) { log.warn("Failed to alter table modify column, SQL:" + modifyColumnSQL); } } @@ -233,8 +233,7 @@ && isSupportDefaultValue(event.getColumn())) { } String addColumnSQL = sqlBuilder.toString(); - log.info("Executing add column SQL: " + addColumnSQL); - if (!sendHttpPostRequest(addColumnSQL, tablePath.getDatabaseName())) { + if (!execute(addColumnSQL, tablePath.getDatabaseName())) { log.warn("Failed to alter table add column, SQL:" + addColumnSQL); } } @@ -264,12 +263,38 @@ public void applySchemaChange(TablePath tablePath, AlterTableDropColumnEvent eve String.format( "ALTER TABLE %s DROP COLUMN %s", tablePath.getFullName(), quoteIdentifier(event.getColumn())); - log.info("Executing drop column SQL: {}", dropColumnSQL); - if (!sendHttpPostRequest(dropColumnSQL, tablePath.getDatabaseName())) { + if (!execute(dropColumnSQL, tablePath.getDatabaseName())) { log.warn("Failed to alter table drop column, SQL:" + dropColumnSQL); } } + /** execute sql in doris. */ + public boolean execute(String ddl, String database) + throws IOException, IllegalArgumentException { + String responseEntity = executeThenReturnResponse(ddl, database); + return handleSchemaChange(responseEntity); + } + + private String executeThenReturnResponse(String ddl, String database) + throws IOException, IllegalArgumentException { + if (StringUtils.isEmpty(ddl)) { + throw new IllegalArgumentException("ddl can not be null or empty string!"); + } + log.info("Execute SQL: {}", ddl); + HttpPost httpPost = buildHttpPost(ddl, database); + return handleResponse(httpPost); + } + + private boolean handleSchemaChange(String responseEntity) throws JsonProcessingException { + Map responseMap = objectMapper.readValue(responseEntity, Map.class); + String code = responseMap.getOrDefault("code", "-1").toString(); + if (code.equals("0")) { + return true; + } else { + return false; + } + } + /** * Check if the column exists in the table * @@ -281,7 +306,7 @@ public boolean columnExists(TablePath tablePath, String column) throws IOExcepti String selectColumnSQL = buildColumnExistsQuery( tablePath.getDatabaseName(), tablePath.getTableName(), column); - return sendHttpPostRequest(selectColumnSQL, tablePath.getDatabaseName()); + return sendCheckColumnHttpPostRequest(selectColumnSQL, tablePath.getDatabaseName()); } public static String buildColumnExistsQuery(String database, String table, String column) { @@ -300,7 +325,7 @@ public static String quoteDefaultValue(Object defaultValue) { return "'" + defaultValue + "'"; } - private boolean sendHttpPostRequest(String sql, String database) + private boolean sendCheckColumnHttpPostRequest(String sql, String database) throws IOException, IllegalArgumentException { HttpPost httpPost = buildHttpPost(sql, database); try (CloseableHttpClient httpclient = HttpClients.createDefault()) { @@ -315,7 +340,7 @@ private boolean sendHttpPostRequest(String sql, String database) JsonNode responseNode = objectMapper.readTree(loadResult); String code = responseNode.get("code").asText("-1"); if (code.equals("0")) { - JsonNode data = responseNode.get("data"); + JsonNode data = responseNode.get("data").get("data"); if (!data.isEmpty()) { return true; } @@ -358,7 +383,12 @@ private String handleResponse(HttpUriRequest request) { final int statusCode = response.getStatusLine().getStatusCode(); final String reasonPhrase = response.getStatusLine().getReasonPhrase(); if (statusCode == 200 && response.getEntity() != null) { - return EntityUtils.toString(response.getEntity()); + String loadResult = EntityUtils.toString(response.getEntity()); + log.info( + "http post response success. statusCode: {}, loadResult: {}", + statusCode, + loadResult); + return loadResult; } else { throw new DorisSchemaChangeException( DorisConnectorErrorCode.SCHEMA_CHANGE_FAILED, diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java index f5087b6f939..f1e1c5f848c 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java @@ -71,7 +71,7 @@ public class DorisSinkWriter private final String labelPrefix; private final LabelGenerator labelGenerator; private final int intervalTime; - private final DorisSerializer serializer; + private DorisSerializer serializer; private final CatalogTable catalogTable; private final ScheduledExecutorService scheduledExecutorService; private volatile Exception loadException = null; @@ -157,6 +157,9 @@ public void write(SeaTunnelRow element) throws IOException { @Override public void applySchemaChange(SchemaChangeEvent event) { this.tableSchema = tableSchemaChanger.reset(tableSchema).apply(event); + SeaTunnelRowType seaTunnelRowType = tableSchema.toPhysicalRowDataType(); + this.serializer = createSerializer(this.dorisSinkConfig, seaTunnelRowType); + try { schemaChangeManager.applySchemaChange(sinkTablePath, event); } catch (Exception e) { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisSchemaChangeIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisSchemaChangeIT.java index d215d889892..4e2e5feee49 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisSchemaChangeIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisSchemaChangeIT.java @@ -79,7 +79,7 @@ public class DorisSchemaChangeIT extends AbstractDorisIT implements TestResource private static final String QUERY_COLUMNS = "SELECT COLUMN_NAME FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s' ORDER by COLUMN_NAME;"; private static final String PROJECTION_QUERY = - "select id,name,description,weight,add_column1,add_column2,add_column`3` from %s.%s order by id;"; + "select id,name,description,weight,add_column1,add_column2,add_column3 from %s.%s order by id;"; private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0); private final UniqueDatabase shopDatabase = new UniqueDatabase(MYSQL_CONTAINER, DATABASE); @@ -208,11 +208,6 @@ private void assertSchemaEvolutionForAddColumns( query( String.format(QUERY_COLUMNS, database, sinkTable), sinkConnection))); - try { - TimeUnit.SECONDS.sleep(120); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } await().atMost(60000, TimeUnit.MILLISECONDS) .untilAsserted( () -> { @@ -295,11 +290,6 @@ public void startUp() { initializeJdbcTable(); try { mysqlConnection = getMysqlJdbcConnection(); - try { - TimeUnit.DAYS.sleep(1); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } } catch (SQLException e) { throw new RuntimeException(e); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/ddl/add_columns.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/ddl/add_columns.sql new file mode 100644 index 00000000000..2a1212aa95d --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/ddl/add_columns.sql @@ -0,0 +1,83 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: shop +-- ---------------------------------------------------------------------------------------------------------------- +CREATE DATABASE IF NOT EXISTS `shop`; +use shop; +INSERT INTO products +VALUES (110,"scooter","Small 2-wheel scooter",3.14), + (111,"car battery","12V car battery",8.1), + (112,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8), + (113,"hammer","12oz carpenter's hammer",0.75), + (114,"hammer","14oz carpenter's hammer",0.875), + (115,"hammer","16oz carpenter's hammer",1.0), + (116,"rocks","box of assorted rocks",5.3), + (117,"jacket","water resistent black wind breaker",0.1), + (118,"spare tire","24 inch spare tire",22.2); +update products set name = 'dailai' where id = 101; +delete from products where id = 102; + +alter table products ADD COLUMN add_column1 varchar(64) not null default 'yy',ADD COLUMN add_column2 int not null default 1; +update products set add_column1 = 'swm1', add_column2 = 2; + +update products set name = 'dailai' where id = 110; +insert into products +values (119,"scooter","Small 2-wheel scooter",3.14,'xx',1), + (120,"car battery","12V car battery",8.1,'xx',2), + (121,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8,'xx',3), + (122,"hammer","12oz carpenter's hammer",0.75,'xx',4), + (123,"hammer","14oz carpenter's hammer",0.875,'xx',5), + (124,"hammer","16oz carpenter's hammer",1.0,'xx',6), + (125,"rocks","box of assorted rocks",5.3,'xx',7), + (126,"jacket","water resistent black wind breaker",0.1,'xx',8), + (127,"spare tire","24 inch spare tire",22.2,'xx',9); +delete from products where id = 118; + +alter table products ADD COLUMN add_column3 float not null default 1.1; +update products set add_column3 = 3.3; +alter table products ADD COLUMN add_column4 timestamp not null default current_timestamp(); +update products set add_column4 = current_timestamp(); + +delete from products where id = 113; +insert into products +values (128,"scooter","Small 2-wheel scooter",3.14,'xx',1,1.1,'2023-02-02 09:09:09'), + (129,"car battery","12V car battery",8.1,'xx',2,1.2,'2023-02-02 09:09:09'), + (130,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8,'xx',3,1.3,'2023-02-02 09:09:09'), + (131,"hammer","12oz carpenter's hammer",0.75,'xx',4,1.4,'2023-02-02 09:09:09'), + (132,"hammer","14oz carpenter's hammer",0.875,'xx',5,1.5,'2023-02-02 09:09:09'), + (133,"hammer","16oz carpenter's hammer",1.0,'xx',6,1.6,'2023-02-02 09:09:09'), + (134,"rocks","box of assorted rocks",5.3,'xx',7,1.7,'2023-02-02 09:09:09'), + (135,"jacket","water resistent black wind breaker",0.1,'xx',8,1.8,'2023-02-02 09:09:09'), + (136,"spare tire","24 inch spare tire",22.2,'xx',9,1.9,'2023-02-02 09:09:09'); +update products set name = 'dailai' where id = 135; + +alter table products ADD COLUMN add_column6 varchar(64) not null default 'ff' after id; +update products set add_column6 = 'swm6'; + +delete from products where id = 115; +insert into products +values (173,'tt',"scooter","Small 2-wheel scooter",3.14,'xx',1,1.1,'2023-02-02 09:09:09'), + (174,'tt',"car battery","12V car battery",8.1,'xx',2,1.2,'2023-02-02 09:09:09'), + (175,'tt',"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8,'xx',3,1.3,'2023-02-02 09:09:09'), + (176,'tt',"hammer","12oz carpenter's hammer",0.75,'xx',4,1.4,'2023-02-02 09:09:09'), + (177,'tt',"hammer","14oz carpenter's hammer",0.875,'xx',5,1.5,'2023-02-02 09:09:09'), + (178,'tt',"hammer","16oz carpenter's hammer",1.0,'xx',6,1.6,'2023-02-02 09:09:09'), + (179,'tt',"rocks","box of assorted rocks",5.3,'xx',7,1.7,'2023-02-02 09:09:09'), + (180,'tt',"jacket","water resistent black wind breaker",0.1,'xx',8,1.8,'2023-02-02 09:09:09'), + (181,'tt',"spare tire","24 inch spare tire",22.2,'xx',9,1.9,'2023-02-02 09:09:09'); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/ddl/change_columns.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/ddl/change_columns.sql new file mode 100644 index 00000000000..a17f9a0a936 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/ddl/change_columns.sql @@ -0,0 +1,36 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: shop +-- ---------------------------------------------------------------------------------------------------------------- +CREATE DATABASE IF NOT EXISTS `shop`; +use shop; + +alter table products change add_column2 add_column int default 1 not null; +delete from products where id < 155; +insert into products +values (155,"scooter","Small 2-wheel scooter",3.14,1), + (156,"car battery","12V car battery",8.1,2), + (157,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8,3), + (158,"hammer","12oz carpenter's hammer",0.75,4), + (159,"hammer","14oz carpenter's hammer",0.875,5), + (160,"hammer","16oz carpenter's hammer",1.0,6), + (161,"rocks","box of assorted rocks",5.3,7), + (162,"jacket","water resistent black wind breaker",0.1,8), + (163,"spare tire","24 inch spare tire",22.2,9); + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/ddl/drop_columns.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/ddl/drop_columns.sql new file mode 100644 index 00000000000..5c3b7d1f549 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/ddl/drop_columns.sql @@ -0,0 +1,50 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: shop +-- ---------------------------------------------------------------------------------------------------------------- +CREATE DATABASE IF NOT EXISTS `shop`; +use shop; + +alter table products drop column add_column4,drop column add_column6; +insert into products +values (137,"scooter","Small 2-wheel scooter",3.14,'xx',1,1.1), + (138,"car battery","12V car battery",8.1,'xx',2,1.2), + (139,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8,'xx',3,1.3), + (140,"hammer","12oz carpenter's hammer",0.75,'xx',4,1.4), + (141,"hammer","14oz carpenter's hammer",0.875,'xx',5,1.5), + (142,"hammer","16oz carpenter's hammer",1.0,'xx',6,1.6), + (143,"rocks","box of assorted rocks",5.3,'xx',7,1.7), + (144,"jacket","water resistent black wind breaker",0.1,'xx',8,1.8), + (145,"spare tire","24 inch spare tire",22.2,'xx',9,1.9); +update products set name = 'dailai' where id in (140,141,142); +delete from products where id < 137; + + +alter table products drop column add_column1,drop column add_column3; +insert into products +values (146,"scooter","Small 2-wheel scooter",3.14,1), + (147,"car battery","12V car battery",8.1,2), + (148,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8,3), + (149,"hammer","12oz carpenter's hammer",0.75,4), + (150,"hammer","14oz carpenter's hammer",0.875,5), + (151,"hammer","16oz carpenter's hammer",1.0,6), + (152,"rocks","box of assorted rocks",5.3,7), + (153,"jacket","water resistent black wind breaker",0.1,8), + (154,"spare tire","24 inch spare tire",22.2,9); +update products set name = 'dailai' where id > 143; diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/ddl/modify_columns.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/ddl/modify_columns.sql new file mode 100644 index 00000000000..ab64c47567b --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/ddl/modify_columns.sql @@ -0,0 +1,36 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: shop +-- ---------------------------------------------------------------------------------------------------------------- +CREATE DATABASE IF NOT EXISTS `shop`; +use shop; + +alter table products modify name longtext null; +delete from products where id < 155; +insert into products +values (164,"scooter","Small 2-wheel scooter",3.14,1), + (165,"car battery","12V car battery",8.1,2), + (166,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8,3), + (167,"hammer","12oz carpenter's hammer",0.75,4), + (168,"hammer","14oz carpenter's hammer",0.875,5), + (169,"hammer","16oz carpenter's hammer",1.0,6), + (170,"rocks","box of assorted rocks",5.3,7), + (171,"jacket","water resistent black wind breaker",0.1,8), + (172,"spare tire","24 inch spare tire",22.2,9); + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/ddl/shop.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/ddl/shop.sql new file mode 100644 index 00000000000..be2eaaeca9e --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/ddl/shop.sql @@ -0,0 +1,44 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: shop +-- ---------------------------------------------------------------------------------------------------------------- +CREATE DATABASE IF NOT EXISTS `shop`; +use shop; + +drop table if exists products; +-- Create and populate our products using a single insert with many rows +CREATE TABLE products ( + id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + name VARCHAR(255) NOT NULL DEFAULT 'SeaTunnel', + description VARCHAR(512), + weight FLOAT +); + +ALTER TABLE products AUTO_INCREMENT = 101; + +INSERT INTO products +VALUES (101,"scooter","Small 2-wheel scooter",3.14), + (102,"car battery","12V car battery",8.1), + (103,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8), + (104,"hammer","12oz carpenter's hammer",0.75), + (105,"hammer","14oz carpenter's hammer",0.875), + (106,"hammer","16oz carpenter's hammer",1.0), + (107,"rocks","box of assorted rocks",5.3), + (108,"jacket","water resistent black wind breaker",0.1), + (109,"spare tire","24 inch spare tire",22.2); \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/mysqlcdc_to_doris_with_schema_change.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/mysqlcdc_to_doris_with_schema_change.conf index 795d0d144f5..3c0126381c6 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/mysqlcdc_to_doris_with_schema_change.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/mysqlcdc_to_doris_with_schema_change.conf @@ -46,7 +46,7 @@ sink { table = "products" sink.label-prefix = "test-cdc" sink.enable-2pc = "true" - sink.enable-delete = "false" + sink.enable-delete = "true" doris.config { format = "json" read_json_by_line = "true" From af6cc9f16d211c063529dc61620eb1ab927576b6 Mon Sep 17 00:00:00 2001 From: deng-jeffer Date: Sat, 14 Dec 2024 09:48:45 +0800 Subject: [PATCH 3/9] [Feature][Connector-V2[Doris]fix code style --- .../exception/DorisSchemaChangeException.java | 17 +++++++++++++++++ .../doris/schema/SchemaChangeManager.java | 7 ++++--- .../doris/sink/writer/DorisSinkWriter.java | 3 ++- 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/exception/DorisSchemaChangeException.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/exception/DorisSchemaChangeException.java index 10b7d8c86c6..bbe4a290d4e 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/exception/DorisSchemaChangeException.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/exception/DorisSchemaChangeException.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.seatunnel.connectors.doris.exception; import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/schema/SchemaChangeManager.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/schema/SchemaChangeManager.java index e0bc93d72c2..95e768e6f8c 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/schema/SchemaChangeManager.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/schema/SchemaChangeManager.java @@ -19,6 +19,10 @@ package org.apache.seatunnel.connectors.doris.schema; +import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode; +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper; + import org.apache.seatunnel.api.table.catalog.Column; import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.converter.BasicTypeDefine; @@ -47,9 +51,6 @@ import org.apache.http.impl.client.HttpClients; import org.apache.http.util.EntityUtils; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import java.io.IOException; diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java index f1e1c5f848c..532d68f075b 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.connectors.doris.sink.writer; +import org.apache.seatunnel.shade.com.google.common.util.concurrent.ThreadFactoryBuilder; + import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter; import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSinkWriter; @@ -41,7 +43,6 @@ import org.apache.seatunnel.connectors.doris.util.HttpUtil; import org.apache.seatunnel.connectors.doris.util.UnsupportedTypeConverterUtils; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; import java.io.IOException; From eadaa715aa1544485003a827c52f2cffe88860b7 Mon Sep 17 00:00:00 2001 From: deng-jeffer Date: Sat, 14 Dec 2024 09:56:34 +0800 Subject: [PATCH 4/9] [Feature][Connector-V2[Doris]fix code style again --- .../seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java index 532d68f075b..c31aedb4713 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java @@ -56,7 +56,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import static com.google.common.base.Preconditions.checkState; +import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkState; @Slf4j public class DorisSinkWriter From bf73b408ee55f8e9ff63df033012e4022c00d9e0 Mon Sep 17 00:00:00 2001 From: deng-jeffer Date: Sat, 14 Dec 2024 12:18:52 +0800 Subject: [PATCH 5/9] [Feature][Connector-V2[Doris]fix cdc test case --- .../seatunnel/e2e/connector/doris/DorisSchemaChangeIT.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisSchemaChangeIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisSchemaChangeIT.java index 4e2e5feee49..0374327160f 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisSchemaChangeIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisSchemaChangeIT.java @@ -279,9 +279,7 @@ private Connection getMysqlJdbcConnection() throws SQLException { } @BeforeAll - @Override - public void startUp() { - super.startUp(); + public void init() { log.info("The second stage: Starting Mysql containers..."); Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join(); log.info("Mysql Containers are started"); From 95fb77f88b8148785cfff3fa7962d66346391151 Mon Sep 17 00:00:00 2001 From: deng-jeffer Date: Mon, 16 Dec 2024 13:11:17 +0800 Subject: [PATCH 6/9] [Feature][Connector-V2[Doris]fix ci --- .../seatunnel/e2e/connector/doris/DorisSchemaChangeIT.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisSchemaChangeIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisSchemaChangeIT.java index 0374327160f..4f90f53aa31 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisSchemaChangeIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisSchemaChangeIT.java @@ -22,7 +22,6 @@ import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer; import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion; import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase; -import org.apache.seatunnel.e2e.common.TestResource; import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; import org.apache.seatunnel.e2e.common.container.EngineType; import org.apache.seatunnel.e2e.common.container.TestContainer; @@ -64,7 +63,7 @@ type = {EngineType.SPARK, EngineType.FLINK}, disabledReason = "Currently SPARK do not support cdc. In addition, currently only the zeta engine supports schema evolution for pr https://github.com/apache/seatunnel/pull/5125.") -public class DorisSchemaChangeIT extends AbstractDorisIT implements TestResource { +public class DorisSchemaChangeIT extends AbstractDorisIT { private static final String DATABASE = "shop"; private static final String SOURCE_TABLE = "products"; private static final String MYSQL_HOST = "mysql_cdc_e2e"; @@ -294,8 +293,7 @@ public void init() { } @AfterAll - @Override - public void tearDown() throws SQLException { + public void close() throws SQLException { if (MYSQL_CONTAINER != null) { MYSQL_CONTAINER.close(); } From e35763456b5c91af09ab12f842d300f09ef5e033 Mon Sep 17 00:00:00 2001 From: deng-jeffer Date: Wed, 18 Dec 2024 22:09:49 +0800 Subject: [PATCH 7/9] [Feature][Connector-V2[Doris]merge dev --- .../connectors/doris/schema/SchemaChangeManager.java | 12 ++++++------ .../doris/sink/writer/DorisSinkWriter.java | 1 - .../mysqlcdc_to_doris_with_schema_change.conf | 4 +--- 3 files changed, 7 insertions(+), 10 deletions(-) diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/schema/SchemaChangeManager.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/schema/SchemaChangeManager.java index 95e768e6f8c..940f78c0175 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/schema/SchemaChangeManager.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/schema/SchemaChangeManager.java @@ -38,7 +38,6 @@ import org.apache.seatunnel.connectors.doris.datatype.DorisTypeConverterV2; import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode; import org.apache.seatunnel.connectors.doris.exception.DorisSchemaChangeException; -import org.apache.seatunnel.connectors.doris.rest.RestService; import org.apache.commons.codec.binary.Base64; import org.apache.commons.lang3.StringUtils; @@ -56,7 +55,10 @@ import java.io.IOException; import java.io.Serializable; import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; @Slf4j @@ -363,11 +365,9 @@ public HttpPost buildHttpPost(String ddl, String database) throws IllegalArgumentException, IOException { Map param = new HashMap<>(); param.put("stmt", ddl); - String requestUrl = - String.format( - SCHEMA_CHANGE_API, - RestService.randomEndpoint(dorisSinkConfig.getFrontends(), log), - database); + List feNodes = Arrays.asList(dorisSinkConfig.getFrontends().split(",")); + Collections.shuffle(feNodes); + String requestUrl = String.format(SCHEMA_CHANGE_API, feNodes.get(0), database); HttpPost httpPost = new HttpPost(requestUrl); httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader()); httpPost.setHeader( diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java index fda0cc40aab..131012648fe 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java @@ -33,7 +33,6 @@ import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode; import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException; import org.apache.seatunnel.connectors.doris.exception.DorisSchemaChangeException; -import org.apache.seatunnel.connectors.doris.rest.RestService; import org.apache.seatunnel.connectors.doris.rest.models.RespContent; import org.apache.seatunnel.connectors.doris.schema.SchemaChangeManager; import org.apache.seatunnel.connectors.doris.serialize.DorisSerializer; diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/mysqlcdc_to_doris_with_schema_change.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/mysqlcdc_to_doris_with_schema_change.conf index 3c0126381c6..8088ff009e8 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/mysqlcdc_to_doris_with_schema_change.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/mysqlcdc_to_doris_with_schema_change.conf @@ -31,9 +31,7 @@ source { password = "mysqlpw" table-names = ["shop.products"] base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop" - debezium = { - include.schema.changes = true - } + schema-changes.enabled = true } } From f295af60b995f29a4f7eb86cf87228c99ff05b52 Mon Sep 17 00:00:00 2001 From: deng-jeffer Date: Thu, 19 Dec 2024 11:17:38 +0800 Subject: [PATCH 8/9] Update docs/en/concept/schema-evolution.md Co-authored-by: hailin0 --- docs/en/concept/schema-evolution.md | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/docs/en/concept/schema-evolution.md b/docs/en/concept/schema-evolution.md index 2a90f437af6..418530e713d 100644 --- a/docs/en/concept/schema-evolution.md +++ b/docs/en/concept/schema-evolution.md @@ -225,9 +225,7 @@ source { password = "mysqlpw" table-names = ["shop.products"] base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop" - debezium = { - include.schema.changes = true - } + schema-changes.enabled = true } } From 712b3dcdd4cbaace60f5dbc72e95948430c1f7bf Mon Sep 17 00:00:00 2001 From: deng-jeffer Date: Thu, 19 Dec 2024 11:17:58 +0800 Subject: [PATCH 9/9] Update docs/zh/concept/schema-evolution.md Co-authored-by: hailin0 --- docs/zh/concept/schema-evolution.md | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/docs/zh/concept/schema-evolution.md b/docs/zh/concept/schema-evolution.md index 48e34154d35..4694ed1af18 100644 --- a/docs/zh/concept/schema-evolution.md +++ b/docs/zh/concept/schema-evolution.md @@ -226,9 +226,7 @@ source { password = "mysqlpw" table-names = ["shop.products"] base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop" - debezium = { - include.schema.changes = true - } + schema-changes.enabled = true } }