From aa2cf9f3af870d963c8fa597aa73b4f5cd387697 Mon Sep 17 00:00:00 2001 From: Fokko Date: Tue, 12 Nov 2024 23:05:21 +0100 Subject: [PATCH 1/2] Replace dropped field with `AnyType` Fixes #4563 --- .../org/apache/iceberg/PartitionSpec.java | 9 ++++++++- .../java/org/apache/iceberg/types/Type.java | 3 ++- .../java/org/apache/iceberg/types/Types.java | 19 +++++++++++++++++++ .../apache/iceberg/avro/AvroSchemaUtil.java | 2 ++ .../org/apache/iceberg/avro/TypeToSchema.java | 4 ++++ .../TestAlterTablePartitionFields.java | 14 ++++++++++++++ 6 files changed, 49 insertions(+), 2 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/PartitionSpec.java b/api/src/main/java/org/apache/iceberg/PartitionSpec.java index 9b74893f1831..d18cd47fe03a 100644 --- a/api/src/main/java/org/apache/iceberg/PartitionSpec.java +++ b/api/src/main/java/org/apache/iceberg/PartitionSpec.java @@ -130,7 +130,14 @@ public StructType partitionType() { for (PartitionField field : fields) { Type sourceType = schema.findType(field.sourceId()); - Type resultType = field.transform().getResultType(sourceType); + + final Type resultType; + if (sourceType == null) { + resultType = Types.AnyType.get(); + } else { + resultType = field.transform().getResultType(sourceType); + } + structFields.add(Types.NestedField.optional(field.fieldId(), field.name(), resultType)); } diff --git a/api/src/main/java/org/apache/iceberg/types/Type.java b/api/src/main/java/org/apache/iceberg/types/Type.java index 571bf9a14e43..ec71cb0c898d 100644 --- a/api/src/main/java/org/apache/iceberg/types/Type.java +++ b/api/src/main/java/org/apache/iceberg/types/Type.java @@ -45,7 +45,8 @@ enum TypeID { DECIMAL(BigDecimal.class), STRUCT(StructLike.class), LIST(List.class), - MAP(Map.class); + MAP(Map.class), + ANY(Object.class); private final Class javaClass; diff --git a/api/src/main/java/org/apache/iceberg/types/Types.java b/api/src/main/java/org/apache/iceberg/types/Types.java index 4bb1674f3be5..df417830dbf6 100644 --- a/api/src/main/java/org/apache/iceberg/types/Types.java +++ b/api/src/main/java/org/apache/iceberg/types/Types.java @@ -55,6 +55,7 @@ private Types() {} .put(StringType.get().toString(), StringType.get()) .put(UUIDType.get().toString(), UUIDType.get()) .put(BinaryType.get().toString(), BinaryType.get()) + .put(AnyType.get().toString(), AnyType.get()) .buildOrThrow(); private static final Pattern FIXED = Pattern.compile("fixed\\[\\s*(\\d+)\\s*\\]"); @@ -412,6 +413,24 @@ public String toString() { } } + public static class AnyType extends PrimitiveType { + private static final AnyType INSTANCE = new AnyType(); + + public static AnyType get() { + return INSTANCE; + } + + @Override + public TypeID typeId() { + return TypeID.ANY; + } + + @Override + public String toString() { + return "any"; + } + } + public static class DecimalType extends PrimitiveType { public static DecimalType of(int precision, int scale) { return new DecimalType(precision, scale); diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java index 032d63105dfe..3e3f8d38e46c 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java @@ -188,6 +188,8 @@ static Schema toOption(Schema schema) { Preconditions.checkArgument( isOptionSchema(schema), "Union schemas are not supported: %s", schema); return schema; + } else if (schema.getType() == Schema.Type.NULL) { + return schema; } else { return Schema.createUnion(NULL, schema); } diff --git a/core/src/main/java/org/apache/iceberg/avro/TypeToSchema.java b/core/src/main/java/org/apache/iceberg/avro/TypeToSchema.java index 05ce4e618662..b129bbc595ca 100644 --- a/core/src/main/java/org/apache/iceberg/avro/TypeToSchema.java +++ b/core/src/main/java/org/apache/iceberg/avro/TypeToSchema.java @@ -49,6 +49,7 @@ abstract class TypeToSchema extends TypeUtil.SchemaVisitor { private static final Schema UUID_SCHEMA = LogicalTypes.uuid().addToSchema(Schema.createFixed("uuid_fixed", null, null, 16)); private static final Schema BINARY_SCHEMA = Schema.create(Schema.Type.BYTES); + private static final Schema NULL_SCHEMA = Schema.create(Schema.Type.NULL); static { TIMESTAMP_SCHEMA.addProp(AvroSchemaUtil.ADJUST_TO_UTC_PROP, false); @@ -243,6 +244,9 @@ public Schema primitive(Type.PrimitiveType primitive) { null, TypeUtil.decimalRequiredBytes(decimal.precision()))); break; + case ANY: + primitiveSchema = NULL_SCHEMA; + break; default: throw new UnsupportedOperationException("Unsupported type ID: " + primitive.typeId()); } diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java index 38e5c942c9ff..4c84e46399cd 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java @@ -436,6 +436,20 @@ public void testReplacePartitionAndRename() { .isEqualTo(expected); } + @TestTemplate + public void testDropPartitionAndUnderlyingField() { + sql( + "CREATE TABLE %s (col0 BIGINT, col1 BIGINT, col2 BIGINT) USING ICEBERG TBLPROPERTIES ('format-version' = %d, 'write.delete.mode' = 'merge-on-read')", + tableName, formatVersion); + sql("INSERT INTO %s VALUES (1, 11, 21)", tableName); + sql("ALTER TABLE %s ADD PARTITION FIELD col2", tableName); + sql("INSERT INTO %s VALUES (2, 12, 22)", tableName); + sql("ALTER TABLE %s DROP PARTITION FIELD col2", tableName); + sql("INSERT INTO %s VALUES (3, 13, 23)", tableName); + sql("ALTER TABLE %s DROP COLUMN col2", tableName); + sql("SELECT * FROM %s", tableName); + } + @TestTemplate public void testReplaceNamedPartition() { createTable("id bigint NOT NULL, category string, ts timestamp, data string"); From 4f356c9c0b52427651037bf3dfd5d947e9791d52 Mon Sep 17 00:00:00 2001 From: Fokko Date: Sun, 17 Nov 2024 08:39:55 +0100 Subject: [PATCH 2/2] Add AnyType to Spark --- .../main/java/org/apache/iceberg/spark/TypeToSparkType.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java index dfb9b30be603..792246d033cd 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java @@ -38,6 +38,7 @@ import org.apache.spark.sql.types.MapType$; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.MetadataBuilder; +import org.apache.spark.sql.types.NullType$; import org.apache.spark.sql.types.StringType$; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType$; @@ -124,6 +125,8 @@ public DataType primitive(Type.PrimitiveType primitive) { case DECIMAL: Types.DecimalType decimal = (Types.DecimalType) primitive; return DecimalType$.MODULE$.apply(decimal.precision(), decimal.scale()); + case ANY: + return NullType$.MODULE$; default: throw new UnsupportedOperationException( "Cannot convert unknown type to Spark: " + primitive);