Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][JDBC] Optimized code style for getting jdbc field types #6583

Merged
merged 1 commit into from
Mar 27, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
[Improve][JDBC] Optimized code style for getting jdbc field types
zhilinli123 committed Mar 26, 2024
commit 34bc6655cb5328ee6cb025c8bc42e2c82272668d
Original file line number Diff line number Diff line change
@@ -23,7 +23,7 @@
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcUtils;
import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcFieldTypeUtils;

import lombok.extern.slf4j.Slf4j;

@@ -56,49 +56,49 @@ public SeaTunnelRow toInternal(ResultSet rs, TableSchema tableSchema) throws SQL
int resultSetIndex = fieldIndex + 1;
switch (seaTunnelDataType.getSqlType()) {
case STRING:
fields[fieldIndex] = JdbcUtils.getString(rs, resultSetIndex);
fields[fieldIndex] = JdbcFieldTypeUtils.getString(rs, resultSetIndex);
break;
case BOOLEAN:
fields[fieldIndex] = JdbcUtils.getBoolean(rs, resultSetIndex);
fields[fieldIndex] = JdbcFieldTypeUtils.getBoolean(rs, resultSetIndex);
break;
case TINYINT:
fields[fieldIndex] = JdbcUtils.getByte(rs, resultSetIndex);
fields[fieldIndex] = JdbcFieldTypeUtils.getByte(rs, resultSetIndex);
break;
case SMALLINT:
fields[fieldIndex] = JdbcUtils.getShort(rs, resultSetIndex);
fields[fieldIndex] = JdbcFieldTypeUtils.getShort(rs, resultSetIndex);
break;
case INT:
fields[fieldIndex] = JdbcUtils.getInt(rs, resultSetIndex);
fields[fieldIndex] = JdbcFieldTypeUtils.getInt(rs, resultSetIndex);
break;
case BIGINT:
fields[fieldIndex] = JdbcUtils.getLong(rs, resultSetIndex);
fields[fieldIndex] = JdbcFieldTypeUtils.getLong(rs, resultSetIndex);
break;
case FLOAT:
fields[fieldIndex] = JdbcUtils.getFloat(rs, resultSetIndex);
fields[fieldIndex] = JdbcFieldTypeUtils.getFloat(rs, resultSetIndex);
break;
case DOUBLE:
fields[fieldIndex] = JdbcUtils.getDouble(rs, resultSetIndex);
fields[fieldIndex] = JdbcFieldTypeUtils.getDouble(rs, resultSetIndex);
break;
case DECIMAL:
fields[fieldIndex] = JdbcUtils.getBigDecimal(rs, resultSetIndex);
fields[fieldIndex] = JdbcFieldTypeUtils.getBigDecimal(rs, resultSetIndex);
break;
case DATE:
Date sqlDate = JdbcUtils.getDate(rs, resultSetIndex);
Date sqlDate = JdbcFieldTypeUtils.getDate(rs, resultSetIndex);
fields[fieldIndex] =
Optional.ofNullable(sqlDate).map(e -> e.toLocalDate()).orElse(null);
break;
case TIME:
fields[fieldIndex] = readTime(rs, resultSetIndex);
break;
case TIMESTAMP:
Timestamp sqlTimestamp = JdbcUtils.getTimestamp(rs, resultSetIndex);
Timestamp sqlTimestamp = JdbcFieldTypeUtils.getTimestamp(rs, resultSetIndex);
fields[fieldIndex] =
Optional.ofNullable(sqlTimestamp)
.map(e -> e.toLocalDateTime())
.orElse(null);
break;
case BYTES:
fields[fieldIndex] = JdbcUtils.getBytes(rs, resultSetIndex);
fields[fieldIndex] = JdbcFieldTypeUtils.getBytes(rs, resultSetIndex);
break;
case NULL:
fields[fieldIndex] = null;
@@ -116,7 +116,7 @@ public SeaTunnelRow toInternal(ResultSet rs, TableSchema tableSchema) throws SQL
}

protected LocalTime readTime(ResultSet rs, int resultSetIndex) throws SQLException {
Time sqlTime = JdbcUtils.getTime(rs, resultSetIndex);
Time sqlTime = JdbcFieldTypeUtils.getTime(rs, resultSetIndex);
return Optional.ofNullable(sqlTime).map(e -> e.toLocalTime()).orElse(null);
}

Original file line number Diff line number Diff line change
@@ -25,7 +25,7 @@
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcUtils;
import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcFieldTypeUtils;

import java.math.BigDecimal;
import java.sql.Date;
@@ -56,51 +56,51 @@ public SeaTunnelRow toInternal(ResultSet rs, TableSchema tableSchema) throws SQL
int resultSetIndex = fieldIndex + 1;
switch (seaTunnelDataType.getSqlType()) {
case STRING:
fields[fieldIndex] = JdbcUtils.getString(rs, resultSetIndex);
fields[fieldIndex] = JdbcFieldTypeUtils.getString(rs, resultSetIndex);
break;
case BOOLEAN:
fields[fieldIndex] = JdbcUtils.getBoolean(rs, resultSetIndex);
fields[fieldIndex] = JdbcFieldTypeUtils.getBoolean(rs, resultSetIndex);
break;
case TINYINT:
fields[fieldIndex] = JdbcUtils.getByte(rs, resultSetIndex);
fields[fieldIndex] = JdbcFieldTypeUtils.getByte(rs, resultSetIndex);
break;
case SMALLINT:
fields[fieldIndex] = JdbcUtils.getShort(rs, resultSetIndex);
fields[fieldIndex] = JdbcFieldTypeUtils.getShort(rs, resultSetIndex);
break;
case INT:
fields[fieldIndex] = JdbcUtils.getInt(rs, resultSetIndex);
fields[fieldIndex] = JdbcFieldTypeUtils.getInt(rs, resultSetIndex);
break;
case BIGINT:
fields[fieldIndex] = JdbcUtils.getLong(rs, resultSetIndex);
fields[fieldIndex] = JdbcFieldTypeUtils.getLong(rs, resultSetIndex);
break;
case FLOAT:
fields[fieldIndex] = JdbcUtils.getFloat(rs, resultSetIndex);
fields[fieldIndex] = JdbcFieldTypeUtils.getFloat(rs, resultSetIndex);
break;
case DOUBLE:
fields[fieldIndex] = JdbcUtils.getDouble(rs, resultSetIndex);
fields[fieldIndex] = JdbcFieldTypeUtils.getDouble(rs, resultSetIndex);
break;
case DECIMAL:
fields[fieldIndex] = JdbcUtils.getBigDecimal(rs, resultSetIndex);
fields[fieldIndex] = JdbcFieldTypeUtils.getBigDecimal(rs, resultSetIndex);
break;
case DATE:
Date sqlDate = JdbcUtils.getDate(rs, resultSetIndex);
Date sqlDate = JdbcFieldTypeUtils.getDate(rs, resultSetIndex);
fields[fieldIndex] =
Optional.ofNullable(sqlDate).map(Date::toLocalDate).orElse(null);
break;
case TIME:
Time sqlTime = JdbcUtils.getTime(rs, resultSetIndex);
Time sqlTime = JdbcFieldTypeUtils.getTime(rs, resultSetIndex);
fields[fieldIndex] =
Optional.ofNullable(sqlTime).map(Time::toLocalTime).orElse(null);
break;
case TIMESTAMP:
Timestamp sqlTimestamp = JdbcUtils.getTimestamp(rs, resultSetIndex);
Timestamp sqlTimestamp = JdbcFieldTypeUtils.getTimestamp(rs, resultSetIndex);
fields[fieldIndex] =
Optional.ofNullable(sqlTimestamp)
.map(Timestamp::toLocalDateTime)
.orElse(null);
break;
case BYTES:
fields[fieldIndex] = JdbcUtils.getBytes(rs, resultSetIndex);
fields[fieldIndex] = JdbcFieldTypeUtils.getBytes(rs, resultSetIndex);
break;
case NULL:
fields[fieldIndex] = null;
Original file line number Diff line number Diff line change
@@ -26,7 +26,7 @@
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcUtils;
import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcFieldTypeUtils;

import java.sql.Array;
import java.sql.Date;
@@ -65,52 +65,52 @@ public SeaTunnelRow toInternal(ResultSet rs, TableSchema tableSchema) throws SQL
? null
: rs.getObject(resultSetIndex).toString();
} else {
fields[fieldIndex] = JdbcUtils.getString(rs, resultSetIndex);
fields[fieldIndex] = JdbcFieldTypeUtils.getString(rs, resultSetIndex);
}
break;
case BOOLEAN:
fields[fieldIndex] = JdbcUtils.getBoolean(rs, resultSetIndex);
fields[fieldIndex] = JdbcFieldTypeUtils.getBoolean(rs, resultSetIndex);
break;
case TINYINT:
fields[fieldIndex] = JdbcUtils.getByte(rs, resultSetIndex);
fields[fieldIndex] = JdbcFieldTypeUtils.getByte(rs, resultSetIndex);
break;
case SMALLINT:
fields[fieldIndex] = JdbcUtils.getShort(rs, resultSetIndex);
fields[fieldIndex] = JdbcFieldTypeUtils.getShort(rs, resultSetIndex);
break;
case INT:
fields[fieldIndex] = JdbcUtils.getInt(rs, resultSetIndex);
fields[fieldIndex] = JdbcFieldTypeUtils.getInt(rs, resultSetIndex);
break;
case BIGINT:
fields[fieldIndex] = JdbcUtils.getLong(rs, resultSetIndex);
fields[fieldIndex] = JdbcFieldTypeUtils.getLong(rs, resultSetIndex);
break;
case FLOAT:
fields[fieldIndex] = JdbcUtils.getFloat(rs, resultSetIndex);
fields[fieldIndex] = JdbcFieldTypeUtils.getFloat(rs, resultSetIndex);
break;
case DOUBLE:
fields[fieldIndex] = JdbcUtils.getDouble(rs, resultSetIndex);
fields[fieldIndex] = JdbcFieldTypeUtils.getDouble(rs, resultSetIndex);
break;
case DECIMAL:
fields[fieldIndex] = JdbcUtils.getBigDecimal(rs, resultSetIndex);
fields[fieldIndex] = JdbcFieldTypeUtils.getBigDecimal(rs, resultSetIndex);
break;
case DATE:
Date sqlDate = JdbcUtils.getDate(rs, resultSetIndex);
Date sqlDate = JdbcFieldTypeUtils.getDate(rs, resultSetIndex);
fields[fieldIndex] =
Optional.ofNullable(sqlDate).map(e -> e.toLocalDate()).orElse(null);
break;
case TIME:
Time sqlTime = JdbcUtils.getTime(rs, resultSetIndex);
Time sqlTime = JdbcFieldTypeUtils.getTime(rs, resultSetIndex);
fields[fieldIndex] =
Optional.ofNullable(sqlTime).map(e -> e.toLocalTime()).orElse(null);
break;
case TIMESTAMP:
Timestamp sqlTimestamp = JdbcUtils.getTimestamp(rs, resultSetIndex);
Timestamp sqlTimestamp = JdbcFieldTypeUtils.getTimestamp(rs, resultSetIndex);
fields[fieldIndex] =
Optional.ofNullable(sqlTimestamp)
.map(e -> e.toLocalDateTime())
.orElse(null);
break;
case BYTES:
fields[fieldIndex] = JdbcUtils.getBytes(rs, resultSetIndex);
fields[fieldIndex] = JdbcFieldTypeUtils.getBytes(rs, resultSetIndex);
break;
case NULL:
fields[fieldIndex] = null;
Original file line number Diff line number Diff line change
@@ -25,7 +25,7 @@
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcUtils;
import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcFieldTypeUtils;

import java.math.BigDecimal;
import java.sql.PreparedStatement;
@@ -46,7 +46,7 @@ public String converterName() {

@Override
protected LocalTime readTime(ResultSet rs, int resultSetIndex) throws SQLException {
Timestamp sqlTime = JdbcUtils.getTimestamp(rs, resultSetIndex);
Timestamp sqlTime = JdbcFieldTypeUtils.getTimestamp(rs, resultSetIndex);
return Optional.ofNullable(sqlTime)
.map(e -> e.toLocalDateTime().toLocalTime())
.orElse(null);
Original file line number Diff line number Diff line change
@@ -23,61 +23,40 @@
import java.sql.Time;
import java.sql.Timestamp;

public final class JdbcUtils {
public final class JdbcFieldTypeUtils {

private JdbcUtils() {}

public static String getString(ResultSet resultSet, int columnIndex) throws SQLException {
return resultSet.getString(columnIndex);
}
private JdbcFieldTypeUtils() {}

public static Boolean getBoolean(ResultSet resultSet, int columnIndex) throws SQLException {
if (null == resultSet.getObject(columnIndex)) {
return null;
}
return resultSet.getBoolean(columnIndex);
return getNullableValue(resultSet, columnIndex, ResultSet::getBoolean);
}

public static Byte getByte(ResultSet resultSet, int columnIndex) throws SQLException {
if (null == resultSet.getObject(columnIndex)) {
return null;
}
return resultSet.getByte(columnIndex);
return getNullableValue(resultSet, columnIndex, ResultSet::getByte);
}

public static Short getShort(ResultSet resultSet, int columnIndex) throws SQLException {
if (null == resultSet.getObject(columnIndex)) {
return null;
}
return resultSet.getShort(columnIndex);
return getNullableValue(resultSet, columnIndex, ResultSet::getShort);
}

public static Integer getInt(ResultSet resultSet, int columnIndex) throws SQLException {
if (null == resultSet.getObject(columnIndex)) {
return null;
}
return resultSet.getInt(columnIndex);
return getNullableValue(resultSet, columnIndex, ResultSet::getInt);
}

public static Long getLong(ResultSet resultSet, int columnIndex) throws SQLException {
if (null == resultSet.getObject(columnIndex)) {
return null;
}
return resultSet.getLong(columnIndex);
return getNullableValue(resultSet, columnIndex, ResultSet::getLong);
}

public static Float getFloat(ResultSet resultSet, int columnIndex) throws SQLException {
if (null == resultSet.getObject(columnIndex)) {
return null;
}
return resultSet.getFloat(columnIndex);
return getNullableValue(resultSet, columnIndex, ResultSet::getFloat);
}

public static Double getDouble(ResultSet resultSet, int columnIndex) throws SQLException {
if (null == resultSet.getObject(columnIndex)) {
return null;
}
return resultSet.getDouble(columnIndex);
return getNullableValue(resultSet, columnIndex, ResultSet::getDouble);
}

public static String getString(ResultSet resultSet, int columnIndex) throws SQLException {
return resultSet.getString(columnIndex);
}

public static BigDecimal getBigDecimal(ResultSet resultSet, int columnIndex)
@@ -98,9 +77,22 @@ public static Timestamp getTimestamp(ResultSet resultSet, int columnIndex) throw
}

public static byte[] getBytes(ResultSet resultSet, int columnIndex) throws SQLException {
if (null == resultSet.getObject(columnIndex)) {
return resultSet.getBytes(columnIndex);
}

private static <T> T getNullableValue(
ResultSet resultSet,
int columnIndex,
ThrowingFunction<ResultSet, T, SQLException> getter)
throws SQLException {
if (resultSet.getObject(columnIndex) == null) {
return null;
}
return resultSet.getBytes(columnIndex);
return getter.apply(resultSet, columnIndex);
}

@FunctionalInterface
private interface ThrowingFunction<T, R, E extends Exception> {
R apply(T t, int columnIndex) throws E;
}
}