diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java index 27359ee32c2..79411ed5f9b 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java @@ -140,8 +140,8 @@ private void addIntoBatch(SeaTunnelRow row, PreparedStatement clickHouseStatemen } String fieldType = option.getTableSchema().get(fieldName); fieldInjectFunctionMap - .getOrDefault(fieldType, DEFAULT_INJECT_FUNCTION) - .injectFields(clickHouseStatement, i + 1, fieldValue); + .getOrDefault(fieldType, DEFAULT_INJECT_FUNCTION) + .injectFields(clickHouseStatement, i + 1, fieldValue); } clickHouseStatement.addBatch(); } catch (SQLException e) { @@ -162,11 +162,11 @@ private Map initStatementMap() { shardRouter.getShards().forEach((weight, s) -> { try { ClickHouseConnectionImpl clickhouseConnection = new ClickHouseConnectionImpl(s.getJdbcUrl(), - this.option.getProperties()); + this.option.getProperties()); PreparedStatement preparedStatement = clickhouseConnection.prepareStatement(prepareSql); IntHolder intHolder = new IntHolder(); ClickhouseBatchStatement batchStatement = - new ClickhouseBatchStatement(clickhouseConnection, preparedStatement, intHolder); + new ClickhouseBatchStatement(clickhouseConnection, preparedStatement, intHolder); result.put(s, batchStatement); } catch (SQLException e) { throw new RuntimeException("Clickhouse prepare statement error: " + e.getMessage(), e); @@ -180,28 +180,29 @@ private String initPrepareSQL() { Arrays.fill(placeholder, "?"); return String.format("INSERT INTO %s (%s) VALUES (%s)", - shardRouter.getShardTable(), - String.join(",", option.getFields()), - String.join(",", placeholder)); + shardRouter.getShardTable(), + String.join(",", option.getFields()), + String.join(",", placeholder)); } private Map initFieldInjectFunctionMap() { Map result = new HashMap<>(Common.COLLECTION_SIZE); - List clickhouseFieldInjectFunctions = Lists.newArrayList( - new ArrayInjectFunction(), - new MapInjectFunction(), - new BigDecimalInjectFunction(), - new DateInjectFunction(), - new DateTimeInjectFunction(), - new LongInjectFunction(), - new DoubleInjectFunction(), - new FloatInjectFunction(), - new IntInjectFunction(), - new StringInjectFunction() - ); + List clickhouseFieldInjectFunctions; ClickhouseFieldInjectFunction defaultFunction = new StringInjectFunction(); // get field type for (String field : this.option.getFields()) { + clickhouseFieldInjectFunctions = Lists.newArrayList( + new ArrayInjectFunction(), + new MapInjectFunction(), + new BigDecimalInjectFunction(), + new DateInjectFunction(), + new DateTimeInjectFunction(), + new LongInjectFunction(), + new DoubleInjectFunction(), + new FloatInjectFunction(), + new IntInjectFunction(), + new StringInjectFunction() + ); ClickhouseFieldInjectFunction function = defaultFunction; String fieldType = this.option.getTableSchema().get(field); for (ClickhouseFieldInjectFunction clickhouseFieldInjectFunction : clickhouseFieldInjectFunctions) { diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/ArrayInjectFunction.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/ArrayInjectFunction.java index c564e5501d1..644e6199465 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/ArrayInjectFunction.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/ArrayInjectFunction.java @@ -19,19 +19,72 @@ import java.sql.PreparedStatement; import java.sql.SQLException; +import java.util.Arrays; import java.util.regex.Pattern; public class ArrayInjectFunction implements ClickhouseFieldInjectFunction { private static final Pattern PATTERN = Pattern.compile("(Array.*)"); + private String fieldType; @Override public void injectFields(PreparedStatement statement, int index, Object value) throws SQLException { - statement.setArray(index, (java.sql.Array) value); + String sqlType; + Object[] elements = (Object[]) value; + String type = fieldType.substring(fieldType.indexOf("(") + 1, fieldType.indexOf(")")); + switch (type) { + case "String": + case "Int128": + case "UInt128": + case "Int256": + case "UInt256": + sqlType = "TEXT"; + elements = Arrays.copyOf(elements, elements.length, String[].class); + break; + case "Int8": + sqlType = "TINYINT"; + elements = Arrays.copyOf(elements, elements.length, Byte[].class); + break; + case "UInt8": + case "Int16": + sqlType = "SMALLINT"; + elements = Arrays.copyOf(elements, elements.length, Short[].class); + break; + case "UInt16": + case "Int32": + sqlType = "INTEGER"; + elements = Arrays.copyOf(elements, elements.length, Integer[].class); + break; + case "UInt32": + case "Int64": + case "UInt64": + sqlType = "BIGINT"; + elements = Arrays.copyOf(elements, elements.length, Long[].class); + break; + case "Float32": + sqlType = "REAL"; + elements = Arrays.copyOf(elements, elements.length, Float[].class); + break; + case "Float64": + sqlType = "DOUBLE"; + elements = Arrays.copyOf(elements, elements.length, Double[].class); + break; + case "Bool": + sqlType = "BOOLEAN"; + elements = Arrays.copyOf(elements, elements.length, Boolean[].class); + break; + default: + throw new IllegalArgumentException("array inject error, not supported data type: " + type); + } + statement.setArray(index, statement.getConnection().createArrayOf(sqlType, elements)); } @Override public boolean isCurrentFieldType(String fieldType) { - return PATTERN.matcher(fieldType).matches(); + if (PATTERN.matcher(fieldType).matches()) { + this.fieldType = fieldType; + return true; + } + return false; } } diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/TypeConvertUtil.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/TypeConvertUtil.java index 0ccad51c9b7..abfe9680957 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/TypeConvertUtil.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/TypeConvertUtil.java @@ -39,6 +39,29 @@ public class TypeConvertUtil { public static SeaTunnelDataType convert(ClickHouseColumn column) { + if (column.isArray()) { + ClickHouseColumn subArrayDataType = column.getNestedColumns().get(0); + SeaTunnelDataType dataType = convert(subArrayDataType); + if (BasicType.INT_TYPE.equals(dataType)) { + return ArrayType.INT_ARRAY_TYPE; + } else if (BasicType.STRING_TYPE.equals(dataType)) { + return ArrayType.STRING_ARRAY_TYPE; + } else if (BasicType.FLOAT_TYPE.equals(dataType)) { + return ArrayType.FLOAT_ARRAY_TYPE; + } else if (BasicType.DOUBLE_TYPE.equals(dataType)) { + return ArrayType.DOUBLE_ARRAY_TYPE; + } else if (BasicType.LONG_TYPE.equals(dataType)) { + return ArrayType.LONG_ARRAY_TYPE; + } else if (BasicType.SHORT_TYPE.equals(dataType)) { + return ArrayType.SHORT_ARRAY_TYPE; + } else if (BasicType.BOOLEAN_TYPE.equals(dataType)) { + return ArrayType.BOOLEAN_ARRAY_TYPE; + } else if (BasicType.BYTE_TYPE.equals(dataType)) { + return ArrayType.BYTE_ARRAY_TYPE; + } else { + throw new IllegalArgumentException("data type in array is not supported: " + subArrayDataType.getDataType()); + } + } Class type = column.getDataType().getObjectClass(); if (Integer.class.equals(type)) { return BasicType.INT_TYPE; @@ -81,7 +104,6 @@ public static SeaTunnelDataType convert(ClickHouseColumn column) { } public static Object valueUnwrap(SeaTunnelDataType dataType, ClickHouseValue record) { - if (dataType instanceof DecimalType) { return record.asBigDecimal(); } else if (dataType.equals(BasicType.BOOLEAN_TYPE)) { @@ -107,7 +129,26 @@ public static Object valueUnwrap(SeaTunnelDataType dataType, ClickHouseValue } else if (dataType instanceof MapType) { return record.asMap(); } else if (dataType instanceof ArrayType) { - return record.asObject(); + Class typeClass = dataType.getTypeClass(); + if (String[].class.equals(typeClass)) { + return record.asArray(String.class); + } else if (Boolean[].class.equals(typeClass)) { + return record.asArray(Boolean.class); + } else if (Byte[].class.equals(typeClass)) { + return record.asArray(Byte.class); + } else if (Short[].class.equals(typeClass)) { + return record.asArray(Short.class); + } else if (Integer[].class.equals(typeClass)) { + return record.asArray(Integer.class); + } else if (Long[].class.equals(typeClass)) { + return record.asArray(Long.class); + } else if (Float[].class.equals(typeClass)) { + return record.asArray(Float.class); + } else if (Double[].class.equals(typeClass)) { + return record.asArray(Double.class); + } else { + return record.asArray(); + } } else { // TODO support pojo throw new IllegalArgumentException("not supported data type: " + dataType); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java index f02c35c05cb..1a5170c37fc 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java @@ -36,7 +36,6 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.TestTemplate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,7 +54,7 @@ import java.sql.Array; import java.sql.Connection; import java.sql.Date; -import java.sql.DriverManager; +import java.sql.Driver; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; @@ -68,13 +67,13 @@ import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; import scala.Tuple2; -@Disabled("Temporary fast fix, reason1: Transactions are not supported. reason2: Invalid boolean value, should be true or false controlled by setting bool_true_representation and bool_false_representation") public class ClickhouseIT extends TestSuiteBase implements TestResource { private static final Logger LOG = LoggerFactory.getLogger(ClickhouseIT.class); private static final String CLICKHOUSE_DOCKER_IMAGE = "yandex/clickhouse-server:latest"; @@ -110,7 +109,6 @@ public void startUp() throws Exception { .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(CLICKHOUSE_DOCKER_IMAGE))); Startables.deepStart(Stream.of(this.container)).join(); LOG.info("Clickhouse container started"); - Class.forName(DRIVER_CLASS); Awaitility.given() .ignoreExceptions() .await() @@ -122,7 +120,6 @@ public void startUp() throws Exception { private void initializeClickhouseTable() { try { - this.connection.setAutoCommit(false); Statement statement = this.connection.createStatement(); statement.execute(CONFIG.getString(SOURCE_TABLE)); statement.execute(CONFIG.getString(SINK_TABLE)); @@ -131,12 +128,11 @@ private void initializeClickhouseTable() { } } - private void initConnection() throws SQLException { - this.connection = DriverManager.getConnection( - this.container.getJdbcUrl(), - this.container.getUsername(), - this.container.getPassword() - ); + private void initConnection() throws SQLException, ClassNotFoundException, InstantiationException, IllegalAccessException { + final Properties info = new Properties(); + info.put("user", this.container.getUsername()); + info.put("password", this.container.getPassword()); + this.connection = ((Driver) Class.forName(DRIVER_CLASS).newInstance()).connect(this.container.getJdbcUrl(), info); } private static Config getInitClickhouseConfig() { @@ -182,9 +178,10 @@ private Array toSqlArray(Object value) throws SQLException { private void batchInsertData() { String sql = CONFIG.getString(INSERT_SQL); + PreparedStatement preparedStatement = null; try { - this.connection.setAutoCommit(false); - PreparedStatement preparedStatement = this.connection.prepareStatement(sql); + this.connection.setAutoCommit(true); + preparedStatement = this.connection.prepareStatement(sql); for (SeaTunnelRow row : TEST_DATASET._2()) { preparedStatement.setLong(1, (Long) row.getField(0)); preparedStatement.setObject(2, row.getField(1)); @@ -213,9 +210,17 @@ private void batchInsertData() { preparedStatement.addBatch(); } preparedStatement.executeBatch(); - this.connection.commit(); + preparedStatement.clearBatch(); } catch (SQLException e) { throw new RuntimeException("Batch insert data failed!", e); + } finally { + if (preparedStatement != null) { + try { + preparedStatement.close(); + } catch (SQLException e) { + throw new RuntimeException("PreparedStatement close failed!", e); + } + } } } @@ -299,7 +304,7 @@ private static Tuple2> generateTestDataSet( i, "string", new Integer[]{Integer.parseInt("1")}, - new Double[]{Double.parseDouble("1")}, + new Double[]{Double.parseDouble("1.1")}, new String[]{"1"} }); rows.add(row); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/init/clickhouse_init.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/init/clickhouse_init.conf index fbd57541187..108f4e5589c 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/init/clickhouse_init.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/init/clickhouse_init.conf @@ -41,7 +41,7 @@ create table if not exists `default`.source_table( `c_nested` Nested ( `int` UInt32, - `double` Int64, + `double` Float64, `string` String ) )engine=Memory; @@ -73,7 +73,7 @@ create table if not exists `default`.sink_table( `c_nested` Nested ( `int` UInt32, - `double` Int64, + `double` Float64, `string` String ) )engine=Memory; diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/serialization/InternalRowConverter.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/serialization/InternalRowConverter.java index 23fb88aaae1..7544b0acf6e 100644 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/serialization/InternalRowConverter.java +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/serialization/InternalRowConverter.java @@ -182,7 +182,6 @@ private static Object reconvert(Object field, SeaTunnelDataType dataType) { if (field == null) { return null; } - switch (dataType.getSqlType()) { case ROW: return reconvert((InternalRow) field, (SeaTunnelRowType) dataType); @@ -200,9 +199,7 @@ private static Object reconvert(Object field, SeaTunnelDataType dataType) { case DECIMAL: return ((Decimal) field).toJavaBigDecimal(); case ARRAY: - ArrayData arrayData = (ArrayData) field; - BasicType elementType = ((ArrayType) dataType).getElementType(); - return arrayData.toObjectArray(TypeConverterUtils.convert(elementType)); + return reconvertArray((ArrayData) field, (ArrayType) dataType); default: return field; } @@ -216,4 +213,16 @@ private static SeaTunnelRow reconvert(InternalRow engineRow, SeaTunnelRowType ro } return new SeaTunnelRow(fields); } + + private static Object reconvertArray(ArrayData arrayData, ArrayType arrayType) { + if (arrayData == null || arrayData.numElements() == 0) { + return Collections.emptyList().toArray(); + } + Object[] newArray = new Object[arrayData.numElements()]; + Object[] values = arrayData.toObjectArray(TypeConverterUtils.convert(arrayType.getElementType())); + for (int i = 0; i < arrayData.numElements(); i++) { + newArray[i] = reconvert(values[i], arrayType.getElementType()); + } + return newArray; + } }