diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java index d6e58825dab..ab0c061d2ce 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java @@ -165,7 +165,7 @@ public static Object[] skipReadAndSortSampleData( .createStatement( ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); - stmt.setFetchSize(Integer.MIN_VALUE); + stmt.setFetchSize(1024); rs = stmt.executeQuery(sampleQuery); int count = 0; diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java index aa7c03ccdb9..099d59d5c0c 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java @@ -314,10 +314,8 @@ default Object[] sampleDataFromColumn( quoteIdentifier(columnName), tableIdentifier(table.getTablePath())); } - try (Statement stmt = - connection.createStatement( - ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) { - stmt.setFetchSize(Integer.MIN_VALUE); + try (Statement stmt = connection.createStatement()) { + stmt.setFetchSize(1024); try (ResultSet rs = stmt.executeQuery(sampleQuery)) { int count = 0; List results = new ArrayList<>(); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java index b22a865a6af..f2ce15e31ee 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java @@ -33,8 +33,10 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; @@ -119,6 +121,44 @@ public TablePath parse(String tablePath) { return TablePath.of(tablePath, false); } + @Override + public Object[] sampleDataFromColumn( + Connection connection, JdbcSourceTable table, String columnName, int samplingRate) + throws SQLException { + String sampleQuery; + if (StringUtils.isNotBlank(table.getQuery())) { + sampleQuery = + String.format( + "SELECT %s FROM (%s) AS T", + quoteIdentifier(columnName), table.getQuery()); + } else { + sampleQuery = + String.format( + "SELECT %s FROM %s", + quoteIdentifier(columnName), tableIdentifier(table.getTablePath())); + } + + try (Statement stmt = + connection.createStatement( + ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) { + stmt.setFetchSize(Integer.MIN_VALUE); + try (ResultSet rs = stmt.executeQuery(sampleQuery)) { + int count = 0; + List results = new ArrayList<>(); + + while (rs.next()) { + count++; + if (count % samplingRate == 0) { + results.add(rs.getObject(1)); + } + } + Object[] resultsArray = results.toArray(); + Arrays.sort(resultsArray); + return resultsArray; + } + } + } + @Override public Long approximateRowCntStatement(Connection connection, JdbcSourceTable table) throws SQLException { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java index ca7bda75c2b..a1c2c30fe6c 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java @@ -18,12 +18,17 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle.OracleCatalog; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle.OracleURLParser; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleDialect; +import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable; import org.apache.commons.lang3.tuple.Pair; +import org.junit.jupiter.api.Test; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.OracleContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; @@ -35,6 +40,7 @@ import java.math.BigDecimal; import java.sql.Date; +import java.sql.SQLException; import java.sql.Timestamp; import java.time.LocalDate; import java.time.LocalDateTime; @@ -95,6 +101,16 @@ public class JdbcOracleIT extends AbstractJdbcIT { "XML_TYPE_COL" }; + @Test + public void testSampleDataFromColumnSuccess() throws SQLException { + JdbcDialect dialect = new OracleDialect(); + JdbcSourceTable table = + JdbcSourceTable.builder() + .tablePath(TablePath.of(null, SCHEMA, SOURCE_TABLE)) + .build(); + dialect.sampleDataFromColumn(connection, table, "INTEGER_COL", 1); + } + @Override JdbcCase getJdbcCase() { Map containerEnv = new HashMap<>();