Skip to content

Commit

Permalink
[Feature][PostgreSQL-jdbc] Supports GEOMETRY data type for PostgreSQL… (
Browse files Browse the repository at this point in the history
#4673)



---------

Co-authored-by: zhilinli <[email protected]>
  • Loading branch information
zhilinli123 and zhilinli authored May 15, 2023
1 parent 8f66ce9 commit a5af4d9
Show file tree
Hide file tree
Showing 9 changed files with 502 additions and 138 deletions.
11 changes: 11 additions & 0 deletions seatunnel-connectors-v2/connector-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
<redshift.version>2.1.0.9</redshift.version>
<saphana.version>2.14.7</saphana.version>
<vertica.version>12.0.3-0</vertica.version>
<postgis.jdbc.version>2.5.1</postgis.jdbc.version>
</properties>

<dependencyManagement>
Expand All @@ -66,6 +67,12 @@
<version>${postgresql.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>net.postgis</groupId>
<artifactId>postgis-jdbc</artifactId>
<version>${postgis.jdbc.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.dameng</groupId>
<artifactId>DmJdbcDriver18</artifactId>
Expand Down Expand Up @@ -149,6 +156,10 @@
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
</dependency>
<dependency>
<groupId>net.postgis</groupId>
<artifactId>postgis-jdbc</artifactId>
</dependency>
<dependency>
<groupId>com.dameng</groupId>
<artifactId>DmJdbcDriver18</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,108 @@

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql;

import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;

import java.sql.Date;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Locale;
import java.util.Optional;

public class PostgresJdbcRowConverter extends AbstractJdbcRowConverter {

private static final String PG_GEOMETRY = "GEOMETRY";
private static final String PG_GEOGRAPHY = "GEOGRAPHY";

@Override
public String converterName() {
return null;
}

@Override
@SuppressWarnings("checkstyle:Indentation")
public SeaTunnelRow toInternal(ResultSet rs, SeaTunnelRowType typeInfo) throws SQLException {
Object[] fields = new Object[typeInfo.getTotalFields()];
for (int fieldIndex = 0; fieldIndex < typeInfo.getTotalFields(); fieldIndex++) {
SeaTunnelDataType<?> seaTunnelDataType = typeInfo.getFieldType(fieldIndex);
int resultSetIndex = fieldIndex + 1;
String metaDataColumnType =
rs.getMetaData().getColumnTypeName(resultSetIndex).toUpperCase(Locale.ROOT);
switch (seaTunnelDataType.getSqlType()) {
case STRING:
if (metaDataColumnType.equals(PG_GEOMETRY)
|| metaDataColumnType.equals(PG_GEOGRAPHY)) {
fields[fieldIndex] =
rs.getObject(resultSetIndex) == null
? null
: rs.getObject(resultSetIndex).toString();
} else {
fields[fieldIndex] = rs.getString(resultSetIndex);
}
break;
case BOOLEAN:
fields[fieldIndex] = rs.getBoolean(resultSetIndex);
break;
case TINYINT:
fields[fieldIndex] = rs.getByte(resultSetIndex);
break;
case SMALLINT:
fields[fieldIndex] = rs.getShort(resultSetIndex);
break;
case INT:
fields[fieldIndex] = rs.getInt(resultSetIndex);
break;
case BIGINT:
fields[fieldIndex] = rs.getLong(resultSetIndex);
break;
case FLOAT:
fields[fieldIndex] = rs.getFloat(resultSetIndex);
break;
case DOUBLE:
fields[fieldIndex] = rs.getDouble(resultSetIndex);
break;
case DECIMAL:
fields[fieldIndex] = rs.getBigDecimal(resultSetIndex);
break;
case DATE:
Date sqlDate = rs.getDate(resultSetIndex);
fields[fieldIndex] =
Optional.ofNullable(sqlDate).map(e -> e.toLocalDate()).orElse(null);
break;
case TIME:
Time sqlTime = rs.getTime(resultSetIndex);
fields[fieldIndex] =
Optional.ofNullable(sqlTime).map(e -> e.toLocalTime()).orElse(null);
break;
case TIMESTAMP:
Timestamp sqlTimestamp = rs.getTimestamp(resultSetIndex);
fields[fieldIndex] =
Optional.ofNullable(sqlTimestamp)
.map(e -> e.toLocalDateTime())
.orElse(null);
break;
case BYTES:
fields[fieldIndex] = rs.getBytes(resultSetIndex);
break;
case NULL:
fields[fieldIndex] = null;
break;
case MAP:
case ARRAY:
case ROW:
default:
throw new JdbcConnectorException(
CommonErrorCode.UNSUPPORTED_DATA_TYPE,
"Unexpected value: " + seaTunnelDataType);
}
}
return new SeaTunnelRow(fields);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ public class PostgresTypeMapper implements JdbcDialectTypeMapper {
private static final String PG_CHARACTER_ARRAY = "_character";
private static final String PG_CHARACTER_VARYING = "varchar";
private static final String PG_CHARACTER_VARYING_ARRAY = "_varchar";
private static final String PG_GEOMETRY = "geometry";
private static final String PG_GEOGRAPHY = "geography";

@SuppressWarnings("checkstyle:MagicNumber")
@Override
Expand Down Expand Up @@ -135,6 +137,8 @@ public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int colIndex)
case PG_CHARACTER:
case PG_CHARACTER_VARYING:
case PG_TEXT:
case PG_GEOMETRY:
case PG_GEOGRAPHY:
return BasicType.STRING_TYPE;
case PG_CHAR_ARRAY:
case PG_CHARACTER_ARRAY:
Expand Down
7 changes: 7 additions & 0 deletions seatunnel-dist/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
<properties>
<mysql.version>8.0.27</mysql.version>
<postgresql.version>42.4.3</postgresql.version>
<postgis.jdbc.version>2.5.1</postgis.jdbc.version>
<dm-jdbc.version>8.1.2.141</dm-jdbc.version>
<sqlserver.version>9.2.1.jre8</sqlserver.version>
<phoenix.version>5.2.5-HBase-2.x</phoenix.version>
Expand Down Expand Up @@ -512,6 +513,12 @@
<version>${postgresql.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>net.postgis</groupId>
<artifactId>postgis-jdbc</artifactId>
<version>${postgis.jdbc.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.dameng</groupId>
<artifactId>DmJdbcDriver18</artifactId>
Expand Down
Loading

0 comments on commit a5af4d9

Please sign in to comment.