From f5a3d252490e8b6219eff640ec01a3a87eeddcb8 Mon Sep 17 00:00:00 2001 From: "cheney.yin" Date: Thu, 27 Jul 2023 12:36:59 +0800 Subject: [PATCH] [Bug][Spark] Fix SeaTunnelRowConvertor fail to convert when schema contains row type. --- .../execution/TransformExecuteProcessor.java | 1 - .../src/test/resources/copy_transform.conf | 6 ++++ .../filter_row_kind_exclude_delete.conf | 5 ++++ .../filter_row_kind_exclude_insert.conf | 5 ++++ .../filter_row_kind_include_insert.conf | 5 ++++ .../src/test/resources/filter_transform.conf | 7 ++++- .../src/test/resources/split_transform.conf | 5 ++++ .../resources/field_mapper_transform.conf | 6 ++++ .../src/test/resources/sql_transform.conf | 7 ++++- .../serialization/SeaTunnelRowConverter.java | 29 ++++++++++++++++--- 10 files changed, 69 insertions(+), 7 deletions(-) diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java index 179598b3a612..fc9be559257f 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java @@ -185,7 +185,6 @@ public Row next() { return null; } seaTunnelRow = outputRowConverter.convert(seaTunnelRow); - return new GenericRowWithSchema(seaTunnelRow.getFields(), structType); } catch (Exception e) { throw new TaskExecuteException("Row convert failed, caused: " + e.getMessage(), e); diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/copy_transform.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/copy_transform.conf index 25ca4ce5f9ae..b937b0a8cbe8 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/copy_transform.conf +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/copy_transform.conf @@ -30,6 +30,11 @@ source { fields { id = "int" name = "string" + c_row = { + c_row = { + c_int = int + } + } } } } @@ -49,6 +54,7 @@ transform { id_1 = "id" name2 = "name" name3 = "name" + c_row_1 = "c_row" } } } diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_delete.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_delete.conf index f7fc0f6e0e11..8fdf195b037c 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_delete.conf +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_delete.conf @@ -31,6 +31,11 @@ source { id = "int" name = "string" age = "int" + c_row = { + c_row = { + c_int = int + } + } } } } diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_insert.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_insert.conf index cc36417788ba..9fc0e577cb8f 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_insert.conf +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_insert.conf @@ -31,6 +31,11 @@ source { id = "int" name = "string" age = "int" + c_row = { + c_row = { + c_int = int + } + } } } } diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_include_insert.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_include_insert.conf index d1fbf79bea21..72d1e38cd442 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_include_insert.conf +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_include_insert.conf @@ -31,6 +31,11 @@ source { id = "int" name = "string" age = "int" + c_row = { + c_row = { + c_int = int + } + } } } } diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_transform.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_transform.conf index 56439b4414f6..c869c70a77b7 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_transform.conf +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_transform.conf @@ -31,6 +31,11 @@ source { id = "int" name = "string" age = "int" + c_row = { + c_row = { + c_int = int + } + } } } } @@ -40,7 +45,7 @@ transform { Filter { source_table_name = "fake" result_table_name = "fake1" - fields = ["age", "name"] + fields = ["age", "name", "c_row"] } } diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/split_transform.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/split_transform.conf index 61e10f694ac7..7ad9fbf8f4af 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/split_transform.conf +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/split_transform.conf @@ -31,6 +31,11 @@ source { id = "int" name = "string" age = "int" + c_row = { + c_row = { + c_int = int + } + } } } } diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/field_mapper_transform.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/field_mapper_transform.conf index c2d1f225f2b9..59d19f3ee74e 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/field_mapper_transform.conf +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/field_mapper_transform.conf @@ -34,6 +34,11 @@ source { string1 = "string" int1 = "int" c_bigint = "bigint" + c_row = { + c_row = { + c_int = int + } + } } } } @@ -48,6 +53,7 @@ transform { age = age_as int1 = int1_as name = name + c_row = c_row } } } diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform.conf index c5f7c4047e74..78e21280f0de 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform.conf +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform.conf @@ -36,6 +36,11 @@ source { c_map = "map" c_array = "array" c_decimal = "decimal(30, 8)" + c_row = { + c_row = { + c_int = int + } + } } } } @@ -46,7 +51,7 @@ transform { source_table_name = "fake" result_table_name = "fake1" # the query table name must same as field 'source_table_name' - query = "select id, regexp_replace(name, '.+', 'b') as name, age+1 as age, pi() as pi, c_timestamp, c_date, c_map, c_array, c_decimal from fake" + query = "select id, regexp_replace(name, '.+', 'b') as name, age+1 as age, pi() as pi, c_timestamp, c_date, c_map, c_array, c_decimal, c_row from fake" } # The SQL transform support base function and criteria operation # But the complex SQL unsupported yet, include: multi source table/rows JOIN and AGGREGATE operation and the like diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/SeaTunnelRowConverter.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/SeaTunnelRowConverter.java index 51d5c7308bd5..15357204cd3e 100644 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/SeaTunnelRowConverter.java +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/SeaTunnelRowConverter.java @@ -24,7 +24,10 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.translation.serialization.RowConverter; +import org.apache.seatunnel.translation.spark.utils.TypeConverterUtils; +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; +import org.apache.spark.sql.types.StructType; import org.apache.spark.unsafe.types.UTF8String; import scala.Tuple2; @@ -51,7 +54,11 @@ public SeaTunnelRowConverter(SeaTunnelDataType dataType) { @Override public SeaTunnelRow convert(SeaTunnelRow seaTunnelRow) throws IOException { validate(seaTunnelRow); - return (SeaTunnelRow) convert(seaTunnelRow, dataType); + GenericRowWithSchema rowWithSchema = (GenericRowWithSchema) convert(seaTunnelRow, dataType); + SeaTunnelRow newRow = new SeaTunnelRow(rowWithSchema.values()); + newRow.setRowKind(seaTunnelRow.getRowKind()); + newRow.setTableId(seaTunnelRow.getTableId()); + return newRow; } private Object convert(Object field, SeaTunnelDataType dataType) { @@ -62,7 +69,7 @@ private Object convert(Object field, SeaTunnelDataType dataType) { case ROW: SeaTunnelRow seaTunnelRow = (SeaTunnelRow) field; SeaTunnelRowType rowType = (SeaTunnelRowType) dataType; - return convert(seaTunnelRow, rowType); + return convertRow(seaTunnelRow, rowType); case DATE: return Date.valueOf((LocalDate) field); case TIMESTAMP: @@ -94,16 +101,17 @@ private Object convert(Object field, SeaTunnelDataType dataType) { } } - private SeaTunnelRow convert(SeaTunnelRow seaTunnelRow, SeaTunnelRowType rowType) { + private GenericRowWithSchema convertRow(SeaTunnelRow seaTunnelRow, SeaTunnelRowType rowType) { int arity = rowType.getTotalFields(); Object[] values = new Object[arity]; + StructType schema = (StructType) TypeConverterUtils.convert(rowType); for (int i = 0; i < arity; i++) { Object fieldValue = convert(seaTunnelRow.getField(i), rowType.getFieldType(i)); if (fieldValue != null) { values[i] = fieldValue; } } - return new SeaTunnelRow(values); + return new GenericRowWithSchema(values, schema); } private scala.collection.immutable.HashMap convertMap( @@ -148,6 +156,10 @@ private Object reconvert(Object field, SeaTunnelDataType dataType) { } switch (dataType.getSqlType()) { case ROW: + if (field instanceof GenericRowWithSchema) { + return createFromGenericRow( + (GenericRowWithSchema) field, (SeaTunnelRowType) dataType); + } return reconvert((SeaTunnelRow) field, (SeaTunnelRowType) dataType); case DATE: return ((Date) field).toLocalDate(); @@ -166,6 +178,15 @@ private Object reconvert(Object field, SeaTunnelDataType dataType) { } } + private SeaTunnelRow createFromGenericRow(GenericRowWithSchema row, SeaTunnelRowType type) { + Object[] fields = row.values(); + Object[] newFields = new Object[fields.length]; + for (int idx = 0; idx < fields.length; idx++) { + newFields[idx] = reconvert(fields[idx], type.getFieldType(idx)); + } + return new SeaTunnelRow(newFields); + } + private SeaTunnelRow reconvert(SeaTunnelRow engineRow, SeaTunnelRowType rowType) { int num = engineRow.getFields().length; Object[] fields = new Object[num];