diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java index 17608392ff1..bf00298a742 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java @@ -40,4 +40,5 @@ public class DatabaseIdentifier { public static final String TIDB = "TiDB"; public static final String XUGU = "XUGU"; public static final String IRIS = "IRIS"; + public static final String INCEPTOR = "Inceptor"; } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveDialectFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveDialectFactory.java index 56bd81b7f83..3ddf3bfab86 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveDialectFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveDialectFactory.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.inceptor.InceptorDialect; import com.google.auto.service.AutoService; @@ -33,6 +34,15 @@ public boolean acceptsURL(String url) { @Override public JdbcDialect create() { + throw new UnsupportedOperationException( + "Can't create JdbcDialect without compatible mode for Hive"); + } + + @Override + public JdbcDialect create(String compatibleMode, String fieldId) { + if ("inceptor".equals(compatibleMode)) { + return new InceptorDialect(); + } return new HiveDialect(); } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/inceptor/InceptorDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/inceptor/InceptorDialect.java new file mode 100644 index 00000000000..9770fb63bdf --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/inceptor/InceptorDialect.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.inceptor; + +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.hive.HiveDialect; + +public class InceptorDialect extends HiveDialect { + + @Override + public String dialectName() { + return DatabaseIdentifier.INCEPTOR; + } + + @Override + public JdbcRowConverter getRowConverter() { + return new InceptorJdbcRowConverter(); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/inceptor/InceptorJdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/inceptor/InceptorJdbcRowConverter.java new file mode 100644 index 00000000000..806788b30eb --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/inceptor/InceptorJdbcRowConverter.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.inceptor; + +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.type.ArrayType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.api.table.type.SqlType; +import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; +import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.hive.HiveJdbcRowConverter; + +import org.apache.commons.lang3.StringUtils; + +import java.math.BigDecimal; +import java.sql.PreparedStatement; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; + +public class InceptorJdbcRowConverter extends HiveJdbcRowConverter { + + @Override + public String converterName() { + return DatabaseIdentifier.INCEPTOR; + } + + @Override + public PreparedStatement toExternal( + TableSchema tableSchema, SeaTunnelRow row, PreparedStatement statement) { + SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType(); + for (int fieldIndex = 0; fieldIndex < rowType.getTotalFields(); fieldIndex++) { + try { + SeaTunnelDataType seaTunnelDataType = rowType.getFieldType(fieldIndex); + int statementIndex = fieldIndex + 1; + Object fieldValue = row.getField(fieldIndex); + if (fieldValue == null) { + statement.setObject(statementIndex, StringUtils.EMPTY); + continue; + } + switch (seaTunnelDataType.getSqlType()) { + case STRING: + statement.setString(statementIndex, (String) row.getField(fieldIndex)); + break; + case BOOLEAN: + statement.setBoolean(statementIndex, (Boolean) row.getField(fieldIndex)); + break; + case TINYINT: + statement.setByte(statementIndex, (Byte) row.getField(fieldIndex)); + break; + case SMALLINT: + statement.setShort(statementIndex, (Short) row.getField(fieldIndex)); + break; + case INT: + statement.setInt(statementIndex, (Integer) row.getField(fieldIndex)); + break; + case BIGINT: + statement.setLong(statementIndex, (Long) row.getField(fieldIndex)); + break; + case FLOAT: + statement.setFloat(statementIndex, (Float) row.getField(fieldIndex)); + break; + case DOUBLE: + statement.setDouble(statementIndex, (Double) row.getField(fieldIndex)); + break; + case DECIMAL: + statement.setBigDecimal( + statementIndex, (BigDecimal) row.getField(fieldIndex)); + break; + case DATE: + LocalDate localDate = (LocalDate) row.getField(fieldIndex); + statement.setDate(statementIndex, java.sql.Date.valueOf(localDate)); + break; + case TIME: + writeTime(statement, statementIndex, (LocalTime) row.getField(fieldIndex)); + break; + case TIMESTAMP: + LocalDateTime localDateTime = (LocalDateTime) row.getField(fieldIndex); + statement.setTimestamp( + statementIndex, java.sql.Timestamp.valueOf(localDateTime)); + break; + case BYTES: + statement.setBytes(statementIndex, (byte[]) row.getField(fieldIndex)); + break; + case NULL: + statement.setNull(statementIndex, java.sql.Types.NULL); + break; + case ARRAY: + SeaTunnelDataType elementType = + ((ArrayType) seaTunnelDataType).getElementType(); + Object[] array = (Object[]) row.getField(fieldIndex); + if (array == null) { + statement.setNull(statementIndex, java.sql.Types.ARRAY); + break; + } + if (SqlType.TINYINT.equals(elementType.getSqlType())) { + Short[] shortArray = new Short[array.length]; + for (int i = 0; i < array.length; i++) { + shortArray[i] = Short.valueOf(array[i].toString()); + } + statement.setObject(statementIndex, shortArray); + } else { + statement.setObject(statementIndex, array); + } + break; + case MAP: + case ROW: + default: + throw new JdbcConnectorException( + CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, + "Unexpected value: " + seaTunnelDataType); + } + } catch (Exception e) { + throw new JdbcConnectorException( + JdbcConnectorErrorCode.DATA_TYPE_CAST_FAILED, + "error field:" + rowType.getFieldNames()[fieldIndex], + e); + } + } + return statement; + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveDialectFactoryTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveDialectFactoryTest.java new file mode 100644 index 00000000000..169f51b6aea --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveDialectFactoryTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.hive; + +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.inceptor.InceptorDialect; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class HiveDialectFactoryTest { + + @Test + public void testWithCompatibleMode() { + HiveDialectFactory hiveDialectFactory = new HiveDialectFactory(); + JdbcDialect inceptorDialect = hiveDialectFactory.create("inceptor", ""); + Assertions.assertTrue(inceptorDialect instanceof InceptorDialect); + JdbcDialect hiveDialect = hiveDialectFactory.create("", ""); + Assertions.assertTrue(hiveDialect instanceof HiveDialect); + } +}