diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml b/seatunnel-connectors-v2/connector-jdbc/pom.xml index f92f605a4eb..edeb4ccdef7 100644 --- a/seatunnel-connectors-v2/connector-jdbc/pom.xml +++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml @@ -44,6 +44,7 @@ 2.1.0.9 2.14.7 12.0.3-0 + 2.5.1 @@ -66,6 +67,12 @@ ${postgresql.version} provided + + net.postgis + postgis-jdbc + ${postgis.jdbc.version} + provided + com.dameng DmJdbcDriver18 @@ -149,6 +156,10 @@ org.postgresql postgresql + + net.postgis + postgis-jdbc + com.dameng DmJdbcDriver18 diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java index bfac414575c..2b7dc47a9e0 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java @@ -17,11 +17,108 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter; +import java.sql.Date; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.Locale; +import java.util.Optional; + public class PostgresJdbcRowConverter extends AbstractJdbcRowConverter { + + private static final String PG_GEOMETRY = "GEOMETRY"; + private static final String PG_GEOGRAPHY = "GEOGRAPHY"; + @Override public String converterName() { return null; } + + @Override + @SuppressWarnings("checkstyle:Indentation") + public SeaTunnelRow toInternal(ResultSet rs, SeaTunnelRowType typeInfo) throws SQLException { + Object[] fields = new Object[typeInfo.getTotalFields()]; + for (int fieldIndex = 0; fieldIndex < typeInfo.getTotalFields(); fieldIndex++) { + SeaTunnelDataType seaTunnelDataType = typeInfo.getFieldType(fieldIndex); + int resultSetIndex = fieldIndex + 1; + String metaDataColumnType = + rs.getMetaData().getColumnTypeName(resultSetIndex).toUpperCase(Locale.ROOT); + switch (seaTunnelDataType.getSqlType()) { + case STRING: + if (metaDataColumnType.equals(PG_GEOMETRY) + || metaDataColumnType.equals(PG_GEOGRAPHY)) { + fields[fieldIndex] = + rs.getObject(resultSetIndex) == null + ? null + : rs.getObject(resultSetIndex).toString(); + } else { + fields[fieldIndex] = rs.getString(resultSetIndex); + } + break; + case BOOLEAN: + fields[fieldIndex] = rs.getBoolean(resultSetIndex); + break; + case TINYINT: + fields[fieldIndex] = rs.getByte(resultSetIndex); + break; + case SMALLINT: + fields[fieldIndex] = rs.getShort(resultSetIndex); + break; + case INT: + fields[fieldIndex] = rs.getInt(resultSetIndex); + break; + case BIGINT: + fields[fieldIndex] = rs.getLong(resultSetIndex); + break; + case FLOAT: + fields[fieldIndex] = rs.getFloat(resultSetIndex); + break; + case DOUBLE: + fields[fieldIndex] = rs.getDouble(resultSetIndex); + break; + case DECIMAL: + fields[fieldIndex] = rs.getBigDecimal(resultSetIndex); + break; + case DATE: + Date sqlDate = rs.getDate(resultSetIndex); + fields[fieldIndex] = + Optional.ofNullable(sqlDate).map(e -> e.toLocalDate()).orElse(null); + break; + case TIME: + Time sqlTime = rs.getTime(resultSetIndex); + fields[fieldIndex] = + Optional.ofNullable(sqlTime).map(e -> e.toLocalTime()).orElse(null); + break; + case TIMESTAMP: + Timestamp sqlTimestamp = rs.getTimestamp(resultSetIndex); + fields[fieldIndex] = + Optional.ofNullable(sqlTimestamp) + .map(e -> e.toLocalDateTime()) + .orElse(null); + break; + case BYTES: + fields[fieldIndex] = rs.getBytes(resultSetIndex); + break; + case NULL: + fields[fieldIndex] = null; + break; + case MAP: + case ARRAY: + case ROW: + default: + throw new JdbcConnectorException( + CommonErrorCode.UNSUPPORTED_DATA_TYPE, + "Unexpected value: " + seaTunnelDataType); + } + } + return new SeaTunnelRow(fields); + } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeMapper.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeMapper.java index f795f300e27..25004168d80 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeMapper.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeMapper.java @@ -84,6 +84,8 @@ public class PostgresTypeMapper implements JdbcDialectTypeMapper { private static final String PG_CHARACTER_ARRAY = "_character"; private static final String PG_CHARACTER_VARYING = "varchar"; private static final String PG_CHARACTER_VARYING_ARRAY = "_varchar"; + private static final String PG_GEOMETRY = "geometry"; + private static final String PG_GEOGRAPHY = "geography"; @SuppressWarnings("checkstyle:MagicNumber") @Override @@ -135,6 +137,8 @@ public SeaTunnelDataType mapping(ResultSetMetaData metadata, int colIndex) case PG_CHARACTER: case PG_CHARACTER_VARYING: case PG_TEXT: + case PG_GEOMETRY: + case PG_GEOGRAPHY: return BasicType.STRING_TYPE; case PG_CHAR_ARRAY: case PG_CHARACTER_ARRAY: diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml index 602138cfc8e..de61efa9bf1 100644 --- a/seatunnel-dist/pom.xml +++ b/seatunnel-dist/pom.xml @@ -86,6 +86,7 @@ 8.0.27 42.4.3 + 2.5.1 8.1.2.141 9.2.1.jre8 5.2.5-HBase-2.x @@ -512,6 +513,12 @@ ${postgresql.version} provided + + net.postgis + postgis-jdbc + ${postgis.jdbc.version} + provided + com.dameng DmJdbcDriver18 diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java index 2f172ab39a3..6a3eb231b27 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java @@ -17,139 +17,368 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc; -import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; +import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; -import org.apache.commons.lang3.tuple.Pair; - -import org.testcontainers.containers.GenericContainer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; import org.testcontainers.containers.PostgreSQLContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; import org.testcontainers.utility.DockerImageName; import org.testcontainers.utility.DockerLoggerFactory; import com.google.common.collect.Lists; +import lombok.extern.slf4j.Slf4j; import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Statement; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; - -public class JdbcPostgresIT extends AbstractJdbcIT { - - private static final String PG_IMAGE = "postgres:14-alpine"; - private static final String PG_CONTAINER_HOST = "seatunnel_e2e_pg"; - private static final String PG_DATABASE = "test"; - - private static final String PG_SCHEMA = "public"; - private static final String PG_SOURCE = "source"; - private static final String PG_SINK = "sink"; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; - private static final String PG_USERNAME = "root"; - private static final String PG_PASSWORD = "test"; - private static final int PG_CONTAINER_PORT = 5432; +import static org.awaitility.Awaitility.given; - private static final int PG_PORT = 5432; - private static final String PG_URL = "jdbc:postgresql://" + HOST + ":%s/%s"; - - private static final String DRIVER_CLASS = "org.postgresql.Driver"; - - private static final List CONFIG_FILE = +@Slf4j +public class JdbcPostgresIT extends TestSuiteBase implements TestResource { + private static final String PG_IMAGE = "postgis/postgis"; + private static final String PG_DRIVER_JAR = + "https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar"; + private static final String PG_JDBC_JAR = + "https://repo1.maven.org/maven2/net/postgis/postgis-jdbc/2.5.1/postgis-jdbc-2.5.1.jar"; + private static final String PG_GEOMETRY_JAR = + "https://repo1.maven.org/maven2/net/postgis/postgis-geometry/2.5.1/postgis-geometry-2.5.1.jar"; + private static final List PG_CONFIG_FILE_LIST = Lists.newArrayList( "/jdbc_postgres_source_and_sink.conf", "/jdbc_postgres_source_and_sink_parallel.conf", "/jdbc_postgres_source_and_sink_parallel_upper_lower.conf", "/jdbc_postgres_source_and_sink_xa.conf"); + private PostgreSQLContainer POSTGRESQL_CONTAINER; + private static final String PG_SOURCE_DDL = + "CREATE TABLE IF NOT EXISTS pg_e2e_source_table (\n" + + " gid SERIAL PRIMARY KEY,\n" + + " text_col TEXT,\n" + + " varchar_col VARCHAR(255),\n" + + " char_col CHAR(10),\n" + + " boolean_col bool,\n" + + " smallint_col int2,\n" + + " integer_col int4,\n" + + " bigint_col BIGINT,\n" + + " decimal_col DECIMAL(10, 2),\n" + + " numeric_col NUMERIC(8, 4),\n" + + " real_col float4,\n" + + " double_precision_col float8,\n" + + " smallserial_col SMALLSERIAL,\n" + + " serial_col SERIAL,\n" + + " bigserial_col BIGSERIAL,\n" + + " date_col DATE,\n" + + " timestamp_col TIMESTAMP,\n" + + " bpchar_col BPCHAR(10),\n" + + " age INT NOT null,\n" + + " name VARCHAR(255) NOT null,\n" + + " point geometry(POINT, 4326),\n" + + " linestring geometry(LINESTRING, 4326),\n" + + " polygon_colums geometry(POLYGON, 4326),\n" + + " multipoint geometry(MULTIPOINT, 4326),\n" + + " multilinestring geometry(MULTILINESTRING, 4326),\n" + + " multipolygon geometry(MULTIPOLYGON, 4326),\n" + + " geometrycollection geometry(GEOMETRYCOLLECTION, 4326),\n" + + " geog geography(POINT, 4326)\n" + + ")"; + private static final String PG_SINK_DDL = + "CREATE TABLE IF NOT EXISTS pg_e2e_sink_table (\n" + + " gid SERIAL PRIMARY KEY,\n" + + " text_col TEXT,\n" + + " varchar_col VARCHAR(255),\n" + + " char_col CHAR(10),\n" + + " boolean_col bool,\n" + + " smallint_col int2,\n" + + " integer_col int4,\n" + + " bigint_col BIGINT,\n" + + " decimal_col DECIMAL(10, 2),\n" + + " numeric_col NUMERIC(8, 4),\n" + + " real_col float4,\n" + + " double_precision_col float8,\n" + + " smallserial_col SMALLSERIAL,\n" + + " serial_col SERIAL,\n" + + " bigserial_col BIGSERIAL,\n" + + " date_col DATE,\n" + + " timestamp_col TIMESTAMP,\n" + + " bpchar_col BPCHAR(10),\n" + + " age int4 NOT NULL,\n" + + " name varchar(255) NOT NULL,\n" + + " point varchar(2000) NULL,\n" + + " linestring varchar(2000) NULL,\n" + + " polygon_colums varchar(2000) NULL,\n" + + " multipoint varchar(2000) NULL,\n" + + " multilinestring varchar(2000) NULL,\n" + + " multipolygon varchar(2000) NULL,\n" + + " geometrycollection varchar(2000) NULL,\n" + + " geog varchar(2000) NULL\n" + + " )"; + private static final String SOURCE_SQL = + "select \n" + + "gid,\n" + + "text_col,\n" + + "varchar_col,\n" + + "char_col,\n" + + "boolean_col,\n" + + "smallint_col,\n" + + "integer_col,\n" + + "bigint_col,\n" + + "decimal_col,\n" + + "numeric_col,\n" + + "real_col,\n" + + "double_precision_col,\n" + + "smallserial_col,\n" + + "serial_col,\n" + + "bigserial_col,\n" + + "date_col,\n" + + "timestamp_col,\n" + + "bpchar_col,\n" + + "age,\n" + + "name,\n" + + "point,\n" + + "linestring,\n" + + "polygon_colums,\n" + + "multipoint,\n" + + "multilinestring,\n" + + "multipolygon,\n" + + "geometrycollection,\n" + + "geog\n" + + " from pg_e2e_source_table"; + private static final String SINK_SQL = + "select\n" + + " gid,\n" + + " text_col,\n" + + " varchar_col,\n" + + " char_col,\n" + + " boolean_col,\n" + + " smallint_col,\n" + + " integer_col,\n" + + " bigint_col,\n" + + " decimal_col,\n" + + " numeric_col,\n" + + " real_col,\n" + + " double_precision_col,\n" + + " smallserial_col,\n" + + " serial_col,\n" + + " bigserial_col,\n" + + " date_col,\n" + + " timestamp_col,\n" + + " bpchar_col," + + " age,\n" + + " name,\n" + + " cast(point as geometry) as point,\n" + + " cast(linestring as geometry) as linestring,\n" + + " cast(polygon_colums as geometry) as polygon_colums,\n" + + " cast(multipoint as geometry) as multipoint,\n" + + " cast(multilinestring as geometry) as multilinestring,\n" + + " cast(multipolygon as geometry) as multilinestring,\n" + + " cast(geometrycollection as geometry) as geometrycollection,\n" + + " cast(geog as geography) as geog\n" + + "from\n" + + " pg_e2e_sink_table"; - private static final String CREATE_SQL = - "CREATE TABLE %s (\n" + "age INT NOT NULL,\n" + "name VARCHAR(255) NOT NULL\n" + ")"; + @TestContainerExtension + private final ContainerExtendedFactory extendedFactory = + container -> { + Container.ExecResult extraCommands = + container.execInContainer( + "bash", + "-c", + "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O " + + PG_DRIVER_JAR + + " && curl -O " + + PG_JDBC_JAR + + " && curl -O " + + PG_GEOMETRY_JAR); + Assertions.assertEquals(0, extraCommands.getExitCode()); + }; + @BeforeAll @Override - JdbcCase getJdbcCase() { - Map containerEnv = new HashMap<>(); - String jdbcUrl = String.format(PG_URL, PG_PORT, PG_DATABASE); - Pair> testDataSet = initTestData(); - String[] fieldNames = testDataSet.getKey(); - - String insertSql = insertTable(PG_SCHEMA, PG_SOURCE, fieldNames); - - return JdbcCase.builder() - .dockerImage(PG_IMAGE) - .networkAliases(PG_CONTAINER_HOST) - .containerEnv(containerEnv) - .driverClass(DRIVER_CLASS) - .host(HOST) - .port(PG_CONTAINER_PORT) - .localPort(PG_PORT) - .jdbcTemplate(PG_URL) - .jdbcUrl(jdbcUrl) - .userName(PG_USERNAME) - .password(PG_PASSWORD) - .database(PG_SCHEMA) - .sourceTable(PG_SOURCE) - .sinkTable(PG_SINK) - .createSql(CREATE_SQL) - .configFile(CONFIG_FILE) - .insertSql(insertSql) - .testData(testDataSet) - .build(); + public void startUp() throws Exception { + POSTGRESQL_CONTAINER = + new PostgreSQLContainer<>( + DockerImageName.parse(PG_IMAGE) + .asCompatibleSubstituteFor("postgres")) + .withNetwork(TestSuiteBase.NETWORK) + .withNetworkAliases("postgresql") + .withCommand("postgres -c max_prepared_transactions=100") + .withLogConsumer( + new Slf4jLogConsumer(DockerLoggerFactory.getLogger(PG_IMAGE))); + Startables.deepStart(Stream.of(POSTGRESQL_CONTAINER)).join(); + log.info("PostgreSQL container started"); + Class.forName(POSTGRESQL_CONTAINER.getDriverClassName()); + given().ignoreExceptions() + .await() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(2, TimeUnit.MINUTES) + .untilAsserted(this::initializeJdbcTable); + log.info("pg data initialization succeeded. Procedure"); } - @Override - void compareResult() throws SQLException, IOException {} - - @Override - String driverUrl() { - return "https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar"; + @TestTemplate + public void testAutoGenerateSQL(TestContainer container) + throws IOException, InterruptedException { + for (String CONFIG_FILE : PG_CONFIG_FILE_LIST) { + Container.ExecResult execResult = container.executeJob(CONFIG_FILE); + Assertions.assertEquals(0, execResult.getExitCode()); + Assertions.assertIterableEquals(querySql(SOURCE_SQL), querySql(SINK_SQL)); + executeSQL("truncate table pg_e2e_sink_table"); + log.info(CONFIG_FILE + " e2e test completed"); + } } - @Override - Pair> initTestData() { - String[] fieldNames = - new String[] { - "age", "name", - }; - - List rows = new ArrayList<>(); - for (int i = 0; i < 100; i++) { - SeaTunnelRow row = - new SeaTunnelRow( - new Object[] { - i, "f_" + i, - }); - rows.add(row); - } + private void initializeJdbcTable() { + try (Connection connection = getJdbcConnection()) { + Statement statement = connection.createStatement(); + statement.execute(PG_SOURCE_DDL); + statement.execute(PG_SINK_DDL); + for (int i = 1; i <= 1000; i++) { + statement.addBatch( + "INSERT INTO\n" + + " pg_e2e_source_table (gid,\n" + + " text_col,\n" + + " varchar_col,\n" + + " char_col,\n" + + " boolean_col,\n" + + " smallint_col,\n" + + " integer_col,\n" + + " bigint_col,\n" + + " decimal_col,\n" + + " numeric_col,\n" + + " real_col,\n" + + " double_precision_col,\n" + + " smallserial_col,\n" + + " serial_col,\n" + + " bigserial_col,\n" + + " date_col,\n" + + " timestamp_col,\n" + + " bpchar_col,\n" + + " age,\n" + + " name,\n" + + " point,\n" + + " linestring,\n" + + " polygon_colums,\n" + + " multipoint,\n" + + " multilinestring,\n" + + " multipolygon,\n" + + " geometrycollection,\n" + + " geog\n" + + " )\n" + + "VALUES\n" + + " (\n" + + " '" + + i + + "',\n" + + " 'Hello World',\n" + + " 'Test',\n" + + " 'Testing',\n" + + " true,\n" + + " 10,\n" + + " 100,\n" + + " 1000,\n" + + " 10.55,\n" + + " 8.8888,\n" + + " 3.14,\n" + + " 3.14159265,\n" + + " 1,\n" + + " 100,\n" + + " 10000,\n" + + " '2023-05-07',\n" + + " '2023-05-07 14:30:00',\n" + + " 'Testing',\n" + + " 21,\n" + + " 'Leblanc',\n" + + " ST_GeomFromText('POINT(-122.3452 47.5925)', 4326),\n" + + " ST_GeomFromText(\n" + + " 'LINESTRING(-122.3451 47.5924, -122.3449 47.5923)',\n" + + " 4326\n" + + " ),\n" + + " ST_GeomFromText(\n" + + " 'POLYGON((-122.3453 47.5922, -122.3453 47.5926, -122.3448 47.5926, -122.3448 47.5922, -122.3453 47.5922))',\n" + + " 4326\n" + + " ),\n" + + " ST_GeomFromText(\n" + + " 'MULTIPOINT(-122.3459 47.5927, -122.3445 47.5918)',\n" + + " 4326\n" + + " ),\n" + + " ST_GeomFromText(\n" + + " 'MULTILINESTRING((-122.3463 47.5920, -122.3461 47.5919),(-122.3459 47.5924, -122.3457 47.5923))',\n" + + " 4326\n" + + " ),\n" + + " ST_GeomFromText(\n" + + " 'MULTIPOLYGON(((-122.3458 47.5925, -122.3458 47.5928, -122.3454 47.5928, -122.3454 47.5925, -122.3458 47.5925)),((-122.3453 47.5921, -122.3453 47.5924, -122.3448 47.5924, -122.3448 47.5921, -122.3453 47.5921)))',\n" + + " 4326\n" + + " ),\n" + + " ST_GeomFromText(\n" + + " 'GEOMETRYCOLLECTION(POINT(-122.3462 47.5921), LINESTRING(-122.3460 47.5924, -122.3457 47.5924))',\n" + + " 4326\n" + + " ),\n" + + " ST_GeographyFromText('POINT(-122.3452 47.5925)')\n" + + " )"); + } - return Pair.of(fieldNames, rows); + statement.executeBatch(); + } catch (SQLException e) { + throw new RuntimeException("Initializing PostgreSql table failed!", e); + } } - @Override - GenericContainer initContainer() { - DockerImageName imageName = DockerImageName.parse(PG_IMAGE); - - GenericContainer container = - new PostgreSQLContainer<>(imageName) - .withNetwork(NETWORK) - .withNetworkAliases(PG_CONTAINER_HOST) - .withUsername(PG_USERNAME) - .withPassword(PG_PASSWORD) - .withCommand("postgres -c max_prepared_transactions=100") - .withLogConsumer( - new Slf4jLogConsumer(DockerLoggerFactory.getLogger(PG_IMAGE))); + private Connection getJdbcConnection() throws SQLException { + return DriverManager.getConnection( + POSTGRESQL_CONTAINER.getJdbcUrl(), + POSTGRESQL_CONTAINER.getUsername(), + POSTGRESQL_CONTAINER.getPassword()); + } - container.setPortBindings( - Lists.newArrayList(String.format("%s:%s", PG_PORT, PG_CONTAINER_PORT))); - return container; + private List> querySql(String sql) { + try (Connection connection = getJdbcConnection()) { + ResultSet resultSet = connection.createStatement().executeQuery(sql); + List> result = new ArrayList<>(); + int columnCount = resultSet.getMetaData().getColumnCount(); + while (resultSet.next()) { + ArrayList objects = new ArrayList<>(); + for (int i = 1; i <= columnCount; i++) { + objects.add(resultSet.getObject(i)); + } + result.add(objects); + } + return result; + } catch (SQLException e) { + throw new RuntimeException(e); + } } - @Override - public String quoteIdentifier(String field) { - return "\"" + field + "\""; + private void executeSQL(String sql) { + try (Connection connection = getJdbcConnection()) { + Statement statement = connection.createStatement(); + statement.execute(sql); + } catch (SQLException e) { + throw new RuntimeException(e); + } } + @AfterAll @Override - public void clearTable(String schema, String table) { - // do nothing. + public void tearDown() { + if (POSTGRESQL_CONTAINER != null) { + POSTGRESQL_CONTAINER.stop(); + } } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink.conf index ace2e19413b..1c7417f8a55 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink.conf @@ -22,23 +22,26 @@ env { source{ jdbc{ - url = "jdbc:postgresql://seatunnel_e2e_pg:5432/test" + url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF" driver = "org.postgresql.Driver" - user = "root" + user = "test" password = "test" - query = "select * from source" + query ="""select gid, text_col, varchar_col, char_col, boolean_col, smallint_col, integer_col, bigint_col, decimal_col, numeric_col, real_col, double_precision_col, + smallserial_col, serial_col, bigserial_col, date_col, timestamp_col, bpchar_col, age, name, point, linestring, polygon_colums, multipoint, + multilinestring, multipolygon, geometrycollection, geog from pg_e2e_source_table""" } } -transform { -} sink { - jdbc { - url = "jdbc:postgresql://seatunnel_e2e_pg:5432/test" - driver = "org.postgresql.Driver" - user = "root" - password = "test" - query = "insert into sink(age, name) values(?,?)" - } -} + Jdbc { + driver = org.postgresql.Driver + url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF" + user = test + password = test + generate_sink_sql = true + database = test + table = "public.pg_e2e_sink_table" + primary_keys = ["gid"] + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel.conf index 97f41367166..25df382c4af 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel.conf @@ -16,18 +16,20 @@ # env { - execution.parallelism = 3 + execution.parallelism = 1 job.mode = "BATCH" } source{ jdbc{ - url = "jdbc:postgresql://seatunnel_e2e_pg:5432/test" + url = "jdbc:postgresql://postgresql:5432/test" driver = "org.postgresql.Driver" - user = "root" + user = "test" password = "test" - query = "select name, age from source" - partition_column= "age" + query ="""select gid, text_col, varchar_col, char_col, boolean_col, smallint_col, integer_col, bigint_col, decimal_col, numeric_col, real_col, double_precision_col, + smallserial_col, serial_col, bigserial_col, date_col, timestamp_col, bpchar_col, age, name, point, linestring, polygon_colums, multipoint, + multilinestring, multipolygon, geometrycollection, geog from pg_e2e_source_table""" + partition_column= "gid" result_table_name = "jdbc" } @@ -38,12 +40,14 @@ transform { sink { jdbc { - - url = "jdbc:postgresql://seatunnel_e2e_pg:5432/test" + url = "jdbc:postgresql://postgresql:5432/test" driver = "org.postgresql.Driver" - user = "root" + user = "test" password = "test" connection_check_timeout_sec = 100 - query = "insert into sink(name,age) values(?,?)" + query ="""INSERT INTO pg_e2e_sink_table ( gid, text_col, varchar_col, char_col, boolean_col, smallint_col, integer_col, bigint_col, decimal_col, numeric_col, real_col, + double_precision_col, smallserial_col, serial_col, bigserial_col, date_col, timestamp_col, bpchar_col, age, name, point, + linestring, polygon_colums, multipoint, multilinestring, multipolygon, geometrycollection, geog ) + VALUES( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )""" } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel_upper_lower.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel_upper_lower.conf index e0035c523d3..46f1b43022b 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel_upper_lower.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel_upper_lower.conf @@ -16,23 +16,25 @@ # env { - execution.parallelism = 3 + execution.parallelism = 1 job.mode = "BATCH" } source{ jdbc{ - url = "jdbc:postgresql://seatunnel_e2e_pg:5432/test" + url = "jdbc:postgresql://postgresql:5432/test" driver = "org.postgresql.Driver" - user = "root" + user = "test" password = "test" - query = "select name, age from source" - partition_column= "age" + query ="""select gid, text_col, varchar_col, char_col, boolean_col, smallint_col, integer_col, bigint_col, decimal_col, numeric_col, real_col, double_precision_col, + smallserial_col, serial_col, bigserial_col, date_col, timestamp_col, bpchar_col, age, name, point, linestring, polygon_colums, multipoint, + multilinestring, multipolygon, geometrycollection, geog from pg_e2e_source_table""" + partition_column= "gid" result_table_name = "jdbc" partition_lower_bound = 1 - partition_upper_bound = 50 - partition_num = 5 + partition_upper_bound = 1000 + partition_num = 1 } } @@ -41,12 +43,15 @@ transform { sink { jdbc { - url = "jdbc:postgresql://seatunnel_e2e_pg:5432/test" + url = "jdbc:postgresql://postgresql:5432/test" driver = "org.postgresql.Driver" - user = "root" + user = "test" password = "test" connection_check_timeout_sec = 100 - query = "insert into sink(name,age) values(?,?)" + query ="""INSERT INTO pg_e2e_sink_table ( gid, text_col, varchar_col, char_col, boolean_col, smallint_col, integer_col, bigint_col, decimal_col, numeric_col, real_col, + double_precision_col, smallserial_col, serial_col, bigserial_col, date_col, timestamp_col, bpchar_col, age, name, point, + linestring, polygon_colums, multipoint, multilinestring, multipolygon, geometrycollection, geog ) + VALUES( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )""" } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_xa.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_xa.conf index 1181e6865d7..ba32ca81bc1 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_xa.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_xa.conf @@ -23,11 +23,13 @@ env { source { jdbc{ - url = "jdbc:postgresql://seatunnel_e2e_pg:5432/test" + url = "jdbc:postgresql://postgresql:5432/test" driver = "org.postgresql.Driver" - user = "root" + user = "test" password = "test" - query = "select name , age from source" + query ="""select gid, text_col, varchar_col, char_col, boolean_col, smallint_col, integer_col, bigint_col, decimal_col, numeric_col, real_col, double_precision_col, + smallserial_col, serial_col, bigserial_col, date_col, timestamp_col, bpchar_col, age, name, point, linestring, polygon_colums, multipoint, + multilinestring, multipolygon, geometrycollection, geog from pg_e2e_source_table""" } } @@ -36,13 +38,15 @@ transform { sink { jdbc { - url = "jdbc:postgresql://seatunnel_e2e_pg:5432/test" + url = "jdbc:postgresql://postgresql:5432/test" driver = "org.postgresql.Driver" - user = "root" + user = "test" password = "test" - max_retries = 0 - query = "insert into sink(name,age) values(?,?)" + query ="""INSERT INTO pg_e2e_sink_table ( gid, text_col, varchar_col, char_col, boolean_col, smallint_col, integer_col, bigint_col, decimal_col, numeric_col, real_col, + double_precision_col, smallserial_col, serial_col, bigserial_col, date_col, timestamp_col, bpchar_col, age, name, point, + linestring, polygon_colums, multipoint, multilinestring, multipolygon, geometrycollection, geog ) + VALUES( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )""" is_exactly_once = "true"