Skip to content

Commit

Permalink
[Hotfix][e2e] Fix e2e error (apache#6018)
Browse files Browse the repository at this point in the history
  • Loading branch information
hailin0 authored and DESKTOP-GHPCOV0\dingaolong committed Dec 19, 2023
1 parent e93ed47 commit c0e7ec1
Showing 1 changed file with 108 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.seatunnel.e2e.connector.kafka;

import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
Expand Down Expand Up @@ -117,111 +119,115 @@ public class KafkaFormatIT extends TestSuiteBase implements TestResource {
private static final String PG_SINK_TABLE1 = "sink";
private static final String PG_SINK_TABLE2 = "sink2";

private static final Map<String, SeaTunnelRowType> sinkTableRowTypes = new HashMap<>();
private static final Map<String, CatalogTable> sinkTables = new HashMap<>();

static {
sinkTableRowTypes.put(
sinkTables.put(
PG_SINK_TABLE1,
new SeaTunnelRowType(
new String[] {"id", "name", "description", "weight"},
new SeaTunnelDataType[] {
BasicType.INT_TYPE,
BasicType.STRING_TYPE,
BasicType.STRING_TYPE,
BasicType.STRING_TYPE
}));

sinkTableRowTypes.put(
CatalogTableUtil.getCatalogTable(
PG_SINK_TABLE1,
new SeaTunnelRowType(
new String[] {"id", "name", "description", "weight"},
new SeaTunnelDataType[] {
BasicType.INT_TYPE,
BasicType.STRING_TYPE,
BasicType.STRING_TYPE,
BasicType.STRING_TYPE
})));

sinkTables.put(
PG_SINK_TABLE2,
new SeaTunnelRowType(
new String[] {
"id",
"f_binary",
"f_blob",
"f_long_varbinary",
"f_longblob",
"f_tinyblob",
"f_varbinary",
"f_smallint",
"f_smallint_unsigned",
"f_mediumint",
"f_mediumint_unsigned",
"f_int",
"f_int_unsigned",
"f_integer",
"f_integer_unsigned",
"f_bigint",
"f_bigint_unsigned",
"f_numeric",
"f_decimal",
"f_float",
"f_double",
"f_double_precision",
"f_longtext",
"f_mediumtext",
"f_text",
"f_tinytext",
"f_varchar",
"f_date",
"f_datetime",
"f_timestamp",
"f_bit1",
"f_bit64",
"f_char",
"f_enum",
"f_mediumblob",
"f_long_varchar",
"f_real",
"f_time",
"f_tinyint",
"f_tinyint_unsigned",
"f_json",
"f_year"
},
new SeaTunnelDataType[] {
BasicType.INT_TYPE,
PrimitiveByteArrayType.INSTANCE,
PrimitiveByteArrayType.INSTANCE,
PrimitiveByteArrayType.INSTANCE,
PrimitiveByteArrayType.INSTANCE,
PrimitiveByteArrayType.INSTANCE,
PrimitiveByteArrayType.INSTANCE,
BasicType.SHORT_TYPE,
BasicType.INT_TYPE,
BasicType.INT_TYPE,
BasicType.INT_TYPE,
BasicType.INT_TYPE,
BasicType.INT_TYPE,
BasicType.INT_TYPE,
BasicType.LONG_TYPE,
BasicType.LONG_TYPE,
new DecimalType(10, 0),
new DecimalType(10, 0),
new DecimalType(10, 0),
BasicType.FLOAT_TYPE,
BasicType.DOUBLE_TYPE,
BasicType.DOUBLE_TYPE,
BasicType.STRING_TYPE,
BasicType.STRING_TYPE,
BasicType.STRING_TYPE,
BasicType.STRING_TYPE,
BasicType.STRING_TYPE,
LocalTimeType.LOCAL_DATE_TYPE,
LocalTimeType.LOCAL_DATE_TIME_TYPE,
LocalTimeType.LOCAL_DATE_TIME_TYPE,
BasicType.BOOLEAN_TYPE,
BasicType.BYTE_TYPE,
BasicType.STRING_TYPE,
BasicType.STRING_TYPE,
PrimitiveByteArrayType.INSTANCE,
BasicType.STRING_TYPE,
BasicType.DOUBLE_TYPE,
LocalTimeType.LOCAL_TIME_TYPE,
BasicType.BYTE_TYPE,
BasicType.INT_TYPE,
BasicType.STRING_TYPE,
BasicType.INT_TYPE
}));
CatalogTableUtil.getCatalogTable(
PG_SINK_TABLE2,
new SeaTunnelRowType(
new String[] {
"id",
"f_binary",
"f_blob",
"f_long_varbinary",
"f_longblob",
"f_tinyblob",
"f_varbinary",
"f_smallint",
"f_smallint_unsigned",
"f_mediumint",
"f_mediumint_unsigned",
"f_int",
"f_int_unsigned",
"f_integer",
"f_integer_unsigned",
"f_bigint",
"f_bigint_unsigned",
"f_numeric",
"f_decimal",
"f_float",
"f_double",
"f_double_precision",
"f_longtext",
"f_mediumtext",
"f_text",
"f_tinytext",
"f_varchar",
"f_date",
"f_datetime",
"f_timestamp",
"f_bit1",
"f_bit64",
"f_char",
"f_enum",
"f_mediumblob",
"f_long_varchar",
"f_real",
"f_time",
"f_tinyint",
"f_tinyint_unsigned",
"f_json",
"f_year"
},
new SeaTunnelDataType[] {
BasicType.INT_TYPE,
PrimitiveByteArrayType.INSTANCE,
PrimitiveByteArrayType.INSTANCE,
PrimitiveByteArrayType.INSTANCE,
PrimitiveByteArrayType.INSTANCE,
PrimitiveByteArrayType.INSTANCE,
PrimitiveByteArrayType.INSTANCE,
BasicType.SHORT_TYPE,
BasicType.INT_TYPE,
BasicType.INT_TYPE,
BasicType.INT_TYPE,
BasicType.INT_TYPE,
BasicType.INT_TYPE,
BasicType.INT_TYPE,
BasicType.LONG_TYPE,
BasicType.LONG_TYPE,
new DecimalType(10, 0),
new DecimalType(10, 0),
new DecimalType(10, 0),
BasicType.FLOAT_TYPE,
BasicType.DOUBLE_TYPE,
BasicType.DOUBLE_TYPE,
BasicType.STRING_TYPE,
BasicType.STRING_TYPE,
BasicType.STRING_TYPE,
BasicType.STRING_TYPE,
BasicType.STRING_TYPE,
LocalTimeType.LOCAL_DATE_TYPE,
LocalTimeType.LOCAL_DATE_TIME_TYPE,
LocalTimeType.LOCAL_DATE_TIME_TYPE,
BasicType.BOOLEAN_TYPE,
BasicType.BYTE_TYPE,
BasicType.STRING_TYPE,
BasicType.STRING_TYPE,
PrimitiveByteArrayType.INSTANCE,
BasicType.STRING_TYPE,
BasicType.DOUBLE_TYPE,
LocalTimeType.LOCAL_TIME_TYPE,
BasicType.BYTE_TYPE,
BasicType.INT_TYPE,
BasicType.STRING_TYPE,
BasicType.INT_TYPE
})));
}

// Used to map local data paths to kafa topics that need to be written to kafka
Expand Down Expand Up @@ -839,7 +845,7 @@ private List<List<Object>> getPostgreSinkTableList(String tableName) {
while (resultSet.next()) {
SeaTunnelRow row =
postgresJdbcRowConverter.toInternal(
resultSet, sinkTableRowTypes.get(tableName));
resultSet, sinkTables.get(tableName).getTableSchema());
actual.add(Arrays.asList(row.getFields()));
}
}
Expand Down

0 comments on commit c0e7ec1

Please sign in to comment.