From 2c9d7e4855ad6ab7269b2ff5b3b3b180f7dcbd2d Mon Sep 17 00:00:00 2001 From: Christian Thiel Date: Wed, 2 Oct 2024 07:51:50 +0200 Subject: [PATCH] New builder --- crates/catalog/glue/src/catalog.rs | 4 +- crates/catalog/glue/src/schema.rs | 4 +- crates/catalog/glue/src/utils.rs | 4 +- crates/catalog/hms/src/catalog.rs | 4 +- crates/catalog/memory/src/catalog.rs | 4 +- crates/iceberg/src/catalog/mod.rs | 53 +- crates/iceberg/src/spec/mod.rs | 1 + crates/iceberg/src/spec/partition.rs | 5 + crates/iceberg/src/spec/schema.rs | 18 + crates/iceberg/src/spec/table_metadata.rs | 111 +- .../src/spec/table_metadata_builder.rs | 1912 +++++++++++++++++ 11 files changed, 2014 insertions(+), 106 deletions(-) create mode 100644 crates/iceberg/src/spec/table_metadata_builder.rs diff --git a/crates/catalog/glue/src/catalog.rs b/crates/catalog/glue/src/catalog.rs index 18e30f3d0..ad216a3a7 100644 --- a/crates/catalog/glue/src/catalog.rs +++ b/crates/catalog/glue/src/catalog.rs @@ -355,7 +355,9 @@ impl Catalog for GlueCatalog { } }; - let metadata = TableMetadataBuilder::from_table_creation(creation)?.build()?; + let metadata = TableMetadataBuilder::from_table_creation(creation)? + .build()? + .metadata; let metadata_location = create_metadata_location(&location, 0)?; self.file_io diff --git a/crates/catalog/glue/src/schema.rs b/crates/catalog/glue/src/schema.rs index bb676e36e..1b490d13d 100644 --- a/crates/catalog/glue/src/schema.rs +++ b/crates/catalog/glue/src/schema.rs @@ -198,7 +198,9 @@ mod tests { .location("my_location".to_string()) .schema(schema) .build(); - let metadata = TableMetadataBuilder::from_table_creation(table_creation)?.build()?; + let metadata = TableMetadataBuilder::from_table_creation(table_creation)? + .build()? + .metadata; Ok(metadata) } diff --git a/crates/catalog/glue/src/utils.rs b/crates/catalog/glue/src/utils.rs index a99fb19c7..c43af500b 100644 --- a/crates/catalog/glue/src/utils.rs +++ b/crates/catalog/glue/src/utils.rs @@ -299,7 +299,9 @@ mod tests { .location("my_location".to_string()) .schema(schema) .build(); - let metadata = TableMetadataBuilder::from_table_creation(table_creation)?.build()?; + let metadata = TableMetadataBuilder::from_table_creation(table_creation)? + .build()? + .metadata; Ok(metadata) } diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs index 6e5db1968..e19cf4ce5 100644 --- a/crates/catalog/hms/src/catalog.rs +++ b/crates/catalog/hms/src/catalog.rs @@ -346,7 +346,9 @@ impl Catalog for HmsCatalog { } }; - let metadata = TableMetadataBuilder::from_table_creation(creation)?.build()?; + let metadata = TableMetadataBuilder::from_table_creation(creation)? + .build()? + .metadata; let metadata_location = create_metadata_location(&location, 0)?; self.file_io diff --git a/crates/catalog/memory/src/catalog.rs b/crates/catalog/memory/src/catalog.rs index eebce36ff..eaae0e615 100644 --- a/crates/catalog/memory/src/catalog.rs +++ b/crates/catalog/memory/src/catalog.rs @@ -194,7 +194,9 @@ impl Catalog for MemoryCatalog { } }; - let metadata = TableMetadataBuilder::from_table_creation(table_creation)?.build()?; + let metadata = TableMetadataBuilder::from_table_creation(table_creation)? + .build()? + .metadata; let metadata_location = format!( "{}/metadata/{}-{}.metadata.json", &location, diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index 54abe8083..de5b03675 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -445,8 +445,46 @@ impl TableUpdate { /// Applies the update to the table metadata builder. pub fn apply(self, builder: TableMetadataBuilder) -> Result { match self { - TableUpdate::AssignUuid { uuid } => builder.assign_uuid(uuid), - _ => unimplemented!(), + TableUpdate::AssignUuid { uuid } => Ok(builder.assign_uuid(uuid)), + TableUpdate::AddSchema { + schema, + last_column_id, + } => { + if let Some(last_column_id) = last_column_id { + if builder.last_column_id() < last_column_id { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Invalid last column ID: {last_column_id} < {} (previous last column ID)", + builder.last_column_id() + ), + )); + } + }; + Ok(builder.add_schema(schema)) + } + TableUpdate::SetCurrentSchema { schema_id } => builder.set_current_schema(schema_id), + TableUpdate::AddSpec { spec } => builder.add_partition_spec(spec), + TableUpdate::SetDefaultSpec { spec_id } => builder.set_default_partition_spec(spec_id), + TableUpdate::AddSortOrder { sort_order } => builder.add_sort_order(sort_order), + TableUpdate::SetDefaultSortOrder { sort_order_id } => { + builder.set_default_sort_order(sort_order_id) + } + TableUpdate::AddSnapshot { snapshot } => builder.add_snapshot(snapshot), + TableUpdate::SetSnapshotRef { + ref_name, + reference, + } => builder.set_ref(&ref_name, reference), + TableUpdate::RemoveSnapshots { snapshot_ids } => { + Ok(builder.remove_snapshots(&snapshot_ids)) + } + TableUpdate::RemoveSnapshotRef { ref_name } => Ok(builder.remove_ref(&ref_name)), + TableUpdate::SetLocation { location } => Ok(builder.set_location(location)), + TableUpdate::SetProperties { updates } => builder.set_properties(updates), + TableUpdate::RemoveProperties { removals } => Ok(builder.remove_properties(&removals)), + TableUpdate::UpgradeFormatVersion { format_version } => { + builder.upgrade_format_version(format_version) + } } } } @@ -1125,8 +1163,12 @@ mod tests { let table_metadata = TableMetadataBuilder::from_table_creation(table_creation) .unwrap() .build() - .unwrap(); - let table_metadata_builder = TableMetadataBuilder::new(table_metadata); + .unwrap() + .metadata; + let table_metadata_builder = TableMetadataBuilder::new_from_metadata( + table_metadata, + "s3://db/table/metadata/metadata1.gz.json", + ); let uuid = uuid::Uuid::new_v4(); let update = TableUpdate::AssignUuid { uuid }; @@ -1134,7 +1176,8 @@ mod tests { .apply(table_metadata_builder) .unwrap() .build() - .unwrap(); + .unwrap() + .metadata; assert_eq!(updated_metadata.uuid(), uuid); } } diff --git a/crates/iceberg/src/spec/mod.rs b/crates/iceberg/src/spec/mod.rs index 793f00d34..9b91d5443 100644 --- a/crates/iceberg/src/spec/mod.rs +++ b/crates/iceberg/src/spec/mod.rs @@ -25,6 +25,7 @@ mod schema; mod snapshot; mod sort; mod table_metadata; +mod table_metadata_builder; mod transform; mod values; mod view_metadata; diff --git a/crates/iceberg/src/spec/partition.rs b/crates/iceberg/src/spec/partition.rs index 72e398768..81bf9ecb6 100644 --- a/crates/iceberg/src/spec/partition.rs +++ b/crates/iceberg/src/spec/partition.rs @@ -171,6 +171,11 @@ impl PartitionSpec { pub fn partition_type(&self) -> &StructType { &self.partition_type } + + /// Set the spec id for the partition spec. + pub(crate) fn with_spec_id(self, spec_id: i32) -> Self { + Self { spec_id, ..self } + } } impl SchemalessPartitionSpec { diff --git a/crates/iceberg/src/spec/schema.rs b/crates/iceberg/src/spec/schema.rs index cf86874dc..d78a2545a 100644 --- a/crates/iceberg/src/spec/schema.rs +++ b/crates/iceberg/src/spec/schema.rs @@ -376,6 +376,24 @@ impl Schema { pub fn accessor_by_field_id(&self, field_id: i32) -> Option> { self.field_id_to_accessor.get(&field_id).cloned() } + + /// Check if this schema is identical to another schema semantically - excluding schema id. + pub(crate) fn is_same_schema(&self, other: &SchemaRef) -> bool { + self.as_struct().eq(other.as_struct()) + && self.identifier_field_ids().eq(other.identifier_field_ids()) + } + + /// Change the schema id of this schema. + // This is redundant with the `with_schema_id` method on the builder, but useful + // as it is infallible in contrast to the builder `build()` method. + pub(crate) fn with_schema_id(self, schema_id: SchemaId) -> Self { + Self { schema_id, ..self } + } + + /// Return A HashMap matching field ids to field names. + pub(crate) fn field_id_to_name_map(&self) -> &HashMap { + &self.id_to_name + } } impl Display for Schema { diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 7707b13b1..c23a09ecc 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -30,12 +30,13 @@ use serde_repr::{Deserialize_repr, Serialize_repr}; use uuid::Uuid; use super::snapshot::SnapshotReference; +pub use super::table_metadata_builder::TableMetadataBuilder; use super::{ - PartitionSpec, PartitionSpecRef, SchemaId, SchemaRef, SchemalessPartitionSpecRef, Snapshot, - SnapshotRef, SnapshotRetention, SortOrder, SortOrderRef, DEFAULT_PARTITION_SPEC_ID, + PartitionSpecRef, SchemaId, SchemaRef, SchemalessPartitionSpecRef, Snapshot, SnapshotRef, + SnapshotRetention, SortOrder, SortOrderRef, DEFAULT_PARTITION_SPEC_ID, }; use crate::error::{timestamp_ms_to_utc, Result}; -use crate::{Error, ErrorKind, TableCreation}; +use crate::{Error, ErrorKind}; static MAIN_BRANCH: &str = "main"; pub(crate) static ONE_MINUTE_MS: i64 = 60_000; @@ -165,6 +166,15 @@ pub struct TableMetadata { } impl TableMetadata { + /// Convert this Table Metadata into a builder for modification. + /// + /// `current_file_location` is the location where the current version + /// of the metadata file is stored. This is used to update the metadata log. + #[must_use] + pub fn into_builder(self, current_file_location: impl Into) -> TableMetadataBuilder { + TableMetadataBuilder::new_from_metadata(self, current_file_location) + } + /// Returns format version of this metadata. #[inline] pub fn format_version(&self) -> FormatVersion { @@ -539,98 +549,6 @@ impl TableMetadata { } } -/// Manipulating table metadata. -pub struct TableMetadataBuilder(TableMetadata); - -impl TableMetadataBuilder { - /// Creates a new table metadata builder from the given table metadata. - pub fn new(origin: TableMetadata) -> Self { - Self(origin) - } - - /// Creates a new table metadata builder from the given table creation. - pub fn from_table_creation(table_creation: TableCreation) -> Result { - let TableCreation { - name: _, - location, - schema, - partition_spec, - sort_order, - properties, - } = table_creation; - - let schema: Arc = Arc::new(schema); - let unpartition_spec = PartitionSpec::unpartition_spec(schema.clone()); - let partition_specs = match partition_spec { - Some(_) => { - return Err(Error::new( - ErrorKind::FeatureUnsupported, - "Can't create table with partition spec now", - )) - } - None => HashMap::from([( - unpartition_spec.spec_id(), - Arc::new(unpartition_spec.clone().into_schemaless()), - )]), - }; - - let sort_orders = match sort_order { - Some(_) => { - return Err(Error::new( - ErrorKind::FeatureUnsupported, - "Can't create table with sort order now", - )) - } - None => HashMap::from([( - SortOrder::UNSORTED_ORDER_ID, - Arc::new(SortOrder::unsorted_order()), - )]), - }; - - let mut table_metadata = TableMetadata { - format_version: FormatVersion::V2, - table_uuid: Uuid::now_v7(), - location: location.ok_or_else(|| { - Error::new( - ErrorKind::DataInvalid, - "Can't create table without location", - ) - })?, - last_sequence_number: 0, - last_updated_ms: Utc::now().timestamp_millis(), - last_column_id: schema.highest_field_id(), - current_schema_id: schema.schema_id(), - schemas: HashMap::from([(schema.schema_id(), schema)]), - partition_specs, - default_spec: PartitionSpecRef::new(unpartition_spec), - last_partition_id: 0, - properties, - current_snapshot_id: None, - snapshots: Default::default(), - snapshot_log: vec![], - sort_orders, - metadata_log: vec![], - default_sort_order_id: SortOrder::UNSORTED_ORDER_ID, - refs: Default::default(), - }; - - table_metadata.try_normalize()?; - - Ok(Self(table_metadata)) - } - - /// Changes uuid of table metadata. - pub fn assign_uuid(mut self, uuid: Uuid) -> Result { - self.0.table_uuid = uuid; - Ok(self) - } - - /// Returns the new table metadata after changes. - pub fn build(self) -> Result { - Ok(self.0) - } -} - pub(super) mod _serde { use std::borrow::BorrowMut; /// This is a helper module that defines types to help with serialization/deserialization. @@ -2308,7 +2226,8 @@ mod tests { let table_metadata = TableMetadataBuilder::from_table_creation(table_creation) .unwrap() .build() - .unwrap(); + .unwrap() + .metadata; assert_eq!(table_metadata.location, "s3://db/table"); assert_eq!(table_metadata.schemas.len(), 1); assert_eq!( diff --git a/crates/iceberg/src/spec/table_metadata_builder.rs b/crates/iceberg/src/spec/table_metadata_builder.rs new file mode 100644 index 000000000..b7e951481 --- /dev/null +++ b/crates/iceberg/src/spec/table_metadata_builder.rs @@ -0,0 +1,1912 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + +use uuid::Uuid; + +use super::{ + FormatVersion, MetadataLog, PartitionSpec, PartitionSpecBuilder, Schema, SchemaRef, Snapshot, + SnapshotLog, SnapshotReference, SortOrder, SortOrderRef, TableMetadata, UnboundPartitionSpec, + UnboundPartitionSpecInterface as _, DEFAULT_PARTITION_SPEC_ID, DEFAULT_SCHEMA_ID, MAIN_BRANCH, + ONE_MINUTE_MS, PROPERTY_FORMAT_VERSION, PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT, + RESERVED_PROPERTIES, UNPARTITIONED_LAST_ASSIGNED_ID, +}; +use crate::error::{Error, ErrorKind, Result}; +use crate::{TableCreation, TableUpdate}; + +/// Manipulating table metadata. +/// +/// For this builder the order of called functions matters. Functions are applied in-order. +/// All operations applied to the `TableMetadata` are tracked in `changes` as a chronologically +/// ordered vec of `TableUpdate`. +/// If an operation does not lead to a change of the `TableMetadata`, the corresponding update +/// is omitted from `changes`. +/// +/// Unlike a typical builder pattern, the order of function calls matters. +/// Some basic rules: +/// - `add_schema` must be called before `set_current_schema`. +/// - If a new partition spec and schema are added, the schema should be added first. +#[derive(Debug, Clone)] +pub struct TableMetadataBuilder { + metadata: TableMetadata, + changes: Vec, + last_added_schema_id: Option, + last_added_spec_id: Option, + last_added_order_id: Option, + // None if this is a new table (from_metadata) method not used + previous_history_entry: Option, +} + +#[derive(Debug, Clone, PartialEq)] +pub struct TableMetadataBuildResult { + pub metadata: TableMetadata, + pub changes: Vec, + pub expired_metadata_logs: Vec, +} + +impl TableMetadataBuilder { + const LAST_ADDED: i32 = -1; + + /// Create a `TableMetadata` object from scratch. + /// + /// This method re-assign ids of fields in the schema, schema.id, sort_order.id and + /// spec.id. It should only be used to create new table metadata from scratch. + pub fn new( + schema: Schema, + spec: impl Into, + sort_order: SortOrder, + location: String, + format_version: FormatVersion, + properties: HashMap, + ) -> Result { + // Re-assign field_ids, schema.id, sort_order.id and spec.id for a new table. + let (fresh_schema, fresh_spec, fresh_sort_order) = + Self::reassign_ids(schema, spec.into(), sort_order)?; + let schema_id = fresh_schema.schema_id(); + + let builder = Self { + metadata: TableMetadata { + format_version, + table_uuid: Uuid::now_v7(), + location: "".to_string(), // Overwritten immediately by set_location + last_sequence_number: 0, + last_updated_ms: 0, // Overwritten by build() if not set before + last_column_id: -1, // Overwritten immediately by add_current_schema + current_schema_id: -1, // Overwritten immediately by add_current_schema + schemas: HashMap::new(), + partition_specs: HashMap::new(), + default_spec: Arc::new( + PartitionSpec::unpartition_spec(fresh_schema.clone()).with_spec_id(-1), + ), // Overwritten immediately by add_default_partition_spec + last_partition_id: UNPARTITIONED_LAST_ASSIGNED_ID, + properties: HashMap::new(), + current_snapshot_id: None, + snapshots: HashMap::new(), + snapshot_log: vec![], + sort_orders: HashMap::new(), + metadata_log: vec![], + default_sort_order_id: -1, // Overwritten immediately by add_default_sort_order + refs: HashMap::default(), + }, + changes: vec![], + last_added_schema_id: Some(schema_id), + last_added_spec_id: None, + last_added_order_id: None, + previous_history_entry: None, + }; + + builder + .set_location(location) + .add_current_schema(fresh_schema)? + .add_default_partition_spec(fresh_spec.into_unbound())? + .add_default_sort_order(fresh_sort_order)? + .set_properties(properties) + } + + /// Creates a new table metadata builder from the given metadata to modify it. + /// Previous file location is used to populate the Metadata Log. + #[must_use] + pub fn new_from_metadata( + previous: TableMetadata, + previous_file_location: impl Into, + ) -> Self { + Self { + previous_history_entry: Some(MetadataLog { + metadata_file: previous_file_location.into(), + timestamp_ms: previous.last_updated_ms, + }), + metadata: previous, + changes: Vec::default(), + last_added_schema_id: None, + last_added_spec_id: None, + last_added_order_id: None, + } + } + + /// Creates a new table metadata builder from the given table creation. + pub fn from_table_creation(table_creation: TableCreation) -> Result { + let TableCreation { + name: _, + location, + schema, + partition_spec, + sort_order, + properties, + } = table_creation; + + let location = location.ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Can't create table without location", + ) + })?; + let partition_spec = partition_spec.unwrap_or( + UnboundPartitionSpec { + spec_id: None, + fields: vec![], + } + .into(), + ); + + Self::new( + schema, + partition_spec, + sort_order.unwrap_or(SortOrder::unsorted_order()), + location, + FormatVersion::V1, + properties, + ) + } + + /// Get the current schema with all changes applied up to this point. + #[inline] + pub fn current_schema(&self) -> &SchemaRef { + self.metadata.current_schema() + } + + /// Get the current last column id + #[inline] + pub fn last_column_id(&self) -> i32 { + self.metadata.last_column_id + } + + /// Get the current last updated timestamp + #[inline] + pub fn last_updated_ms(&self) -> i64 { + self.metadata.last_updated_ms + } + + /// Changes uuid of table metadata. + pub fn assign_uuid(mut self, uuid: Uuid) -> Self { + if self.metadata.table_uuid != uuid { + self.metadata.table_uuid = uuid; + self.changes.push(TableUpdate::AssignUuid { uuid }); + } + + self + } + + /// Upgrade `FormatVersion`. Downgrades are not allowed. + /// + /// # Errors + /// - Cannot downgrade to older format versions. + pub fn upgrade_format_version(mut self, format_version: FormatVersion) -> Result { + if format_version < self.metadata.format_version { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot downgrade FormatVersion from {} to {}", + self.metadata.format_version, format_version + ), + )); + } + + if format_version != self.metadata.format_version { + self.metadata.format_version = format_version; + self.changes + .push(TableUpdate::UpgradeFormatVersion { format_version }); + } + + Ok(self) + } + + /// Set properties. If a property already exists, it will be overwritten. + /// + /// If a reserved property is set, the corresponding action is performed and the property is not persisted. + /// Currently the following reserved properties are supported: + /// * format-version: Set the format version of the table. + /// + /// # Errors + /// - If format-version property is set to a lower version than the current format version. + pub fn set_properties(mut self, properties: HashMap) -> Result { + // List of specified properties that are RESERVED and should not be persisted. + let reserved_properties = properties + .keys() + .filter(|key| RESERVED_PROPERTIES.contains(&key.as_str())) + .map(ToString::to_string) + .collect::>(); + + if !reserved_properties.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Table properties should not contain reserved properties, but got: [{}]", + reserved_properties.join(", ") + ), + )); + } + + if properties.is_empty() { + return Ok(self); + } + + self.metadata.properties.extend(properties.clone()); + self.changes.push(TableUpdate::SetProperties { + updates: properties, + }); + + Ok(self) + } + + /// Remove properties from the table metadata. + /// Does nothing if the key is not present. + pub fn remove_properties(mut self, properties: &[String]) -> Self { + for property in properties { + self.metadata.properties.remove(property); + } + + if !properties.is_empty() { + self.changes.push(TableUpdate::RemoveProperties { + removals: properties.to_vec(), + }); + } + + self + } + + /// Set the location of the table metadata, stripping any trailing slashes. + pub fn set_location(mut self, location: String) -> Self { + let location = location.trim_end_matches('/').to_string(); + if self.metadata.location != location { + self.changes.push(TableUpdate::SetLocation { + location: location.clone(), + }); + self.metadata.location = location; + } + + self + } + + /// Add a snapshot to the table metadata. + /// + /// # Errors + /// - No schema has been added to the table metadata. + /// - No partition spec has been added to the table metadata. + /// - No sort order has been added to the table metadata. + /// - Snapshot id already exists. + /// - For format version > 1: the sequence number of the snapshot is loser than the highest sequence number specified so far. + pub fn add_snapshot(mut self, snapshot: Snapshot) -> Result { + if self.metadata.partition_specs.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Attempting to add a snapshot before a partition spec is added", + )); + } + + if self.metadata.sort_orders.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Attempting to add a snapshot before a sort order is added", + )); + } + + if self + .metadata + .snapshots + .contains_key(&snapshot.snapshot_id()) + { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Snapshot already exists for: '{}'", snapshot.snapshot_id()), + )); + } + + if self.metadata.format_version != FormatVersion::V1 + && snapshot.sequence_number() <= self.metadata.last_sequence_number + && snapshot.parent_snapshot_id().is_some() + { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot add snapshot with sequence number {} older than last sequence number {}", + snapshot.sequence_number(), + self.metadata.last_sequence_number + ) + )); + } + + if let Some(last) = self.metadata.snapshot_log.last() { + // commits can happen concurrently from different machines. + // A tolerance helps us avoid failure for small clock skew + if snapshot.timestamp_ms() - last.timestamp_ms < -ONE_MINUTE_MS { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Invalid snapshot timestamp {}: before last snapshot timestamp {}", + snapshot.timestamp_ms(), + last.timestamp_ms + ), + )); + } + } + + if snapshot.timestamp_ms() - self.metadata.last_updated_ms < -ONE_MINUTE_MS { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Invalid snapshot timestamp {}: before last updated timestamp {}", + snapshot.timestamp_ms(), + self.metadata.last_updated_ms + ), + )); + } + + // Mutation happens in next line - must be infallible from here + self.changes.push(TableUpdate::AddSnapshot { + snapshot: snapshot.clone(), + }); + + self.metadata.last_updated_ms = snapshot.timestamp_ms(); + self.metadata.last_sequence_number = snapshot.sequence_number(); + self.metadata + .snapshots + .insert(snapshot.snapshot_id(), snapshot.into()); + + Ok(self) + } + + /// Append a snapshot to the specified branch. + /// If branch is not specified, the snapshot is appended to the main branch. + /// The `ref` must already exist. Retention settings from the `ref` are re-used. + /// + /// # Errors + /// - The ref is unknown. + /// - Any of the preconditions of `self.add_snapshot` are not met. + pub fn append_snapshot(self, snapshot: Snapshot, ref_name: Option<&str>) -> Result { + let ref_name = ref_name.unwrap_or(MAIN_BRANCH); + let mut reference = self + .metadata + .refs + .get(ref_name) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Cannot append snapshot to unknown ref: '{}'", ref_name), + ) + })? + .clone(); + + reference.snapshot_id = snapshot.snapshot_id(); + + self.add_snapshot(snapshot)?.set_ref(ref_name, reference) + } + + /// Remove snapshots by its ids from the table metadata. + /// Does nothing if a snapshot id is not present. + /// Keeps as changes only the snapshots that were actually removed. + pub fn remove_snapshots(mut self, snapshot_ids: &[i64]) -> Self { + let mut removed_snapshots = Vec::with_capacity(snapshot_ids.len()); + + self.metadata.snapshots.retain(|k, _| { + if snapshot_ids.contains(k) { + removed_snapshots.push(*k); + false + } else { + true + } + }); + + if !removed_snapshots.is_empty() { + self.changes.push(TableUpdate::RemoveSnapshots { + snapshot_ids: removed_snapshots, + }); + } + + // Remove refs that are no longer valid + self.metadata + .refs + .retain(|_, v| self.metadata.snapshots.contains_key(&v.snapshot_id)); + + self + } + + /// Set a reference to a snapshot. + /// + /// # Errors + /// - The snapshot id is unknown. + pub fn set_ref(mut self, ref_name: &str, reference: SnapshotReference) -> Result { + if self + .metadata + .refs + .get(ref_name) + .is_some_and(|snap_ref| snap_ref.eq(&reference)) + { + return Ok(self); + } + + let Some(snapshot) = self.metadata.snapshots.get(&reference.snapshot_id) else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot set '{ref_name}' to unknown snapshot: '{}'", + reference.snapshot_id + ), + )); + }; + + // Update last_updated_ms to the exact timestamp of the snapshot if it was added in this commit + let is_added_snapshot = self.changes.iter().any(|update| { + matches!(update, TableUpdate::AddSnapshot { snapshot: snap } if snap.snapshot_id() == snapshot.snapshot_id()) + }); + if is_added_snapshot { + self.metadata.last_updated_ms = snapshot.timestamp_ms(); + } + + // Current snapshot id is set only for the main branch + if ref_name == MAIN_BRANCH { + self.metadata.current_snapshot_id = Some(snapshot.snapshot_id()); + if self.metadata.last_updated_ms == i64::default() { + self.metadata.last_updated_ms = chrono::Utc::now().timestamp_millis(); + }; + + self.metadata.snapshot_log.push(SnapshotLog { + snapshot_id: snapshot.snapshot_id(), + timestamp_ms: self.metadata.last_updated_ms, + }); + } + + self.changes.push(TableUpdate::SetSnapshotRef { + ref_name: ref_name.to_string(), + reference: reference.clone(), + }); + self.metadata.refs.insert(ref_name.to_string(), reference); + + Ok(self) + } + + /// Remove a reference + /// + /// If `ref_name='main'` the current snapshot id is set to -1. + pub fn remove_ref(mut self, ref_name: &str) -> Self { + if ref_name == MAIN_BRANCH { + self.metadata.current_snapshot_id = Some(i64::from(Self::LAST_ADDED)); + self.metadata.snapshot_log.clear(); + } + + if self.metadata.refs.remove(ref_name).is_some() || ref_name == MAIN_BRANCH { + self.changes.push(TableUpdate::RemoveSnapshotRef { + ref_name: ref_name.to_string(), + }); + } + + self + } + + /// Add a schema to the table metadata. + /// + /// The provided `schema.schema_id` may not be used. + + // ToDo Discuss: Should we add `new_last_column_id` argument? + // TLDR; I believe not as it acts as an assertion and its purpose (and source) is not clear. We shouldn't add it. + // + // Schemas can contain only old columns or a mix of old and new columns. + // In Java, if `new_last_column_id` set but too low, the function would fail, basically hinting at + // at the schema having been built for an older metadata version. `new_last_column_id` is typically obtained + // in the schema building process. + // + // This assertion is not required if the user controls the flow - he knows for which + // metadata he created a schema. If asserting the `new_last_column_id` was semantically important, it should be part of the schema and + // not be passed around alongside it. + // + // Specifying `new_last_column_id` in java also allows to set `metadata.last_column_id` to any arbitrary value + // even if its not present as a column. I believe this to be undesired behavior. This is not possible with the current Rust interface. + // + // If the schema is built out of sync with the TableMetadata, for example in a REST Catalog setting, the assertion of + // the provided `last_column_id` as part of the `TableUpdate::AddSchema` is still done in its `.apply` method. + pub fn add_schema(mut self, schema: Schema) -> Self { + // fn returns a result because I think we should check field-id <-> type compatibility if the field-id + // is still present in the metadata. This is not done in the Java code. + let new_schema_id = self.reuse_or_create_new_schema_id(&schema); + let schema_found = self.metadata.schemas.contains_key(&new_schema_id); + + if schema_found { + // ToDo Discuss: The Java code is a bit convoluted and I think it might be wrong for an edge case. + // Why is it wrong: The baseline is, that if something changes the state of the builder, it has an effect on it and + // must be recorded in the changes. + // The Java code might or might not change `lastAddedSchemaId`, and does not record this change in `changes`. + // Thus, replaying the changes, would lead to a different result if a schema is added twice in unfavorable + // conditions. + // Here we do it differently, but a check from a Java maintainer would be nice. + if self.last_added_schema_id != Some(new_schema_id) { + self.changes.push(TableUpdate::AddSchema { + last_column_id: Some(self.metadata.last_column_id), + schema: schema.clone(), + }); + self.last_added_schema_id = Some(new_schema_id); + } + + return self; + } + + // New schemas might contain only old columns. In this case last_column_id should not be + // reduced. + self.metadata.last_column_id = + std::cmp::max(self.metadata.last_column_id, schema.highest_field_id()); + + // Set schema-id + let schema = match new_schema_id == schema.schema_id() { + true => schema, + false => schema.with_schema_id(new_schema_id), + }; + + self.metadata + .schemas + .insert(new_schema_id, schema.clone().into()); + + self.changes.push(TableUpdate::AddSchema { + schema, + last_column_id: Some(self.metadata.last_column_id), + }); + + self.last_added_schema_id = Some(new_schema_id); + + self + } + + /// Set the current schema id. + /// + /// Errors: + /// - provided `schema_id` is -1 but no schema has been added via `add_schema`. + /// - No schema with the provided `schema_id` exists. + pub fn set_current_schema(mut self, mut schema_id: i32) -> Result { + if schema_id == Self::LAST_ADDED { + schema_id = self.last_added_schema_id.ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Cannot set current schema to last added schema: no schema has been added.", + ) + })?; + }; + let schema_id = schema_id; // Make immutable + + if schema_id == self.metadata.current_schema_id { + return Ok(self); + } + + let _schema = self.metadata.schemas.get(&schema_id).ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot set current schema to unknown schema with id: '{}'", + schema_id + ), + ) + })?; + + // Old partition specs and sort-orders should be preserved even if they are not compatible with the new schema, + // so that older metadata can still be interpreted. + // Default partition spec and sort order are checked in the build() method + // which allows other default partition specs and sort orders to be set before the build. + + self.metadata.current_schema_id = schema_id; + + if self.last_added_schema_id == Some(schema_id) { + self.changes.push(TableUpdate::SetCurrentSchema { + schema_id: Self::LAST_ADDED, + }); + } else { + self.changes + .push(TableUpdate::SetCurrentSchema { schema_id }); + } + + Ok(self) + } + + /// Add a schema and set it as the current schema. + pub fn add_current_schema(self, schema: Schema) -> Result { + self.add_schema(schema).set_current_schema(Self::LAST_ADDED) + } + + /// Add a partition spec to the table metadata. + /// + /// The spec is bound eagerly to the current schema. + /// If a schema is added in the same set of changes, the schema should be added first. + /// + /// Even if `unbound_spec.spec_id` is provided as `Some`, it may not be used. + /// + /// # Errors + /// - The partition spec cannot be bound to the current schema. + /// - The partition spec has non-sequential field ids and the table format version is 1. + pub fn add_partition_spec(mut self, unbound_spec: UnboundPartitionSpec) -> Result { + let new_spec_id = self.reuse_or_create_new_spec_id(&unbound_spec); + let spec_found = self.metadata.partition_specs.contains_key(&new_spec_id); + let unbound_spec = unbound_spec.with_spec_id(new_spec_id); + + if spec_found { + if self.last_added_spec_id != Some(new_spec_id) { + self.changes + .push(TableUpdate::AddSpec { spec: unbound_spec }); + self.last_added_spec_id = Some(new_spec_id); + } + + return Ok(self); + } + + let schema = self.get_current_schema()?.clone(); + let spec = PartitionSpecBuilder::new_from_unbound(unbound_spec.clone(), schema)? + .with_last_assigned_field_id(self.metadata.last_partition_id) + .with_spec_id(new_spec_id) + .build()?; + + if self.metadata.format_version <= FormatVersion::V1 && !spec.has_sequential_ids() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Cannot add partition spec with non-sequential field ids to format version 1 table", + )); + } + + let highest_field_id = spec.highest_field_id(); + self.metadata + .partition_specs + .insert(new_spec_id, Arc::new(spec.into())); + self.changes + .push(TableUpdate::AddSpec { spec: unbound_spec }); + + self.last_added_spec_id = Some(new_spec_id); + self.metadata.last_partition_id = + std::cmp::max(self.metadata.last_partition_id, highest_field_id); + + Ok(self) + } + + /// Set the default partition spec. + /// + /// # Errors + /// - spec_id is -1 but no spec has been added via `add_partition_spec`. + /// - No partition spec with the provided `spec_id` exists. + pub fn set_default_partition_spec(mut self, mut spec_id: i32) -> Result { + if spec_id == Self::LAST_ADDED { + spec_id = self.last_added_spec_id.ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Cannot set default partition spec to last added spec: no spec has been added.", + ) + })?; + } + + if self.metadata.default_spec.spec_id() == spec_id { + return Ok(self); + } + + if !self.metadata.partition_specs.contains_key(&spec_id) { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Cannot set default partition spec to unknown spec with id: '{spec_id}'",), + )); + } + + let schemaless_spec = + self.metadata + .partition_specs + .get(&spec_id) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Cannot set default partition spec to unknown spec with id: '{spec_id}'",), + ) + })? + .clone(); + let spec = + Arc::unwrap_or_clone(schemaless_spec).bind(self.get_current_schema()?.clone())?; + self.metadata.default_spec = Arc::new(spec); + + if self.last_added_spec_id == Some(spec_id) { + self.changes.push(TableUpdate::SetDefaultSpec { + spec_id: Self::LAST_ADDED, + }); + } else { + self.changes.push(TableUpdate::SetDefaultSpec { spec_id }); + } + + Ok(self) + } + + /// Add a partition spec and set it as the default + pub fn add_default_partition_spec(self, unbound_spec: UnboundPartitionSpec) -> Result { + self.add_partition_spec(unbound_spec)? + .set_default_partition_spec(Self::LAST_ADDED) + } + + /// Add a sort order to the table metadata. + /// + /// The spec is bound eagerly to the current schema and must be valid for it. + /// If a schema is added in the same set of changes, the schema should be added first. + /// + /// Even if `sort_order.order_id` is provided, it may not be used. + /// + /// # Errors + /// - Sort order id to add already exists. + /// - Sort order is incompatible with the current schema. + pub fn add_sort_order(mut self, sort_order: SortOrder) -> Result { + let new_order_id = self.reuse_or_create_new_sort_id(&sort_order); + let sort_order_found = self.metadata.sort_orders.contains_key(&new_order_id); + + if sort_order_found { + if self.last_added_order_id != Some(new_order_id) { + self.changes.push(TableUpdate::AddSortOrder { + sort_order: sort_order.clone(), + }); + self.last_added_order_id = Some(new_order_id); + } + + return Ok(self); + } + + // ToDo Discuss: Java builds a fresh spec here: + // https://github.com/apache/iceberg/blob/64b36999d7ff716ae2534fb0972fcc10d22a64c2/core/src/main/java/org/apache/iceberg/TableMetadata.java#L1613 + // For rust we make the assumption + // that the order that is added refers to the current schema and check compatibility with it. + + let schema = self.get_current_schema()?.clone().as_ref().clone(); + let sort_order = SortOrder::builder() + .with_order_id(new_order_id) + .with_fields(sort_order.fields) + .build(&schema) + .map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("Sort order to add is incompatible with current schema: {e}"), + ) + .with_source(e) + })?; + + self.last_added_order_id = Some(new_order_id); + self.metadata + .sort_orders + .insert(new_order_id, sort_order.clone().into()); + self.changes.push(TableUpdate::AddSortOrder { sort_order }); + + Ok(self) + } + + /// Set the default sort order. If `sort_order_id` is -1, the last added sort order is set as default. + /// + /// # Errors + /// - sort_order_id is -1 but no sort order has been added via `add_sort_order`. + /// - No sort order with the provided `sort_order_id` exists. + pub fn set_default_sort_order(mut self, mut sort_order_id: i64) -> Result { + if sort_order_id == Self::LAST_ADDED as i64 { + sort_order_id = self.last_added_order_id.ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Cannot set default sort order to last added order: no order has been added.", + ) + })?; + } + + if self.metadata.default_sort_order_id == sort_order_id { + return Ok(self); + } + + if !self.metadata.sort_orders.contains_key(&sort_order_id) { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot set default sort order to unknown order with id: '{sort_order_id}'" + ), + )); + } + + self.metadata.default_sort_order_id = sort_order_id; + + if self.last_added_order_id == Some(sort_order_id) { + self.changes.push(TableUpdate::SetDefaultSortOrder { + sort_order_id: Self::LAST_ADDED as i64, + }); + } else { + self.changes + .push(TableUpdate::SetDefaultSortOrder { sort_order_id }); + } + + Ok(self) + } + + /// Add a sort order and set it as the default + fn add_default_sort_order(self, sort_order: SortOrder) -> Result { + self.add_sort_order(sort_order)? + .set_default_sort_order(Self::LAST_ADDED as i64) + } + + /// Build the table metadata. + pub fn build(mut self) -> Result { + if self.metadata.last_updated_ms == i64::default() { + self.metadata.last_updated_ms = chrono::Utc::now().timestamp_millis(); + } + + // Check compatibility of the current schema to the default partition spec and sort order. + // We use the `get_xxx` methods from the builder to avoid using the panicking + // `TableMetadata.default_partition_spec` etc. methods. + let schema = self.get_current_schema()?.clone(); + let sort_order = Arc::unwrap_or_clone(self.get_default_sort_order()?); + + self.metadata.default_spec = Arc::new( + Arc::unwrap_or_clone(self.metadata.default_spec) + .into_unbound() + .bind(schema.clone())?, + ); + SortOrder::builder() + .with_fields(sort_order.fields) + .build(&schema)?; + + let expired_metadata_logs = self.expire_metadata_log(); + self.update_snapshot_log()?; + self.metadata.try_normalize()?; + + if let Some(hist_entry) = self.previous_history_entry.take() { + self.metadata.metadata_log.push(hist_entry); + } + + Ok(TableMetadataBuildResult { + metadata: self.metadata, + changes: self.changes, + expired_metadata_logs, + }) + } + + fn expire_metadata_log(&mut self) -> Vec { + let max_size = self + .metadata + .properties + .get(PROPERTY_FORMAT_VERSION) + .and_then(|v| v.parse::().ok()) + .unwrap_or(PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT) + .max(1); + + if self.metadata.metadata_log.len() > max_size { + self.metadata + .metadata_log + .drain(0..self.metadata.metadata_log.len() - max_size) + .collect() + } else { + Vec::new() + } + } + + fn update_snapshot_log(&mut self) -> Result<()> { + let intermediate_snapshots = self.get_intermediate_snapshots(); + let has_removed_snapshots = self + .changes + .iter() + .any(|update| matches!(update, TableUpdate::RemoveSnapshots { .. })); + + if intermediate_snapshots.is_empty() && !has_removed_snapshots { + return Ok(()); + } + + let mut new_snapshot_log = Vec::new(); + for log_entry in &self.metadata.snapshot_log { + let snapshot_id = log_entry.snapshot_id; + if self.metadata.snapshots.contains_key(&snapshot_id) { + if !intermediate_snapshots.contains(&snapshot_id) { + new_snapshot_log.push(log_entry.clone()); + } + } else if has_removed_snapshots { + // any invalid entry causes the history before it to be removed. otherwise, there could be + // history gaps that cause time-travel queries to produce incorrect results. for example, + // if history is [(t1, s1), (t2, s2), (t3, s3)] and s2 is removed, the history cannot be + // [(t1, s1), (t3, s3)] because it appears that s3 was current during the time between t2 + // and t3 when in fact s2 was the current snapshot. + new_snapshot_log.clear(); + } + } + + if let Some(current_snapshot_id) = self.metadata.current_snapshot_id { + let last_id = new_snapshot_log.last().map(|entry| entry.snapshot_id); + if last_id != Some(current_snapshot_id) { + return Err(Error::new( + ErrorKind::DataInvalid, + "Cannot set invalid snapshot log: latest entry is not the current snapshot", + )); + } + }; + + self.metadata.snapshot_log = new_snapshot_log; + Ok(()) + } + + /// Finds intermediate snapshots that have not been committed as the current snapshot. + /// + /// Transactions can create snapshots that are never the current snapshot because several + /// changes are combined by the transaction into one table metadata update. when each + /// intermediate snapshot is added to table metadata, it is added to the snapshot log, assuming + /// that it will be the current snapshot. when there are multiple snapshot updates, the log must + /// be corrected by suppressing the intermediate snapshot entries. + /// + /// A snapshot is an intermediate snapshot if it was added but is not the current snapshot. + fn get_intermediate_snapshots(&self) -> HashSet { + let added_snapshot_ids = self + .changes + .iter() + .filter_map(|update| match update { + TableUpdate::AddSnapshot { snapshot } => Some(snapshot.snapshot_id()), + _ => None, + }) + .collect::>(); + + self.changes + .iter() + .filter_map(|update| match update { + TableUpdate::SetSnapshotRef { + ref_name, + reference, + } => { + if added_snapshot_ids.contains(&reference.snapshot_id) + && ref_name == MAIN_BRANCH + && reference.snapshot_id != self.metadata.current_snapshot_id.unwrap_or(-1) + { + Some(reference.snapshot_id) + } else { + None + } + } + _ => None, + }) + .collect() + } + + fn reassign_ids( + schema: Schema, + spec: UnboundPartitionSpec, + sort_order: SortOrder, + ) -> Result<(Schema, PartitionSpec, SortOrder)> { + // Re-assign field ids and schema ids for a new table. + let previous_id_to_name = schema.field_id_to_name_map().clone(); + let fresh_schema = schema + .into_builder() + .with_schema_id(DEFAULT_SCHEMA_ID) + .with_reassigned_field_ids(u32::try_from(DEFAULT_PARTITION_SPEC_ID).unwrap_or_default()) + .build()?; + + // Re-build partition spec with new ids + let mut fresh_spec = PartitionSpecBuilder::new(fresh_schema.clone()); + for field in spec.fields() { + let source_field_name = previous_id_to_name.get(&field.source_id).ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot find source column with id {} for partition column {} in schema.", + field.source_id, field.name + ), + ) + })?; + fresh_spec = + fresh_spec.add_partition_field(source_field_name, &field.name, field.transform)?; + } + let fresh_spec = fresh_spec.build()?; + + // Re-build sort order with new ids + let mut fresh_order = SortOrder::builder(); + for mut field in sort_order.fields { + let source_field_name = previous_id_to_name.get(&field.source_id).ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot find source column with id {} for sort column in schema.", + field.source_id + ), + ) + })?; + let new_field_id = fresh_schema + .field_by_name(source_field_name) + .ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + format!( + "Cannot find source column with name {} for sort column in re-assigned schema.", + source_field_name + ), + ) + })?.id; + field.source_id = new_field_id; + fresh_order.with_sort_field(field); + } + let fresh_sort_order = fresh_order.build(&fresh_schema)?; + + Ok((fresh_schema, fresh_spec, fresh_sort_order)) + } + + fn reuse_or_create_new_schema_id(&self, new_schema: &Schema) -> i32 { + self.metadata + .schemas + .iter() + .find_map(|(id, schema)| new_schema.is_same_schema(schema).then_some(*id)) + .unwrap_or_else(|| self.get_highest_schema_id() + 1) + } + + fn get_highest_schema_id(&self) -> i32 { + *self + .metadata + .schemas + .keys() + .max() + .unwrap_or(&self.metadata.current_schema_id) + } + + fn get_current_schema(&self) -> Result<&SchemaRef> { + self.metadata + .schemas + .get(&self.metadata.current_schema_id) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Current schema with id '{}' not found in table metadata.", + self.metadata.current_schema_id + ), + ) + }) + } + + fn get_default_sort_order(&self) -> Result { + self.metadata + .sort_orders + .get(&self.metadata.default_sort_order_id) + .cloned() + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Default sort order with id '{}' not found in table metadata.", + self.metadata.default_sort_order_id + ), + ) + }) + } + + /// If a compatible spec already exists, use the same ID. Otherwise, use 1 more than the highest ID. + fn reuse_or_create_new_spec_id(&self, new_spec: &UnboundPartitionSpec) -> i32 { + self.metadata + .partition_specs + .iter() + .find_map(|(id, old_spec)| old_spec.is_compatible_with(new_spec).then_some(*id)) + .unwrap_or_else(|| { + self.get_highest_spec_id() + .map(|id| id + 1) + .unwrap_or(DEFAULT_PARTITION_SPEC_ID) + }) + } + + fn get_highest_spec_id(&self) -> Option { + self.metadata.partition_specs.keys().max().copied() + } + + /// If a compatible sort-order already exists, use the same ID. Otherwise, use 1 more than the highest ID. + fn reuse_or_create_new_sort_id(&self, new_sort_order: &SortOrder) -> i64 { + if new_sort_order.is_unsorted() { + return SortOrder::unsorted_order().order_id; + } + + self.metadata + .sort_orders + .iter() + .find_map(|(id, sort_order)| { + sort_order.fields.eq(&new_sort_order.fields).then_some(*id) + }) + .unwrap_or_else(|| { + self.highest_sort_order_id() + .unwrap_or(SortOrder::unsorted_order().order_id) + + 1 + }) + } + + fn highest_sort_order_id(&self) -> Option { + self.metadata.sort_orders.keys().max().copied() + } +} + +impl From for TableMetadata { + fn from(result: TableMetadataBuildResult) -> Self { + result.metadata + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::spec::{ + NestedField, NullOrder, Operation, PrimitiveType, Schema, SnapshotRetention, SortDirection, + SortField, StructType, Summary, Transform, Type, UnboundPartitionField, + }; + + const TEST_LOCATION: &str = "s3://bucket/test/location"; + const LAST_ASSIGNED_COLUMN_ID: i32 = 2; + + fn schema() -> Schema { + Schema::builder() + .with_fields(vec![ + NestedField::required(0, "x", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::required(1, "y", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::required(2, "z", Type::Primitive(PrimitiveType::Long)).into(), + ]) + .build() + .unwrap() + } + + fn sort_order() -> SortOrder { + let schema = schema(); + SortOrder::builder() + .with_order_id(1) + .with_sort_field(SortField { + source_id: 2, + transform: Transform::Bucket(4), + direction: SortDirection::Descending, + null_order: NullOrder::First, + }) + .build(&schema) + .unwrap() + } + + fn partition_spec() -> UnboundPartitionSpec { + UnboundPartitionSpec::builder() + .with_spec_id(0) + .add_partition_field(1, "y", Transform::Identity) + .unwrap() + .build() + } + + fn builder_without_changes(format_version: FormatVersion) -> TableMetadataBuilder { + TableMetadataBuilder::new( + schema(), + partition_spec(), + sort_order(), + TEST_LOCATION.to_string(), + format_version, + HashMap::new(), + ) + .unwrap() + .build() + .unwrap() + .metadata + .into_builder("s3://bucket/test/location/metadata/metadata1.json") + } + + #[test] + fn test_minimal_build() { + let metadata = TableMetadataBuilder::new( + schema(), + partition_spec(), + sort_order(), + TEST_LOCATION.to_string(), + FormatVersion::V1, + HashMap::new(), + ) + .unwrap() + .build() + .unwrap() + .metadata; + + assert_eq!(metadata.format_version, FormatVersion::V1); + assert_eq!(metadata.location, TEST_LOCATION); + assert_eq!(metadata.current_schema_id, 0); + assert_eq!(metadata.default_spec.spec_id(), 0); + assert_eq!(metadata.default_sort_order_id, 1); + assert_eq!(metadata.last_partition_id, 1000); + assert_eq!(metadata.last_column_id, 2); + assert_eq!(metadata.snapshots.len(), 0); + assert_eq!(metadata.refs.len(), 0); + assert_eq!(metadata.properties.len(), 0); + assert_eq!(metadata.metadata_log.len(), 0); + assert_eq!(metadata.last_sequence_number, 0); + assert_eq!(metadata.last_column_id, LAST_ASSIGNED_COLUMN_ID); + + // Test can serialize v1 + let _ = serde_json::to_string(&metadata).unwrap(); + + // Test can serialize v2 + let metadata = metadata + .into_builder("s3://bucket/test/location/metadata/metadata1.json") + .upgrade_format_version(FormatVersion::V2) + .unwrap() + .build() + .unwrap() + .metadata; + + assert_eq!(metadata.format_version, FormatVersion::V2); + let _ = serde_json::to_string(&metadata).unwrap(); + } + + #[test] + fn test_reassigns_ids() { + let schema = Schema::builder() + .with_schema_id(10) + .with_fields(vec![ + NestedField::required(11, "a", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::required(12, "b", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::required( + 13, + "struct", + Type::Struct(StructType::new(vec![NestedField::required( + 14, + "nested", + Type::Primitive(PrimitiveType::Long), + ) + .into()])), + ) + .into(), + NestedField::required(15, "c", Type::Primitive(PrimitiveType::Long)).into(), + ]) + .build() + .unwrap(); + let spec = PartitionSpec::builder(schema.clone()) + .with_spec_id(20) + .add_partition_field("a", "a", Transform::Identity) + .unwrap() + .add_partition_field("struct.nested", "nested_partition", Transform::Identity) + .unwrap() + .build() + .unwrap(); + let sort_order = SortOrder::builder() + .with_fields(vec![SortField { + source_id: 11, + transform: Transform::Identity, + direction: SortDirection::Ascending, + null_order: NullOrder::First, + }]) + .with_order_id(10) + .build(&schema) + .unwrap(); + + let (fresh_schema, fresh_spec, fresh_sort_order) = + TableMetadataBuilder::reassign_ids(schema, spec.into_unbound(), sort_order).unwrap(); + + let expected_schema = Schema::builder() + .with_fields(vec![ + NestedField::required(0, "a", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::required(1, "b", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::required( + 2, + "struct", + Type::Struct(StructType::new(vec![NestedField::required( + 4, + "nested", + Type::Primitive(PrimitiveType::Long), + ) + .into()])), + ) + .into(), + NestedField::required(3, "c", Type::Primitive(PrimitiveType::Long)).into(), + ]) + .build() + .unwrap(); + + let expected_spec = PartitionSpec::builder(expected_schema.clone()) + .with_spec_id(0) + .add_partition_field("a", "a", Transform::Identity) + .unwrap() + .add_partition_field("struct.nested", "nested_partition", Transform::Identity) + .unwrap() + .build() + .unwrap(); + + let expected_sort_order = SortOrder::builder() + .with_fields(vec![SortField { + source_id: 0, + transform: Transform::Identity, + direction: SortDirection::Ascending, + null_order: NullOrder::First, + }]) + .with_order_id(1) + .build(&expected_schema) + .unwrap(); + + assert_eq!(fresh_schema, expected_schema); + assert_eq!(fresh_spec, expected_spec); + assert_eq!(fresh_sort_order, expected_sort_order); + } + + #[test] + fn test_ids_are_reassigned_for_new_metadata() { + let schema = schema().into_builder().with_schema_id(10).build().unwrap(); + + let metadata = TableMetadataBuilder::new( + schema, + partition_spec(), + sort_order(), + TEST_LOCATION.to_string(), + FormatVersion::V1, + HashMap::new(), + ) + .unwrap() + .build() + .unwrap() + .metadata; + + assert_eq!(metadata.current_schema_id, 0); + assert_eq!(metadata.current_schema().schema_id(), 0); + } + + #[test] + fn test_new_metadata_changes() { + let changes = TableMetadataBuilder::new( + schema(), + partition_spec(), + sort_order(), + TEST_LOCATION.to_string(), + FormatVersion::V1, + HashMap::new(), + ) + .unwrap() + .build() + .unwrap() + .changes; + + pretty_assertions::assert_eq!(changes, vec![ + TableUpdate::SetLocation { + location: TEST_LOCATION.to_string() + }, + TableUpdate::AddSchema { + last_column_id: Some(LAST_ASSIGNED_COLUMN_ID), + schema: schema(), + }, + TableUpdate::SetCurrentSchema { schema_id: -1 }, + TableUpdate::AddSpec { + // Because this is a new tables, field-ids are assigned + // partition_spec() has None set for field-id + spec: PartitionSpec::builder(schema()) + .with_spec_id(0) + .add_unbound_field(UnboundPartitionField { + name: "y".to_string(), + transform: Transform::Identity, + source_id: 1, + field_id: Some(1000) + }) + .unwrap() + .build() + .unwrap() + .into_unbound(), + }, + TableUpdate::SetDefaultSpec { spec_id: -1 }, + TableUpdate::AddSortOrder { + sort_order: sort_order(), + }, + TableUpdate::SetDefaultSortOrder { sort_order_id: -1 }, + ]); + } + + #[test] + fn test_add_partition_spec() { + let builder = builder_without_changes(FormatVersion::V2); + + let added_spec = UnboundPartitionSpec::builder() + .with_spec_id(10) + .add_partition_fields(vec![ + UnboundPartitionField { + // The previous field - has field_id set + name: "y".to_string(), + transform: Transform::Identity, + source_id: 1, + field_id: Some(1000), + }, + UnboundPartitionField { + // A new field without field id - should still be without field id in changes + name: "z".to_string(), + transform: Transform::Identity, + source_id: 2, + field_id: None, + }, + ]) + .unwrap() + .build(); + + let build_result = builder + .add_partition_spec(added_spec.clone()) + .unwrap() + .build() + .unwrap(); + + // Spec id should be re-assigned + let expected_change = added_spec.with_spec_id(1); + let expected_spec = PartitionSpec::builder(schema()) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + name: "y".to_string(), + transform: Transform::Identity, + source_id: 1, + field_id: Some(1000), + }) + .unwrap() + .add_unbound_field(UnboundPartitionField { + name: "z".to_string(), + transform: Transform::Identity, + source_id: 2, + field_id: Some(1001), + }) + .unwrap() + .build() + .unwrap(); + + assert_eq!(build_result.changes.len(), 1); + assert_eq!( + build_result.metadata.partition_spec_by_id(1), + Some(&Arc::new(expected_spec.into_schemaless())) + ); + assert_eq!(build_result.metadata.default_spec.spec_id(), 0); + assert_eq!(build_result.metadata.last_partition_id, 1001); + pretty_assertions::assert_eq!(build_result.changes[0], TableUpdate::AddSpec { + spec: expected_change + }); + } + + #[test] + fn test_set_default_partition_spec() { + let builder = builder_without_changes(FormatVersion::V2); + let schema = builder.get_current_schema().unwrap().clone(); + let added_spec = UnboundPartitionSpec::builder() + .with_spec_id(10) + .add_partition_field(1, "y_bucket[2]", Transform::Bucket(2)) + .unwrap() + .build(); + + let build_result = builder + .add_partition_spec(added_spec.clone()) + .unwrap() + .set_default_partition_spec(-1) + .unwrap() + .build() + .unwrap(); + + let expected_spec = PartitionSpec::builder(schema) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + name: "y_bucket[2]".to_string(), + transform: Transform::Bucket(2), + source_id: 1, + field_id: Some(1001), + }) + .unwrap() + .build() + .unwrap(); + + assert_eq!(build_result.changes.len(), 2); + assert_eq!(build_result.metadata.default_spec, Arc::new(expected_spec)); + assert_eq!(build_result.changes, vec![ + TableUpdate::AddSpec { + // Should contain the actual ID that was used + spec: added_spec.with_spec_id(1) + }, + TableUpdate::SetDefaultSpec { spec_id: -1 } + ]); + } + + #[test] + fn test_set_existing_default_partition_spec() { + let builder = builder_without_changes(FormatVersion::V2); + // Add and set an unbound spec as current + let unbound_spec = UnboundPartitionSpec::builder().with_spec_id(1).build(); + let build_result = builder + .add_partition_spec(unbound_spec.clone()) + .unwrap() + .set_default_partition_spec(-1) + .unwrap() + .build() + .unwrap(); + + assert_eq!(build_result.changes.len(), 2); + assert_eq!(build_result.changes[0], TableUpdate::AddSpec { + spec: unbound_spec.clone() + }); + assert_eq!(build_result.changes[1], TableUpdate::SetDefaultSpec { + spec_id: -1 + }); + assert_eq!( + build_result.metadata.default_spec, + Arc::new( + unbound_spec + .bind(build_result.metadata.current_schema().clone()) + .unwrap() + ) + ); + + // Set old spec again + let build_result = build_result + .metadata + .into_builder("s3://bucket/test/location/metadata/metadata1.json") + .set_default_partition_spec(0) + .unwrap() + .build() + .unwrap(); + + assert_eq!(build_result.changes.len(), 1); + assert_eq!(build_result.changes[0], TableUpdate::SetDefaultSpec { + spec_id: 0 + }); + assert_eq!( + build_result.metadata.default_spec, + Arc::new( + partition_spec() + .bind(build_result.metadata.current_schema().clone()) + .unwrap() + ) + ); + } + + #[test] + fn test_add_sort_order() { + let builder = builder_without_changes(FormatVersion::V2); + + let added_sort_order = SortOrder::builder() + .with_order_id(10) + .with_fields(vec![SortField { + source_id: 1, + transform: Transform::Identity, + direction: SortDirection::Ascending, + null_order: NullOrder::First, + }]) + .build(&schema()) + .unwrap(); + + let build_result = builder + .add_sort_order(added_sort_order.clone()) + .unwrap() + .build() + .unwrap(); + + let expected_sort_order = added_sort_order.with_order_id(2); + + assert_eq!(build_result.changes.len(), 1); + assert_eq!(build_result.metadata.sort_orders.keys().max(), Some(&2)); + pretty_assertions::assert_eq!( + build_result.metadata.sort_order_by_id(2), + Some(&Arc::new(expected_sort_order.clone())) + ); + pretty_assertions::assert_eq!(build_result.changes[0], TableUpdate::AddSortOrder { + sort_order: expected_sort_order + }); + } + + #[test] + fn test_add_compatible_schema() { + let builder = builder_without_changes(FormatVersion::V2); + + let added_schema = Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(0, "x", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::required(1, "y", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::required(2, "z", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::required(3, "a", Type::Primitive(PrimitiveType::Long)).into(), + ]) + .build() + .unwrap(); + + let build_result = builder + .add_current_schema(added_schema.clone()) + .unwrap() + .build() + .unwrap(); + + assert_eq!(build_result.changes.len(), 2); + assert_eq!(build_result.metadata.schemas.keys().max(), Some(&1)); + pretty_assertions::assert_eq!( + build_result.metadata.schema_by_id(1), + Some(&Arc::new(added_schema.clone())) + ); + pretty_assertions::assert_eq!(build_result.changes[0], TableUpdate::AddSchema { + last_column_id: Some(3), + schema: added_schema + }); + assert_eq!(build_result.changes[1], TableUpdate::SetCurrentSchema { + schema_id: -1 + }); + } + + #[test] + fn test_set_current_schema_change_is_minus_one_if_schema_was_added_in_this_change() { + let builder = builder_without_changes(FormatVersion::V2); + + let added_schema = Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(0, "x", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::required(1, "y", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::required(2, "z", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::required(3, "a", Type::Primitive(PrimitiveType::Long)).into(), + ]) + .build() + .unwrap(); + + let build_result = builder + .add_schema(added_schema.clone()) + .set_current_schema(1) + .unwrap() + .build() + .unwrap(); + + assert_eq!(build_result.changes.len(), 2); + assert_eq!(build_result.changes[1], TableUpdate::SetCurrentSchema { + schema_id: -1 + }); + } + + #[test] + fn test_no_metadata_log_for_create_table() { + let build_result = TableMetadataBuilder::new( + schema(), + partition_spec(), + sort_order(), + TEST_LOCATION.to_string(), + FormatVersion::V2, + HashMap::new(), + ) + .unwrap() + .build() + .unwrap(); + + assert_eq!(build_result.metadata.metadata_log.len(), 0); + } + + #[test] + fn test_from_metadata_generates_metadata_log() { + let metadata_path = "s3://bucket/test/location/metadata/metadata1.json"; + let builder = TableMetadataBuilder::new( + schema(), + partition_spec(), + sort_order(), + TEST_LOCATION.to_string(), + FormatVersion::V2, + HashMap::new(), + ) + .unwrap() + .build() + .unwrap() + .metadata + .into_builder(metadata_path); + + let builder = builder + .add_default_sort_order(SortOrder::unsorted_order()) + .unwrap(); + + let build_result = builder.build().unwrap(); + + assert_eq!(build_result.metadata.metadata_log.len(), 1); + assert_eq!( + build_result.metadata.metadata_log[0].metadata_file, + metadata_path + ); + } + + #[test] + fn test_set_ref() { + let builder = builder_without_changes(FormatVersion::V2); + + let snapshot = Snapshot::builder() + .with_snapshot_id(1) + .with_timestamp_ms(builder.last_updated_ms() + 1) + .with_sequence_number(0) + .with_schema_id(0) + .with_manifest_list("/snap-1.avro") + .with_summary(Summary { + operation: Operation::Append, + other: HashMap::from_iter(vec![ + ( + "spark.app.id".to_string(), + "local-1662532784305".to_string(), + ), + ("added-data-files".to_string(), "4".to_string()), + ("added-records".to_string(), "4".to_string()), + ("added-files-size".to_string(), "6001".to_string()), + ]), + }) + .build(); + + let builder = builder.add_snapshot(snapshot.clone()).unwrap(); + + assert!(builder + .clone() + .set_ref(MAIN_BRANCH, SnapshotReference { + snapshot_id: 10, + retention: SnapshotRetention::Branch { + min_snapshots_to_keep: Some(10), + max_snapshot_age_ms: None, + max_ref_age_ms: None, + }, + }) + .unwrap_err() + .to_string() + .contains("Cannot set 'main' to unknown snapshot: '10'")); + + let build_result = builder + .set_ref(MAIN_BRANCH, SnapshotReference { + snapshot_id: 1, + retention: SnapshotRetention::Branch { + min_snapshots_to_keep: Some(10), + max_snapshot_age_ms: None, + max_ref_age_ms: None, + }, + }) + .unwrap() + .build() + .unwrap(); + assert_eq!(build_result.metadata.snapshots.len(), 1); + assert_eq!( + build_result.metadata.snapshot_by_id(1), + Some(&Arc::new(snapshot.clone())) + ); + assert_eq!(build_result.metadata.snapshot_log, vec![SnapshotLog { + snapshot_id: 1, + timestamp_ms: snapshot.timestamp_ms() + }]) + } + + #[test] + fn test_snapshot_log_skips_intermediates() { + let builder = builder_without_changes(FormatVersion::V2); + + let snapshot_1 = Snapshot::builder() + .with_snapshot_id(1) + .with_timestamp_ms(builder.last_updated_ms() + 1) + .with_sequence_number(0) + .with_schema_id(0) + .with_manifest_list("/snap-1.avro") + .with_summary(Summary { + operation: Operation::Append, + other: HashMap::from_iter(vec![ + ( + "spark.app.id".to_string(), + "local-1662532784305".to_string(), + ), + ("added-data-files".to_string(), "4".to_string()), + ("added-records".to_string(), "4".to_string()), + ("added-files-size".to_string(), "6001".to_string()), + ]), + }) + .build(); + + let snapshot_2 = Snapshot::builder() + .with_snapshot_id(2) + .with_timestamp_ms(builder.last_updated_ms() + 1) + .with_sequence_number(0) + .with_schema_id(0) + .with_manifest_list("/snap-1.avro") + .with_summary(Summary { + operation: Operation::Append, + other: HashMap::from_iter(vec![ + ( + "spark.app.id".to_string(), + "local-1662532784305".to_string(), + ), + ("added-data-files".to_string(), "4".to_string()), + ("added-records".to_string(), "4".to_string()), + ("added-files-size".to_string(), "6001".to_string()), + ]), + }) + .build(); + + let result = builder + .add_snapshot(snapshot_1) + .unwrap() + .set_ref(MAIN_BRANCH, SnapshotReference { + snapshot_id: 1, + retention: SnapshotRetention::Branch { + min_snapshots_to_keep: Some(10), + max_snapshot_age_ms: None, + max_ref_age_ms: None, + }, + }) + .unwrap() + .append_snapshot(snapshot_2.clone(), Some(MAIN_BRANCH)) + .unwrap() + .build() + .unwrap(); + + assert_eq!(result.metadata.snapshot_log, vec![SnapshotLog { + snapshot_id: 2, + timestamp_ms: snapshot_2.timestamp_ms() + }]); + assert_eq!(result.metadata.current_snapshot().unwrap().snapshot_id(), 2); + } + + #[test] + fn test_cannot_add_duplicate_snapshot_id() { + let builder = builder_without_changes(FormatVersion::V2); + + let snapshot = Snapshot::builder() + .with_snapshot_id(2) + .with_timestamp_ms(builder.last_updated_ms() + 1) + .with_sequence_number(0) + .with_schema_id(0) + .with_manifest_list("/snap-1.avro") + .with_summary(Summary { + operation: Operation::Append, + other: HashMap::from_iter(vec![ + ( + "spark.app.id".to_string(), + "local-1662532784305".to_string(), + ), + ("added-data-files".to_string(), "4".to_string()), + ("added-records".to_string(), "4".to_string()), + ("added-files-size".to_string(), "6001".to_string()), + ]), + }) + .build(); + + let builder = builder.add_snapshot(snapshot.clone()).unwrap(); + builder.add_snapshot(snapshot).unwrap_err(); + } + + #[test] + fn test_add_incompatible_current_schema_fails() { + let builder = builder_without_changes(FormatVersion::V2); + + let added_schema = Schema::builder() + .with_schema_id(1) + .with_fields(vec![]) + .build() + .unwrap(); + + let err = builder + .add_current_schema(added_schema) + .unwrap() + .build() + .unwrap_err(); + + assert!(err + .to_string() + .contains("Cannot find partition source field")); + } + + #[test] + fn test_add_partition_spec_for_v1_requires_sequential_ids() { + let builder = builder_without_changes(FormatVersion::V1); + + let added_spec = UnboundPartitionSpec::builder() + .with_spec_id(10) + .add_partition_fields(vec![ + UnboundPartitionField { + name: "y".to_string(), + transform: Transform::Identity, + source_id: 1, + field_id: Some(1000), + }, + UnboundPartitionField { + name: "z".to_string(), + transform: Transform::Identity, + source_id: 2, + field_id: Some(1002), + }, + ]) + .unwrap() + .build(); + + let err = builder.add_partition_spec(added_spec).unwrap_err(); + assert!(err.to_string().contains( + "Cannot add partition spec with non-sequential field ids to format version 1 table" + )); + } +}