diff --git a/docs/en/connector-v2/sink/Clickhouse.md b/docs/en/connector-v2/sink/Clickhouse.md
index fc4a12603eb..4f1f0e16c09 100644
--- a/docs/en/connector-v2/sink/Clickhouse.md
+++ b/docs/en/connector-v2/sink/Clickhouse.md
@@ -126,3 +126,9 @@ sink {
### 2.3.0-beta 2022-10-20
- [Improve] Clickhouse Support Int128,Int256 Type ([3067](https://github.com/apache/incubator-seatunnel/pull/3067))
+
+### next version
+
+- [Improve] Clickhouse Sink support nest type and array type([3047](https://github.com/apache/incubator-seatunnel/pull/3047))
+
+- [Improve] Clickhouse Sink support geo type([3141](https://github.com/apache/incubator-seatunnel/pull/3141))
diff --git a/docs/en/connector-v2/source/Clickhouse.md b/docs/en/connector-v2/source/Clickhouse.md
index 0e89409ca11..1f048f1cdbc 100644
--- a/docs/en/connector-v2/source/Clickhouse.md
+++ b/docs/en/connector-v2/source/Clickhouse.md
@@ -93,4 +93,9 @@ source {
- [Improve] Clickhouse Source random use host when config multi-host ([3108](https://github.com/apache/incubator-seatunnel/pull/3108))
+### next version
+
+- [Improve] Clickhouse Source support nest type and array type([3047](https://github.com/apache/incubator-seatunnel/pull/3047))
+
+- [Improve] Clickhouse Source support geo type([3141](https://github.com/apache/incubator-seatunnel/pull/3141))
diff --git a/seatunnel-connectors-v2/connector-clickhouse/pom.xml b/seatunnel-connectors-v2/connector-clickhouse/pom.xml
index aee8ffe4091..5253eea84dd 100644
--- a/seatunnel-connectors-v2/connector-clickhouse/pom.xml
+++ b/seatunnel-connectors-v2/connector-clickhouse/pom.xml
@@ -62,6 +62,16 @@
${clickhouse.version}
+
+ com.fasterxml.jackson.core
+ jackson-core
+
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/StringInjectFunction.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/StringInjectFunction.java
index 4728fe21630..96d1f51936b 100644
--- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/StringInjectFunction.java
+++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/StringInjectFunction.java
@@ -17,22 +17,54 @@
package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
import java.sql.PreparedStatement;
import java.sql.SQLException;
public class StringInjectFunction implements ClickhouseFieldInjectFunction {
+ private String fieldType;
+
@Override
public void injectFields(PreparedStatement statement, int index, Object value) throws SQLException {
- statement.setString(index, value.toString());
+ ObjectMapper mapper = new ObjectMapper();
+ try {
+ if ("Point".equals(fieldType)) {
+ statement.setObject(index, mapper.readValue(replace(value.toString()), double[].class));
+ } else if ("Ring".equals(fieldType)) {
+ statement.setObject(index, mapper.readValue(replace(value.toString()), double[][].class));
+ } else if ("Polygon".equals(fieldType)) {
+ statement.setObject(index, mapper.readValue(replace(value.toString()), double[][][].class));
+ } else if ("MultiPolygon".equals(fieldType)) {
+ statement.setObject(index, mapper.readValue(replace(value.toString()), double[][][][].class));
+ } else {
+ statement.setString(index, value.toString());
+ }
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
}
@Override
public boolean isCurrentFieldType(String fieldType) {
- return "String".equals(fieldType)
+ if ("String".equals(fieldType)
|| "Int128".equals(fieldType)
|| "UInt128".equals(fieldType)
|| "Int256".equals(fieldType)
- || "UInt256".equals(fieldType);
+ || "UInt256".equals(fieldType)
+ || "Point".equals(fieldType)
+ || "Ring".equals(fieldType)
+ || "Polygon".equals(fieldType)
+ || "MultiPolygon".equals(fieldType)) {
+ this.fieldType = fieldType;
+ return true;
+ }
+ return false;
+ }
+
+ private static String replace(String str) {
+ return str.replaceAll("\\(", "[").replaceAll("\\)", "]");
}
}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/pom.xml
index a4c69aaaaa8..b275e0eca2d 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/pom.xml
@@ -44,5 +44,18 @@
${clickhouse.jdbc.version}
test
+
+
+ org.apache.seatunnel
+ connector-clickhouse
+ ${project.version}
+ test
+
+
+ org.apache.seatunnel
+ seatunnel-e2e-common
+ ${project.version}
+ test
+
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java
index 1a5170c37fc..05ca6d35990 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java
@@ -112,7 +112,7 @@ public void startUp() throws Exception {
Awaitility.given()
.ignoreExceptions()
.await()
- .atMost(180L, TimeUnit.SECONDS)
+ .atMost(360L, TimeUnit.SECONDS)
.untilAsserted(this::initConnection);
this.initializeClickhouseTable();
this.batchInsertData();
@@ -207,6 +207,12 @@ private void batchInsertData() {
preparedStatement.setArray(22, toSqlArray(row.getField(21)));
preparedStatement.setArray(23, toSqlArray(row.getField(22)));
preparedStatement.setArray(24, toSqlArray(row.getField(23)));
+ preparedStatement.setObject(25, row.getField(24));
+ preparedStatement.setObject(26, row.getField(25));
+ preparedStatement.setObject(27, row.getField(26));
+ preparedStatement.setObject(28, row.getField(27));
+ preparedStatement.setObject(29, row.getField(28));
+ preparedStatement.setObject(30, row.getField(29));
preparedStatement.addBatch();
}
preparedStatement.executeBatch();
@@ -250,7 +256,13 @@ private static Tuple2> generateTestDataSet(
"c_lowcardinality",
"c_nested.int",
"c_nested.double",
- "c_nested.string"
+ "c_nested.string",
+ "c_int128",
+ "c_uint128",
+ "c_int256",
+ "c_uint256",
+ "c_point",
+ "c_ring"
},
new SeaTunnelDataType[]{
BasicType.LONG_TYPE,
@@ -276,7 +288,13 @@ private static Tuple2> generateTestDataSet(
BasicType.STRING_TYPE,
ArrayType.INT_ARRAY_TYPE,
ArrayType.DOUBLE_ARRAY_TYPE,
- ArrayType.STRING_ARRAY_TYPE
+ ArrayType.STRING_ARRAY_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.STRING_TYPE
});
List rows = new ArrayList<>();
for (int i = 0; i < 100; ++i) {
@@ -305,7 +323,13 @@ private static Tuple2> generateTestDataSet(
"string",
new Integer[]{Integer.parseInt("1")},
new Double[]{Double.parseDouble("1.1")},
- new String[]{"1"}
+ new String[]{"1"},
+ "170141183460469231731687303715884105727",
+ "340282366920938463463374607431768211455",
+ "57896044618658097711785492504343953926634992332820282019728792003956564819967",
+ "115792089237316195423570985008687907853269984665640564039457584007913129639935",
+ new double[]{1, 2},
+ new double[][]{{2, 3}, {4, 5}}
});
rows.add(row);
}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_to_clickhouse.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_to_clickhouse.conf
index ed27371b824..536096b936c 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_to_clickhouse.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_to_clickhouse.conf
@@ -72,7 +72,13 @@ sink {
"c_lowcardinality",
"c_nested.int",
"c_nested.double",
- "c_nested.string"
+ "c_nested.string",
+ "c_int128",
+ "c_uint128",
+ "c_int256",
+ "c_uint256",
+ "c_point",
+ "c_ring"
]
username = "default"
password = ""
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/init/clickhouse_init.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/init/clickhouse_init.conf
index 108f4e5589c..78f2daa1d79 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/init/clickhouse_init.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/init/clickhouse_init.conf
@@ -16,6 +16,7 @@
#
source_table = """
+set allow_experimental_geo_types = 1;
create table if not exists `default`.source_table(
`id` Int64,
`c_map` Map(String, Int32),
@@ -43,7 +44,13 @@ create table if not exists `default`.source_table(
`int` UInt32,
`double` Float64,
`string` String
- )
+ ),
+ `c_int128` Int128,
+ `c_uint128` UInt128,
+ `c_int256` Int256,
+ `c_uint256` UInt256,
+ `c_point` Point,
+ `c_ring` Ring
)engine=Memory;
"""
@@ -75,7 +82,13 @@ create table if not exists `default`.sink_table(
`int` UInt32,
`double` Float64,
`string` String
- )
+ ),
+ `c_int128` Int128,
+ `c_uint128` UInt128,
+ `c_int256` Int256,
+ `c_uint256` UInt256,
+ `c_point` Point,
+ `c_ring` Ring
)engine=Memory;
"""
@@ -105,10 +118,16 @@ insert into `default`.source_table
`c_lowcardinality`,
`c_nested.int`,
`c_nested.double`,
- `c_nested.string`
+ `c_nested.string`,
+ `c_int128`,
+ `c_uint128`,
+ `c_int256`,
+ `c_uint256`,
+ `c_point`,
+ `c_ring`
)
values
-(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
+(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
"""
compare_sql = """