diff --git a/docs/en/connector-v2/sink/Clickhouse.md b/docs/en/connector-v2/sink/Clickhouse.md index fc4a12603eb..4f1f0e16c09 100644 --- a/docs/en/connector-v2/sink/Clickhouse.md +++ b/docs/en/connector-v2/sink/Clickhouse.md @@ -126,3 +126,9 @@ sink { ### 2.3.0-beta 2022-10-20 - [Improve] Clickhouse Support Int128,Int256 Type ([3067](https://github.com/apache/incubator-seatunnel/pull/3067)) + +### next version + +- [Improve] Clickhouse Sink support nest type and array type([3047](https://github.com/apache/incubator-seatunnel/pull/3047)) + +- [Improve] Clickhouse Sink support geo type([3141](https://github.com/apache/incubator-seatunnel/pull/3141)) diff --git a/docs/en/connector-v2/source/Clickhouse.md b/docs/en/connector-v2/source/Clickhouse.md index 0e89409ca11..1f048f1cdbc 100644 --- a/docs/en/connector-v2/source/Clickhouse.md +++ b/docs/en/connector-v2/source/Clickhouse.md @@ -93,4 +93,9 @@ source { - [Improve] Clickhouse Source random use host when config multi-host ([3108](https://github.com/apache/incubator-seatunnel/pull/3108)) +### next version + +- [Improve] Clickhouse Source support nest type and array type([3047](https://github.com/apache/incubator-seatunnel/pull/3047)) + +- [Improve] Clickhouse Source support geo type([3141](https://github.com/apache/incubator-seatunnel/pull/3141)) diff --git a/seatunnel-connectors-v2/connector-clickhouse/pom.xml b/seatunnel-connectors-v2/connector-clickhouse/pom.xml index aee8ffe4091..5253eea84dd 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/pom.xml +++ b/seatunnel-connectors-v2/connector-clickhouse/pom.xml @@ -62,6 +62,16 @@ ${clickhouse.version} + + com.fasterxml.jackson.core + jackson-core + + + + com.fasterxml.jackson.core + jackson-databind + + \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/StringInjectFunction.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/StringInjectFunction.java index 4728fe21630..96d1f51936b 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/StringInjectFunction.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/StringInjectFunction.java @@ -17,22 +17,54 @@ package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + import java.sql.PreparedStatement; import java.sql.SQLException; public class StringInjectFunction implements ClickhouseFieldInjectFunction { + private String fieldType; + @Override public void injectFields(PreparedStatement statement, int index, Object value) throws SQLException { - statement.setString(index, value.toString()); + ObjectMapper mapper = new ObjectMapper(); + try { + if ("Point".equals(fieldType)) { + statement.setObject(index, mapper.readValue(replace(value.toString()), double[].class)); + } else if ("Ring".equals(fieldType)) { + statement.setObject(index, mapper.readValue(replace(value.toString()), double[][].class)); + } else if ("Polygon".equals(fieldType)) { + statement.setObject(index, mapper.readValue(replace(value.toString()), double[][][].class)); + } else if ("MultiPolygon".equals(fieldType)) { + statement.setObject(index, mapper.readValue(replace(value.toString()), double[][][][].class)); + } else { + statement.setString(index, value.toString()); + } + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } } @Override public boolean isCurrentFieldType(String fieldType) { - return "String".equals(fieldType) + if ("String".equals(fieldType) || "Int128".equals(fieldType) || "UInt128".equals(fieldType) || "Int256".equals(fieldType) - || "UInt256".equals(fieldType); + || "UInt256".equals(fieldType) + || "Point".equals(fieldType) + || "Ring".equals(fieldType) + || "Polygon".equals(fieldType) + || "MultiPolygon".equals(fieldType)) { + this.fieldType = fieldType; + return true; + } + return false; + } + + private static String replace(String str) { + return str.replaceAll("\\(", "[").replaceAll("\\)", "]"); } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/pom.xml index a4c69aaaaa8..b275e0eca2d 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/pom.xml @@ -44,5 +44,18 @@ ${clickhouse.jdbc.version} test + + + org.apache.seatunnel + connector-clickhouse + ${project.version} + test + + + org.apache.seatunnel + seatunnel-e2e-common + ${project.version} + test + \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java index 1a5170c37fc..05ca6d35990 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java @@ -112,7 +112,7 @@ public void startUp() throws Exception { Awaitility.given() .ignoreExceptions() .await() - .atMost(180L, TimeUnit.SECONDS) + .atMost(360L, TimeUnit.SECONDS) .untilAsserted(this::initConnection); this.initializeClickhouseTable(); this.batchInsertData(); @@ -207,6 +207,12 @@ private void batchInsertData() { preparedStatement.setArray(22, toSqlArray(row.getField(21))); preparedStatement.setArray(23, toSqlArray(row.getField(22))); preparedStatement.setArray(24, toSqlArray(row.getField(23))); + preparedStatement.setObject(25, row.getField(24)); + preparedStatement.setObject(26, row.getField(25)); + preparedStatement.setObject(27, row.getField(26)); + preparedStatement.setObject(28, row.getField(27)); + preparedStatement.setObject(29, row.getField(28)); + preparedStatement.setObject(30, row.getField(29)); preparedStatement.addBatch(); } preparedStatement.executeBatch(); @@ -250,7 +256,13 @@ private static Tuple2> generateTestDataSet( "c_lowcardinality", "c_nested.int", "c_nested.double", - "c_nested.string" + "c_nested.string", + "c_int128", + "c_uint128", + "c_int256", + "c_uint256", + "c_point", + "c_ring" }, new SeaTunnelDataType[]{ BasicType.LONG_TYPE, @@ -276,7 +288,13 @@ private static Tuple2> generateTestDataSet( BasicType.STRING_TYPE, ArrayType.INT_ARRAY_TYPE, ArrayType.DOUBLE_ARRAY_TYPE, - ArrayType.STRING_ARRAY_TYPE + ArrayType.STRING_ARRAY_TYPE, + BasicType.STRING_TYPE, + BasicType.STRING_TYPE, + BasicType.STRING_TYPE, + BasicType.STRING_TYPE, + BasicType.STRING_TYPE, + BasicType.STRING_TYPE }); List rows = new ArrayList<>(); for (int i = 0; i < 100; ++i) { @@ -305,7 +323,13 @@ private static Tuple2> generateTestDataSet( "string", new Integer[]{Integer.parseInt("1")}, new Double[]{Double.parseDouble("1.1")}, - new String[]{"1"} + new String[]{"1"}, + "170141183460469231731687303715884105727", + "340282366920938463463374607431768211455", + "57896044618658097711785492504343953926634992332820282019728792003956564819967", + "115792089237316195423570985008687907853269984665640564039457584007913129639935", + new double[]{1, 2}, + new double[][]{{2, 3}, {4, 5}} }); rows.add(row); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_to_clickhouse.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_to_clickhouse.conf index ed27371b824..536096b936c 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_to_clickhouse.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_to_clickhouse.conf @@ -72,7 +72,13 @@ sink { "c_lowcardinality", "c_nested.int", "c_nested.double", - "c_nested.string" + "c_nested.string", + "c_int128", + "c_uint128", + "c_int256", + "c_uint256", + "c_point", + "c_ring" ] username = "default" password = "" diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/init/clickhouse_init.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/init/clickhouse_init.conf index 108f4e5589c..78f2daa1d79 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/init/clickhouse_init.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/init/clickhouse_init.conf @@ -16,6 +16,7 @@ # source_table = """ +set allow_experimental_geo_types = 1; create table if not exists `default`.source_table( `id` Int64, `c_map` Map(String, Int32), @@ -43,7 +44,13 @@ create table if not exists `default`.source_table( `int` UInt32, `double` Float64, `string` String - ) + ), + `c_int128` Int128, + `c_uint128` UInt128, + `c_int256` Int256, + `c_uint256` UInt256, + `c_point` Point, + `c_ring` Ring )engine=Memory; """ @@ -75,7 +82,13 @@ create table if not exists `default`.sink_table( `int` UInt32, `double` Float64, `string` String - ) + ), + `c_int128` Int128, + `c_uint128` UInt128, + `c_int256` Int256, + `c_uint256` UInt256, + `c_point` Point, + `c_ring` Ring )engine=Memory; """ @@ -105,10 +118,16 @@ insert into `default`.source_table `c_lowcardinality`, `c_nested.int`, `c_nested.double`, - `c_nested.string` + `c_nested.string`, + `c_int128`, + `c_uint128`, + `c_int256`, + `c_uint256`, + `c_point`, + `c_ring` ) values -(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) +(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) """ compare_sql = """