From 567aff8ac6ac76b1fdf6ef84ae7db2a8f1c70ca7 Mon Sep 17 00:00:00 2001 From: teo Date: Fri, 26 Jan 2024 03:42:38 +0800 Subject: [PATCH 1/5] support struct for iceberg --- .../iceberg/converter/ConvertUtil.java | 18 +++++++++++ .../iceberg/converter/FromIcebergType.java | 11 ++++++- .../converter/ToIcebergTypeVisitor.java | 15 +++++++++ .../lakehouse/iceberg/TestIcebergTable.java | 21 ++++++++++++- .../iceberg/converter/TestConvertUtil.java | 31 +++++++++++++++++++ 5 files changed, 94 insertions(+), 2 deletions(-) diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/ConvertUtil.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/ConvertUtil.java index f5c6ecb5504..72b79ce90db 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/ConvertUtil.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/ConvertUtil.java @@ -61,4 +61,22 @@ public static IcebergColumn fromNestedField(Types.NestedField nestedField) { .withType(ConvertUtil.formIcebergType(nestedField.type())) .build(); } + + /** + * Convert the Gravitino field of Iceberg to the Iceberg column. + * + * @param field Gravitino field. + * @param id + * @return + */ + public static IcebergColumn fromGravitinoField( + com.datastrato.gravitino.rel.types.Types.StructType.Field field, int id) { + return new IcebergColumn.Builder() + .withId(id) + .withName(field.name()) + .withNullable(field.nullable()) + .withComment(field.comment()) + .withType(field.type()) + .build(); + } } diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/FromIcebergType.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/FromIcebergType.java index 13da120ba03..8f75db5ad8b 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/FromIcebergType.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/FromIcebergType.java @@ -26,7 +26,16 @@ public Type schema(Schema schema, Type structType) { @Override public Type struct(Types.StructType struct, List fieldResults) { - throw new UnsupportedOperationException("Data conversion of struct type is not supported"); + return com.datastrato.gravitino.rel.types.Types.StructType.of( + struct.fields().stream() + .map( + nestedField -> + com.datastrato.gravitino.rel.types.Types.StructType.Field.of( + nestedField.name(), + fieldResults.get(struct.fields().indexOf(nestedField)), + nestedField.isOptional(), + nestedField.doc())) + .toArray(com.datastrato.gravitino.rel.types.Types.StructType.Field[]::new)); } @Override diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/ToIcebergTypeVisitor.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/ToIcebergTypeVisitor.java index 7cd30a7215d..37cb671b609 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/ToIcebergTypeVisitor.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/ToIcebergTypeVisitor.java @@ -10,7 +10,9 @@ import com.datastrato.gravitino.rel.types.Type; import com.datastrato.gravitino.rel.types.Types; import com.google.common.collect.Lists; +import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; /** * Type converter belonging to gravitino. @@ -52,6 +54,19 @@ public static T visit(Type type, ToIcebergTypeVisitor visitor) { } else if (type instanceof Types.ListType) { Types.ListType list = (Types.ListType) type; return visitor.array(list, visit(list.elementType(), visitor)); + } else if (type instanceof Types.StructType) { + Types.StructType struct = (Types.StructType) type; + Types.StructType.Field[] fields = struct.fields(); + List columns = + Arrays.stream(fields) + .map( + field -> { + return ConvertUtil.fromGravitinoField(field, 0); + }) + .collect(Collectors.toList()); + IcebergTable mockTable = + new IcebergTable.Builder().withColumns(columns.toArray(new IcebergColumn[0])).build(); + return visit(mockTable, visitor); } else { return visitor.atomic((Type.PrimitiveType) type); } diff --git a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/TestIcebergTable.java b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/TestIcebergTable.java index f573a9a3648..c0a9b924d0b 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/TestIcebergTable.java +++ b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/TestIcebergTable.java @@ -144,7 +144,25 @@ public void testCreateIcebergTable() { .withComment(ICEBERG_COMMENT) .withNullable(false) .build(); - Column[] columns = new Column[] {col1, col2}; + Types.StructType structTypeInside = + Types.StructType.of( + Types.StructType.Field.notNullField("integer_field_inside", Types.IntegerType.get()), + Types.StructType.Field.notNullField( + "string_field_inside", Types.StringType.get(), "string field inside")); + Types.StructType structType = + Types.StructType.of( + Types.StructType.Field.notNullField("integer_field", Types.IntegerType.get()), + Types.StructType.Field.notNullField( + "string_field", Types.StringType.get(), "string field"), + Types.StructType.Field.nullableField("struct_field", structTypeInside, "struct field")); + IcebergColumn col3 = + new IcebergColumn.Builder() + .withName("col_3") + .withType(structType) + .withComment(ICEBERG_COMMENT) + .withNullable(false) + .build(); + Column[] columns = new Column[] {col1, col2, col3}; SortOrder[] sortOrders = createSortOrder(); Table table = @@ -169,6 +187,7 @@ public void testCreateIcebergTable() { Assertions.assertEquals("val2", loadedTable.properties().get("key2")); Assertions.assertTrue(loadedTable.columns()[0].nullable()); Assertions.assertFalse(loadedTable.columns()[1].nullable()); + Assertions.assertFalse(loadedTable.columns()[2].nullable()); Assertions.assertTrue(icebergCatalog.asTableCatalog().tableExists(tableIdentifier)); NameIdentifier[] tableIdents = diff --git a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/TestConvertUtil.java b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/TestConvertUtil.java index 0b19ebb4754..e066d7317f7 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/TestConvertUtil.java +++ b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/TestConvertUtil.java @@ -252,6 +252,37 @@ public void testFormIcebergType() { Assertions.assertTrue( ((com.datastrato.gravitino.rel.types.Types.ListType) gravitinoListType).elementType() instanceof com.datastrato.gravitino.rel.types.Types.StringType); + + Types.StructType structTypeInside = + Types.StructType.of( + Types.NestedField.optional( + 2, "integer_type_inside", Types.IntegerType.get(), "integer type"), + Types.NestedField.optional( + 3, "string_type_inside", Types.StringType.get(), "string type")); + Types.StructType structType = + Types.StructType.of( + Types.NestedField.optional(0, "integer_type", Types.IntegerType.get(), "integer type"), + Types.NestedField.optional(1, "struct_type", structTypeInside, "struct type inside")); + com.datastrato.gravitino.rel.types.Type gravitinoStructType = + ConvertUtil.formIcebergType(structType); + Assertions.assertTrue( + (gravitinoStructType) instanceof com.datastrato.gravitino.rel.types.Types.StructType); + Assertions.assertTrue( + ((com.datastrato.gravitino.rel.types.Types.StructType) gravitinoStructType) + .fields()[0].type() + instanceof com.datastrato.gravitino.rel.types.Types.IntegerType); + Assertions.assertTrue( + ((com.datastrato.gravitino.rel.types.Types.StructType) + ((com.datastrato.gravitino.rel.types.Types.StructType) gravitinoStructType) + .fields()[1].type()) + .fields()[0].type() + instanceof com.datastrato.gravitino.rel.types.Types.IntegerType); + Assertions.assertTrue( + ((com.datastrato.gravitino.rel.types.Types.StructType) + ((com.datastrato.gravitino.rel.types.Types.StructType) gravitinoStructType) + .fields()[1].type()) + .fields()[1].type() + instanceof com.datastrato.gravitino.rel.types.Types.StringType); } @Test From 799255e3b47cea318556bad9cc317e2e669f23e9 Mon Sep 17 00:00:00 2001 From: teo Date: Tue, 30 Jan 2024 18:17:11 +0800 Subject: [PATCH 2/5] compelete CatalogIcebergIT and improve code --- .../iceberg/converter/ConvertUtil.java | 4 +- .../iceberg/converter/FromIcebergType.java | 23 ++++--- .../iceberg/converter/TestConvertUtil.java | 64 +++++++++++++++++++ .../lakehouse/iceberg/CatalogIcebergIT.java | 62 ++++++++++++++---- 4 files changed, 130 insertions(+), 23 deletions(-) diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/ConvertUtil.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/ConvertUtil.java index 72b79ce90db..c01c416e503 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/ConvertUtil.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/ConvertUtil.java @@ -63,10 +63,10 @@ public static IcebergColumn fromNestedField(Types.NestedField nestedField) { } /** - * Convert the Gravitino field of Iceberg to the Iceberg column. + * Convert the Gravitino field to the Iceberg column.t * * @param field Gravitino field. - * @param id + * @param id Gravitino field id. * @return */ public static IcebergColumn fromGravitinoField( diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/FromIcebergType.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/FromIcebergType.java index 8f75db5ad8b..1d256d715d5 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/FromIcebergType.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/FromIcebergType.java @@ -5,6 +5,7 @@ package com.datastrato.gravitino.catalog.lakehouse.iceberg.converter; import com.datastrato.gravitino.rel.types.Type; +import java.util.ArrayList; import java.util.List; import org.apache.iceberg.Schema; import org.apache.iceberg.types.TypeUtil; @@ -26,16 +27,20 @@ public Type schema(Schema schema, Type structType) { @Override public Type struct(Types.StructType struct, List fieldResults) { + List fieldsList = new ArrayList<>(); + List originalFields = struct.fields(); + + for (int i = 0; i < originalFields.size(); i++) { + Types.NestedField nestedField = originalFields.get(i); + fieldsList.add( + com.datastrato.gravitino.rel.types.Types.StructType.Field.of( + nestedField.name(), + fieldResults.get(i), + nestedField.isOptional(), + nestedField.doc())); + } return com.datastrato.gravitino.rel.types.Types.StructType.of( - struct.fields().stream() - .map( - nestedField -> - com.datastrato.gravitino.rel.types.Types.StructType.Field.of( - nestedField.name(), - fieldResults.get(struct.fields().indexOf(nestedField)), - nestedField.isOptional(), - nestedField.doc())) - .toArray(com.datastrato.gravitino.rel.types.Types.StructType.Field[]::new)); + fieldsList.toArray(new com.datastrato.gravitino.rel.types.Types.StructType.Field[0])); } @Override diff --git a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/TestConvertUtil.java b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/TestConvertUtil.java index e066d7317f7..9d1412e6019 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/TestConvertUtil.java +++ b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/TestConvertUtil.java @@ -265,6 +265,7 @@ public void testFormIcebergType() { Types.NestedField.optional(1, "struct_type", structTypeInside, "struct type inside")); com.datastrato.gravitino.rel.types.Type gravitinoStructType = ConvertUtil.formIcebergType(structType); + // check for type Assertions.assertTrue( (gravitinoStructType) instanceof com.datastrato.gravitino.rel.types.Types.StructType); Assertions.assertTrue( @@ -283,6 +284,69 @@ public void testFormIcebergType() { .fields()[1].type()) .fields()[1].type() instanceof com.datastrato.gravitino.rel.types.Types.StringType); + // check for name + Assertions.assertEquals( + structType.fields().get(0).name(), + ((com.datastrato.gravitino.rel.types.Types.StructType) gravitinoStructType) + .fields()[0].name()); + Assertions.assertEquals( + structType.fields().get(1).name(), + ((com.datastrato.gravitino.rel.types.Types.StructType) gravitinoStructType) + .fields()[1].name()); + Assertions.assertEquals( + structTypeInside.fields().get(0).name(), + ((com.datastrato.gravitino.rel.types.Types.StructType) + ((com.datastrato.gravitino.rel.types.Types.StructType) gravitinoStructType) + .fields()[1].type()) + .fields()[0].name()); + Assertions.assertEquals( + structTypeInside.fields().get(1).name(), + ((com.datastrato.gravitino.rel.types.Types.StructType) + ((com.datastrato.gravitino.rel.types.Types.StructType) gravitinoStructType) + .fields()[1].type()) + .fields()[1].name()); + // check for comment + Assertions.assertEquals( + structType.fields().get(0).doc(), + ((com.datastrato.gravitino.rel.types.Types.StructType) gravitinoStructType) + .fields()[0].comment()); + Assertions.assertEquals( + structType.fields().get(1).doc(), + ((com.datastrato.gravitino.rel.types.Types.StructType) gravitinoStructType) + .fields()[1].comment()); + Assertions.assertEquals( + structTypeInside.fields().get(0).doc(), + ((com.datastrato.gravitino.rel.types.Types.StructType) + ((com.datastrato.gravitino.rel.types.Types.StructType) gravitinoStructType) + .fields()[1].type()) + .fields()[0].comment()); + Assertions.assertEquals( + structTypeInside.fields().get(1).doc(), + ((com.datastrato.gravitino.rel.types.Types.StructType) + ((com.datastrato.gravitino.rel.types.Types.StructType) gravitinoStructType) + .fields()[1].type()) + .fields()[1].comment()); + // check for nullable + Assertions.assertEquals( + structType.fields().get(0).isOptional(), + ((com.datastrato.gravitino.rel.types.Types.StructType) gravitinoStructType) + .fields()[0].nullable()); + Assertions.assertEquals( + structType.fields().get(1).isOptional(), + ((com.datastrato.gravitino.rel.types.Types.StructType) gravitinoStructType) + .fields()[1].nullable()); + Assertions.assertEquals( + structTypeInside.fields().get(0).isOptional(), + ((com.datastrato.gravitino.rel.types.Types.StructType) + ((com.datastrato.gravitino.rel.types.Types.StructType) gravitinoStructType) + .fields()[1].type()) + .fields()[0].nullable()); + Assertions.assertEquals( + structTypeInside.fields().get(1).isOptional(), + ((com.datastrato.gravitino.rel.types.Types.StructType) + ((com.datastrato.gravitino.rel.types.Types.StructType) gravitinoStructType) + .fields()[1].type()) + .fields()[1].nullable()); } @Test diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/lakehouse/iceberg/CatalogIcebergIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/lakehouse/iceberg/CatalogIcebergIT.java index 5c184a7fae2..5a136d5a55d 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/lakehouse/iceberg/CatalogIcebergIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/lakehouse/iceberg/CatalogIcebergIT.java @@ -87,6 +87,7 @@ public class CatalogIcebergIT extends AbstractIT { public static String ICEBERG_COL_NAME1 = "iceberg_col_name1"; public static String ICEBERG_COL_NAME2 = "iceberg_col_name2"; public static String ICEBERG_COL_NAME3 = "iceberg_col_name3"; + public static String ICEBERG_COL_NAME4 = "iceberg_col_name4"; private static final String provider = "lakehouse-iceberg"; private static final ContainerSuite containerSuite = ContainerSuite.getInstance(); @@ -233,7 +234,24 @@ private ColumnDTO[] createColumns() { .withDataType(Types.StringType.get()) .withComment("col_3_comment") .build(); - return new ColumnDTO[] {col1, col2, col3}; + Types.StructType structTypeInside = + Types.StructType.of( + Types.StructType.Field.notNullField("integer_field_inside", Types.IntegerType.get()), + Types.StructType.Field.notNullField( + "string_field_inside", Types.StringType.get(), "string field inside")); + Types.StructType structType = + Types.StructType.of( + Types.StructType.Field.notNullField("integer_field", Types.IntegerType.get()), + Types.StructType.Field.notNullField( + "string_field", Types.StringType.get(), "string field"), + Types.StructType.Field.nullableField("struct_field", structTypeInside, "struct field")); + ColumnDTO col4 = + new ColumnDTO.Builder() + .withName(ICEBERG_COL_NAME4) + .withDataType(structType) + .withComment("col_4_comment") + .build(); + return new ColumnDTO[] {col1, col2, col3, col4}; } private Map createProperties() { @@ -542,7 +560,7 @@ public void testAlterIcebergTable() { TableChange.updateComment(table_comment + "_new"), TableChange.removeProperty("key1"), TableChange.setProperty("key2", "val2_new"), - TableChange.addColumn(new String[] {"col_4"}, Types.StringType.get()), + TableChange.addColumn(new String[] {"col_5_for_add"}, Types.StringType.get()), TableChange.renameColumn(new String[] {ICEBERG_COL_NAME2}, "col_2_new"), TableChange.updateColumnComment(new String[] {ICEBERG_COL_NAME1}, "comment_new"), TableChange.updateColumnType( @@ -567,9 +585,13 @@ public void testAlterIcebergTable() { Assertions.assertEquals(Types.StringType.get(), table.columns()[2].dataType()); Assertions.assertEquals("col_3_comment", table.columns()[2].comment()); - Assertions.assertEquals("col_4", table.columns()[3].name()); - Assertions.assertEquals(Types.StringType.get(), table.columns()[3].dataType()); - Assertions.assertNull(table.columns()[3].comment()); + Assertions.assertEquals(ICEBERG_COL_NAME4, table.columns()[3].name()); + Assertions.assertEquals(columns[3].dataType(), table.columns()[3].dataType()); + Assertions.assertEquals("col_4_comment", table.columns()[3].comment()); + + Assertions.assertEquals("col_5_for_add", table.columns()[4].name()); + Assertions.assertEquals(Types.StringType.get(), table.columns()[4].dataType()); + Assertions.assertNull(table.columns()[4].comment()); Assertions.assertEquals(1, table.partitioning().length); Assertions.assertEquals( @@ -727,14 +749,22 @@ void testOperationDataIcebergTable() { TableIdentifier tableIdentifier = TableIdentifier.of(schemaName, testTableName); List values = new ArrayList<>(); for (int i = 1; i < 5; i++) { - values.add( + String structValue = String.format( - "(%s, %s, %s)", i, "date_sub(current_date(), " + i + ")", "'data" + i + "'")); + "STRUCT(%d, 'string%d', %s)", + i * 10, // integer_field + i, // string_field + String.format( + "STRUCT(%d, 'inner%d')", + i, i) // struct_field, alternating NULL and non-NULL values + ); + values.add( + String.format("(%d, date_sub(current_date(), %d), 'data%d', %s)", i, i, i, structValue)); } // insert data String insertSQL = String.format( - INSERT_BATCH_WITHOUT_PARTITION_TEMPLATE, tableIdentifier, String.join(",", values)); + INSERT_BATCH_WITHOUT_PARTITION_TEMPLATE, tableIdentifier, String.join(", ", values)); spark.sql(insertSQL); // select data @@ -746,7 +776,9 @@ void testOperationDataIcebergTable() { for (int i = 0; i < result.length; i++) { LocalDate previousDay = currentDate.minusDays(i + 1); Assertions.assertEquals( - String.format("[%s,%s,data%s]", i + 1, previousDay.format(formatter), i + 1), + String.format( + "[%s,%s,data%s,[%s,string%s,[%s,inner%s]]]", + i + 1, previousDay.format(formatter), i + 1, (i + 1) * 10, i + 1, i + 1, i + 1), result[i].toString()); } @@ -762,12 +794,16 @@ void testOperationDataIcebergTable() { if (i == result.length - 1) { LocalDate previousDay = currentDate.minusDays(1); Assertions.assertEquals( - String.format("[100,%s,data%s]", previousDay.format(formatter), 1), + String.format( + "[100,%s,data%s,[%s,string%s,[%s,inner%s]]]", + previousDay.format(formatter), 1, 10, 1, 1, 1), result[i].toString()); } else { LocalDate previousDay = currentDate.minusDays(i + 2); Assertions.assertEquals( - String.format("[%s,%s,data%s]", i + 2, previousDay.format(formatter), i + 2), + String.format( + "[%s,%s,data%s,[%s,string%s,[%s,inner%s]]]", + i + 2, previousDay.format(formatter), i + 2, (i + 2) * 10, i + 2, i + 2, i + 2), result[i].toString()); } } @@ -780,7 +816,9 @@ void testOperationDataIcebergTable() { for (int i = 0; i < result.length; i++) { LocalDate previousDay = currentDate.minusDays(i + 2); Assertions.assertEquals( - String.format("[%s,%s,data%s]", i + 2, previousDay.format(formatter), i + 2), + String.format( + "[%s,%s,data%s,[%s,string%s,[%s,inner%s]]]", + i + 2, previousDay.format(formatter), i + 2, (i + 2) * 10, i + 2, i + 2, i + 2), result[i].toString()); } } From 25e68f3a8d10fd220411e7444e39ef7d33dbafdd Mon Sep 17 00:00:00 2001 From: teo Date: Mon, 5 Feb 2024 18:59:25 +0800 Subject: [PATCH 3/5] use structType for ConvertUtil instead of icebergtable --- .../iceberg/converter/ConvertUtil.java | 44 +++++++++++-- .../iceberg/converter/ToIcebergType.java | 43 ++++++------- .../converter/ToIcebergTypeVisitor.java | 61 +++++++------------ 3 files changed, 82 insertions(+), 66 deletions(-) diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/ConvertUtil.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/ConvertUtil.java index c01c416e503..5d5a2371953 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/ConvertUtil.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/ConvertUtil.java @@ -6,6 +6,7 @@ import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergColumn; import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTable; +import java.util.Arrays; import org.apache.iceberg.Schema; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.TypeUtil; @@ -17,10 +18,25 @@ public class ConvertUtil { * Convert the Iceberg Table to the corresponding schema information in the Iceberg. * * @param icebergTable Iceberg table. - * @return iceberg schema. + * @return Iceberg schema. */ public static Schema toIcebergSchema(IcebergTable icebergTable) { - Type converted = ToIcebergTypeVisitor.visit(icebergTable, new ToIcebergType(icebergTable)); + com.datastrato.gravitino.rel.types.Types.StructType gravitinoStructType = + toGravitinoStructType(icebergTable); + Type converted = + ToIcebergTypeVisitor.visit(gravitinoStructType, new ToIcebergType(gravitinoStructType)); + return new Schema(converted.asNestedType().asStructType().fields()); + } + + /** + * Convert the Gravitino StructType to the corresponding schema information in the Iceberg. + * + * @param gravitinoType Gravitino StructType + * @return Iceberg schema. + */ + public static Schema toIcebergSchema( + com.datastrato.gravitino.rel.types.Types.StructType gravitinoType) { + Type converted = ToIcebergTypeVisitor.visit(gravitinoType, new ToIcebergType(gravitinoType)); return new Schema(converted.asNestedType().asStructType().fields()); } @@ -50,7 +66,7 @@ public static com.datastrato.gravitino.rel.types.Type formIcebergType(Type type) * Convert the nested field of Iceberg to the Iceberg column. * * @param nestedField Iceberg nested field. - * @return + * @return Gravitino iceberg column */ public static IcebergColumn fromNestedField(Types.NestedField nestedField) { return new IcebergColumn.Builder() @@ -63,11 +79,11 @@ public static IcebergColumn fromNestedField(Types.NestedField nestedField) { } /** - * Convert the Gravitino field to the Iceberg column.t + * Convert the Gravitino field to the Iceberg Gravitino column * * @param field Gravitino field. * @param id Gravitino field id. - * @return + * @return Gravitino iceberg table column */ public static IcebergColumn fromGravitinoField( com.datastrato.gravitino.rel.types.Types.StructType.Field field, int id) { @@ -79,4 +95,22 @@ public static IcebergColumn fromGravitinoField( .withType(field.type()) .build(); } + + /** + * Convert the Gravitino iceberg table to the Gravitino StructType + * + * @param icebergTable Gravitino iceberg table + * @return Gravitino StructType + */ + public static com.datastrato.gravitino.rel.types.Types.StructType toGravitinoStructType( + IcebergTable icebergTable) { + com.datastrato.gravitino.rel.types.Types.StructType.Field[] fields = + Arrays.stream(icebergTable.columns()) + .map( + column -> + com.datastrato.gravitino.rel.types.Types.StructType.Field.of( + column.name(), column.dataType(), column.nullable(), column.comment())) + .toArray(com.datastrato.gravitino.rel.types.Types.StructType.Field[]::new); + return com.datastrato.gravitino.rel.types.Types.StructType.of(fields); + } } diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/ToIcebergType.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/ToIcebergType.java index 2a7f964f029..e04ffeede44 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/ToIcebergType.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/ToIcebergType.java @@ -4,12 +4,8 @@ */ package com.datastrato.gravitino.catalog.lakehouse.iceberg.converter; -import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergColumn; -import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTable; import com.google.common.collect.Lists; -import java.util.Arrays; import java.util.List; -import java.util.stream.Collectors; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; @@ -19,7 +15,7 @@ *

Referred from core/src/main/java/org/apache/iceberg/spark/SparkTypeToType.java */ public class ToIcebergType extends ToIcebergTypeVisitor { - private final IcebergTable root; + private final com.datastrato.gravitino.rel.types.Types.StructType root; private int nextId = 0; private boolean nullable; @@ -28,10 +24,10 @@ public ToIcebergType(boolean nullable) { this.nullable = nullable; } - public ToIcebergType(IcebergTable root) { + public ToIcebergType(com.datastrato.gravitino.rel.types.Types.StructType root) { this.root = root; // the root struct's fields use the first ids - this.nextId = root.columns().length; + this.nextId = root.fields().length; } private int getNextId() { @@ -39,31 +35,36 @@ private int getNextId() { } @Override - public Type struct(IcebergTable struct, List types) { - List fields = - Arrays.stream(struct.columns()) - .map(column -> (IcebergColumn) column) - .collect(Collectors.toList()); - List newFields = Lists.newArrayListWithExpectedSize(fields.size()); + public Type struct(com.datastrato.gravitino.rel.types.Types.StructType struct, List types) { + com.datastrato.gravitino.rel.types.Types.StructType.Field[] fields = struct.fields(); + List newFields = Lists.newArrayListWithExpectedSize(fields.length); boolean isRoot = root == struct; - - for (int i = 0; i < fields.size(); i += 1) { - IcebergColumn field = fields.get(i); + for (int i = 0; i < fields.length; i += 1) { + com.datastrato.gravitino.rel.types.Types.StructType.Field field = fields[i]; Type type = types.get(i); - // for new conversions, use ordinals for ids in the root struct - int id = isRoot ? i : getNextId(); + int id; + if (isRoot) { + // for new conversions, use ordinals for ids in the root struct + id = i; + } else { + id = getNextId(); + } + + String doc = field.comment(); + if (field.nullable()) { - newFields.add(Types.NestedField.optional(id, field.name(), type, field.comment())); + newFields.add(Types.NestedField.optional(id, field.name(), type, doc)); } else { - newFields.add(Types.NestedField.required(id, field.name(), type, field.comment())); + newFields.add(Types.NestedField.required(id, field.name(), type, doc)); } } return Types.StructType.of(newFields); } @Override - public Type field(IcebergColumn field, Type typeResult) { + public Type field( + com.datastrato.gravitino.rel.types.Types.StructType.Field field, Type typeResult) { return typeResult; } diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/ToIcebergTypeVisitor.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/ToIcebergTypeVisitor.java index 37cb671b609..70dc680e3b2 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/ToIcebergTypeVisitor.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/ToIcebergTypeVisitor.java @@ -4,15 +4,11 @@ */ package com.datastrato.gravitino.catalog.lakehouse.iceberg.converter; -import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergColumn; import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTable; -import com.datastrato.gravitino.rel.Column; import com.datastrato.gravitino.rel.types.Type; import com.datastrato.gravitino.rel.types.Types; import com.google.common.collect.Lists; -import java.util.Arrays; import java.util.List; -import java.util.stream.Collectors; /** * Type converter belonging to gravitino. @@ -22,51 +18,31 @@ public class ToIcebergTypeVisitor { /** - * Traverse the gravitino table and convert the fields into iceberg fields. + * Traverse the Gravitino data type and convert the fields into iceberg fields. * - * @param table iceberg table. - * @param visitor - * @param - * @return - */ - public static T visit(IcebergTable table, ToIcebergTypeVisitor visitor) { - Column[] columns = table.columns(); - List fieldResults = Lists.newArrayListWithExpectedSize(columns.length); - - for (Column field : columns) { - fieldResults.add(visitor.field((IcebergColumn) field, visit(field.dataType(), visitor))); - } - return visitor.struct(table, fieldResults); - } - - /** - * Convert the type mapping of gravitino to Iceberg. - * - * @param type TODO Abstract a data type in a gravitino. - * @param visitor - * @return - * @param + * @param type Gravitino a data type in a gravitino. + * @param visitor Visitor of iceberg type + * @param Iceberg type + * @return Iceberg type */ public static T visit(Type type, ToIcebergTypeVisitor visitor) { if (type instanceof Types.MapType) { Types.MapType map = (Types.MapType) type; return visitor.map(map, visit(map.keyType(), visitor), visit(map.valueType(), visitor)); + } else if (type instanceof Types.ListType) { Types.ListType list = (Types.ListType) type; return visitor.array(list, visit(list.elementType(), visitor)); + } else if (type instanceof Types.StructType) { - Types.StructType struct = (Types.StructType) type; - Types.StructType.Field[] fields = struct.fields(); - List columns = - Arrays.stream(fields) - .map( - field -> { - return ConvertUtil.fromGravitinoField(field, 0); - }) - .collect(Collectors.toList()); - IcebergTable mockTable = - new IcebergTable.Builder().withColumns(columns.toArray(new IcebergColumn[0])).build(); - return visit(mockTable, visitor); + Types.StructType.Field[] fields = ((Types.StructType) type).fields(); + List fieldResults = Lists.newArrayListWithExpectedSize(fields.length); + for (Types.StructType.Field field : fields) { + fieldResults.add(visitor.field(field, visit(field.type(), visitor))); + } + return visitor.struct( + (com.datastrato.gravitino.rel.types.Types.StructType) type, fieldResults); + } else { return visitor.atomic((Type.PrimitiveType) type); } @@ -76,7 +52,12 @@ public T struct(IcebergTable struct, List fieldResults) { throw new UnsupportedOperationException(); } - public T field(IcebergColumn field, T typeResult) { + public T struct( + com.datastrato.gravitino.rel.types.Types.StructType struct, List fieldResults) { + throw new UnsupportedOperationException(); + } + + public T field(Types.StructType.Field field, T typeResult) { throw new UnsupportedOperationException(); } From 0eff76e60b6fc7f4f7956d42ee9b5d339ea10c31 Mon Sep 17 00:00:00 2001 From: teo Date: Mon, 5 Feb 2024 20:18:10 +0800 Subject: [PATCH 4/5] modify the comment and drop unneccessary fun --- .../iceberg/converter/ConvertUtil.java | 34 ++----------------- .../converter/ToIcebergTypeVisitor.java | 7 ++-- 2 files changed, 4 insertions(+), 37 deletions(-) diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/ConvertUtil.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/ConvertUtil.java index 5d5a2371953..17a3fec3931 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/ConvertUtil.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/ConvertUtil.java @@ -17,7 +17,7 @@ public class ConvertUtil { /** * Convert the Iceberg Table to the corresponding schema information in the Iceberg. * - * @param icebergTable Iceberg table. + * @param icebergTable gravitinoTable. * @return Iceberg schema. */ public static Schema toIcebergSchema(IcebergTable icebergTable) { @@ -28,18 +28,6 @@ public static Schema toIcebergSchema(IcebergTable icebergTable) { return new Schema(converted.asNestedType().asStructType().fields()); } - /** - * Convert the Gravitino StructType to the corresponding schema information in the Iceberg. - * - * @param gravitinoType Gravitino StructType - * @return Iceberg schema. - */ - public static Schema toIcebergSchema( - com.datastrato.gravitino.rel.types.Types.StructType gravitinoType) { - Type converted = ToIcebergTypeVisitor.visit(gravitinoType, new ToIcebergType(gravitinoType)); - return new Schema(converted.asNestedType().asStructType().fields()); - } - /** * Convert the Gravitino type to the Iceberg type. * @@ -78,31 +66,13 @@ public static IcebergColumn fromNestedField(Types.NestedField nestedField) { .build(); } - /** - * Convert the Gravitino field to the Iceberg Gravitino column - * - * @param field Gravitino field. - * @param id Gravitino field id. - * @return Gravitino iceberg table column - */ - public static IcebergColumn fromGravitinoField( - com.datastrato.gravitino.rel.types.Types.StructType.Field field, int id) { - return new IcebergColumn.Builder() - .withId(id) - .withName(field.name()) - .withNullable(field.nullable()) - .withComment(field.comment()) - .withType(field.type()) - .build(); - } - /** * Convert the Gravitino iceberg table to the Gravitino StructType * * @param icebergTable Gravitino iceberg table * @return Gravitino StructType */ - public static com.datastrato.gravitino.rel.types.Types.StructType toGravitinoStructType( + private static com.datastrato.gravitino.rel.types.Types.StructType toGravitinoStructType( IcebergTable icebergTable) { com.datastrato.gravitino.rel.types.Types.StructType.Field[] fields = Arrays.stream(icebergTable.columns()) diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/ToIcebergTypeVisitor.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/ToIcebergTypeVisitor.java index 70dc680e3b2..d356d17d069 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/ToIcebergTypeVisitor.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/ToIcebergTypeVisitor.java @@ -18,10 +18,10 @@ public class ToIcebergTypeVisitor { /** - * Traverse the Gravitino data type and convert the fields into iceberg fields. + * Traverse the Gravitino data type and convert the fields into Iceberg fields. * * @param type Gravitino a data type in a gravitino. - * @param visitor Visitor of iceberg type + * @param visitor Visitor of Iceberg type * @param Iceberg type * @return Iceberg type */ @@ -29,11 +29,9 @@ public static T visit(Type type, ToIcebergTypeVisitor visitor) { if (type instanceof Types.MapType) { Types.MapType map = (Types.MapType) type; return visitor.map(map, visit(map.keyType(), visitor), visit(map.valueType(), visitor)); - } else if (type instanceof Types.ListType) { Types.ListType list = (Types.ListType) type; return visitor.array(list, visit(list.elementType(), visitor)); - } else if (type instanceof Types.StructType) { Types.StructType.Field[] fields = ((Types.StructType) type).fields(); List fieldResults = Lists.newArrayListWithExpectedSize(fields.length); @@ -42,7 +40,6 @@ public static T visit(Type type, ToIcebergTypeVisitor visitor) { } return visitor.struct( (com.datastrato.gravitino.rel.types.Types.StructType) type, fieldResults); - } else { return visitor.atomic((Type.PrimitiveType) type); } From 786d877e179064c3f815f6c1893505ed6120fcbe Mon Sep 17 00:00:00 2001 From: teo Date: Mon, 5 Feb 2024 21:23:40 +0800 Subject: [PATCH 5/5] update doc and modify comment --- .../catalog/lakehouse/iceberg/converter/ConvertUtil.java | 6 +++--- docs/lakehouse-iceberg-catalog.md | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/ConvertUtil.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/ConvertUtil.java index 17a3fec3931..2b98532d1e8 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/ConvertUtil.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/ConvertUtil.java @@ -17,12 +17,12 @@ public class ConvertUtil { /** * Convert the Iceberg Table to the corresponding schema information in the Iceberg. * - * @param icebergTable gravitinoTable. + * @param gravitinoTable Gravitino table of Iceberg. * @return Iceberg schema. */ - public static Schema toIcebergSchema(IcebergTable icebergTable) { + public static Schema toIcebergSchema(IcebergTable gravitinoTable) { com.datastrato.gravitino.rel.types.Types.StructType gravitinoStructType = - toGravitinoStructType(icebergTable); + toGravitinoStructType(gravitinoTable); Type converted = ToIcebergTypeVisitor.visit(gravitinoStructType, new ToIcebergType(gravitinoStructType)); return new Schema(converted.asNestedType().asStructType().fields()); diff --git a/docs/lakehouse-iceberg-catalog.md b/docs/lakehouse-iceberg-catalog.md index 2a8edc9800f..22189fba3b9 100644 --- a/docs/lakehouse-iceberg-catalog.md +++ b/docs/lakehouse-iceberg-catalog.md @@ -195,6 +195,7 @@ Apache Iceberg doesn't support Gravitino `EvenDistribution` type. | Gravitino Type | Apache Iceberg Type | |-----------------------------|-----------------------------| +| `Sturct` | `Struct` | | `Map` | `Map` | | `Array` | `Array` | | `Boolean` | `Boolean` | @@ -213,7 +214,7 @@ Apache Iceberg doesn't support Gravitino `EvenDistribution` type. | `UUID` | `UUID` | :::info -Apache Iceberg doesn't support Gravitino `Struct` `Varchar` `Fixedchar` `Byte` `Short` `Union` type. +Apache Iceberg doesn't support Gravitino `Varchar` `Fixedchar` `Byte` `Short` `Union` type. ::: ### Table properties @@ -259,7 +260,6 @@ Supports operations: :::info The default column position is `LAST` when you add a column. If you add a non nullability column, there may be compatibility issues. -Iceberg just supports updating primitive types. ::: :::caution