Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1313][#1471] feat(iceberg): Support struct column for iceberg #1721

Merged
merged 6 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergColumn;
import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTable;
import java.util.Arrays;
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
Expand All @@ -16,11 +17,14 @@ public class ConvertUtil {
/**
* Convert the Iceberg Table to the corresponding schema information in the Iceberg.
*
* @param icebergTable Iceberg table.
* @return iceberg schema.
* @param gravitinoTable Gravitino table of Iceberg.
* @return Iceberg schema.
*/
public static Schema toIcebergSchema(IcebergTable icebergTable) {
Type converted = ToIcebergTypeVisitor.visit(icebergTable, new ToIcebergType(icebergTable));
public static Schema toIcebergSchema(IcebergTable gravitinoTable) {
com.datastrato.gravitino.rel.types.Types.StructType gravitinoStructType =
toGravitinoStructType(gravitinoTable);
Type converted =
ToIcebergTypeVisitor.visit(gravitinoStructType, new ToIcebergType(gravitinoStructType));
return new Schema(converted.asNestedType().asStructType().fields());
}

Expand Down Expand Up @@ -50,7 +54,7 @@ public static com.datastrato.gravitino.rel.types.Type formIcebergType(Type type)
* Convert the nested field of Iceberg to the Iceberg column.
*
* @param nestedField Iceberg nested field.
* @return
* @return Gravitino iceberg column
*/
public static IcebergColumn fromNestedField(Types.NestedField nestedField) {
return new IcebergColumn.Builder()
Expand All @@ -61,4 +65,22 @@ public static IcebergColumn fromNestedField(Types.NestedField nestedField) {
.withType(ConvertUtil.formIcebergType(nestedField.type()))
.build();
}

/**
* Convert the Gravitino iceberg table to the Gravitino StructType
*
* @param icebergTable Gravitino iceberg table
* @return Gravitino StructType
*/
private static com.datastrato.gravitino.rel.types.Types.StructType toGravitinoStructType(
IcebergTable icebergTable) {
com.datastrato.gravitino.rel.types.Types.StructType.Field[] fields =
Arrays.stream(icebergTable.columns())
.map(
column ->
com.datastrato.gravitino.rel.types.Types.StructType.Field.of(
column.name(), column.dataType(), column.nullable(), column.comment()))
.toArray(com.datastrato.gravitino.rel.types.Types.StructType.Field[]::new);
return com.datastrato.gravitino.rel.types.Types.StructType.of(fields);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package com.datastrato.gravitino.catalog.lakehouse.iceberg.converter;

import com.datastrato.gravitino.rel.types.Type;
import java.util.ArrayList;
import java.util.List;
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.TypeUtil;
Expand All @@ -26,7 +27,20 @@ public Type schema(Schema schema, Type structType) {

@Override
public Type struct(Types.StructType struct, List<Type> fieldResults) {
throw new UnsupportedOperationException("Data conversion of struct type is not supported");
List<com.datastrato.gravitino.rel.types.Types.StructType.Field> fieldsList = new ArrayList<>();
List<Types.NestedField> originalFields = struct.fields();

for (int i = 0; i < originalFields.size(); i++) {
Types.NestedField nestedField = originalFields.get(i);
fieldsList.add(
com.datastrato.gravitino.rel.types.Types.StructType.Field.of(
nestedField.name(),
fieldResults.get(i),
nestedField.isOptional(),
nestedField.doc()));
}
return com.datastrato.gravitino.rel.types.Types.StructType.of(
fieldsList.toArray(new com.datastrato.gravitino.rel.types.Types.StructType.Field[0]));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,8 @@
*/
package com.datastrato.gravitino.catalog.lakehouse.iceberg.converter;

import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergColumn;
import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTable;
import com.google.common.collect.Lists;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;

Expand All @@ -19,7 +15,7 @@
* <p>Referred from core/src/main/java/org/apache/iceberg/spark/SparkTypeToType.java
*/
public class ToIcebergType extends ToIcebergTypeVisitor<Type> {
private final IcebergTable root;
private final com.datastrato.gravitino.rel.types.Types.StructType root;
private int nextId = 0;
private boolean nullable;

Expand All @@ -28,42 +24,47 @@ public ToIcebergType(boolean nullable) {
this.nullable = nullable;
}

public ToIcebergType(IcebergTable root) {
public ToIcebergType(com.datastrato.gravitino.rel.types.Types.StructType root) {
this.root = root;
// the root struct's fields use the first ids
this.nextId = root.columns().length;
this.nextId = root.fields().length;
}

private int getNextId() {
return nextId++;
}

@Override
public Type struct(IcebergTable struct, List<Type> types) {
List<IcebergColumn> fields =
Arrays.stream(struct.columns())
.map(column -> (IcebergColumn) column)
.collect(Collectors.toList());
List<Types.NestedField> newFields = Lists.newArrayListWithExpectedSize(fields.size());
public Type struct(com.datastrato.gravitino.rel.types.Types.StructType struct, List<Type> types) {
com.datastrato.gravitino.rel.types.Types.StructType.Field[] fields = struct.fields();
List<Types.NestedField> newFields = Lists.newArrayListWithExpectedSize(fields.length);
boolean isRoot = root == struct;

for (int i = 0; i < fields.size(); i += 1) {
IcebergColumn field = fields.get(i);
for (int i = 0; i < fields.length; i += 1) {
com.datastrato.gravitino.rel.types.Types.StructType.Field field = fields[i];
Type type = types.get(i);

// for new conversions, use ordinals for ids in the root struct
int id = isRoot ? i : getNextId();
int id;
if (isRoot) {
// for new conversions, use ordinals for ids in the root struct
id = i;
} else {
id = getNextId();
}

String doc = field.comment();

if (field.nullable()) {
newFields.add(Types.NestedField.optional(id, field.name(), type, field.comment()));
newFields.add(Types.NestedField.optional(id, field.name(), type, doc));
} else {
newFields.add(Types.NestedField.required(id, field.name(), type, field.comment()));
newFields.add(Types.NestedField.required(id, field.name(), type, doc));
}
}
return Types.StructType.of(newFields);
}

@Override
public Type field(IcebergColumn field, Type typeResult) {
public Type field(
com.datastrato.gravitino.rel.types.Types.StructType.Field field, Type typeResult) {
return typeResult;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@
*/
package com.datastrato.gravitino.catalog.lakehouse.iceberg.converter;

import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergColumn;
import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTable;
import com.datastrato.gravitino.rel.Column;
import com.datastrato.gravitino.rel.types.Type;
import com.datastrato.gravitino.rel.types.Types;
import com.google.common.collect.Lists;
Expand All @@ -20,30 +18,12 @@
public class ToIcebergTypeVisitor<T> {

/**
* Traverse the gravitino table and convert the fields into iceberg fields.
* Traverse the Gravitino data type and convert the fields into Iceberg fields.
*
* @param table iceberg table.
* @param visitor
* @param <T>
* @return
*/
public static <T> T visit(IcebergTable table, ToIcebergTypeVisitor<T> visitor) {
Column[] columns = table.columns();
List<T> fieldResults = Lists.newArrayListWithExpectedSize(columns.length);

for (Column field : columns) {
fieldResults.add(visitor.field((IcebergColumn) field, visit(field.dataType(), visitor)));
}
return visitor.struct(table, fieldResults);
}

/**
* Convert the type mapping of gravitino to Iceberg.
*
* @param type TODO Abstract a data type in a gravitino.
* @param visitor
* @return
* @param <T>
* @param type Gravitino a data type in a gravitino.
* @param visitor Visitor of Iceberg type
* @param <T> Iceberg type
* @return Iceberg type
*/
public static <T> T visit(Type type, ToIcebergTypeVisitor<T> visitor) {
if (type instanceof Types.MapType) {
Expand All @@ -52,6 +32,14 @@ public static <T> T visit(Type type, ToIcebergTypeVisitor<T> visitor) {
} else if (type instanceof Types.ListType) {
Types.ListType list = (Types.ListType) type;
return visitor.array(list, visit(list.elementType(), visitor));
} else if (type instanceof Types.StructType) {
Types.StructType.Field[] fields = ((Types.StructType) type).fields();
List<T> fieldResults = Lists.newArrayListWithExpectedSize(fields.length);
for (Types.StructType.Field field : fields) {
fieldResults.add(visitor.field(field, visit(field.type(), visitor)));
}
return visitor.struct(
(com.datastrato.gravitino.rel.types.Types.StructType) type, fieldResults);
} else {
return visitor.atomic((Type.PrimitiveType) type);
}
Expand All @@ -61,7 +49,12 @@ public T struct(IcebergTable struct, List<T> fieldResults) {
throw new UnsupportedOperationException();
}

public T field(IcebergColumn field, T typeResult) {
public T struct(
com.datastrato.gravitino.rel.types.Types.StructType struct, List<T> fieldResults) {
throw new UnsupportedOperationException();
}

public T field(Types.StructType.Field field, T typeResult) {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,25 @@ public void testCreateIcebergTable() {
.withComment(ICEBERG_COMMENT)
.withNullable(false)
.build();
Column[] columns = new Column[] {col1, col2};
Types.StructType structTypeInside =
Types.StructType.of(
Types.StructType.Field.notNullField("integer_field_inside", Types.IntegerType.get()),
Types.StructType.Field.notNullField(
"string_field_inside", Types.StringType.get(), "string field inside"));
Types.StructType structType =
Types.StructType.of(
Types.StructType.Field.notNullField("integer_field", Types.IntegerType.get()),
Types.StructType.Field.notNullField(
"string_field", Types.StringType.get(), "string field"),
Types.StructType.Field.nullableField("struct_field", structTypeInside, "struct field"));
IcebergColumn col3 =
new IcebergColumn.Builder()
.withName("col_3")
.withType(structType)
.withComment(ICEBERG_COMMENT)
.withNullable(false)
.build();
Column[] columns = new Column[] {col1, col2, col3};

SortOrder[] sortOrders = createSortOrder();
Table table =
Expand All @@ -166,6 +184,7 @@ public void testCreateIcebergTable() {
Assertions.assertEquals("val2", loadedTable.properties().get("key2"));
Assertions.assertTrue(loadedTable.columns()[0].nullable());
Assertions.assertFalse(loadedTable.columns()[1].nullable());
Assertions.assertFalse(loadedTable.columns()[2].nullable());

Assertions.assertTrue(icebergCatalog.asTableCatalog().tableExists(tableIdentifier));
NameIdentifier[] tableIdents =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,101 @@ public void testFormIcebergType() {
Assertions.assertTrue(
((com.datastrato.gravitino.rel.types.Types.ListType) gravitinoListType).elementType()
instanceof com.datastrato.gravitino.rel.types.Types.StringType);

Types.StructType structTypeInside =
Types.StructType.of(
Types.NestedField.optional(
2, "integer_type_inside", Types.IntegerType.get(), "integer type"),
Types.NestedField.optional(
3, "string_type_inside", Types.StringType.get(), "string type"));
Types.StructType structType =
Types.StructType.of(
Types.NestedField.optional(0, "integer_type", Types.IntegerType.get(), "integer type"),
Types.NestedField.optional(1, "struct_type", structTypeInside, "struct type inside"));
com.datastrato.gravitino.rel.types.Type gravitinoStructType =
ConvertUtil.formIcebergType(structType);
// check for type
Assertions.assertTrue(
(gravitinoStructType) instanceof com.datastrato.gravitino.rel.types.Types.StructType);
Assertions.assertTrue(
((com.datastrato.gravitino.rel.types.Types.StructType) gravitinoStructType)
.fields()[0].type()
instanceof com.datastrato.gravitino.rel.types.Types.IntegerType);
Assertions.assertTrue(
((com.datastrato.gravitino.rel.types.Types.StructType)
((com.datastrato.gravitino.rel.types.Types.StructType) gravitinoStructType)
.fields()[1].type())
.fields()[0].type()
instanceof com.datastrato.gravitino.rel.types.Types.IntegerType);
Assertions.assertTrue(
((com.datastrato.gravitino.rel.types.Types.StructType)
((com.datastrato.gravitino.rel.types.Types.StructType) gravitinoStructType)
.fields()[1].type())
.fields()[1].type()
instanceof com.datastrato.gravitino.rel.types.Types.StringType);
// check for name
Assertions.assertEquals(
structType.fields().get(0).name(),
((com.datastrato.gravitino.rel.types.Types.StructType) gravitinoStructType)
.fields()[0].name());
Assertions.assertEquals(
structType.fields().get(1).name(),
((com.datastrato.gravitino.rel.types.Types.StructType) gravitinoStructType)
.fields()[1].name());
Assertions.assertEquals(
structTypeInside.fields().get(0).name(),
((com.datastrato.gravitino.rel.types.Types.StructType)
((com.datastrato.gravitino.rel.types.Types.StructType) gravitinoStructType)
.fields()[1].type())
.fields()[0].name());
Assertions.assertEquals(
structTypeInside.fields().get(1).name(),
((com.datastrato.gravitino.rel.types.Types.StructType)
((com.datastrato.gravitino.rel.types.Types.StructType) gravitinoStructType)
.fields()[1].type())
.fields()[1].name());
// check for comment
Assertions.assertEquals(
structType.fields().get(0).doc(),
((com.datastrato.gravitino.rel.types.Types.StructType) gravitinoStructType)
.fields()[0].comment());
Assertions.assertEquals(
structType.fields().get(1).doc(),
((com.datastrato.gravitino.rel.types.Types.StructType) gravitinoStructType)
.fields()[1].comment());
Assertions.assertEquals(
structTypeInside.fields().get(0).doc(),
((com.datastrato.gravitino.rel.types.Types.StructType)
((com.datastrato.gravitino.rel.types.Types.StructType) gravitinoStructType)
.fields()[1].type())
.fields()[0].comment());
Assertions.assertEquals(
structTypeInside.fields().get(1).doc(),
((com.datastrato.gravitino.rel.types.Types.StructType)
((com.datastrato.gravitino.rel.types.Types.StructType) gravitinoStructType)
.fields()[1].type())
.fields()[1].comment());
// check for nullable
Assertions.assertEquals(
structType.fields().get(0).isOptional(),
((com.datastrato.gravitino.rel.types.Types.StructType) gravitinoStructType)
.fields()[0].nullable());
Assertions.assertEquals(
structType.fields().get(1).isOptional(),
((com.datastrato.gravitino.rel.types.Types.StructType) gravitinoStructType)
.fields()[1].nullable());
Assertions.assertEquals(
structTypeInside.fields().get(0).isOptional(),
((com.datastrato.gravitino.rel.types.Types.StructType)
((com.datastrato.gravitino.rel.types.Types.StructType) gravitinoStructType)
.fields()[1].type())
.fields()[0].nullable());
Assertions.assertEquals(
structTypeInside.fields().get(1).isOptional(),
((com.datastrato.gravitino.rel.types.Types.StructType)
((com.datastrato.gravitino.rel.types.Types.StructType) gravitinoStructType)
.fields()[1].type())
.fields()[1].nullable());
}

@Test
Expand Down
Loading
Loading