Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve] [Connector-V2-Clickhouse] Support nest type and array #3047

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
9406476
[Improve] [Connector-V2-Clickhouse] Support nest type and array
Oct 10, 2022
6edac39
Merge branch 'apache:dev' into clickhouse_source_support_nest_type_an…
FWLamb Oct 18, 2022
c254ab3
Merge branch 'apache:dev' into clickhouse_source_support_nest_type_an…
FWLamb Oct 18, 2022
c0b21fc
[Improve] [Connector-V2-Clickhouse] Support nest type and array
Oct 18, 2022
1305c11
Merge remote-tracking branch 'origin/clickhouse_source_support_nest_t…
Oct 18, 2022
d8a3bbe
[Improve] [Connector-V2-Clickhouse] Support nest type and array
Oct 18, 2022
ccd367d
[Improve] [Connector-V2-Clickhouse] Support nest type and array
Oct 18, 2022
cb6d56e
Merge branch 'apache:dev' into clickhouse_source_support_nest_type_an…
FWLamb Oct 24, 2022
7085567
[Improve] [Connector-V2-Clickhouse] Support nest type and array
Oct 24, 2022
5842bea
Merge remote-tracking branch 'origin/clickhouse_source_support_nest_t…
Oct 24, 2022
73dafb8
[Improve] [Connector-V2-Clickhouse] Support nest type and array
Oct 24, 2022
bb85be7
[Improve] [Connector-V2-Clickhouse] Support nest type and array
Oct 24, 2022
06a9c9f
[Improve] [Connector-V2-Clickhouse] Support nest type and array
Oct 24, 2022
8bccb04
[Improve] [Connector-V2-Clickhouse] Support nest type and array
Oct 24, 2022
0f88385
[Improve] [Connector-V2-Clickhouse] Support nest type and array
Oct 24, 2022
a515d69
[Improve] [Connector-V2-Clickhouse] Support nest type and array
Oct 24, 2022
50608b9
[Improve] [Connector-V2-Clickhouse] Support nest type and array
Oct 24, 2022
249c9b5
[Improve] [Connector-V2-Clickhouse] Support nest type and array
Oct 24, 2022
b907108
[Improve] [Connector-V2-Clickhouse] Support nest type and array
Oct 25, 2022
ea7f1d6
Merge branch 'apache:dev' into clickhouse_source_support_nest_type_an…
FWLamb Oct 25, 2022
aacc76b
Update Support nest type and array
Oct 25, 2022
839e737
Merge remote-tracking branch 'upstream/dev' into pr/3047
Hisoka-X Oct 25, 2022
d90aee8
[Engine] [Test] Enable ClickhouseIT
Hisoka-X Oct 25, 2022
02108da
Merge branch 'apache:dev' into clickhouse_source_support_nest_type_an…
FWLamb Oct 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ private void addIntoBatch(SeaTunnelRow row, PreparedStatement clickHouseStatemen
}
String fieldType = option.getTableSchema().get(fieldName);
fieldInjectFunctionMap
.getOrDefault(fieldType, DEFAULT_INJECT_FUNCTION)
.injectFields(clickHouseStatement, i + 1, fieldValue);
.getOrDefault(fieldType, DEFAULT_INJECT_FUNCTION)
.injectFields(clickHouseStatement, i + 1, fieldValue);
}
clickHouseStatement.addBatch();
} catch (SQLException e) {
Expand All @@ -162,11 +162,11 @@ private Map<Shard, ClickhouseBatchStatement> initStatementMap() {
shardRouter.getShards().forEach((weight, s) -> {
try {
ClickHouseConnectionImpl clickhouseConnection = new ClickHouseConnectionImpl(s.getJdbcUrl(),
this.option.getProperties());
this.option.getProperties());
PreparedStatement preparedStatement = clickhouseConnection.prepareStatement(prepareSql);
IntHolder intHolder = new IntHolder();
ClickhouseBatchStatement batchStatement =
new ClickhouseBatchStatement(clickhouseConnection, preparedStatement, intHolder);
new ClickhouseBatchStatement(clickhouseConnection, preparedStatement, intHolder);
result.put(s, batchStatement);
} catch (SQLException e) {
throw new RuntimeException("Clickhouse prepare statement error: " + e.getMessage(), e);
Expand All @@ -180,28 +180,29 @@ private String initPrepareSQL() {
Arrays.fill(placeholder, "?");

return String.format("INSERT INTO %s (%s) VALUES (%s)",
shardRouter.getShardTable(),
String.join(",", option.getFields()),
String.join(",", placeholder));
shardRouter.getShardTable(),
String.join(",", option.getFields()),
String.join(",", placeholder));
}

private Map<String, ClickhouseFieldInjectFunction> initFieldInjectFunctionMap() {
Map<String, ClickhouseFieldInjectFunction> result = new HashMap<>(Common.COLLECTION_SIZE);
List<ClickhouseFieldInjectFunction> clickhouseFieldInjectFunctions = Lists.newArrayList(
new ArrayInjectFunction(),
new MapInjectFunction(),
new BigDecimalInjectFunction(),
new DateInjectFunction(),
new DateTimeInjectFunction(),
new LongInjectFunction(),
new DoubleInjectFunction(),
new FloatInjectFunction(),
new IntInjectFunction(),
new StringInjectFunction()
);
List<ClickhouseFieldInjectFunction> clickhouseFieldInjectFunctions;
ClickhouseFieldInjectFunction defaultFunction = new StringInjectFunction();
// get field type
for (String field : this.option.getFields()) {
clickhouseFieldInjectFunctions = Lists.newArrayList(
new ArrayInjectFunction(),
new MapInjectFunction(),
new BigDecimalInjectFunction(),
new DateInjectFunction(),
new DateTimeInjectFunction(),
new LongInjectFunction(),
new DoubleInjectFunction(),
new FloatInjectFunction(),
new IntInjectFunction(),
new StringInjectFunction()
);
ClickhouseFieldInjectFunction function = defaultFunction;
String fieldType = this.option.getTableSchema().get(field);
for (ClickhouseFieldInjectFunction clickhouseFieldInjectFunction : clickhouseFieldInjectFunctions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,72 @@

import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.regex.Pattern;

public class ArrayInjectFunction implements ClickhouseFieldInjectFunction {

private static final Pattern PATTERN = Pattern.compile("(Array.*)");
private String fieldType;

@Override
public void injectFields(PreparedStatement statement, int index, Object value) throws SQLException {
statement.setArray(index, (java.sql.Array) value);
String sqlType;
Object[] elements = (Object[]) value;
String type = fieldType.substring(fieldType.indexOf("(") + 1, fieldType.indexOf(")"));
switch (type) {
case "String":
case "Int128":
case "UInt128":
case "Int256":
case "UInt256":
sqlType = "TEXT";
elements = Arrays.copyOf(elements, elements.length, String[].class);
break;
case "Int8":
sqlType = "TINYINT";
elements = Arrays.copyOf(elements, elements.length, Byte[].class);
break;
case "UInt8":
case "Int16":
sqlType = "SMALLINT";
elements = Arrays.copyOf(elements, elements.length, Short[].class);
break;
case "UInt16":
case "Int32":
sqlType = "INTEGER";
elements = Arrays.copyOf(elements, elements.length, Integer[].class);
break;
case "UInt32":
case "Int64":
case "UInt64":
sqlType = "BIGINT";
elements = Arrays.copyOf(elements, elements.length, Long[].class);
break;
case "Float32":
sqlType = "REAL";
elements = Arrays.copyOf(elements, elements.length, Float[].class);
break;
case "Float64":
sqlType = "DOUBLE";
elements = Arrays.copyOf(elements, elements.length, Double[].class);
break;
case "Bool":
sqlType = "BOOLEAN";
elements = Arrays.copyOf(elements, elements.length, Boolean[].class);
break;
default:
throw new IllegalArgumentException("array inject error, not supported data type: " + type);
}
statement.setArray(index, statement.getConnection().createArrayOf(sqlType, elements));
}

@Override
public boolean isCurrentFieldType(String fieldType) {
return PATTERN.matcher(fieldType).matches();
if (PATTERN.matcher(fieldType).matches()) {
this.fieldType = fieldType;
return true;
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,29 @@
public class TypeConvertUtil {

public static SeaTunnelDataType<?> convert(ClickHouseColumn column) {
if (column.isArray()) {
ClickHouseColumn subArrayDataType = column.getNestedColumns().get(0);
SeaTunnelDataType<?> dataType = convert(subArrayDataType);
if (BasicType.INT_TYPE.equals(dataType)) {
return ArrayType.INT_ARRAY_TYPE;
} else if (BasicType.STRING_TYPE.equals(dataType)) {
return ArrayType.STRING_ARRAY_TYPE;
} else if (BasicType.FLOAT_TYPE.equals(dataType)) {
return ArrayType.FLOAT_ARRAY_TYPE;
} else if (BasicType.DOUBLE_TYPE.equals(dataType)) {
return ArrayType.DOUBLE_ARRAY_TYPE;
} else if (BasicType.LONG_TYPE.equals(dataType)) {
return ArrayType.LONG_ARRAY_TYPE;
} else if (BasicType.SHORT_TYPE.equals(dataType)) {
return ArrayType.SHORT_ARRAY_TYPE;
} else if (BasicType.BOOLEAN_TYPE.equals(dataType)) {
return ArrayType.BOOLEAN_ARRAY_TYPE;
} else if (BasicType.BYTE_TYPE.equals(dataType)) {
return ArrayType.BYTE_ARRAY_TYPE;
} else {
throw new IllegalArgumentException("data type in array is not supported: " + subArrayDataType.getDataType());
}
}
Class<?> type = column.getDataType().getObjectClass();
if (Integer.class.equals(type)) {
return BasicType.INT_TYPE;
Expand Down Expand Up @@ -81,7 +104,6 @@ public static SeaTunnelDataType<?> convert(ClickHouseColumn column) {
}

public static Object valueUnwrap(SeaTunnelDataType<?> dataType, ClickHouseValue record) {

if (dataType instanceof DecimalType) {
return record.asBigDecimal();
} else if (dataType.equals(BasicType.BOOLEAN_TYPE)) {
Expand All @@ -107,7 +129,26 @@ public static Object valueUnwrap(SeaTunnelDataType<?> dataType, ClickHouseValue
} else if (dataType instanceof MapType) {
return record.asMap();
} else if (dataType instanceof ArrayType) {
return record.asObject();
Class<?> typeClass = dataType.getTypeClass();
if (String[].class.equals(typeClass)) {
return record.asArray(String.class);
} else if (Boolean[].class.equals(typeClass)) {
return record.asArray(Boolean.class);
} else if (Byte[].class.equals(typeClass)) {
return record.asArray(Byte.class);
} else if (Short[].class.equals(typeClass)) {
return record.asArray(Short.class);
} else if (Integer[].class.equals(typeClass)) {
return record.asArray(Integer.class);
} else if (Long[].class.equals(typeClass)) {
return record.asArray(Long.class);
} else if (Float[].class.equals(typeClass)) {
return record.asArray(Float.class);
} else if (Double[].class.equals(typeClass)) {
return record.asArray(Double.class);
} else {
return record.asArray();
}
} else {
// TODO support pojo
throw new IllegalArgumentException("not supported data type: " + dataType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.TestTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -55,7 +54,7 @@
import java.sql.Array;
import java.sql.Connection;
import java.sql.Date;
import java.sql.DriverManager;
import java.sql.Driver;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
Expand All @@ -68,13 +67,13 @@
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import scala.Tuple2;

@Disabled("Temporary fast fix, reason1: Transactions are not supported. reason2: Invalid boolean value, should be true or false controlled by setting bool_true_representation and bool_false_representation")
public class ClickhouseIT extends TestSuiteBase implements TestResource {
private static final Logger LOG = LoggerFactory.getLogger(ClickhouseIT.class);
private static final String CLICKHOUSE_DOCKER_IMAGE = "yandex/clickhouse-server:latest";
Expand Down Expand Up @@ -110,7 +109,6 @@ public void startUp() throws Exception {
.withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(CLICKHOUSE_DOCKER_IMAGE)));
Startables.deepStart(Stream.of(this.container)).join();
LOG.info("Clickhouse container started");
Class.forName(DRIVER_CLASS);
Awaitility.given()
.ignoreExceptions()
.await()
Expand All @@ -122,7 +120,6 @@ public void startUp() throws Exception {

private void initializeClickhouseTable() {
try {
this.connection.setAutoCommit(false);
Statement statement = this.connection.createStatement();
statement.execute(CONFIG.getString(SOURCE_TABLE));
statement.execute(CONFIG.getString(SINK_TABLE));
Expand All @@ -131,12 +128,11 @@ private void initializeClickhouseTable() {
}
}

private void initConnection() throws SQLException {
this.connection = DriverManager.getConnection(
this.container.getJdbcUrl(),
this.container.getUsername(),
this.container.getPassword()
);
private void initConnection() throws SQLException, ClassNotFoundException, InstantiationException, IllegalAccessException {
final Properties info = new Properties();
info.put("user", this.container.getUsername());
info.put("password", this.container.getPassword());
this.connection = ((Driver) Class.forName(DRIVER_CLASS).newInstance()).connect(this.container.getJdbcUrl(), info);
}

private static Config getInitClickhouseConfig() {
Expand Down Expand Up @@ -182,9 +178,10 @@ private Array toSqlArray(Object value) throws SQLException {

private void batchInsertData() {
String sql = CONFIG.getString(INSERT_SQL);
PreparedStatement preparedStatement = null;
try {
this.connection.setAutoCommit(false);
PreparedStatement preparedStatement = this.connection.prepareStatement(sql);
this.connection.setAutoCommit(true);
preparedStatement = this.connection.prepareStatement(sql);
for (SeaTunnelRow row : TEST_DATASET._2()) {
preparedStatement.setLong(1, (Long) row.getField(0));
preparedStatement.setObject(2, row.getField(1));
Expand Down Expand Up @@ -213,9 +210,17 @@ private void batchInsertData() {
preparedStatement.addBatch();
}
preparedStatement.executeBatch();
this.connection.commit();
preparedStatement.clearBatch();
} catch (SQLException e) {
throw new RuntimeException("Batch insert data failed!", e);
} finally {
if (preparedStatement != null) {
try {
preparedStatement.close();
} catch (SQLException e) {
throw new RuntimeException("PreparedStatement close failed!", e);
}
}
}
}

Expand Down Expand Up @@ -299,7 +304,7 @@ private static Tuple2<SeaTunnelRowType, List<SeaTunnelRow>> generateTestDataSet(
i,
"string",
new Integer[]{Integer.parseInt("1")},
new Double[]{Double.parseDouble("1")},
new Double[]{Double.parseDouble("1.1")},
new String[]{"1"}
});
rows.add(row);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ create table if not exists `default`.source_table(
`c_nested` Nested
(
`int` UInt32,
`double` Int64,
`double` Float64,
`string` String
)
)engine=Memory;
Expand Down Expand Up @@ -73,7 +73,7 @@ create table if not exists `default`.sink_table(
`c_nested` Nested
(
`int` UInt32,
`double` Int64,
`double` Float64,
`string` String
)
)engine=Memory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ private static Object reconvert(Object field, SeaTunnelDataType<?> dataType) {
if (field == null) {
return null;
}

switch (dataType.getSqlType()) {
case ROW:
return reconvert((InternalRow) field, (SeaTunnelRowType) dataType);
Expand All @@ -200,9 +199,7 @@ private static Object reconvert(Object field, SeaTunnelDataType<?> dataType) {
case DECIMAL:
return ((Decimal) field).toJavaBigDecimal();
case ARRAY:
ArrayData arrayData = (ArrayData) field;
BasicType<?> elementType = ((ArrayType<?, ?>) dataType).getElementType();
return arrayData.toObjectArray(TypeConverterUtils.convert(elementType));
return reconvertArray((ArrayData) field, (ArrayType<?, ?>) dataType);
default:
return field;
}
Expand All @@ -216,4 +213,16 @@ private static SeaTunnelRow reconvert(InternalRow engineRow, SeaTunnelRowType ro
}
return new SeaTunnelRow(fields);
}

private static Object reconvertArray(ArrayData arrayData, ArrayType<?, ?> arrayType) {
if (arrayData == null || arrayData.numElements() == 0) {
return Collections.emptyList().toArray();
}
Object[] newArray = new Object[arrayData.numElements()];
Object[] values = arrayData.toObjectArray(TypeConverterUtils.convert(arrayType.getElementType()));
for (int i = 0; i < arrayData.numElements(); i++) {
newArray[i] = reconvert(values[i], arrayType.getElementType());
}
return newArray;
}
}