From 34bc6655cb5328ee6cb025c8bc42e2c82272668d Mon Sep 17 00:00:00 2001 From: zhilinli Date: Tue, 26 Mar 2024 20:08:54 +0800 Subject: [PATCH] [Improve][JDBC] Optimized code style for getting jdbc field types --- .../converter/AbstractJdbcRowConverter.java | 28 ++++---- .../kingbase/KingbaseJdbcRowConverter.java | 28 ++++---- .../psql/PostgresJdbcRowConverter.java | 28 ++++---- .../sqlserver/SqlserverJdbcRowConverter.java | 4 +- ...JdbcUtils.java => JdbcFieldTypeUtils.java} | 64 ++++++++----------- 5 files changed, 72 insertions(+), 80 deletions(-) rename seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/{JdbcUtils.java => JdbcFieldTypeUtils.java} (71%) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java index 5a4a6b60d85..8ff8ac47d7c 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java @@ -23,7 +23,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException; -import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcUtils; +import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcFieldTypeUtils; import lombok.extern.slf4j.Slf4j; @@ -56,34 +56,34 @@ public SeaTunnelRow toInternal(ResultSet rs, TableSchema tableSchema) throws SQL int resultSetIndex = fieldIndex + 1; switch (seaTunnelDataType.getSqlType()) { case STRING: - fields[fieldIndex] = JdbcUtils.getString(rs, resultSetIndex); + fields[fieldIndex] = JdbcFieldTypeUtils.getString(rs, resultSetIndex); break; case BOOLEAN: - fields[fieldIndex] = JdbcUtils.getBoolean(rs, resultSetIndex); + fields[fieldIndex] = JdbcFieldTypeUtils.getBoolean(rs, resultSetIndex); break; case TINYINT: - fields[fieldIndex] = JdbcUtils.getByte(rs, resultSetIndex); + fields[fieldIndex] = JdbcFieldTypeUtils.getByte(rs, resultSetIndex); break; case SMALLINT: - fields[fieldIndex] = JdbcUtils.getShort(rs, resultSetIndex); + fields[fieldIndex] = JdbcFieldTypeUtils.getShort(rs, resultSetIndex); break; case INT: - fields[fieldIndex] = JdbcUtils.getInt(rs, resultSetIndex); + fields[fieldIndex] = JdbcFieldTypeUtils.getInt(rs, resultSetIndex); break; case BIGINT: - fields[fieldIndex] = JdbcUtils.getLong(rs, resultSetIndex); + fields[fieldIndex] = JdbcFieldTypeUtils.getLong(rs, resultSetIndex); break; case FLOAT: - fields[fieldIndex] = JdbcUtils.getFloat(rs, resultSetIndex); + fields[fieldIndex] = JdbcFieldTypeUtils.getFloat(rs, resultSetIndex); break; case DOUBLE: - fields[fieldIndex] = JdbcUtils.getDouble(rs, resultSetIndex); + fields[fieldIndex] = JdbcFieldTypeUtils.getDouble(rs, resultSetIndex); break; case DECIMAL: - fields[fieldIndex] = JdbcUtils.getBigDecimal(rs, resultSetIndex); + fields[fieldIndex] = JdbcFieldTypeUtils.getBigDecimal(rs, resultSetIndex); break; case DATE: - Date sqlDate = JdbcUtils.getDate(rs, resultSetIndex); + Date sqlDate = JdbcFieldTypeUtils.getDate(rs, resultSetIndex); fields[fieldIndex] = Optional.ofNullable(sqlDate).map(e -> e.toLocalDate()).orElse(null); break; @@ -91,14 +91,14 @@ public SeaTunnelRow toInternal(ResultSet rs, TableSchema tableSchema) throws SQL fields[fieldIndex] = readTime(rs, resultSetIndex); break; case TIMESTAMP: - Timestamp sqlTimestamp = JdbcUtils.getTimestamp(rs, resultSetIndex); + Timestamp sqlTimestamp = JdbcFieldTypeUtils.getTimestamp(rs, resultSetIndex); fields[fieldIndex] = Optional.ofNullable(sqlTimestamp) .map(e -> e.toLocalDateTime()) .orElse(null); break; case BYTES: - fields[fieldIndex] = JdbcUtils.getBytes(rs, resultSetIndex); + fields[fieldIndex] = JdbcFieldTypeUtils.getBytes(rs, resultSetIndex); break; case NULL: fields[fieldIndex] = null; @@ -116,7 +116,7 @@ public SeaTunnelRow toInternal(ResultSet rs, TableSchema tableSchema) throws SQL } protected LocalTime readTime(ResultSet rs, int resultSetIndex) throws SQLException { - Time sqlTime = JdbcUtils.getTime(rs, resultSetIndex); + Time sqlTime = JdbcFieldTypeUtils.getTime(rs, resultSetIndex); return Optional.ofNullable(sqlTime).map(e -> e.toLocalTime()).orElse(null); } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseJdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseJdbcRowConverter.java index 4aa41c0f4c1..4a9411b99b5 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseJdbcRowConverter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseJdbcRowConverter.java @@ -25,7 +25,7 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier; -import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcUtils; +import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcFieldTypeUtils; import java.math.BigDecimal; import java.sql.Date; @@ -56,51 +56,51 @@ public SeaTunnelRow toInternal(ResultSet rs, TableSchema tableSchema) throws SQL int resultSetIndex = fieldIndex + 1; switch (seaTunnelDataType.getSqlType()) { case STRING: - fields[fieldIndex] = JdbcUtils.getString(rs, resultSetIndex); + fields[fieldIndex] = JdbcFieldTypeUtils.getString(rs, resultSetIndex); break; case BOOLEAN: - fields[fieldIndex] = JdbcUtils.getBoolean(rs, resultSetIndex); + fields[fieldIndex] = JdbcFieldTypeUtils.getBoolean(rs, resultSetIndex); break; case TINYINT: - fields[fieldIndex] = JdbcUtils.getByte(rs, resultSetIndex); + fields[fieldIndex] = JdbcFieldTypeUtils.getByte(rs, resultSetIndex); break; case SMALLINT: - fields[fieldIndex] = JdbcUtils.getShort(rs, resultSetIndex); + fields[fieldIndex] = JdbcFieldTypeUtils.getShort(rs, resultSetIndex); break; case INT: - fields[fieldIndex] = JdbcUtils.getInt(rs, resultSetIndex); + fields[fieldIndex] = JdbcFieldTypeUtils.getInt(rs, resultSetIndex); break; case BIGINT: - fields[fieldIndex] = JdbcUtils.getLong(rs, resultSetIndex); + fields[fieldIndex] = JdbcFieldTypeUtils.getLong(rs, resultSetIndex); break; case FLOAT: - fields[fieldIndex] = JdbcUtils.getFloat(rs, resultSetIndex); + fields[fieldIndex] = JdbcFieldTypeUtils.getFloat(rs, resultSetIndex); break; case DOUBLE: - fields[fieldIndex] = JdbcUtils.getDouble(rs, resultSetIndex); + fields[fieldIndex] = JdbcFieldTypeUtils.getDouble(rs, resultSetIndex); break; case DECIMAL: - fields[fieldIndex] = JdbcUtils.getBigDecimal(rs, resultSetIndex); + fields[fieldIndex] = JdbcFieldTypeUtils.getBigDecimal(rs, resultSetIndex); break; case DATE: - Date sqlDate = JdbcUtils.getDate(rs, resultSetIndex); + Date sqlDate = JdbcFieldTypeUtils.getDate(rs, resultSetIndex); fields[fieldIndex] = Optional.ofNullable(sqlDate).map(Date::toLocalDate).orElse(null); break; case TIME: - Time sqlTime = JdbcUtils.getTime(rs, resultSetIndex); + Time sqlTime = JdbcFieldTypeUtils.getTime(rs, resultSetIndex); fields[fieldIndex] = Optional.ofNullable(sqlTime).map(Time::toLocalTime).orElse(null); break; case TIMESTAMP: - Timestamp sqlTimestamp = JdbcUtils.getTimestamp(rs, resultSetIndex); + Timestamp sqlTimestamp = JdbcFieldTypeUtils.getTimestamp(rs, resultSetIndex); fields[fieldIndex] = Optional.ofNullable(sqlTimestamp) .map(Timestamp::toLocalDateTime) .orElse(null); break; case BYTES: - fields[fieldIndex] = JdbcUtils.getBytes(rs, resultSetIndex); + fields[fieldIndex] = JdbcFieldTypeUtils.getBytes(rs, resultSetIndex); break; case NULL: fields[fieldIndex] = null; diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java index 171ab406f53..f1cd4f8ec98 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java @@ -26,7 +26,7 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier; -import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcUtils; +import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcFieldTypeUtils; import java.sql.Array; import java.sql.Date; @@ -65,52 +65,52 @@ public SeaTunnelRow toInternal(ResultSet rs, TableSchema tableSchema) throws SQL ? null : rs.getObject(resultSetIndex).toString(); } else { - fields[fieldIndex] = JdbcUtils.getString(rs, resultSetIndex); + fields[fieldIndex] = JdbcFieldTypeUtils.getString(rs, resultSetIndex); } break; case BOOLEAN: - fields[fieldIndex] = JdbcUtils.getBoolean(rs, resultSetIndex); + fields[fieldIndex] = JdbcFieldTypeUtils.getBoolean(rs, resultSetIndex); break; case TINYINT: - fields[fieldIndex] = JdbcUtils.getByte(rs, resultSetIndex); + fields[fieldIndex] = JdbcFieldTypeUtils.getByte(rs, resultSetIndex); break; case SMALLINT: - fields[fieldIndex] = JdbcUtils.getShort(rs, resultSetIndex); + fields[fieldIndex] = JdbcFieldTypeUtils.getShort(rs, resultSetIndex); break; case INT: - fields[fieldIndex] = JdbcUtils.getInt(rs, resultSetIndex); + fields[fieldIndex] = JdbcFieldTypeUtils.getInt(rs, resultSetIndex); break; case BIGINT: - fields[fieldIndex] = JdbcUtils.getLong(rs, resultSetIndex); + fields[fieldIndex] = JdbcFieldTypeUtils.getLong(rs, resultSetIndex); break; case FLOAT: - fields[fieldIndex] = JdbcUtils.getFloat(rs, resultSetIndex); + fields[fieldIndex] = JdbcFieldTypeUtils.getFloat(rs, resultSetIndex); break; case DOUBLE: - fields[fieldIndex] = JdbcUtils.getDouble(rs, resultSetIndex); + fields[fieldIndex] = JdbcFieldTypeUtils.getDouble(rs, resultSetIndex); break; case DECIMAL: - fields[fieldIndex] = JdbcUtils.getBigDecimal(rs, resultSetIndex); + fields[fieldIndex] = JdbcFieldTypeUtils.getBigDecimal(rs, resultSetIndex); break; case DATE: - Date sqlDate = JdbcUtils.getDate(rs, resultSetIndex); + Date sqlDate = JdbcFieldTypeUtils.getDate(rs, resultSetIndex); fields[fieldIndex] = Optional.ofNullable(sqlDate).map(e -> e.toLocalDate()).orElse(null); break; case TIME: - Time sqlTime = JdbcUtils.getTime(rs, resultSetIndex); + Time sqlTime = JdbcFieldTypeUtils.getTime(rs, resultSetIndex); fields[fieldIndex] = Optional.ofNullable(sqlTime).map(e -> e.toLocalTime()).orElse(null); break; case TIMESTAMP: - Timestamp sqlTimestamp = JdbcUtils.getTimestamp(rs, resultSetIndex); + Timestamp sqlTimestamp = JdbcFieldTypeUtils.getTimestamp(rs, resultSetIndex); fields[fieldIndex] = Optional.ofNullable(sqlTimestamp) .map(e -> e.toLocalDateTime()) .orElse(null); break; case BYTES: - fields[fieldIndex] = JdbcUtils.getBytes(rs, resultSetIndex); + fields[fieldIndex] = JdbcFieldTypeUtils.getBytes(rs, resultSetIndex); break; case NULL: fields[fieldIndex] = null; diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlserverJdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlserverJdbcRowConverter.java index efb17292444..5ae0dec1afa 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlserverJdbcRowConverter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlserverJdbcRowConverter.java @@ -25,7 +25,7 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier; -import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcUtils; +import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcFieldTypeUtils; import java.math.BigDecimal; import java.sql.PreparedStatement; @@ -46,7 +46,7 @@ public String converterName() { @Override protected LocalTime readTime(ResultSet rs, int resultSetIndex) throws SQLException { - Timestamp sqlTime = JdbcUtils.getTimestamp(rs, resultSetIndex); + Timestamp sqlTime = JdbcFieldTypeUtils.getTimestamp(rs, resultSetIndex); return Optional.ofNullable(sqlTime) .map(e -> e.toLocalDateTime().toLocalTime()) .orElse(null); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcUtils.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcFieldTypeUtils.java similarity index 71% rename from seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcUtils.java rename to seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcFieldTypeUtils.java index b9f7f1eac3f..ca8edb65769 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcUtils.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcFieldTypeUtils.java @@ -23,61 +23,40 @@ import java.sql.Time; import java.sql.Timestamp; -public final class JdbcUtils { +public final class JdbcFieldTypeUtils { - private JdbcUtils() {} - - public static String getString(ResultSet resultSet, int columnIndex) throws SQLException { - return resultSet.getString(columnIndex); - } + private JdbcFieldTypeUtils() {} public static Boolean getBoolean(ResultSet resultSet, int columnIndex) throws SQLException { - if (null == resultSet.getObject(columnIndex)) { - return null; - } - return resultSet.getBoolean(columnIndex); + return getNullableValue(resultSet, columnIndex, ResultSet::getBoolean); } public static Byte getByte(ResultSet resultSet, int columnIndex) throws SQLException { - if (null == resultSet.getObject(columnIndex)) { - return null; - } - return resultSet.getByte(columnIndex); + return getNullableValue(resultSet, columnIndex, ResultSet::getByte); } public static Short getShort(ResultSet resultSet, int columnIndex) throws SQLException { - if (null == resultSet.getObject(columnIndex)) { - return null; - } - return resultSet.getShort(columnIndex); + return getNullableValue(resultSet, columnIndex, ResultSet::getShort); } public static Integer getInt(ResultSet resultSet, int columnIndex) throws SQLException { - if (null == resultSet.getObject(columnIndex)) { - return null; - } - return resultSet.getInt(columnIndex); + return getNullableValue(resultSet, columnIndex, ResultSet::getInt); } public static Long getLong(ResultSet resultSet, int columnIndex) throws SQLException { - if (null == resultSet.getObject(columnIndex)) { - return null; - } - return resultSet.getLong(columnIndex); + return getNullableValue(resultSet, columnIndex, ResultSet::getLong); } public static Float getFloat(ResultSet resultSet, int columnIndex) throws SQLException { - if (null == resultSet.getObject(columnIndex)) { - return null; - } - return resultSet.getFloat(columnIndex); + return getNullableValue(resultSet, columnIndex, ResultSet::getFloat); } public static Double getDouble(ResultSet resultSet, int columnIndex) throws SQLException { - if (null == resultSet.getObject(columnIndex)) { - return null; - } - return resultSet.getDouble(columnIndex); + return getNullableValue(resultSet, columnIndex, ResultSet::getDouble); + } + + public static String getString(ResultSet resultSet, int columnIndex) throws SQLException { + return resultSet.getString(columnIndex); } public static BigDecimal getBigDecimal(ResultSet resultSet, int columnIndex) @@ -98,9 +77,22 @@ public static Timestamp getTimestamp(ResultSet resultSet, int columnIndex) throw } public static byte[] getBytes(ResultSet resultSet, int columnIndex) throws SQLException { - if (null == resultSet.getObject(columnIndex)) { + return resultSet.getBytes(columnIndex); + } + + private static T getNullableValue( + ResultSet resultSet, + int columnIndex, + ThrowingFunction getter) + throws SQLException { + if (resultSet.getObject(columnIndex) == null) { return null; } - return resultSet.getBytes(columnIndex); + return getter.apply(resultSet, columnIndex); + } + + @FunctionalInterface + private interface ThrowingFunction { + R apply(T t, int columnIndex) throws E; } }