Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][Clickhouse-V2] Clickhouse Support Geo type #3141

Merged
merged 15 commits into from
Nov 2, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/en/connector-v2/sink/Clickhouse.md
Original file line number Diff line number Diff line change
@@ -126,3 +126,9 @@ sink {

### 2.3.0-beta 2022-10-20
- [Improve] Clickhouse Support Int128,Int256 Type ([3067](https://github.com/apache/incubator-seatunnel/pull/3067))

### next version

- [Improve] Clickhouse Sink support nest type and array type([3047](https://github.com/apache/incubator-seatunnel/pull/3047))

- [Improve] Clickhouse Sink support geo type([3141](https://github.com/apache/incubator-seatunnel/pull/3141))
5 changes: 5 additions & 0 deletions docs/en/connector-v2/source/Clickhouse.md
Original file line number Diff line number Diff line change
@@ -93,4 +93,9 @@ source {

- [Improve] Clickhouse Source random use host when config multi-host ([3108](https://github.com/apache/incubator-seatunnel/pull/3108))

### next version

- [Improve] Clickhouse Source support nest type and array type([3047](https://github.com/apache/incubator-seatunnel/pull/3047))

- [Improve] Clickhouse Source support geo type([3141](https://github.com/apache/incubator-seatunnel/pull/3141))

10 changes: 10 additions & 0 deletions seatunnel-connectors-v2/connector-clickhouse/pom.xml
Original file line number Diff line number Diff line change
@@ -62,6 +62,16 @@
<version>${clickhouse.version}</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -17,22 +17,54 @@

package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.sql.PreparedStatement;
import java.sql.SQLException;

public class StringInjectFunction implements ClickhouseFieldInjectFunction {

private String fieldType;

@Override
public void injectFields(PreparedStatement statement, int index, Object value) throws SQLException {
statement.setString(index, value.toString());
ObjectMapper mapper = new ObjectMapper();
try {
if ("Point".equals(fieldType)) {
statement.setObject(index, mapper.readValue(replace(value.toString()), double[].class));
} else if ("Ring".equals(fieldType)) {
statement.setObject(index, mapper.readValue(replace(value.toString()), double[][].class));
} else if ("Polygon".equals(fieldType)) {
statement.setObject(index, mapper.readValue(replace(value.toString()), double[][][].class));
} else if ("MultiPolygon".equals(fieldType)) {
statement.setObject(index, mapper.readValue(replace(value.toString()), double[][][][].class));
} else {
statement.setString(index, value.toString());
}
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}

@Override
public boolean isCurrentFieldType(String fieldType) {
return "String".equals(fieldType)
if ("String".equals(fieldType)
|| "Int128".equals(fieldType)
|| "UInt128".equals(fieldType)
|| "Int256".equals(fieldType)
|| "UInt256".equals(fieldType);
|| "UInt256".equals(fieldType)
|| "Point".equals(fieldType)
|| "Ring".equals(fieldType)
|| "Polygon".equals(fieldType)
|| "MultiPolygon".equals(fieldType)) {
this.fieldType = fieldType;
return true;
}
return false;
}

private static String replace(String str) {
return str.replaceAll("\\(", "[").replaceAll("\\)", "]");
}
}
Original file line number Diff line number Diff line change
@@ -44,5 +44,18 @@
<version>${clickhouse.jdbc.version}</version>
<scope>test</scope>
</dependency>
<!-- connector -->
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-clickhouse</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-e2e-common</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -112,7 +112,7 @@ public void startUp() throws Exception {
Awaitility.given()
.ignoreExceptions()
.await()
.atMost(180L, TimeUnit.SECONDS)
.atMost(360L, TimeUnit.SECONDS)
.untilAsserted(this::initConnection);
this.initializeClickhouseTable();
this.batchInsertData();
@@ -207,6 +207,12 @@ private void batchInsertData() {
preparedStatement.setArray(22, toSqlArray(row.getField(21)));
preparedStatement.setArray(23, toSqlArray(row.getField(22)));
preparedStatement.setArray(24, toSqlArray(row.getField(23)));
preparedStatement.setObject(25, row.getField(24));
preparedStatement.setObject(26, row.getField(25));
preparedStatement.setObject(27, row.getField(26));
preparedStatement.setObject(28, row.getField(27));
preparedStatement.setObject(29, row.getField(28));
preparedStatement.setObject(30, row.getField(29));
preparedStatement.addBatch();
}
preparedStatement.executeBatch();
@@ -250,7 +256,13 @@ private static Tuple2<SeaTunnelRowType, List<SeaTunnelRow>> generateTestDataSet(
"c_lowcardinality",
"c_nested.int",
"c_nested.double",
"c_nested.string"
"c_nested.string",
"c_int128",
"c_uint128",
"c_int256",
"c_uint256",
"c_point",
"c_ring"
},
new SeaTunnelDataType[]{
BasicType.LONG_TYPE,
@@ -276,7 +288,13 @@ private static Tuple2<SeaTunnelRowType, List<SeaTunnelRow>> generateTestDataSet(
BasicType.STRING_TYPE,
ArrayType.INT_ARRAY_TYPE,
ArrayType.DOUBLE_ARRAY_TYPE,
ArrayType.STRING_ARRAY_TYPE
ArrayType.STRING_ARRAY_TYPE,
BasicType.STRING_TYPE,
BasicType.STRING_TYPE,
BasicType.STRING_TYPE,
BasicType.STRING_TYPE,
BasicType.STRING_TYPE,
BasicType.STRING_TYPE
});
List<SeaTunnelRow> rows = new ArrayList<>();
for (int i = 0; i < 100; ++i) {
@@ -305,7 +323,13 @@ private static Tuple2<SeaTunnelRowType, List<SeaTunnelRow>> generateTestDataSet(
"string",
new Integer[]{Integer.parseInt("1")},
new Double[]{Double.parseDouble("1.1")},
new String[]{"1"}
new String[]{"1"},
"170141183460469231731687303715884105727",
"340282366920938463463374607431768211455",
"57896044618658097711785492504343953926634992332820282019728792003956564819967",
"115792089237316195423570985008687907853269984665640564039457584007913129639935",
new double[]{1, 2},
new double[][]{{2, 3}, {4, 5}}
});
rows.add(row);
}
Original file line number Diff line number Diff line change
@@ -72,7 +72,13 @@ sink {
"c_lowcardinality",
"c_nested.int",
"c_nested.double",
"c_nested.string"
"c_nested.string",
"c_int128",
"c_uint128",
"c_int256",
"c_uint256",
"c_point",
"c_ring"
]
username = "default"
password = ""
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@
#

source_table = """
set allow_experimental_geo_types = 1;
create table if not exists `default`.source_table(
`id` Int64,
`c_map` Map(String, Int32),
@@ -43,7 +44,13 @@ create table if not exists `default`.source_table(
`int` UInt32,
`double` Float64,
`string` String
)
),
`c_int128` Int128,
`c_uint128` UInt128,
`c_int256` Int256,
`c_uint256` UInt256,
`c_point` Point,
`c_ring` Ring
)engine=Memory;
"""

@@ -75,7 +82,13 @@ create table if not exists `default`.sink_table(
`int` UInt32,
`double` Float64,
`string` String
)
),
`c_int128` Int128,
`c_uint128` UInt128,
`c_int256` Int256,
`c_uint256` UInt256,
`c_point` Point,
`c_ring` Ring
)engine=Memory;
"""

@@ -105,10 +118,16 @@ insert into `default`.source_table
`c_lowcardinality`,
`c_nested.int`,
`c_nested.double`,
`c_nested.string`
`c_nested.string`,
`c_int128`,
`c_uint128`,
`c_int256`,
`c_uint256`,
`c_point`,
`c_ring`
)
values
(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
"""

compare_sql = """