From 261f239850ef87afcea0cf847ac28eb801f0a4c5 Mon Sep 17 00:00:00 2001 From: Christian Thiel Date: Mon, 9 Dec 2024 17:13:18 +0100 Subject: [PATCH 1/6] Remove bound partition spec --- crates/catalog/memory/src/catalog.rs | 6 +- crates/catalog/rest/src/catalog.rs | 4 +- crates/catalog/rest/src/types.rs | 4 +- crates/catalog/sql/src/catalog.rs | 7 +- crates/iceberg/src/catalog/mod.rs | 13 +- .../src/expr/visitors/expression_evaluator.rs | 134 +++---- .../visitors/inclusive_metrics_evaluator.rs | 15 +- .../src/expr/visitors/inclusive_projection.rs | 43 +-- crates/iceberg/src/spec/manifest.rs | 35 +- crates/iceberg/src/spec/partition.rs | 339 +++++------------- crates/iceberg/src/spec/table_metadata.rs | 122 +++---- .../src/spec/table_metadata_builder.rs | 73 ++-- crates/iceberg/src/transaction.rs | 6 +- .../writer/file_writer/location_generator.rs | 6 +- 14 files changed, 326 insertions(+), 481 deletions(-) diff --git a/crates/catalog/memory/src/catalog.rs b/crates/catalog/memory/src/catalog.rs index 3a4a4e3b4..c5a98391f 100644 --- a/crates/catalog/memory/src/catalog.rs +++ b/crates/catalog/memory/src/catalog.rs @@ -285,7 +285,7 @@ mod tests { use std::iter::FromIterator; use iceberg::io::FileIOBuilder; - use iceberg::spec::{BoundPartitionSpec, NestedField, PrimitiveType, Schema, SortOrder, Type}; + use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type}; use regex::Regex; use tempfile::TempDir; @@ -357,7 +357,7 @@ mod tests { assert_eq!(metadata.current_schema().as_ref(), expected_schema); - let expected_partition_spec = BoundPartitionSpec::builder((*expected_schema).clone()) + let expected_partition_spec = PartitionSpec::builder((*expected_schema).clone()) .with_spec_id(0) .build() .unwrap(); @@ -367,7 +367,7 @@ mod tests { .partition_specs_iter() .map(|p| p.as_ref()) .collect_vec(), - vec![&expected_partition_spec.into_schemaless()] + vec![&expected_partition_spec] ); let expected_sorted_order = SortOrder::builder() diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index fce5fe2be..e60dbf31f 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -702,7 +702,7 @@ mod tests { use iceberg::spec::{ FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot, SnapshotLog, SortDirection, SortField, SortOrder, Summary, Transform, Type, - UnboundPartitionField, UnboundPartitionSpec, + UnPartitionSpec, UnboundPartitionField, }; use iceberg::transaction::Transaction; use mockito::{Mock, Server, ServerGuard}; @@ -1488,7 +1488,7 @@ mod tests { ) .properties(HashMap::from([("owner".to_string(), "testx".to_string())])) .partition_spec( - UnboundPartitionSpec::builder() + UnPartitionSpec::builder() .add_partition_fields(vec![UnboundPartitionField::builder() .source_id(1) .transform(Transform::Truncate(3)) diff --git a/crates/catalog/rest/src/types.rs b/crates/catalog/rest/src/types.rs index 11833a562..815dfddae 100644 --- a/crates/catalog/rest/src/types.rs +++ b/crates/catalog/rest/src/types.rs @@ -17,7 +17,7 @@ use std::collections::HashMap; -use iceberg::spec::{Schema, SortOrder, TableMetadata, UnboundPartitionSpec}; +use iceberg::spec::{Schema, SortOrder, TableMetadata, UnPartitionSpec}; use iceberg::{ Error, ErrorKind, Namespace, NamespaceIdent, TableIdent, TableRequirement, TableUpdate, }; @@ -167,7 +167,7 @@ pub(super) struct CreateTableRequest { pub(super) name: String, pub(super) location: Option, pub(super) schema: Schema, - pub(super) partition_spec: Option, + pub(super) partition_spec: Option, pub(super) write_order: Option, pub(super) stage_create: Option, pub(super) properties: Option>, diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 1556614dd..51e2904f7 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -783,7 +783,7 @@ mod tests { use std::hash::Hash; use iceberg::io::FileIOBuilder; - use iceberg::spec::{BoundPartitionSpec, NestedField, PrimitiveType, Schema, SortOrder, Type}; + use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type}; use iceberg::table::Table; use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation, TableIdent}; use itertools::Itertools; @@ -876,11 +876,10 @@ mod tests { assert_eq!(metadata.current_schema().as_ref(), expected_schema); - let expected_partition_spec = BoundPartitionSpec::builder(expected_schema.clone()) + let expected_partition_spec = PartitionSpec::builder(expected_schema.clone()) .with_spec_id(0) .build() - .unwrap() - .into_schemaless(); + .unwrap(); assert_eq!( metadata diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index b897d1574..5d342eb6f 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -30,8 +30,7 @@ use uuid::Uuid; use crate::spec::{ FormatVersion, Schema, SchemaId, Snapshot, SnapshotReference, SortOrder, TableMetadata, - TableMetadataBuilder, UnboundPartitionSpec, ViewFormatVersion, ViewRepresentations, - ViewVersion, + TableMetadataBuilder, UnPartitionSpec, ViewFormatVersion, ViewRepresentations, ViewVersion, }; use crate::table::Table; use crate::{Error, ErrorKind, Result}; @@ -244,7 +243,7 @@ pub struct TableCreation { pub schema: Schema, /// The partition spec of the table, could be None. #[builder(default, setter(strip_option(fallback = partition_spec_opt), into))] - pub partition_spec: Option, + pub partition_spec: Option, /// The sort order of the table. #[builder(default, setter(strip_option(fallback = sort_order_opt)))] pub sort_order: Option, @@ -383,7 +382,7 @@ pub enum TableUpdate { /// Add a new partition spec to the table AddSpec { /// The partition spec to add. - spec: UnboundPartitionSpec, + spec: UnPartitionSpec, }, /// Set table's default spec #[serde(rename_all = "kebab-case")] @@ -770,8 +769,8 @@ mod tests { FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, SqlViewRepresentation, Summary, TableMetadata, TableMetadataBuilder, Transform, Type, - UnboundPartitionSpec, ViewFormatVersion, ViewRepresentation, ViewRepresentations, - ViewVersion, MAIN_BRANCH, + UnPartitionSpec, ViewFormatVersion, ViewRepresentation, ViewRepresentations, ViewVersion, + MAIN_BRANCH, }; use crate::{NamespaceIdent, TableCreation, TableIdent, TableRequirement, TableUpdate}; @@ -1305,7 +1304,7 @@ mod tests { } "#, TableUpdate::AddSpec { - spec: UnboundPartitionSpec::builder() + spec: UnPartitionSpec::builder() .add_partition_field(4, "ts_day".to_string(), Transform::Day) .unwrap() .add_partition_field(1, "id_bucket".to_string(), Transform::Bucket(16)) diff --git a/crates/iceberg/src/expr/visitors/expression_evaluator.rs b/crates/iceberg/src/expr/visitors/expression_evaluator.rs index 2add5761f..e7f34bf97 100644 --- a/crates/iceberg/src/expr/visitors/expression_evaluator.rs +++ b/crates/iceberg/src/expr/visitors/expression_evaluator.rs @@ -258,13 +258,13 @@ mod tests { UnaryExpression, }; use crate::spec::{ - BoundPartitionSpec, BoundPartitionSpecRef, DataContentType, DataFile, DataFileFormat, - Datum, Literal, NestedField, PrimitiveType, Schema, Struct, Transform, Type, + DataContentType, DataFile, DataFileFormat, Datum, Literal, NestedField, PartitionSpec, + PartitionSpecRef, PrimitiveType, Schema, SchemaRef, Struct, Transform, Type, UnboundPartitionField, }; use crate::Result; - fn create_partition_spec(r#type: PrimitiveType) -> Result { + fn create_partition_spec(r#type: PrimitiveType) -> Result<(PartitionSpecRef, SchemaRef)> { let schema = Schema::builder() .with_fields(vec![Arc::new(NestedField::optional( 1, @@ -273,7 +273,7 @@ mod tests { ))]) .build()?; - let spec = BoundPartitionSpec::builder(schema.clone()) + let spec = PartitionSpec::builder(schema.clone()) .with_spec_id(1) .add_unbound_fields(vec![UnboundPartitionField::builder() .source_id(1) @@ -285,15 +285,16 @@ mod tests { .build() .unwrap(); - Ok(Arc::new(spec)) + Ok((Arc::new(spec), schema.into())) } fn create_partition_filter( - partition_spec: BoundPartitionSpecRef, + partition_spec: PartitionSpecRef, + schema: &Schema, predicate: &BoundPredicate, case_sensitive: bool, ) -> Result { - let partition_type = partition_spec.partition_type(); + let partition_type = partition_spec.partition_type(schema).unwrap(); let partition_fields = partition_type.fields().to_owned(); let partition_schema = Schema::builder() @@ -301,8 +302,7 @@ mod tests { .with_fields(partition_fields) .build()?; - let mut inclusive_projection = - InclusiveProjection::new((*partition_spec).clone().into_schemaless().into()); + let mut inclusive_projection = InclusiveProjection::new((*partition_spec).clone().into()); let partition_filter = inclusive_projection .project(predicate)? @@ -313,11 +313,13 @@ mod tests { } fn create_expression_evaluator( - partition_spec: BoundPartitionSpecRef, + partition_spec: PartitionSpecRef, + schema: &Schema, predicate: &BoundPredicate, case_sensitive: bool, ) -> Result { - let partition_filter = create_partition_filter(partition_spec, predicate, case_sensitive)?; + let partition_filter = + create_partition_filter(partition_spec, schema, predicate, case_sensitive)?; Ok(ExpressionEvaluator::new(partition_filter)) } @@ -371,7 +373,7 @@ mod tests { #[test] fn test_expr_or() -> Result<()> { let case_sensitive = true; - let partition_spec = create_partition_spec(PrimitiveType::Float)?; + let (partition_spec, schema) = create_partition_spec(PrimitiveType::Float)?; let predicate = Predicate::Binary(BinaryExpression::new( PredicateOperator::LessThan, @@ -383,10 +385,10 @@ mod tests { Reference::new("a"), Datum::float(0.4), ))) - .bind(partition_spec.schema_ref().clone(), case_sensitive)?; + .bind(schema.clone(), case_sensitive)?; let expression_evaluator = - create_expression_evaluator(partition_spec, &predicate, case_sensitive)?; + create_expression_evaluator(partition_spec, &schema, &predicate, case_sensitive)?; let data_file = create_data_file_float(); @@ -400,7 +402,7 @@ mod tests { #[test] fn test_expr_and() -> Result<()> { let case_sensitive = true; - let partition_spec = create_partition_spec(PrimitiveType::Float)?; + let (partition_spec, schema) = create_partition_spec(PrimitiveType::Float)?; let predicate = Predicate::Binary(BinaryExpression::new( PredicateOperator::LessThan, @@ -412,10 +414,10 @@ mod tests { Reference::new("a"), Datum::float(0.4), ))) - .bind(partition_spec.schema_ref().clone(), case_sensitive)?; + .bind(schema.clone(), case_sensitive)?; let expression_evaluator = - create_expression_evaluator(partition_spec, &predicate, case_sensitive)?; + create_expression_evaluator(partition_spec, &schema, &predicate, case_sensitive)?; let data_file = create_data_file_float(); @@ -429,17 +431,17 @@ mod tests { #[test] fn test_expr_not_in() -> Result<()> { let case_sensitive = true; - let partition_spec = create_partition_spec(PrimitiveType::Float)?; + let (partition_spec, schema) = create_partition_spec(PrimitiveType::Float)?; let predicate = Predicate::Set(SetExpression::new( PredicateOperator::NotIn, Reference::new("a"), FnvHashSet::from_iter([Datum::float(0.9), Datum::float(1.2), Datum::float(2.4)]), )) - .bind(partition_spec.schema_ref().clone(), case_sensitive)?; + .bind(schema.clone(), case_sensitive)?; let expression_evaluator = - create_expression_evaluator(partition_spec, &predicate, case_sensitive)?; + create_expression_evaluator(partition_spec, &schema, &predicate, case_sensitive)?; let data_file = create_data_file_float(); @@ -453,17 +455,17 @@ mod tests { #[test] fn test_expr_in() -> Result<()> { let case_sensitive = true; - let partition_spec = create_partition_spec(PrimitiveType::Float)?; + let (partition_spec, schema) = create_partition_spec(PrimitiveType::Float)?; let predicate = Predicate::Set(SetExpression::new( PredicateOperator::In, Reference::new("a"), FnvHashSet::from_iter([Datum::float(1.0), Datum::float(1.2), Datum::float(2.4)]), )) - .bind(partition_spec.schema_ref().clone(), case_sensitive)?; + .bind(schema.clone(), case_sensitive)?; let expression_evaluator = - create_expression_evaluator(partition_spec, &predicate, case_sensitive)?; + create_expression_evaluator(partition_spec, &schema, &predicate, case_sensitive)?; let data_file = create_data_file_float(); @@ -477,17 +479,17 @@ mod tests { #[test] fn test_expr_not_starts_with() -> Result<()> { let case_sensitive = true; - let partition_spec = create_partition_spec(PrimitiveType::String)?; + let (partition_spec, schema) = create_partition_spec(PrimitiveType::String)?; let predicate = Predicate::Binary(BinaryExpression::new( PredicateOperator::NotStartsWith, Reference::new("a"), Datum::string("not"), )) - .bind(partition_spec.schema_ref().clone(), case_sensitive)?; + .bind(schema.clone(), case_sensitive)?; let expression_evaluator = - create_expression_evaluator(partition_spec, &predicate, case_sensitive)?; + create_expression_evaluator(partition_spec, &schema, &predicate, case_sensitive)?; let data_file = create_data_file_string(); @@ -501,17 +503,17 @@ mod tests { #[test] fn test_expr_starts_with() -> Result<()> { let case_sensitive = true; - let partition_spec = create_partition_spec(PrimitiveType::String)?; + let (partition_spec, schema) = create_partition_spec(PrimitiveType::String)?; let predicate = Predicate::Binary(BinaryExpression::new( PredicateOperator::StartsWith, Reference::new("a"), Datum::string("test"), )) - .bind(partition_spec.schema_ref().clone(), case_sensitive)?; + .bind(schema.clone(), case_sensitive)?; let expression_evaluator = - create_expression_evaluator(partition_spec, &predicate, case_sensitive)?; + create_expression_evaluator(partition_spec, &schema, &predicate, case_sensitive)?; let data_file = create_data_file_string(); @@ -525,17 +527,17 @@ mod tests { #[test] fn test_expr_not_eq() -> Result<()> { let case_sensitive = true; - let partition_spec = create_partition_spec(PrimitiveType::Float)?; + let (partition_spec, schema) = create_partition_spec(PrimitiveType::Float)?; let predicate = Predicate::Binary(BinaryExpression::new( PredicateOperator::NotEq, Reference::new("a"), Datum::float(0.9), )) - .bind(partition_spec.schema_ref().clone(), case_sensitive)?; + .bind(schema.clone(), case_sensitive)?; let expression_evaluator = - create_expression_evaluator(partition_spec, &predicate, case_sensitive)?; + create_expression_evaluator(partition_spec, &schema, &predicate, case_sensitive)?; let data_file = create_data_file_float(); @@ -549,17 +551,17 @@ mod tests { #[test] fn test_expr_eq() -> Result<()> { let case_sensitive = true; - let partition_spec = create_partition_spec(PrimitiveType::Float)?; + let (partition_spec, schema) = create_partition_spec(PrimitiveType::Float)?; let predicate = Predicate::Binary(BinaryExpression::new( PredicateOperator::Eq, Reference::new("a"), Datum::float(1.0), )) - .bind(partition_spec.schema_ref().clone(), case_sensitive)?; + .bind(schema.clone(), case_sensitive)?; let expression_evaluator = - create_expression_evaluator(partition_spec, &predicate, case_sensitive)?; + create_expression_evaluator(partition_spec, &schema, &predicate, case_sensitive)?; let data_file = create_data_file_float(); @@ -573,17 +575,17 @@ mod tests { #[test] fn test_expr_greater_than_or_eq() -> Result<()> { let case_sensitive = true; - let partition_spec = create_partition_spec(PrimitiveType::Float)?; + let (partition_spec, schema) = create_partition_spec(PrimitiveType::Float)?; let predicate = Predicate::Binary(BinaryExpression::new( PredicateOperator::GreaterThanOrEq, Reference::new("a"), Datum::float(1.0), )) - .bind(partition_spec.schema_ref().clone(), case_sensitive)?; + .bind(schema.clone(), case_sensitive)?; let expression_evaluator = - create_expression_evaluator(partition_spec, &predicate, case_sensitive)?; + create_expression_evaluator(partition_spec, &schema, &predicate, case_sensitive)?; let data_file = create_data_file_float(); @@ -597,17 +599,17 @@ mod tests { #[test] fn test_expr_greater_than() -> Result<()> { let case_sensitive = true; - let partition_spec = create_partition_spec(PrimitiveType::Float)?; + let (partition_spec, schema) = create_partition_spec(PrimitiveType::Float)?; let predicate = Predicate::Binary(BinaryExpression::new( PredicateOperator::GreaterThan, Reference::new("a"), Datum::float(0.9), )) - .bind(partition_spec.schema_ref().clone(), case_sensitive)?; + .bind(schema.clone(), case_sensitive)?; let expression_evaluator = - create_expression_evaluator(partition_spec, &predicate, case_sensitive)?; + create_expression_evaluator(partition_spec, &schema, &predicate, case_sensitive)?; let data_file = create_data_file_float(); @@ -621,17 +623,17 @@ mod tests { #[test] fn test_expr_less_than_or_eq() -> Result<()> { let case_sensitive = true; - let partition_spec = create_partition_spec(PrimitiveType::Float)?; + let (partition_spec, schema) = create_partition_spec(PrimitiveType::Float)?; let predicate = Predicate::Binary(BinaryExpression::new( PredicateOperator::LessThanOrEq, Reference::new("a"), Datum::float(1.0), )) - .bind(partition_spec.schema_ref().clone(), case_sensitive)?; + .bind(schema.clone(), case_sensitive)?; let expression_evaluator = - create_expression_evaluator(partition_spec, &predicate, case_sensitive)?; + create_expression_evaluator(partition_spec, &schema, &predicate, case_sensitive)?; let data_file = create_data_file_float(); @@ -645,17 +647,17 @@ mod tests { #[test] fn test_expr_less_than() -> Result<()> { let case_sensitive = true; - let partition_spec = create_partition_spec(PrimitiveType::Float)?; + let (partition_spec, schema) = create_partition_spec(PrimitiveType::Float)?; let predicate = Predicate::Binary(BinaryExpression::new( PredicateOperator::LessThan, Reference::new("a"), Datum::float(1.1), )) - .bind(partition_spec.schema_ref().clone(), case_sensitive)?; + .bind(schema.clone(), case_sensitive)?; let expression_evaluator = - create_expression_evaluator(partition_spec, &predicate, case_sensitive)?; + create_expression_evaluator(partition_spec, &schema, &predicate, case_sensitive)?; let data_file = create_data_file_float(); @@ -669,15 +671,15 @@ mod tests { #[test] fn test_expr_is_not_nan() -> Result<()> { let case_sensitive = true; - let partition_spec = create_partition_spec(PrimitiveType::Float)?; + let (partition_spec, schema) = create_partition_spec(PrimitiveType::Float)?; let predicate = Predicate::Unary(UnaryExpression::new( PredicateOperator::NotNan, Reference::new("a"), )) - .bind(partition_spec.schema_ref().clone(), case_sensitive)?; + .bind(schema.clone(), case_sensitive)?; let expression_evaluator = - create_expression_evaluator(partition_spec, &predicate, case_sensitive)?; + create_expression_evaluator(partition_spec, &schema, &predicate, case_sensitive)?; let data_file = create_data_file_float(); @@ -691,15 +693,15 @@ mod tests { #[test] fn test_expr_is_nan() -> Result<()> { let case_sensitive = true; - let partition_spec = create_partition_spec(PrimitiveType::Float)?; + let (partition_spec, schema) = create_partition_spec(PrimitiveType::Float)?; let predicate = Predicate::Unary(UnaryExpression::new( PredicateOperator::IsNan, Reference::new("a"), )) - .bind(partition_spec.schema_ref().clone(), case_sensitive)?; + .bind(schema.clone(), case_sensitive)?; let expression_evaluator = - create_expression_evaluator(partition_spec, &predicate, case_sensitive)?; + create_expression_evaluator(partition_spec, &schema, &predicate, case_sensitive)?; let data_file = create_data_file_float(); @@ -713,15 +715,15 @@ mod tests { #[test] fn test_expr_is_not_null() -> Result<()> { let case_sensitive = true; - let partition_spec = create_partition_spec(PrimitiveType::Float)?; + let (partition_spec, schema) = create_partition_spec(PrimitiveType::Float)?; let predicate = Predicate::Unary(UnaryExpression::new( PredicateOperator::NotNull, Reference::new("a"), )) - .bind(partition_spec.schema_ref().clone(), case_sensitive)?; + .bind(schema.clone(), case_sensitive)?; let expression_evaluator = - create_expression_evaluator(partition_spec, &predicate, case_sensitive)?; + create_expression_evaluator(partition_spec, &schema, &predicate, case_sensitive)?; let data_file = create_data_file_float(); @@ -735,15 +737,15 @@ mod tests { #[test] fn test_expr_is_null() -> Result<()> { let case_sensitive = true; - let partition_spec = create_partition_spec(PrimitiveType::Float)?; + let (partition_spec, schema) = create_partition_spec(PrimitiveType::Float)?; let predicate = Predicate::Unary(UnaryExpression::new( PredicateOperator::IsNull, Reference::new("a"), )) - .bind(partition_spec.schema_ref().clone(), case_sensitive)?; + .bind(schema.clone(), case_sensitive)?; let expression_evaluator = - create_expression_evaluator(partition_spec, &predicate, case_sensitive)?; + create_expression_evaluator(partition_spec, &schema, &predicate, case_sensitive)?; let data_file = create_data_file_float(); @@ -757,12 +759,11 @@ mod tests { #[test] fn test_expr_always_false() -> Result<()> { let case_sensitive = true; - let partition_spec = create_partition_spec(PrimitiveType::Float)?; - let predicate = - Predicate::AlwaysFalse.bind(partition_spec.schema_ref().clone(), case_sensitive)?; + let (partition_spec, schema) = create_partition_spec(PrimitiveType::Float)?; + let predicate = Predicate::AlwaysFalse.bind(schema.clone(), case_sensitive)?; let expression_evaluator = - create_expression_evaluator(partition_spec, &predicate, case_sensitive)?; + create_expression_evaluator(partition_spec, &schema, &predicate, case_sensitive)?; let data_file = create_data_file_float(); @@ -776,12 +777,11 @@ mod tests { #[test] fn test_expr_always_true() -> Result<()> { let case_sensitive = true; - let partition_spec = create_partition_spec(PrimitiveType::Float)?; - let predicate = - Predicate::AlwaysTrue.bind(partition_spec.schema_ref().clone(), case_sensitive)?; + let (partition_spec, schema) = create_partition_spec(PrimitiveType::Float)?; + let predicate = Predicate::AlwaysTrue.bind(schema.clone(), case_sensitive)?; let expression_evaluator = - create_expression_evaluator(partition_spec, &predicate, case_sensitive)?; + create_expression_evaluator(partition_spec, &schema, &predicate, case_sensitive)?; let data_file = create_data_file_float(); diff --git a/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs b/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs index 1cdc75771..291b66223 100644 --- a/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs +++ b/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs @@ -495,8 +495,9 @@ mod test { UnaryExpression, }; use crate::spec::{ - BoundPartitionSpec, DataContentType, DataFile, DataFileFormat, Datum, NestedField, - PrimitiveType, Schema, Struct, Transform, Type, UnboundPartitionField, + DataContentType, DataFile, DataFileFormat, Datum, NestedField, PartitionSpec, + PartitionSpecRef, PrimitiveType, Schema, SchemaRef, Struct, Transform, Type, + UnboundPartitionField, }; const INT_MIN_VALUE: i32 = 30; @@ -504,10 +505,10 @@ mod test { #[test] fn test_data_file_no_partitions() { - let partition_spec_ref = create_test_partition_spec(); + let (_partition_spec_ref, schema_ref) = create_test_partition_spec(); let partition_filter = Predicate::AlwaysTrue - .bind(partition_spec_ref.schema_ref().clone(), false) + .bind(schema_ref.clone(), false) .unwrap(); let case_sensitive = false; @@ -1645,7 +1646,7 @@ mod test { assert!(result, "Should read: NotIn on no nulls column"); } - fn create_test_partition_spec() -> Arc { + fn create_test_partition_spec() -> (PartitionSpecRef, SchemaRef) { let table_schema = Schema::builder() .with_fields(vec![Arc::new(NestedField::optional( 1, @@ -1656,7 +1657,7 @@ mod test { .unwrap(); let table_schema_ref = Arc::new(table_schema); - let partition_spec = BoundPartitionSpec::builder(table_schema_ref.clone()) + let partition_spec = PartitionSpec::builder(table_schema_ref.clone()) .with_spec_id(1) .add_unbound_fields(vec![UnboundPartitionField::builder() .source_id(1) @@ -1667,7 +1668,7 @@ mod test { .unwrap() .build() .unwrap(); - Arc::new(partition_spec) + (Arc::new(partition_spec), table_schema_ref) } fn not_null(reference: &str) -> BoundPredicate { diff --git a/crates/iceberg/src/expr/visitors/inclusive_projection.rs b/crates/iceberg/src/expr/visitors/inclusive_projection.rs index 7c6e0b2d5..d075fdd81 100644 --- a/crates/iceberg/src/expr/visitors/inclusive_projection.rs +++ b/crates/iceberg/src/expr/visitors/inclusive_projection.rs @@ -21,16 +21,16 @@ use fnv::FnvHashSet; use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor}; use crate::expr::{BoundPredicate, BoundReference, Predicate}; -use crate::spec::{Datum, PartitionField, SchemalessPartitionSpecRef}; +use crate::spec::{Datum, PartitionField, PartitionSpecRef}; use crate::Error; pub(crate) struct InclusiveProjection { - partition_spec: SchemalessPartitionSpecRef, + partition_spec: PartitionSpecRef, cached_parts: HashMap>, } impl InclusiveProjection { - pub(crate) fn new(partition_spec: SchemalessPartitionSpecRef) -> Self { + pub(crate) fn new(partition_spec: PartitionSpecRef) -> Self { Self { partition_spec, cached_parts: HashMap::new(), @@ -235,7 +235,7 @@ mod tests { use crate::expr::visitors::inclusive_projection::InclusiveProjection; use crate::expr::{Bind, Predicate, Reference}; use crate::spec::{ - BoundPartitionSpec, Datum, NestedField, PrimitiveType, Schema, Transform, Type, + Datum, NestedField, PartitionSpec, PrimitiveType, Schema, Transform, Type, UnboundPartitionField, }; @@ -267,11 +267,10 @@ mod tests { let schema = build_test_schema(); let arc_schema = Arc::new(schema); - let partition_spec = BoundPartitionSpec::builder(arc_schema.clone()) + let partition_spec = PartitionSpec::builder(arc_schema.clone()) .with_spec_id(1) .build() - .unwrap() - .into_schemaless(); + .unwrap(); let arc_partition_spec = Arc::new(partition_spec); @@ -298,7 +297,7 @@ mod tests { let schema = build_test_schema(); let arc_schema = Arc::new(schema); - let partition_spec = BoundPartitionSpec::builder(arc_schema.clone()) + let partition_spec = PartitionSpec::builder(arc_schema.clone()) .with_spec_id(1) .add_unbound_field( UnboundPartitionField::builder() @@ -310,8 +309,7 @@ mod tests { ) .unwrap() .build() - .unwrap() - .into_schemaless(); + .unwrap(); let arc_partition_spec = Arc::new(partition_spec); @@ -336,7 +334,7 @@ mod tests { let schema = build_test_schema(); let arc_schema = Arc::new(schema); - let partition_spec = BoundPartitionSpec::builder(arc_schema.clone()) + let partition_spec = PartitionSpec::builder(arc_schema.clone()) .with_spec_id(1) .add_unbound_fields(vec![UnboundPartitionField { source_id: 2, @@ -346,8 +344,7 @@ mod tests { }]) .unwrap() .build() - .unwrap() - .into_schemaless(); + .unwrap(); let arc_partition_spec = Arc::new(partition_spec); @@ -372,7 +369,7 @@ mod tests { let schema = build_test_schema(); let arc_schema = Arc::new(schema); - let partition_spec = BoundPartitionSpec::builder(arc_schema.clone()) + let partition_spec = PartitionSpec::builder(arc_schema.clone()) .with_spec_id(1) .add_unbound_fields(vec![UnboundPartitionField { source_id: 2, @@ -382,8 +379,7 @@ mod tests { }]) .unwrap() .build() - .unwrap() - .into_schemaless(); + .unwrap(); let arc_partition_spec = Arc::new(partition_spec); @@ -408,7 +404,7 @@ mod tests { let schema = build_test_schema(); let arc_schema = Arc::new(schema); - let partition_spec = BoundPartitionSpec::builder(arc_schema.clone()) + let partition_spec = PartitionSpec::builder(arc_schema.clone()) .with_spec_id(1) .add_unbound_fields(vec![UnboundPartitionField { source_id: 2, @@ -418,8 +414,7 @@ mod tests { }]) .unwrap() .build() - .unwrap() - .into_schemaless(); + .unwrap(); let arc_partition_spec = Arc::new(partition_spec); @@ -444,7 +439,7 @@ mod tests { let schema = build_test_schema(); let arc_schema = Arc::new(schema); - let partition_spec = BoundPartitionSpec::builder(arc_schema.clone()) + let partition_spec = PartitionSpec::builder(arc_schema.clone()) .with_spec_id(1) .add_unbound_field( UnboundPartitionField::builder() @@ -456,8 +451,7 @@ mod tests { ) .unwrap() .build() - .unwrap() - .into_schemaless(); + .unwrap(); let arc_partition_spec = Arc::new(partition_spec); @@ -485,7 +479,7 @@ mod tests { let schema = build_test_schema(); let arc_schema = Arc::new(schema); - let partition_spec = BoundPartitionSpec::builder(arc_schema.clone()) + let partition_spec = PartitionSpec::builder(arc_schema.clone()) .with_spec_id(1) .add_unbound_field( UnboundPartitionField::builder() @@ -497,8 +491,7 @@ mod tests { ) .unwrap() .build() - .unwrap() - .into_schemaless(); + .unwrap(); let arc_partition_spec = Arc::new(partition_spec); diff --git a/crates/iceberg/src/spec/manifest.rs b/crates/iceberg/src/spec/manifest.rs index a868c7b11..e883af8eb 100644 --- a/crates/iceberg/src/spec/manifest.rs +++ b/crates/iceberg/src/spec/manifest.rs @@ -30,8 +30,8 @@ use typed_builder::TypedBuilder; use self::_const_schema::{manifest_schema_v1, manifest_schema_v2}; use super::{ - BoundPartitionSpec, Datum, FieldSummary, FormatVersion, ManifestContentType, ManifestFile, - Schema, SchemaId, SchemaRef, Struct, INITIAL_SEQUENCE_NUMBER, UNASSIGNED_SEQUENCE_NUMBER, + Datum, FieldSummary, FormatVersion, ManifestContentType, ManifestFile, PartitionSpec, Schema, + SchemaId, SchemaRef, Struct, INITIAL_SEQUENCE_NUMBER, UNASSIGNED_SEQUENCE_NUMBER, }; use crate::error::Result; use crate::io::OutputFile; @@ -55,7 +55,7 @@ impl Manifest { let metadata = ManifestMetadata::parse(meta)?; // Parse manifest entries - let partition_type = metadata.partition_spec.partition_type(); + let partition_type = metadata.partition_spec.partition_type(&metadata.schema)?; let entries = match metadata.format_version { FormatVersion::V1 => { @@ -65,7 +65,7 @@ impl Manifest { .into_iter() .map(|value| { from_value::<_serde::ManifestEntryV1>(&value?)? - .try_into(partition_type, &metadata.schema) + .try_into(&partition_type, &metadata.schema) }) .collect::>>()? } @@ -76,7 +76,7 @@ impl Manifest { .into_iter() .map(|value| { from_value::<_serde::ManifestEntryV2>(&value?)? - .try_into(partition_type, &metadata.schema) + .try_into(&partition_type, &metadata.schema) }) .collect::>>()? } @@ -206,7 +206,10 @@ impl ManifestWriter { /// Write a manifest. pub async fn write(mut self, manifest: Manifest) -> Result { // Create the avro writer - let partition_type = manifest.metadata.partition_spec.partition_type(); + let partition_type = manifest + .metadata + .partition_spec + .partition_type(&manifest.metadata.schema)?; let table_schema = &manifest.metadata.schema; let avro_schema = match manifest.metadata.format_version { FormatVersion::V1 => manifest_schema_v1(partition_type.clone())?, @@ -281,12 +284,12 @@ impl ManifestWriter { let value = match manifest.metadata.format_version { FormatVersion::V1 => to_value(_serde::ManifestEntryV1::try_from( (*entry).clone(), - partition_type, + &partition_type, )?)? .resolve(&avro_schema)?, FormatVersion::V2 => to_value(_serde::ManifestEntryV2::try_from( (*entry).clone(), - partition_type, + &partition_type, )?)? .resolve(&avro_schema)?, }; @@ -706,7 +709,7 @@ pub struct ManifestMetadata { /// ID of the schema used to write the manifest as a string schema_id: SchemaId, /// The partition spec used to write the manifest - partition_spec: BoundPartitionSpec, + partition_spec: PartitionSpec, /// Table format version number of the manifest as a string format_version: FormatVersion, /// Type of content files tracked by the manifest: “data” or “deletes” @@ -773,7 +776,7 @@ impl ManifestMetadata { }) .transpose()? .unwrap_or(0); - BoundPartitionSpec::builder(schema.clone()) + PartitionSpec::builder(schema.clone()) .with_spec_id(spec_id) .add_unbound_fields(fields.into_iter().map(|f| f.into_unbound()))? .build()? @@ -1600,7 +1603,7 @@ mod tests { metadata: ManifestMetadata { schema_id: 0, schema: schema.clone(), - partition_spec: BoundPartitionSpec::builder(schema).with_spec_id(0).build().unwrap(), + partition_spec: PartitionSpec::builder(schema).with_spec_id(0).build().unwrap(), content: ManifestContentType::Data, format_version: FormatVersion::V2, }, @@ -1713,7 +1716,7 @@ mod tests { metadata: ManifestMetadata { schema_id: 0, schema: schema.clone(), - partition_spec: BoundPartitionSpec::builder(schema) + partition_spec: PartitionSpec::builder(schema) .with_spec_id(0).add_partition_field("v_int", "v_int", Transform::Identity).unwrap() .add_partition_field("v_long", "v_long", Transform::Identity).unwrap().build().unwrap(), content: ManifestContentType::Data, @@ -1824,7 +1827,7 @@ mod tests { metadata: ManifestMetadata { schema_id: 1, schema: schema.clone(), - partition_spec: BoundPartitionSpec::builder(schema).with_spec_id(0).build().unwrap(), + partition_spec: PartitionSpec::builder(schema).with_spec_id(0).build().unwrap(), content: ManifestContentType::Data, format_version: FormatVersion::V1, }, @@ -1888,7 +1891,7 @@ mod tests { metadata: ManifestMetadata { schema_id: 0, schema: schema.clone(), - partition_spec: BoundPartitionSpec::builder(schema).add_partition_field("category", "category", Transform::Identity).unwrap().build().unwrap(), + partition_spec: PartitionSpec::builder(schema).add_partition_field("category", "category", Transform::Identity).unwrap().build().unwrap(), content: ManifestContentType::Data, format_version: FormatVersion::V1, }, @@ -1967,7 +1970,7 @@ mod tests { metadata: ManifestMetadata { schema_id: 0, schema: schema.clone(), - partition_spec: BoundPartitionSpec::builder(schema).with_spec_id(0).build().unwrap(), + partition_spec: PartitionSpec::builder(schema).with_spec_id(0).build().unwrap(), content: ManifestContentType::Data, format_version: FormatVersion::V2, }, @@ -2039,7 +2042,7 @@ mod tests { metadata: ManifestMetadata { schema_id: 0, schema: schema.clone(), - partition_spec: BoundPartitionSpec::builder(schema).with_spec_id(0).build().unwrap(), + partition_spec: PartitionSpec::builder(schema).with_spec_id(0).build().unwrap(), content: ManifestContentType::Data, format_version: FormatVersion::V2, }, diff --git a/crates/iceberg/src/spec/partition.rs b/crates/iceberg/src/spec/partition.rs index 445e7d441..429c3eaab 100644 --- a/crates/iceberg/src/spec/partition.rs +++ b/crates/iceberg/src/spec/partition.rs @@ -30,9 +30,6 @@ use crate::{Error, ErrorKind, Result}; pub(crate) const UNPARTITIONED_LAST_ASSIGNED_ID: i32 = 999; pub(crate) const DEFAULT_PARTITION_SPEC_ID: i32 = 0; -/// Reference to [`BoundPartitionSpec`]. -pub type BoundPartitionSpecRef = Arc; - /// Partition fields capture the transform from table data to partition values. #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, TypedBuilder)] #[serde(rename_all = "kebab-case")] @@ -55,69 +52,43 @@ impl PartitionField { } } -/// Partition spec that defines how to produce a tuple of partition values from a record. -/// `PartitionSpec` is bound to a specific schema. -#[derive(Debug, PartialEq, Eq, Clone)] -pub struct BoundPartitionSpec { - /// Identifier for PartitionSpec - spec_id: i32, - /// Details of the partition spec - fields: Vec, - /// The schema this partition spec is bound to - schema: SchemaRef, - /// Type of the partition spec - partition_type: StructType, -} - -/// Reference to [`SchemalessPartitionSpec`]. -pub type SchemalessPartitionSpecRef = Arc; +/// Reference to [`PartitionSpec`]. +pub type PartitionSpecRef = Arc; /// Partition spec that defines how to produce a tuple of partition values from a record. /// Schemaless partition specs are never constructed manually. They occur when a table is mutated /// and partition spec and schemas are updated. While old partition specs are retained, the bound /// schema might not be available anymore as part of the table metadata. #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] #[serde(rename_all = "kebab-case")] -pub struct SchemalessPartitionSpec { +pub struct PartitionSpec { /// Identifier for PartitionSpec spec_id: i32, /// Details of the partition spec fields: Vec, } -impl BoundPartitionSpec { - /// Create partition spec builder +impl PartitionSpec { + /// Create a new partition spec builder with the given schema. pub fn builder(schema: impl Into) -> PartitionSpecBuilder { PartitionSpecBuilder::new(schema) } - /// Get a new unpatitioned partition spec - pub fn unpartition_spec(schema: impl Into) -> Self { - Self { - spec_id: DEFAULT_PARTITION_SPEC_ID, - fields: vec![], - schema: schema.into(), - partition_type: StructType::new(vec![]), - } - } - - /// Spec id of the partition spec - pub fn spec_id(&self) -> i32 { - self.spec_id - } - /// Fields of the partition spec pub fn fields(&self) -> &[PartitionField] { &self.fields } - /// The schema this partition spec is bound to - pub fn schema(&self) -> &Schema { - &self.schema + /// Spec id of the partition spec + pub fn spec_id(&self) -> i32 { + self.spec_id } - /// The schema ref this partition spec is bound to - pub fn schema_ref(&self) -> &SchemaRef { - &self.schema + /// Get a new unpatitioned partition spec + pub fn unpartition_spec() -> Self { + Self { + spec_id: DEFAULT_PARTITION_SPEC_ID, + fields: vec![], + } } /// Returns if the partition spec is unpartitioned. @@ -127,18 +98,21 @@ impl BoundPartitionSpec { self.fields.is_empty() || self.fields.iter().all(|f| f.transform == Transform::Void) } - /// Turn this partition spec into an unbound partition spec. - /// - /// The `field_id` is retained as `partition_id` in the unbound partition spec. - pub fn into_unbound(self) -> UnboundPartitionSpec { - self.into() + /// Returns the partition type of this partition spec. + pub fn partition_type(&self, schema: &Schema) -> Result { + PartitionSpecBuilder::partition_type(&self.fields, schema) } - /// Turn this partition spec into a preserved partition spec. - pub fn into_schemaless(self) -> SchemalessPartitionSpec { + /// Convert to unbound partition spec + pub fn into_unbound(self) -> UnPartitionSpec { self.into() } + /// Change the spec id of the partition spec + pub fn with_spec_id(self, spec_id: i32) -> Self { + Self { spec_id, ..self } + } + /// Check if this partition spec has sequential partition ids. /// Sequential ids start from 1000 and increment by 1 for each field. /// This is required for spec version 1 @@ -151,11 +125,6 @@ impl BoundPartitionSpec { self.fields.iter().map(|f| f.field_id).max() } - /// Returns the partition type of this partition spec. - pub fn partition_type(&self) -> &StructType { - &self.partition_type - } - /// Check if this partition spec is compatible with another partition spec. /// /// Returns true if the partition spec is equal to the other spec with partition field ids ignored and @@ -165,7 +134,7 @@ impl BoundPartitionSpec { /// * Field names /// * Source column ids /// * Transforms - pub fn is_compatible_with_schemaless(&self, other: &SchemalessPartitionSpec) -> bool { + pub fn is_compatible_with(&self, other: &PartitionSpec) -> bool { if self.fields.len() != other.fields.len() { return false; } @@ -181,50 +150,10 @@ impl BoundPartitionSpec { true } - - /// Change the spec id of the partition spec - pub fn with_spec_id(self, spec_id: i32) -> Self { - Self { spec_id, ..self } - } -} - -impl SchemalessPartitionSpec { - /// Fields of the partition spec - pub fn fields(&self) -> &[PartitionField] { - &self.fields - } - - /// Spec id of the partition spec - pub fn spec_id(&self) -> i32 { - self.spec_id - } - - /// Bind this schemaless partition spec to a schema. - pub fn bind(self, schema: impl Into) -> Result { - PartitionSpecBuilder::new_from_unbound(self.into_unbound(), schema)?.build() - } - - /// Get a new unpatitioned partition spec - pub fn unpartition_spec() -> Self { - Self { - spec_id: DEFAULT_PARTITION_SPEC_ID, - fields: vec![], - } - } - - /// Returns the partition type of this partition spec. - pub fn partition_type(&self, schema: &Schema) -> Result { - PartitionSpecBuilder::partition_type(&self.fields, schema) - } - - /// Convert to unbound partition spec - pub fn into_unbound(self) -> UnboundPartitionSpec { - self.into() - } } -/// Reference to [`UnboundPartitionSpec`]. -pub type UnboundPartitionSpecRef = Arc; +/// Reference to [`UnPartitionSpec`]. +pub type UnPartitionSpecRef = Arc; /// Unbound partition field can be built without a schema and later bound to a schema. #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, TypedBuilder)] #[serde(rename_all = "kebab-case")] @@ -244,21 +173,21 @@ pub struct UnboundPartitionField { /// Unbound partition spec can be built without a schema and later bound to a schema. #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default)] #[serde(rename_all = "kebab-case")] -pub struct UnboundPartitionSpec { +pub struct UnPartitionSpec { /// Identifier for PartitionSpec pub(crate) spec_id: Option, /// Details of the partition spec pub(crate) fields: Vec, } -impl UnboundPartitionSpec { +impl UnPartitionSpec { /// Create unbound partition spec builder - pub fn builder() -> UnboundPartitionSpecBuilder { - UnboundPartitionSpecBuilder::default() + pub fn builder() -> UnPartitionSpecBuilder { + UnPartitionSpecBuilder::default() } /// Bind this unbound partition spec to a schema. - pub fn bind(self, schema: impl Into) -> Result { + pub fn bind(self, schema: impl Into) -> Result { PartitionSpecBuilder::new_from_unbound(self, schema)?.build() } @@ -307,41 +236,23 @@ impl From for UnboundPartitionField { } } -impl From for UnboundPartitionSpec { - fn from(spec: BoundPartitionSpec) -> Self { - UnboundPartitionSpec { - spec_id: Some(spec.spec_id), - fields: spec.fields.into_iter().map(Into::into).collect(), - } - } -} - -impl From for UnboundPartitionSpec { - fn from(spec: SchemalessPartitionSpec) -> Self { - UnboundPartitionSpec { +impl From for UnPartitionSpec { + fn from(spec: PartitionSpec) -> Self { + UnPartitionSpec { spec_id: Some(spec.spec_id), fields: spec.fields.into_iter().map(Into::into).collect(), } } } -impl From for SchemalessPartitionSpec { - fn from(spec: BoundPartitionSpec) -> Self { - SchemalessPartitionSpec { - spec_id: spec.spec_id, - fields: spec.fields, - } - } -} - -/// Create a new UnboundPartitionSpec +/// Create a new UnPartitionSpec #[derive(Debug, Default)] -pub struct UnboundPartitionSpecBuilder { +pub struct UnPartitionSpecBuilder { spec_id: Option, fields: Vec, } -impl UnboundPartitionSpecBuilder { +impl UnPartitionSpecBuilder { /// Create a new partition spec builder with the given schema. pub fn new() -> Self { Self { @@ -395,8 +306,8 @@ impl UnboundPartitionSpecBuilder { } /// Build the unbound partition spec. - pub fn build(self) -> UnboundPartitionSpec { - UnboundPartitionSpec { + pub fn build(self) -> UnPartitionSpec { + UnPartitionSpec { spec_id: self.spec_id, fields: self.fields, } @@ -425,7 +336,7 @@ impl PartitionSpecBuilder { /// Create a new partition spec builder from an existing unbound partition spec. pub fn new_from_unbound( - unbound: UnboundPartitionSpec, + unbound: UnPartitionSpec, schema: impl Into, ) -> Result { let mut builder = @@ -514,14 +425,11 @@ impl PartitionSpecBuilder { } /// Build a bound partition spec with the given schema. - pub fn build(self) -> Result { + pub fn build(self) -> Result { let fields = Self::set_field_ids(self.fields, self.last_assigned_field_id)?; - let partition_type = Self::partition_type(&fields, &self.schema)?; - Ok(BoundPartitionSpec { + Ok(PartitionSpec { spec_id: self.spec_id.unwrap_or(DEFAULT_PARTITION_SPEC_ID), fields, - partition_type, - schema: self.schema, }) } @@ -676,7 +584,7 @@ impl PartitionSpecBuilder { } } -/// Contains checks that are common to both PartitionSpecBuilder and UnboundPartitionSpecBuilder +/// Contains checks that are common to both PartitionSpecBuilder and UnPartitionSpecBuilder trait CorePartitionSpecValidator { /// Ensure that the partition name is unique among the partition fields and is not empty. fn check_name_set_and_unique(&self, name: &str) -> Result<()> { @@ -739,7 +647,7 @@ impl CorePartitionSpecValidator for PartitionSpecBuilder { } } -impl CorePartitionSpecValidator for UnboundPartitionSpecBuilder { +impl CorePartitionSpecValidator for UnPartitionSpecBuilder { fn fields(&self) -> &Vec { &self.fields } @@ -774,7 +682,7 @@ mod tests { } "#; - let partition_spec: SchemalessPartitionSpec = serde_json::from_str(spec).unwrap(); + let partition_spec: PartitionSpec = serde_json::from_str(spec).unwrap(); assert_eq!(4, partition_spec.fields[0].source_id); assert_eq!(1000, partition_spec.fields[0].field_id); assert_eq!("ts_day", partition_spec.fields[0].name); @@ -806,7 +714,7 @@ mod tests { ]) .build() .unwrap(); - let partition_spec = BoundPartitionSpec::builder(schema.clone()) + let partition_spec = PartitionSpec::builder(schema.clone()) .with_spec_id(1) .build() .unwrap(); @@ -815,7 +723,7 @@ mod tests { "Empty partition spec should be unpartitioned" ); - let partition_spec = BoundPartitionSpec::builder(schema.clone()) + let partition_spec = PartitionSpec::builder(schema.clone()) .add_unbound_fields(vec![ UnboundPartitionField::builder() .source_id(1) @@ -837,7 +745,7 @@ mod tests { "Partition spec with one non void transform should not be unpartitioned" ); - let partition_spec = BoundPartitionSpec::builder(schema.clone()) + let partition_spec = PartitionSpec::builder(schema.clone()) .with_spec_id(1) .add_unbound_fields(vec![ UnboundPartitionField::builder() @@ -884,7 +792,7 @@ mod tests { } "#; - let partition_spec: UnboundPartitionSpec = serde_json::from_str(spec).unwrap(); + let partition_spec: UnPartitionSpec = serde_json::from_str(spec).unwrap(); assert_eq!(Some(1), partition_spec.spec_id); assert_eq!(4, partition_spec.fields[0].source_id); @@ -911,7 +819,7 @@ mod tests { } ] } "#; - let partition_spec: UnboundPartitionSpec = serde_json::from_str(spec).unwrap(); + let partition_spec: UnPartitionSpec = serde_json::from_str(spec).unwrap(); assert_eq!(None, partition_spec.spec_id); assert_eq!(4, partition_spec.fields[0].source_id); @@ -935,14 +843,14 @@ mod tests { ]) .build() .unwrap(); - let partition_spec = BoundPartitionSpec::builder(schema.clone()) + let partition_spec = PartitionSpec::builder(schema.clone()) .with_spec_id(0) .build() .unwrap(); - let partition_type = partition_spec.partition_type(); + let partition_type = partition_spec.partition_type(&schema).unwrap(); assert_eq!(0, partition_type.fields().len()); - let unpartition_spec = BoundPartitionSpec::unpartition_spec(schema); + let unpartition_spec = PartitionSpec::unpartition_spec(); assert_eq!(partition_spec, unpartition_spec); } @@ -970,7 +878,7 @@ mod tests { } "#; - let partition_spec: SchemalessPartitionSpec = serde_json::from_str(spec).unwrap(); + let partition_spec: PartitionSpec = serde_json::from_str(spec).unwrap(); let schema = Schema::builder() .with_fields(vec![ NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) @@ -1046,7 +954,7 @@ mod tests { } "#; - let partition_spec: SchemalessPartitionSpec = serde_json::from_str(spec).unwrap(); + let partition_spec: PartitionSpec = serde_json::from_str(spec).unwrap(); let schema = Schema::builder() .with_fields(vec![ NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) @@ -1113,7 +1021,7 @@ mod tests { } "#; - let partition_spec: SchemalessPartitionSpec = serde_json::from_str(spec).unwrap(); + let partition_spec: PartitionSpec = serde_json::from_str(spec).unwrap(); let schema = Schema::builder() .with_fields(vec![ NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) @@ -1131,53 +1039,9 @@ mod tests { assert!(partition_spec.partition_type(&schema).is_err()); } - #[test] - fn test_schemaless_bind_schema_keeps_field_ids_and_spec_id() { - let schema: Schema = Schema::builder() - .with_fields(vec![ - NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) - .into(), - NestedField::required( - 2, - "name", - Type::Primitive(crate::spec::PrimitiveType::String), - ) - .into(), - ]) - .build() - .unwrap(); - - let partition_spec = BoundPartitionSpec::builder(schema.clone()) - .with_spec_id(99) - .add_unbound_field(UnboundPartitionField { - source_id: 1, - field_id: Some(1010), - name: "id".to_string(), - transform: Transform::Identity, - }) - .unwrap() - .add_unbound_field(UnboundPartitionField { - source_id: 2, - field_id: Some(1001), - name: "name_void".to_string(), - transform: Transform::Void, - }) - .unwrap() - .build() - .unwrap(); - - let schemaless_partition_spec = SchemalessPartitionSpec::from(partition_spec.clone()); - let bound_partition_spec = schemaless_partition_spec.bind(schema).unwrap(); - - assert_eq!(partition_spec, bound_partition_spec); - assert_eq!(partition_spec.fields[0].field_id, 1010); - assert_eq!(partition_spec.fields[1].field_id, 1001); - assert_eq!(bound_partition_spec.spec_id(), 99); - } - #[test] fn test_builder_disallow_duplicate_names() { - UnboundPartitionSpec::builder() + UnPartitionSpec::builder() .add_partition_field(1, "ts_day".to_string(), Transform::Day) .unwrap() .add_partition_field(2, "ts_day".to_string(), Transform::Day) @@ -1199,7 +1063,7 @@ mod tests { ]) .build() .unwrap(); - BoundPartitionSpec::builder(schema.clone()) + PartitionSpec::builder(schema.clone()) .add_unbound_field(UnboundPartitionField { source_id: 1, field_id: Some(1000), @@ -1237,7 +1101,7 @@ mod tests { ]) .build() .unwrap(); - let spec = BoundPartitionSpec::builder(schema.clone()) + let spec = PartitionSpec::builder(schema.clone()) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 1, @@ -1285,34 +1149,36 @@ mod tests { .build() .unwrap(); - BoundPartitionSpec::builder(schema.clone()) + PartitionSpec::builder(schema.clone()) .with_spec_id(1) .build() .unwrap(); - let spec = BoundPartitionSpec::builder(schema.clone()) + let spec = PartitionSpec::builder(schema.clone()) .with_spec_id(1) .add_partition_field("id", "id_bucket[16]", Transform::Bucket(16)) .unwrap() .build() .unwrap(); - assert_eq!(spec, BoundPartitionSpec { + assert_eq!(spec, PartitionSpec { spec_id: 1, - schema: schema.into(), fields: vec![PartitionField { source_id: 1, field_id: 1000, name: "id_bucket[16]".to_string(), transform: Transform::Bucket(16), }], - partition_type: StructType::new(vec![NestedField::optional( + }); + assert_eq!( + spec.partition_type(&schema).unwrap(), + StructType::new(vec![NestedField::optional( 1000, "id_bucket[16]", Type::Primitive(PrimitiveType::Int) ) .into()]) - }); + ) } #[test] @@ -1327,12 +1193,12 @@ mod tests { .build() .unwrap(); - BoundPartitionSpec::builder(schema.clone()) + PartitionSpec::builder(schema.clone()) .with_spec_id(1) .build() .unwrap(); - let err = BoundPartitionSpec::builder(schema) + let err = PartitionSpec::builder(schema) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 1, @@ -1360,12 +1226,12 @@ mod tests { .build() .unwrap(); - BoundPartitionSpec::builder(schema.clone()) + PartitionSpec::builder(schema.clone()) .with_spec_id(1) .build() .unwrap(); - BoundPartitionSpec::builder(schema.clone()) + PartitionSpec::builder(schema.clone()) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 1, @@ -1378,7 +1244,7 @@ mod tests { .unwrap(); // Not OK for different source id - BoundPartitionSpec::builder(schema) + PartitionSpec::builder(schema) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 2, @@ -1412,7 +1278,7 @@ mod tests { .unwrap(); // Valid - BoundPartitionSpec::builder(schema.clone()) + PartitionSpec::builder(schema.clone()) .with_spec_id(1) .add_unbound_fields(vec![ UnboundPartitionField { @@ -1433,7 +1299,7 @@ mod tests { .unwrap(); // Invalid - BoundPartitionSpec::builder(schema) + PartitionSpec::builder(schema) .with_spec_id(1) .add_unbound_fields(vec![ UnboundPartitionField { @@ -1454,7 +1320,7 @@ mod tests { #[test] fn test_builder_disallows_redundant() { - let err = UnboundPartitionSpec::builder() + let err = UnPartitionSpec::builder() .with_spec_id(1) .add_partition_field(1, "id_bucket[16]".to_string(), Transform::Bucket(16)) .unwrap() @@ -1479,7 +1345,7 @@ mod tests { .build() .unwrap(); - BoundPartitionSpec::builder(schema) + PartitionSpec::builder(schema) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 1, @@ -1492,7 +1358,7 @@ mod tests { #[test] fn test_build_unbound_specs_without_partition_id() { - let spec = UnboundPartitionSpec::builder() + let spec = UnPartitionSpec::builder() .with_spec_id(1) .add_partition_fields(vec![UnboundPartitionField { source_id: 1, @@ -1503,7 +1369,7 @@ mod tests { .unwrap() .build(); - assert_eq!(spec, UnboundPartitionSpec { + assert_eq!(spec, UnPartitionSpec { spec_id: Some(1), fields: vec![UnboundPartitionField { source_id: 1, @@ -1530,7 +1396,7 @@ mod tests { .build() .unwrap(); - let partition_spec_1 = BoundPartitionSpec::builder(schema.clone()) + let partition_spec_1 = PartitionSpec::builder(schema.clone()) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 1, @@ -1542,7 +1408,7 @@ mod tests { .build() .unwrap(); - let partition_spec_2 = BoundPartitionSpec::builder(schema) + let partition_spec_2 = PartitionSpec::builder(schema) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 1, @@ -1554,7 +1420,7 @@ mod tests { .build() .unwrap(); - assert!(partition_spec_1.is_compatible_with_schemaless(&partition_spec_2.into_schemaless())); + assert!(partition_spec_1.is_compatible_with(&partition_spec_2)); } #[test] @@ -1569,7 +1435,7 @@ mod tests { .build() .unwrap(); - let partition_spec_1 = BoundPartitionSpec::builder(schema.clone()) + let partition_spec_1 = PartitionSpec::builder(schema.clone()) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 1, @@ -1581,7 +1447,7 @@ mod tests { .build() .unwrap(); - let partition_spec_2 = BoundPartitionSpec::builder(schema) + let partition_spec_2 = PartitionSpec::builder(schema) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 1, @@ -1593,9 +1459,7 @@ mod tests { .build() .unwrap(); - assert!( - !partition_spec_1.is_compatible_with_schemaless(&partition_spec_2.into_schemaless()) - ); + assert!(!partition_spec_1.is_compatible_with(&partition_spec_2)); } #[test] @@ -1614,7 +1478,7 @@ mod tests { .build() .unwrap(); - let partition_spec_1 = BoundPartitionSpec::builder(schema.clone()) + let partition_spec_1 = PartitionSpec::builder(schema.clone()) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 1, @@ -1626,7 +1490,7 @@ mod tests { .build() .unwrap(); - let partition_spec_2 = BoundPartitionSpec::builder(schema) + let partition_spec_2 = PartitionSpec::builder(schema) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 2, @@ -1638,9 +1502,7 @@ mod tests { .build() .unwrap(); - assert!( - !partition_spec_1.is_compatible_with_schemaless(&partition_spec_2.into_schemaless()) - ); + assert!(!partition_spec_1.is_compatible_with(&partition_spec_2)); } #[test] @@ -1659,7 +1521,7 @@ mod tests { .build() .unwrap(); - let partition_spec_1 = BoundPartitionSpec::builder(schema.clone()) + let partition_spec_1 = PartitionSpec::builder(schema.clone()) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 1, @@ -1678,7 +1540,7 @@ mod tests { .build() .unwrap(); - let partition_spec_2 = BoundPartitionSpec::builder(schema) + let partition_spec_2 = PartitionSpec::builder(schema) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 2, @@ -1697,18 +1559,15 @@ mod tests { .build() .unwrap(); - assert!( - !partition_spec_1.is_compatible_with_schemaless(&partition_spec_2.into_schemaless()) - ); + assert!(!partition_spec_1.is_compatible_with(&partition_spec_2)); } #[test] fn test_highest_field_id_unpartitioned() { - let spec = - BoundPartitionSpec::builder(Schema::builder().with_fields(vec![]).build().unwrap()) - .with_spec_id(1) - .build() - .unwrap(); + let spec = PartitionSpec::builder(Schema::builder().with_fields(vec![]).build().unwrap()) + .with_spec_id(1) + .build() + .unwrap(); assert!(spec.highest_field_id().is_none()); } @@ -1729,7 +1588,7 @@ mod tests { .build() .unwrap(); - let spec = BoundPartitionSpec::builder(schema) + let spec = PartitionSpec::builder(schema) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 1, @@ -1767,7 +1626,7 @@ mod tests { .build() .unwrap(); - let spec = BoundPartitionSpec::builder(schema) + let spec = PartitionSpec::builder(schema) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 1, @@ -1807,7 +1666,7 @@ mod tests { .build() .unwrap(); - let spec = BoundPartitionSpec::builder(schema) + let spec = PartitionSpec::builder(schema) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 1, @@ -1847,7 +1706,7 @@ mod tests { .build() .unwrap(); - let spec = BoundPartitionSpec::builder(schema) + let spec = PartitionSpec::builder(schema) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 1, diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index bc1fe17c1..244216d5b 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -32,8 +32,8 @@ use uuid::Uuid; use super::snapshot::SnapshotReference; pub use super::table_metadata_builder::{TableMetadataBuildResult, TableMetadataBuilder}; use super::{ - BoundPartitionSpecRef, SchemaId, SchemaRef, SchemalessPartitionSpecRef, Snapshot, SnapshotRef, - SnapshotRetention, SortOrder, SortOrderRef, DEFAULT_PARTITION_SPEC_ID, + PartitionSpecRef, SchemaId, SchemaRef, Snapshot, SnapshotRef, SnapshotRetention, SortOrder, + SortOrderRef, StructType, DEFAULT_PARTITION_SPEC_ID, }; use crate::error::{timestamp_ms_to_utc, Result}; use crate::{Error, ErrorKind}; @@ -119,9 +119,11 @@ pub struct TableMetadata { /// ID of the table’s current schema. pub(crate) current_schema_id: i32, /// A list of partition specs, stored as full partition spec objects. - pub(crate) partition_specs: HashMap, + pub(crate) partition_specs: HashMap, /// ID of the “current” spec that writers should use by default. - pub(crate) default_spec: BoundPartitionSpecRef, + pub(crate) default_spec: PartitionSpecRef, + /// Partition type of the default partition spec. + pub(crate) default_partition_type: StructType, /// An integer; the highest assigned partition field ID across all partition specs for the table. pub(crate) last_partition_id: i32, ///A string to string map of table properties. This is used to control settings that @@ -252,24 +254,28 @@ impl TableMetadata { /// Returns all partition specs. #[inline] - pub fn partition_specs_iter( - &self, - ) -> impl ExactSizeIterator { + pub fn partition_specs_iter(&self) -> impl ExactSizeIterator { self.partition_specs.values() } /// Lookup partition spec by id. #[inline] - pub fn partition_spec_by_id(&self, spec_id: i32) -> Option<&SchemalessPartitionSpecRef> { + pub fn partition_spec_by_id(&self, spec_id: i32) -> Option<&PartitionSpecRef> { self.partition_specs.get(&spec_id) } /// Get default partition spec #[inline] - pub fn default_partition_spec(&self) -> &BoundPartitionSpecRef { + pub fn default_partition_spec(&self) -> &PartitionSpecRef { &self.default_spec } + /// Return the partition type of the default partition spec. + #[inline] + pub fn default_partition_type(&self) -> &StructType { + &self.default_partition_type + } + #[inline] /// Returns spec id of the "current" partition spec. pub fn default_partition_spec_id(&self) -> i32 { @@ -413,7 +419,7 @@ impl TableMetadata { { self.partition_specs.insert( self.default_spec.spec_id(), - Arc::new(Arc::unwrap_or_clone(self.default_spec.clone()).into_schemaless()), + Arc::new(Arc::unwrap_or_clone(self.default_spec.clone())), ); } @@ -610,7 +616,7 @@ pub(super) mod _serde { use crate::spec::schema::_serde::{SchemaV1, SchemaV2}; use crate::spec::snapshot::_serde::{SnapshotV1, SnapshotV2}; use crate::spec::{ - BoundPartitionSpec, PartitionField, Schema, SchemaRef, SchemalessPartitionSpec, Snapshot, + PartitionField, PartitionSpec, PartitionSpecRef, Schema, SchemaRef, Snapshot, SnapshotReference, SnapshotRetention, SortOrder, }; use crate::{Error, ErrorKind}; @@ -634,7 +640,7 @@ pub(super) mod _serde { pub last_column_id: i32, pub schemas: Vec, pub current_schema_id: i32, - pub partition_specs: Vec, + pub partition_specs: Vec, pub default_spec_id: i32, pub last_partition_id: i32, #[serde(skip_serializing_if = "Option::is_none")] @@ -670,7 +676,7 @@ pub(super) mod _serde { pub current_schema_id: Option, pub partition_spec: Vec, #[serde(skip_serializing_if = "Option::is_none")] - pub partition_specs: Option>, + pub partition_specs: Option>, #[serde(skip_serializing_if = "Option::is_none")] pub default_spec_id: Option, #[serde(skip_serializing_if = "Option::is_none")] @@ -776,17 +782,12 @@ pub(super) mod _serde { .map(|x| (x.spec_id(), Arc::new(x))), ); let default_spec_id = value.default_spec_id; - let default_spec = partition_specs + let default_spec: PartitionSpecRef = partition_specs .get(&value.default_spec_id) - .map(|schemaless_spec| { - (*schemaless_spec.clone()) - .clone() - .bind(current_schema.clone()) - }) - .transpose()? + .map(|schemaless_spec| (**schemaless_spec).clone()) .or_else(|| { (DEFAULT_PARTITION_SPEC_ID == default_spec_id) - .then(|| BoundPartitionSpec::unpartition_spec(current_schema.clone())) + .then(PartitionSpec::unpartition_spec) }) .ok_or_else(|| { Error::new( @@ -795,6 +796,7 @@ pub(super) mod _serde { ) })? .into(); + let default_partition_type = default_spec.partition_type(current_schema)?; let mut metadata = TableMetadata { format_version: FormatVersion::V2, @@ -806,6 +808,7 @@ pub(super) mod _serde { current_schema_id: value.current_schema_id, schemas, partition_specs, + default_partition_type, default_spec, last_partition_id: value.last_partition_id, properties: value.properties.unwrap_or_default(), @@ -901,11 +904,10 @@ pub(super) mod _serde { let partition_specs = match value.partition_specs { Some(partition_specs) => partition_specs, - None => vec![BoundPartitionSpec::builder(current_schema.clone()) + None => vec![PartitionSpec::builder(current_schema.clone()) .with_spec_id(DEFAULT_PARTITION_SPEC_ID) .add_unbound_fields(value.partition_spec.into_iter().map(|f| f.into_unbound()))? - .build()? - .into_schemaless()], + .build()?], } .into_iter() .map(|x| (x.spec_id(), Arc::new(x))) @@ -914,10 +916,9 @@ pub(super) mod _serde { let default_spec_id = value .default_spec_id .unwrap_or_else(|| partition_specs.keys().copied().max().unwrap_or_default()); - let default_spec = partition_specs + let default_spec: PartitionSpecRef = partition_specs .get(&default_spec_id) - .map(|x| Arc::unwrap_or_clone(x.clone()).bind(current_schema.clone())) - .transpose()? + .map(|x| Arc::unwrap_or_clone(x.clone())) .ok_or_else(|| { Error::new( ErrorKind::DataInvalid, @@ -925,6 +926,7 @@ pub(super) mod _serde { ) })? .into(); + let default_partition_type = default_spec.partition_type(¤t_schema)?; let mut metadata = TableMetadata { format_version: FormatVersion::V1, @@ -935,6 +937,7 @@ pub(super) mod _serde { last_column_id: value.last_column_id, current_schema_id, default_spec, + default_partition_type, last_partition_id: value .last_partition_id .unwrap_or_else(|| partition_specs.keys().copied().max().unwrap_or_default()), @@ -1205,7 +1208,7 @@ mod tests { use super::{FormatVersion, MetadataLog, SnapshotLog, TableMetadataBuilder}; use crate::spec::table_metadata::TableMetadata; use crate::spec::{ - BoundPartitionSpec, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot, + NestedField, NullOrder, Operation, PartitionSpec, PrimitiveType, Schema, Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, Summary, Transform, Type, UnboundPartitionField, }; @@ -1311,7 +1314,7 @@ mod tests { .build() .unwrap(); - let partition_spec = BoundPartitionSpec::builder(schema.clone()) + let partition_spec = PartitionSpec::builder(schema.clone()) .with_spec_id(0) .add_unbound_field(UnboundPartitionField { name: "ts_day".to_string(), @@ -1323,6 +1326,7 @@ mod tests { .build() .unwrap(); + let default_partition_type = partition_spec.partition_type(&schema).unwrap(); let expected = TableMetadata { format_version: FormatVersion::V2, table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(), @@ -1331,10 +1335,8 @@ mod tests { last_column_id: 1, schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]), current_schema_id: 1, - partition_specs: HashMap::from_iter(vec![( - 0, - partition_spec.clone().into_schemaless().into(), - )]), + partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]), + default_partition_type, default_spec: partition_spec.into(), last_partition_id: 1000, default_sort_order_id: 0, @@ -1479,7 +1481,7 @@ mod tests { .unwrap(); let schema = Arc::new(schema); - let partition_spec = BoundPartitionSpec::builder(schema.clone()) + let partition_spec = PartitionSpec::builder(schema.clone()) .with_spec_id(0) .add_partition_field("vendor_id", "vendor_id", Transform::Identity) .unwrap() @@ -1500,6 +1502,7 @@ mod tests { .with_summary(Summary { operation: Operation::Append, additional_properties: HashMap::from_iter(vec![("spark.app.id".to_string(), "local-1662532784305".to_string()), ("added-data-files".to_string(), "4".to_string()), ("added-records".to_string(), "4".to_string()), ("added-files-size".to_string(), "6001".to_string())]) }) .build(); + let default_partition_type = partition_spec.partition_type(&schema).unwrap(); let expected = TableMetadata { format_version: FormatVersion::V1, table_uuid: Uuid::parse_str("df838b92-0b32-465d-a44e-d39936e538b7").unwrap(), @@ -1508,7 +1511,8 @@ mod tests { last_column_id: 5, schemas: HashMap::from_iter(vec![(0, schema)]), current_schema_id: 0, - partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into_schemaless().into())]), + partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]), + default_partition_type, default_spec: Arc::new(partition_spec), last_partition_id: 1000, default_sort_order_id: 0, @@ -1588,11 +1592,12 @@ mod tests { .build() .unwrap(); - let partition_spec = BoundPartitionSpec::builder(schema.clone()) + let partition_spec = PartitionSpec::builder(schema.clone()) .with_spec_id(0) .build() .unwrap(); + let default_partition_type = partition_spec.partition_type(&schema).unwrap(); let expected = TableMetadata { format_version: FormatVersion::V2, table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(), @@ -1601,10 +1606,8 @@ mod tests { last_column_id: 1, schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]), current_schema_id: 1, - partition_specs: HashMap::from_iter(vec![( - 0, - partition_spec.clone().into_schemaless().into(), - )]), + partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]), + default_partition_type, default_spec: partition_spec.into(), last_partition_id: 1000, default_sort_order_id: 0, @@ -1980,7 +1983,7 @@ mod tests { .build() .unwrap(); - let partition_spec = BoundPartitionSpec::builder(schema2.clone()) + let partition_spec = PartitionSpec::builder(schema2.clone()) .with_spec_id(0) .add_unbound_field(UnboundPartitionField { name: "x".to_string(), @@ -2033,6 +2036,7 @@ mod tests { }) .build(); + let default_partition_type = partition_spec.partition_type(&schema2).unwrap(); let expected = TableMetadata { format_version: FormatVersion::V2, table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(), @@ -2041,11 +2045,9 @@ mod tests { last_column_id: 3, schemas: HashMap::from_iter(vec![(0, Arc::new(schema1)), (1, Arc::new(schema2))]), current_schema_id: 1, - partition_specs: HashMap::from_iter(vec![( - 0, - partition_spec.clone().into_schemaless().into(), - )]), + partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]), default_spec: Arc::new(partition_spec), + default_partition_type, last_partition_id: 1000, default_sort_order_id: 3, sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]), @@ -2106,7 +2108,7 @@ mod tests { .build() .unwrap(); - let partition_spec = BoundPartitionSpec::builder(schema.clone()) + let partition_spec = PartitionSpec::builder(schema.clone()) .with_spec_id(0) .add_unbound_field(UnboundPartitionField { name: "x".to_string(), @@ -2135,6 +2137,7 @@ mod tests { .build_unbound() .unwrap(); + let default_partition_type = partition_spec.partition_type(&schema).unwrap(); let expected = TableMetadata { format_version: FormatVersion::V2, table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(), @@ -2143,10 +2146,8 @@ mod tests { last_column_id: 3, schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]), current_schema_id: 0, - partition_specs: HashMap::from_iter(vec![( - 0, - partition_spec.clone().into_schemaless().into(), - )]), + partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]), + default_partition_type, default_spec: Arc::new(partition_spec), last_partition_id: 1000, default_sort_order_id: 3, @@ -2189,7 +2190,7 @@ mod tests { .build() .unwrap(); - let partition_spec = BoundPartitionSpec::builder(schema.clone()) + let partition_spec = PartitionSpec::builder(schema.clone()) .with_spec_id(0) .add_unbound_field(UnboundPartitionField { name: "x".to_string(), @@ -2201,6 +2202,7 @@ mod tests { .build() .unwrap(); + let default_partition_type = partition_spec.partition_type(&schema).unwrap(); let expected = TableMetadata { format_version: FormatVersion::V1, table_uuid: Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap(), @@ -2209,11 +2211,9 @@ mod tests { last_column_id: 3, schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]), current_schema_id: 0, - partition_specs: HashMap::from_iter(vec![( - 0, - partition_spec.clone().into_schemaless().into(), - )]), + partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]), default_spec: Arc::new(partition_spec), + default_partition_type, last_partition_id: 0, default_sort_order_id: 0, // Sort order is added during deserialization for V2 compatibility @@ -2326,17 +2326,14 @@ mod tests { fn test_default_partition_spec() { let default_spec_id = 1234; let mut table_meta_data = get_test_table_metadata("TableMetadataV2Valid.json"); - let partition_spec = - BoundPartitionSpec::unpartition_spec(table_meta_data.current_schema().clone()); + let partition_spec = PartitionSpec::unpartition_spec(); table_meta_data.default_spec = partition_spec.clone().into(); table_meta_data .partition_specs - .insert(default_spec_id, Arc::new(partition_spec.into_schemaless())); + .insert(default_spec_id, Arc::new(partition_spec)); assert_eq!( - (*table_meta_data.default_partition_spec().clone()) - .clone() - .into_schemaless(), + (*table_meta_data.default_partition_spec().clone()).clone(), (*table_meta_data .partition_spec_by_id(default_spec_id) .unwrap() @@ -2393,11 +2390,10 @@ mod tests { HashMap::from([( 0, Arc::new( - BoundPartitionSpec::builder(table_metadata.schemas.get(&0).unwrap().clone()) + PartitionSpec::builder(table_metadata.schemas.get(&0).unwrap().clone()) .with_spec_id(0) .build() .unwrap() - .into_schemaless() ) )]) ); diff --git a/crates/iceberg/src/spec/table_metadata_builder.rs b/crates/iceberg/src/spec/table_metadata_builder.rs index 966abb156..d621b0705 100644 --- a/crates/iceberg/src/spec/table_metadata_builder.rs +++ b/crates/iceberg/src/spec/table_metadata_builder.rs @@ -21,9 +21,9 @@ use std::sync::Arc; use uuid::Uuid; use super::{ - BoundPartitionSpec, FormatVersion, MetadataLog, PartitionSpecBuilder, Schema, SchemaRef, - Snapshot, SnapshotLog, SnapshotReference, SnapshotRetention, SortOrder, SortOrderRef, - TableMetadata, UnboundPartitionSpec, DEFAULT_PARTITION_SPEC_ID, DEFAULT_SCHEMA_ID, MAIN_BRANCH, + FormatVersion, MetadataLog, PartitionSpec, PartitionSpecBuilder, Schema, SchemaRef, Snapshot, + SnapshotLog, SnapshotReference, SnapshotRetention, SortOrder, SortOrderRef, StructType, + TableMetadata, UnPartitionSpec, DEFAULT_PARTITION_SPEC_ID, DEFAULT_SCHEMA_ID, MAIN_BRANCH, ONE_MINUTE_MS, PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX, PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT, RESERVED_PROPERTIES, UNPARTITIONED_LAST_ASSIGNED_ID, @@ -77,7 +77,7 @@ impl TableMetadataBuilder { /// spec.id. It should only be used to create new table metadata from scratch. pub fn new( schema: Schema, - spec: impl Into, + spec: impl Into, sort_order: SortOrder, location: String, format_version: FormatVersion, @@ -105,8 +105,9 @@ impl TableMetadataBuilder { // also unpartitioned. // The `default_spec` value is always replaced at the end of this method by he `add_default_partition_spec` // method. - BoundPartitionSpec::unpartition_spec(fresh_schema.clone()).with_spec_id(-1), + PartitionSpec::unpartition_spec().with_spec_id(-1), ), // Overwritten immediately by add_default_partition_spec + default_partition_type: StructType::new(vec![]), last_partition_id: UNPARTITIONED_LAST_ASSIGNED_ID, properties: HashMap::new(), current_snapshot_id: None, @@ -173,7 +174,7 @@ impl TableMetadataBuilder { "Can't create table without location", ) })?; - let partition_spec = partition_spec.unwrap_or(UnboundPartitionSpec { + let partition_spec = partition_spec.unwrap_or(UnPartitionSpec { spec_id: None, fields: vec![], }); @@ -636,7 +637,7 @@ impl TableMetadataBuilder { /// # Errors /// - The partition spec cannot be bound to the current schema. /// - The partition spec has non-sequential field ids and the table format version is 1. - pub fn add_partition_spec(mut self, unbound_spec: UnboundPartitionSpec) -> Result { + pub fn add_partition_spec(mut self, unbound_spec: UnPartitionSpec) -> Result { let schema = self.get_current_schema()?.clone(); let spec = PartitionSpecBuilder::new_from_unbound(unbound_spec.clone(), schema)? .with_last_assigned_field_id(self.metadata.last_partition_id) @@ -669,7 +670,7 @@ impl TableMetadataBuilder { .unwrap_or(UNPARTITIONED_LAST_ASSIGNED_ID); self.metadata .partition_specs - .insert(new_spec_id, Arc::new(spec.into())); + .insert(new_spec_id, Arc::new(spec)); self.changes .push(TableUpdate::AddSpec { spec: unbound_spec }); @@ -717,9 +718,10 @@ impl TableMetadataBuilder { ) })? .clone(); - let spec = - Arc::unwrap_or_clone(schemaless_spec).bind(self.get_current_schema()?.clone())?; + let spec = Arc::unwrap_or_clone(schemaless_spec); + let spec_type = spec.partition_type(self.get_current_schema()?)?; self.metadata.default_spec = Arc::new(spec); + self.metadata.default_partition_type = spec_type; if self.last_added_spec_id == Some(spec_id) { self.changes.push(TableUpdate::SetDefaultSpec { @@ -733,7 +735,7 @@ impl TableMetadataBuilder { } /// Add a partition spec and set it as the default - pub fn add_default_partition_spec(self, unbound_spec: UnboundPartitionSpec) -> Result { + pub fn add_default_partition_spec(self, unbound_spec: UnPartitionSpec) -> Result { self.add_partition_spec(unbound_spec)? .set_default_partition_spec(Self::LAST_ADDED) } @@ -850,6 +852,8 @@ impl TableMetadataBuilder { .into_unbound() .bind(schema.clone())?, ); + self.metadata.default_partition_type = + self.metadata.default_spec.partition_type(&schema)?; SortOrder::builder() .with_fields(sort_order.fields) .build(&schema)?; @@ -976,9 +980,9 @@ impl TableMetadataBuilder { fn reassign_ids( schema: Schema, - spec: UnboundPartitionSpec, + spec: UnPartitionSpec, sort_order: SortOrder, - ) -> Result<(Schema, BoundPartitionSpec, SortOrder)> { + ) -> Result<(Schema, PartitionSpec, SortOrder)> { // Re-assign field ids and schema ids for a new table. let previous_id_to_name = schema.field_id_to_name_map().clone(); let fresh_schema = schema @@ -1084,15 +1088,11 @@ impl TableMetadataBuilder { } /// If a compatible spec already exists, use the same ID. Otherwise, use 1 more than the highest ID. - fn reuse_or_create_new_spec_id(&self, new_spec: &BoundPartitionSpec) -> i32 { + fn reuse_or_create_new_spec_id(&self, new_spec: &PartitionSpec) -> i32 { self.metadata .partition_specs .iter() - .find_map(|(id, old_spec)| { - new_spec - .is_compatible_with_schemaless(old_spec) - .then_some(*id) - }) + .find_map(|(id, old_spec)| new_spec.is_compatible_with(old_spec).then_some(*id)) .unwrap_or_else(|| { self.get_highest_spec_id() .map(|id| id + 1) @@ -1138,9 +1138,8 @@ impl From for TableMetadata { mod tests { use super::*; use crate::spec::{ - NestedField, NullOrder, Operation, PrimitiveType, Schema, SchemalessPartitionSpec, - SnapshotRetention, SortDirection, SortField, StructType, Summary, Transform, Type, - UnboundPartitionField, + NestedField, NullOrder, Operation, PartitionSpec, PrimitiveType, Schema, SnapshotRetention, + SortDirection, SortField, StructType, Summary, Transform, Type, UnboundPartitionField, }; const TEST_LOCATION: &str = "s3://bucket/test/location"; @@ -1171,8 +1170,8 @@ mod tests { .unwrap() } - fn partition_spec() -> UnboundPartitionSpec { - UnboundPartitionSpec::builder() + fn partition_spec() -> UnPartitionSpec { + UnPartitionSpec::builder() .with_spec_id(0) .add_partition_field(2, "y", Transform::Identity) .unwrap() @@ -1250,7 +1249,7 @@ mod tests { let schema = Schema::builder().build().unwrap(); let metadata = TableMetadataBuilder::new( schema.clone(), - SchemalessPartitionSpec::unpartition_spec(), + PartitionSpec::unpartition_spec(), SortOrder::unsorted_order(), TEST_LOCATION.to_string(), FormatVersion::V2, @@ -1298,7 +1297,7 @@ mod tests { ]) .build() .unwrap(); - let spec = BoundPartitionSpec::builder(schema.clone()) + let spec = PartitionSpec::builder(schema.clone()) .with_spec_id(20) .add_partition_field("a", "a", Transform::Identity) .unwrap() @@ -1340,7 +1339,7 @@ mod tests { .build() .unwrap(); - let expected_spec = BoundPartitionSpec::builder(expected_schema.clone()) + let expected_spec = PartitionSpec::builder(expected_schema.clone()) .with_spec_id(0) .add_partition_field("a", "a", Transform::Identity) .unwrap() @@ -1413,7 +1412,7 @@ mod tests { TableUpdate::AddSpec { // Because this is a new tables, field-ids are assigned // partition_spec() has None set for field-id - spec: BoundPartitionSpec::builder(schema()) + spec: PartitionSpec::builder(schema()) .with_spec_id(0) .add_unbound_field(UnboundPartitionField { name: "y".to_string(), @@ -1445,7 +1444,7 @@ mod tests { let schema = Schema::builder().build().unwrap(); let changes = TableMetadataBuilder::new( schema.clone(), - SchemalessPartitionSpec::unpartition_spec().into_unbound(), + PartitionSpec::unpartition_spec().into_unbound(), SortOrder::unsorted_order(), TEST_LOCATION.to_string(), FormatVersion::V1, @@ -1468,7 +1467,7 @@ mod tests { TableUpdate::AddSpec { // Because this is a new tables, field-ids are assigned // partition_spec() has None set for field-id - spec: BoundPartitionSpec::builder(schema) + spec: PartitionSpec::builder(schema) .with_spec_id(0) .build() .unwrap() @@ -1486,7 +1485,7 @@ mod tests { fn test_add_partition_spec() { let builder = builder_without_changes(FormatVersion::V2); - let added_spec = UnboundPartitionSpec::builder() + let added_spec = UnPartitionSpec::builder() .with_spec_id(10) .add_partition_fields(vec![ UnboundPartitionField { @@ -1515,7 +1514,7 @@ mod tests { // Spec id should be re-assigned let expected_change = added_spec.with_spec_id(1); - let expected_spec = BoundPartitionSpec::builder(schema()) + let expected_spec = PartitionSpec::builder(schema()) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { name: "y".to_string(), @@ -1537,7 +1536,7 @@ mod tests { assert_eq!(build_result.changes.len(), 1); assert_eq!( build_result.metadata.partition_spec_by_id(1), - Some(&Arc::new(expected_spec.into_schemaless())) + Some(&Arc::new(expected_spec)) ); assert_eq!(build_result.metadata.default_spec.spec_id(), 0); assert_eq!(build_result.metadata.last_partition_id, 1001); @@ -1550,7 +1549,7 @@ mod tests { fn test_set_default_partition_spec() { let builder = builder_without_changes(FormatVersion::V2); let schema = builder.get_current_schema().unwrap().clone(); - let added_spec = UnboundPartitionSpec::builder() + let added_spec = UnPartitionSpec::builder() .with_spec_id(10) .add_partition_field(1, "y_bucket[2]", Transform::Bucket(2)) .unwrap() @@ -1564,7 +1563,7 @@ mod tests { .build() .unwrap(); - let expected_spec = BoundPartitionSpec::builder(schema) + let expected_spec = PartitionSpec::builder(schema) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { name: "y_bucket[2]".to_string(), @@ -1591,7 +1590,7 @@ mod tests { fn test_set_existing_default_partition_spec() { let builder = builder_without_changes(FormatVersion::V2); // Add and set an unbound spec as current - let unbound_spec = UnboundPartitionSpec::builder().with_spec_id(1).build(); + let unbound_spec = UnPartitionSpec::builder().with_spec_id(1).build(); let build_result = builder .add_partition_spec(unbound_spec.clone()) .unwrap() @@ -2037,7 +2036,7 @@ mod tests { fn test_add_partition_spec_for_v1_requires_sequential_ids() { let builder = builder_without_changes(FormatVersion::V1); - let added_spec = UnboundPartitionSpec::builder() + let added_spec = UnPartitionSpec::builder() .with_spec_id(10) .add_partition_fields(vec![ UnboundPartitionField { diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index edf1a8596..382e4b124 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -356,11 +356,7 @@ impl<'a> SnapshotProduceAction<'a> { } Self::validate_partition_value( data_file.partition(), - self.tx - .table - .metadata() - .default_partition_spec() - .partition_type(), + self.tx.table.metadata().default_partition_type(), )?; } self.added_data_files.extend(data_files); diff --git a/crates/iceberg/src/writer/file_writer/location_generator.rs b/crates/iceberg/src/writer/file_writer/location_generator.rs index def18b580..a997c839b 100644 --- a/crates/iceberg/src/writer/file_writer/location_generator.rs +++ b/crates/iceberg/src/writer/file_writer/location_generator.rs @@ -132,7 +132,7 @@ pub(crate) mod test { use uuid::Uuid; use super::LocationGenerator; - use crate::spec::{BoundPartitionSpec, FormatVersion, TableMetadata}; + use crate::spec::{FormatVersion, PartitionSpec, StructType, TableMetadata}; use crate::writer::file_writer::location_generator::{ FileNameGenerator, WRITE_DATA_LOCATION, WRITE_FOLDER_STORAGE_LOCATION, }; @@ -156,7 +156,6 @@ pub(crate) mod test { #[test] fn test_default_location_generate() { - let schema = crate::spec::Schema::builder().build().unwrap(); let mut table_metadata = TableMetadata { format_version: FormatVersion::V2, table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(), @@ -166,7 +165,8 @@ pub(crate) mod test { schemas: HashMap::new(), current_schema_id: 1, partition_specs: HashMap::new(), - default_spec: BoundPartitionSpec::unpartition_spec(schema).into(), + default_spec: PartitionSpec::unpartition_spec().into(), + default_partition_type: StructType::new(vec![]), last_partition_id: 1000, default_sort_order_id: 0, sort_orders: HashMap::from_iter(vec![]), From 85fc497339b068008464fa2138dd213c88f53d80 Mon Sep 17 00:00:00 2001 From: Christian Thiel Date: Tue, 10 Dec 2024 09:41:51 +0100 Subject: [PATCH 2/6] Fix UnboundPartitionSpec name --- crates/catalog/rest/src/catalog.rs | 4 +- crates/catalog/rest/src/types.rs | 4 +- crates/iceberg/src/catalog/mod.rs | 13 +++--- crates/iceberg/src/spec/partition.rs | 46 +++++++++---------- .../src/spec/table_metadata_builder.rs | 24 +++++----- 5 files changed, 46 insertions(+), 45 deletions(-) diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index e60dbf31f..fce5fe2be 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -702,7 +702,7 @@ mod tests { use iceberg::spec::{ FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot, SnapshotLog, SortDirection, SortField, SortOrder, Summary, Transform, Type, - UnPartitionSpec, UnboundPartitionField, + UnboundPartitionField, UnboundPartitionSpec, }; use iceberg::transaction::Transaction; use mockito::{Mock, Server, ServerGuard}; @@ -1488,7 +1488,7 @@ mod tests { ) .properties(HashMap::from([("owner".to_string(), "testx".to_string())])) .partition_spec( - UnPartitionSpec::builder() + UnboundPartitionSpec::builder() .add_partition_fields(vec![UnboundPartitionField::builder() .source_id(1) .transform(Transform::Truncate(3)) diff --git a/crates/catalog/rest/src/types.rs b/crates/catalog/rest/src/types.rs index 815dfddae..11833a562 100644 --- a/crates/catalog/rest/src/types.rs +++ b/crates/catalog/rest/src/types.rs @@ -17,7 +17,7 @@ use std::collections::HashMap; -use iceberg::spec::{Schema, SortOrder, TableMetadata, UnPartitionSpec}; +use iceberg::spec::{Schema, SortOrder, TableMetadata, UnboundPartitionSpec}; use iceberg::{ Error, ErrorKind, Namespace, NamespaceIdent, TableIdent, TableRequirement, TableUpdate, }; @@ -167,7 +167,7 @@ pub(super) struct CreateTableRequest { pub(super) name: String, pub(super) location: Option, pub(super) schema: Schema, - pub(super) partition_spec: Option, + pub(super) partition_spec: Option, pub(super) write_order: Option, pub(super) stage_create: Option, pub(super) properties: Option>, diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index 5d342eb6f..b897d1574 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -30,7 +30,8 @@ use uuid::Uuid; use crate::spec::{ FormatVersion, Schema, SchemaId, Snapshot, SnapshotReference, SortOrder, TableMetadata, - TableMetadataBuilder, UnPartitionSpec, ViewFormatVersion, ViewRepresentations, ViewVersion, + TableMetadataBuilder, UnboundPartitionSpec, ViewFormatVersion, ViewRepresentations, + ViewVersion, }; use crate::table::Table; use crate::{Error, ErrorKind, Result}; @@ -243,7 +244,7 @@ pub struct TableCreation { pub schema: Schema, /// The partition spec of the table, could be None. #[builder(default, setter(strip_option(fallback = partition_spec_opt), into))] - pub partition_spec: Option, + pub partition_spec: Option, /// The sort order of the table. #[builder(default, setter(strip_option(fallback = sort_order_opt)))] pub sort_order: Option, @@ -382,7 +383,7 @@ pub enum TableUpdate { /// Add a new partition spec to the table AddSpec { /// The partition spec to add. - spec: UnPartitionSpec, + spec: UnboundPartitionSpec, }, /// Set table's default spec #[serde(rename_all = "kebab-case")] @@ -769,8 +770,8 @@ mod tests { FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, SqlViewRepresentation, Summary, TableMetadata, TableMetadataBuilder, Transform, Type, - UnPartitionSpec, ViewFormatVersion, ViewRepresentation, ViewRepresentations, ViewVersion, - MAIN_BRANCH, + UnboundPartitionSpec, ViewFormatVersion, ViewRepresentation, ViewRepresentations, + ViewVersion, MAIN_BRANCH, }; use crate::{NamespaceIdent, TableCreation, TableIdent, TableRequirement, TableUpdate}; @@ -1304,7 +1305,7 @@ mod tests { } "#, TableUpdate::AddSpec { - spec: UnPartitionSpec::builder() + spec: UnboundPartitionSpec::builder() .add_partition_field(4, "ts_day".to_string(), Transform::Day) .unwrap() .add_partition_field(1, "id_bucket".to_string(), Transform::Bucket(16)) diff --git a/crates/iceberg/src/spec/partition.rs b/crates/iceberg/src/spec/partition.rs index 429c3eaab..8a28d4843 100644 --- a/crates/iceberg/src/spec/partition.rs +++ b/crates/iceberg/src/spec/partition.rs @@ -104,7 +104,7 @@ impl PartitionSpec { } /// Convert to unbound partition spec - pub fn into_unbound(self) -> UnPartitionSpec { + pub fn into_unbound(self) -> UnboundPartitionSpec { self.into() } @@ -152,8 +152,8 @@ impl PartitionSpec { } } -/// Reference to [`UnPartitionSpec`]. -pub type UnPartitionSpecRef = Arc; +/// Reference to [`UnboundPartitionSpec`]. +pub type UnboundPartitionSpecRef = Arc; /// Unbound partition field can be built without a schema and later bound to a schema. #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, TypedBuilder)] #[serde(rename_all = "kebab-case")] @@ -173,17 +173,17 @@ pub struct UnboundPartitionField { /// Unbound partition spec can be built without a schema and later bound to a schema. #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default)] #[serde(rename_all = "kebab-case")] -pub struct UnPartitionSpec { +pub struct UnboundPartitionSpec { /// Identifier for PartitionSpec pub(crate) spec_id: Option, /// Details of the partition spec pub(crate) fields: Vec, } -impl UnPartitionSpec { +impl UnboundPartitionSpec { /// Create unbound partition spec builder - pub fn builder() -> UnPartitionSpecBuilder { - UnPartitionSpecBuilder::default() + pub fn builder() -> UnboundPartitionSpecBuilder { + UnboundPartitionSpecBuilder::default() } /// Bind this unbound partition spec to a schema. @@ -236,23 +236,23 @@ impl From for UnboundPartitionField { } } -impl From for UnPartitionSpec { +impl From for UnboundPartitionSpec { fn from(spec: PartitionSpec) -> Self { - UnPartitionSpec { + UnboundPartitionSpec { spec_id: Some(spec.spec_id), fields: spec.fields.into_iter().map(Into::into).collect(), } } } -/// Create a new UnPartitionSpec +/// Create a new UnboundPartitionSpec #[derive(Debug, Default)] -pub struct UnPartitionSpecBuilder { +pub struct UnboundPartitionSpecBuilder { spec_id: Option, fields: Vec, } -impl UnPartitionSpecBuilder { +impl UnboundPartitionSpecBuilder { /// Create a new partition spec builder with the given schema. pub fn new() -> Self { Self { @@ -306,8 +306,8 @@ impl UnPartitionSpecBuilder { } /// Build the unbound partition spec. - pub fn build(self) -> UnPartitionSpec { - UnPartitionSpec { + pub fn build(self) -> UnboundPartitionSpec { + UnboundPartitionSpec { spec_id: self.spec_id, fields: self.fields, } @@ -336,7 +336,7 @@ impl PartitionSpecBuilder { /// Create a new partition spec builder from an existing unbound partition spec. pub fn new_from_unbound( - unbound: UnPartitionSpec, + unbound: UnboundPartitionSpec, schema: impl Into, ) -> Result { let mut builder = @@ -584,7 +584,7 @@ impl PartitionSpecBuilder { } } -/// Contains checks that are common to both PartitionSpecBuilder and UnPartitionSpecBuilder +/// Contains checks that are common to both PartitionSpecBuilder and UnboundPartitionSpecBuilder trait CorePartitionSpecValidator { /// Ensure that the partition name is unique among the partition fields and is not empty. fn check_name_set_and_unique(&self, name: &str) -> Result<()> { @@ -647,7 +647,7 @@ impl CorePartitionSpecValidator for PartitionSpecBuilder { } } -impl CorePartitionSpecValidator for UnPartitionSpecBuilder { +impl CorePartitionSpecValidator for UnboundPartitionSpecBuilder { fn fields(&self) -> &Vec { &self.fields } @@ -792,7 +792,7 @@ mod tests { } "#; - let partition_spec: UnPartitionSpec = serde_json::from_str(spec).unwrap(); + let partition_spec: UnboundPartitionSpec = serde_json::from_str(spec).unwrap(); assert_eq!(Some(1), partition_spec.spec_id); assert_eq!(4, partition_spec.fields[0].source_id); @@ -819,7 +819,7 @@ mod tests { } ] } "#; - let partition_spec: UnPartitionSpec = serde_json::from_str(spec).unwrap(); + let partition_spec: UnboundPartitionSpec = serde_json::from_str(spec).unwrap(); assert_eq!(None, partition_spec.spec_id); assert_eq!(4, partition_spec.fields[0].source_id); @@ -1041,7 +1041,7 @@ mod tests { #[test] fn test_builder_disallow_duplicate_names() { - UnPartitionSpec::builder() + UnboundPartitionSpec::builder() .add_partition_field(1, "ts_day".to_string(), Transform::Day) .unwrap() .add_partition_field(2, "ts_day".to_string(), Transform::Day) @@ -1320,7 +1320,7 @@ mod tests { #[test] fn test_builder_disallows_redundant() { - let err = UnPartitionSpec::builder() + let err = UnboundPartitionSpec::builder() .with_spec_id(1) .add_partition_field(1, "id_bucket[16]".to_string(), Transform::Bucket(16)) .unwrap() @@ -1358,7 +1358,7 @@ mod tests { #[test] fn test_build_unbound_specs_without_partition_id() { - let spec = UnPartitionSpec::builder() + let spec = UnboundPartitionSpec::builder() .with_spec_id(1) .add_partition_fields(vec![UnboundPartitionField { source_id: 1, @@ -1369,7 +1369,7 @@ mod tests { .unwrap() .build(); - assert_eq!(spec, UnPartitionSpec { + assert_eq!(spec, UnboundPartitionSpec { spec_id: Some(1), fields: vec![UnboundPartitionField { source_id: 1, diff --git a/crates/iceberg/src/spec/table_metadata_builder.rs b/crates/iceberg/src/spec/table_metadata_builder.rs index d621b0705..d6ea2c91f 100644 --- a/crates/iceberg/src/spec/table_metadata_builder.rs +++ b/crates/iceberg/src/spec/table_metadata_builder.rs @@ -23,7 +23,7 @@ use uuid::Uuid; use super::{ FormatVersion, MetadataLog, PartitionSpec, PartitionSpecBuilder, Schema, SchemaRef, Snapshot, SnapshotLog, SnapshotReference, SnapshotRetention, SortOrder, SortOrderRef, StructType, - TableMetadata, UnPartitionSpec, DEFAULT_PARTITION_SPEC_ID, DEFAULT_SCHEMA_ID, MAIN_BRANCH, + TableMetadata, UnboundPartitionSpec, DEFAULT_PARTITION_SPEC_ID, DEFAULT_SCHEMA_ID, MAIN_BRANCH, ONE_MINUTE_MS, PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX, PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT, RESERVED_PROPERTIES, UNPARTITIONED_LAST_ASSIGNED_ID, @@ -77,7 +77,7 @@ impl TableMetadataBuilder { /// spec.id. It should only be used to create new table metadata from scratch. pub fn new( schema: Schema, - spec: impl Into, + spec: impl Into, sort_order: SortOrder, location: String, format_version: FormatVersion, @@ -174,7 +174,7 @@ impl TableMetadataBuilder { "Can't create table without location", ) })?; - let partition_spec = partition_spec.unwrap_or(UnPartitionSpec { + let partition_spec = partition_spec.unwrap_or(UnboundPartitionSpec { spec_id: None, fields: vec![], }); @@ -637,7 +637,7 @@ impl TableMetadataBuilder { /// # Errors /// - The partition spec cannot be bound to the current schema. /// - The partition spec has non-sequential field ids and the table format version is 1. - pub fn add_partition_spec(mut self, unbound_spec: UnPartitionSpec) -> Result { + pub fn add_partition_spec(mut self, unbound_spec: UnboundPartitionSpec) -> Result { let schema = self.get_current_schema()?.clone(); let spec = PartitionSpecBuilder::new_from_unbound(unbound_spec.clone(), schema)? .with_last_assigned_field_id(self.metadata.last_partition_id) @@ -735,7 +735,7 @@ impl TableMetadataBuilder { } /// Add a partition spec and set it as the default - pub fn add_default_partition_spec(self, unbound_spec: UnPartitionSpec) -> Result { + pub fn add_default_partition_spec(self, unbound_spec: UnboundPartitionSpec) -> Result { self.add_partition_spec(unbound_spec)? .set_default_partition_spec(Self::LAST_ADDED) } @@ -980,7 +980,7 @@ impl TableMetadataBuilder { fn reassign_ids( schema: Schema, - spec: UnPartitionSpec, + spec: UnboundPartitionSpec, sort_order: SortOrder, ) -> Result<(Schema, PartitionSpec, SortOrder)> { // Re-assign field ids and schema ids for a new table. @@ -1170,8 +1170,8 @@ mod tests { .unwrap() } - fn partition_spec() -> UnPartitionSpec { - UnPartitionSpec::builder() + fn partition_spec() -> UnboundPartitionSpec { + UnboundPartitionSpec::builder() .with_spec_id(0) .add_partition_field(2, "y", Transform::Identity) .unwrap() @@ -1485,7 +1485,7 @@ mod tests { fn test_add_partition_spec() { let builder = builder_without_changes(FormatVersion::V2); - let added_spec = UnPartitionSpec::builder() + let added_spec = UnboundPartitionSpec::builder() .with_spec_id(10) .add_partition_fields(vec![ UnboundPartitionField { @@ -1549,7 +1549,7 @@ mod tests { fn test_set_default_partition_spec() { let builder = builder_without_changes(FormatVersion::V2); let schema = builder.get_current_schema().unwrap().clone(); - let added_spec = UnPartitionSpec::builder() + let added_spec = UnboundPartitionSpec::builder() .with_spec_id(10) .add_partition_field(1, "y_bucket[2]", Transform::Bucket(2)) .unwrap() @@ -1590,7 +1590,7 @@ mod tests { fn test_set_existing_default_partition_spec() { let builder = builder_without_changes(FormatVersion::V2); // Add and set an unbound spec as current - let unbound_spec = UnPartitionSpec::builder().with_spec_id(1).build(); + let unbound_spec = UnboundPartitionSpec::builder().with_spec_id(1).build(); let build_result = builder .add_partition_spec(unbound_spec.clone()) .unwrap() @@ -2036,7 +2036,7 @@ mod tests { fn test_add_partition_spec_for_v1_requires_sequential_ids() { let builder = builder_without_changes(FormatVersion::V1); - let added_spec = UnPartitionSpec::builder() + let added_spec = UnboundPartitionSpec::builder() .with_spec_id(10) .add_partition_fields(vec![ UnboundPartitionField { From d8dc45f1686f63e992661f81addc9d16b6e7daac Mon Sep 17 00:00:00 2001 From: Christian Date: Tue, 10 Dec 2024 17:05:21 +0100 Subject: [PATCH 3/6] Update crates/iceberg/src/expr/visitors/expression_evaluator.rs Co-authored-by: Fokko Driesprong --- crates/iceberg/src/expr/visitors/expression_evaluator.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/expr/visitors/expression_evaluator.rs b/crates/iceberg/src/expr/visitors/expression_evaluator.rs index e7f34bf97..7ff1035d5 100644 --- a/crates/iceberg/src/expr/visitors/expression_evaluator.rs +++ b/crates/iceberg/src/expr/visitors/expression_evaluator.rs @@ -275,7 +275,7 @@ mod tests { let spec = PartitionSpec::builder(schema.clone()) .with_spec_id(1) - .add_unbound_fields(vec![UnboundPartitionField::builder() + .add_unbound_field(UnboundPartitionField::builder() .source_id(1) .name("a".to_string()) .field_id(1) From 3ff8c12cabb8d923edb8e5b2e73d94717e1d35eb Mon Sep 17 00:00:00 2001 From: Christian Date: Tue, 10 Dec 2024 17:07:02 +0100 Subject: [PATCH 4/6] Update crates/iceberg/src/spec/table_metadata.rs Co-authored-by: Fokko Driesprong --- crates/iceberg/src/spec/table_metadata.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 244216d5b..bb1cc917c 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -784,7 +784,7 @@ pub(super) mod _serde { let default_spec_id = value.default_spec_id; let default_spec: PartitionSpecRef = partition_specs .get(&value.default_spec_id) - .map(|schemaless_spec| (**schemaless_spec).clone()) + .map(|spec| (**spec).clone()) .or_else(|| { (DEFAULT_PARTITION_SPEC_ID == default_spec_id) .then(PartitionSpec::unpartition_spec) From 75756c3d10a85eb9d05cbe7cfb58e6eb377e3d24 Mon Sep 17 00:00:00 2001 From: Christian Thiel Date: Tue, 10 Dec 2024 17:07:40 +0100 Subject: [PATCH 5/6] Fix syntax --- .../src/expr/visitors/expression_evaluator.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/crates/iceberg/src/expr/visitors/expression_evaluator.rs b/crates/iceberg/src/expr/visitors/expression_evaluator.rs index 7ff1035d5..6f03a6ee6 100644 --- a/crates/iceberg/src/expr/visitors/expression_evaluator.rs +++ b/crates/iceberg/src/expr/visitors/expression_evaluator.rs @@ -275,12 +275,14 @@ mod tests { let spec = PartitionSpec::builder(schema.clone()) .with_spec_id(1) - .add_unbound_field(UnboundPartitionField::builder() - .source_id(1) - .name("a".to_string()) - .field_id(1) - .transform(Transform::Identity) - .build()]) + .add_unbound_field( + UnboundPartitionField::builder() + .source_id(1) + .name("a".to_string()) + .field_id(1) + .transform(Transform::Identity) + .build(), + ) .unwrap() .build() .unwrap(); From d351b3a98e62bacc16091f7553f5455b28c21d48 Mon Sep 17 00:00:00 2001 From: Christian Thiel Date: Sun, 15 Dec 2024 11:49:48 +0100 Subject: [PATCH 6/6] Address comments --- crates/iceberg/src/spec/partition.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/crates/iceberg/src/spec/partition.rs b/crates/iceberg/src/spec/partition.rs index 8a28d4843..e6405be4c 100644 --- a/crates/iceberg/src/spec/partition.rs +++ b/crates/iceberg/src/spec/partition.rs @@ -55,9 +55,11 @@ impl PartitionField { /// Reference to [`PartitionSpec`]. pub type PartitionSpecRef = Arc; /// Partition spec that defines how to produce a tuple of partition values from a record. -/// Schemaless partition specs are never constructed manually. They occur when a table is mutated -/// and partition spec and schemas are updated. While old partition specs are retained, the bound -/// schema might not be available anymore as part of the table metadata. +/// +/// A [`PartitionSpec`] is originally obtained by binding an [`UnboundPartitionSpec`] to a schema and is +/// only guaranteed to be valid for that schema. The main difference between [`PartitionSpec`] and +/// [`UnboundPartitionSpec`] is that the former has field ids assigned, +/// while field ids are optional for [`UnboundPartitionSpec`]. #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] #[serde(rename_all = "kebab-case")] pub struct PartitionSpec { @@ -83,7 +85,7 @@ impl PartitionSpec { self.spec_id } - /// Get a new unpatitioned partition spec + /// Get a new unpartitioned partition spec pub fn unpartition_spec() -> Self { Self { spec_id: DEFAULT_PARTITION_SPEC_ID, @@ -171,6 +173,8 @@ pub struct UnboundPartitionField { } /// Unbound partition spec can be built without a schema and later bound to a schema. +/// They are used to transport schema information as part of the REST specification. +/// The main difference to [`PartitionSpec`] is that the field ids are optional. #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default)] #[serde(rename_all = "kebab-case")] pub struct UnboundPartitionSpec {