diff --git a/crates/core/src/kernel/models/actions.rs b/crates/core/src/kernel/models/actions.rs index 61a58b98f3..8284bb92e0 100644 --- a/crates/core/src/kernel/models/actions.rs +++ b/crates/core/src/kernel/models/actions.rs @@ -284,8 +284,8 @@ impl From<&str> for WriterFeatures { fn from(value: &str) -> Self { match value { "appendOnly" | "delta.appendOnly" => WriterFeatures::AppendOnly, - "invariants" | "delta.invariants" => WriterFeatures::Invariants, - "checkConstraints" | "delta.checkConstraints" => WriterFeatures::CheckConstraints, + "invariants" => WriterFeatures::Invariants, + "checkConstraints" => WriterFeatures::CheckConstraints, "changeDataFeed" | "delta.enableChangeDataFeed" => WriterFeatures::ChangeDataFeed, "generatedColumns" => WriterFeatures::GeneratedColumns, "columnMapping" => WriterFeatures::ColumnMapping, diff --git a/crates/core/src/operations/create.rs b/crates/core/src/operations/create.rs index 0f38939150..ed1f9d6ba8 100644 --- a/crates/core/src/operations/create.rs +++ b/crates/core/src/operations/create.rs @@ -1,10 +1,11 @@ //! Command for creating a new delta table // https://github.com/delta-io/delta/blob/master/core/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::sync::Arc; use futures::future::BoxFuture; +use maplit::hashset; use serde_json::Value; use super::transaction::{CommitBuilder, TableReference, PROTOCOL}; @@ -13,6 +14,9 @@ use crate::kernel::{ Action, DataType, Metadata, Protocol, ReaderFeatures, StructField, StructType, WriterFeatures, }; use crate::logstore::{LogStore, LogStoreRef}; +use crate::operations::set_tbl_properties::{ + apply_properties_to_protocol, convert_properties_to_features, +}; use crate::protocol::{DeltaOperation, SaveMode}; use crate::table::builder::ensure_table_uri; use crate::table::config::DeltaConfigKey; @@ -237,41 +241,28 @@ impl CreateBuilder { ) }; + let configuration = self.configuration; let contains_timestampntz = PROTOCOL.contains_timestampntz(&self.columns); + // TODO configure more permissive versions based on configuration. Also how should this ideally be handled? // We set the lowest protocol we can, and if subsequent writes use newer features we update metadata? - let (min_reader_version, min_writer_version, writer_features, reader_features) = - if contains_timestampntz { - let mut converted_writer_features = self - .configuration - .keys() - .map(|key| key.clone().into()) - .filter(|v| !matches!(v, WriterFeatures::Other(_))) - .collect::>(); - - let mut converted_reader_features = self - .configuration - .keys() - .map(|key| key.clone().into()) - .filter(|v| !matches!(v, ReaderFeatures::Other(_))) - .collect::>(); - converted_writer_features.insert(WriterFeatures::TimestampWithoutTimezone); - converted_reader_features.insert(ReaderFeatures::TimestampWithoutTimezone); - ( - 3, - 7, - Some(converted_writer_features), - Some(converted_reader_features), - ) - } else { - ( - PROTOCOL.default_reader_version(), - PROTOCOL.default_writer_version(), - None, - None, - ) - }; + let current_protocol = if contains_timestampntz { + Protocol { + min_reader_version: 3, + min_writer_version: 7, + writer_features: Some(hashset! {WriterFeatures::TimestampWithoutTimezone}), + reader_features: Some(hashset! {ReaderFeatures::TimestampWithoutTimezone}), + } + } else { + Protocol { + min_reader_version: PROTOCOL.default_reader_version(), + min_writer_version: PROTOCOL.default_writer_version(), + reader_features: None, + writer_features: None, + } + }; + let protocol = self .actions .iter() @@ -280,17 +271,23 @@ impl CreateBuilder { Action::Protocol(p) => p.clone(), _ => unreachable!(), }) - .unwrap_or_else(|| Protocol { - min_reader_version, - min_writer_version, - writer_features, - reader_features, - }); + .unwrap_or_else(|| current_protocol); + + let protocol = apply_properties_to_protocol( + &protocol, + &configuration + .iter() + .map(|(k, v)| (k.clone(), v.clone().unwrap())) + .collect::>(), + true, + )?; + + let protocol = convert_properties_to_features(protocol, &configuration); let mut metadata = Metadata::try_new( StructType::new(self.columns), self.partition_columns.unwrap_or_default(), - self.configuration, + configuration, )? .with_created_time(chrono::Utc::now().timestamp_millis()); if let Some(name) = self.name { diff --git a/crates/core/src/operations/mod.rs b/crates/core/src/operations/mod.rs index 4323bf025f..7923431d45 100644 --- a/crates/core/src/operations/mod.rs +++ b/crates/core/src/operations/mod.rs @@ -37,6 +37,7 @@ pub use ::datafusion::physical_plan::common::collect as collect_sendable_stream; use arrow::record_batch::RecordBatch; use optimize::OptimizeBuilder; use restore::RestoreBuilder; +use set_tbl_properties::SetTablePropertiesBuilder; #[cfg(feature = "datafusion")] pub mod constraints; @@ -48,6 +49,7 @@ mod load; pub mod load_cdf; #[cfg(feature = "datafusion")] pub mod merge; +pub mod set_tbl_properties; #[cfg(feature = "datafusion")] pub mod update; #[cfg(feature = "datafusion")] @@ -219,6 +221,11 @@ impl DeltaOps { pub fn drop_constraints(self) -> DropConstraintBuilder { DropConstraintBuilder::new(self.0.log_store, self.0.state.unwrap()) } + + /// Set table properties + pub fn set_tbl_properties(self) -> SetTablePropertiesBuilder { + SetTablePropertiesBuilder::new(self.0.log_store, self.0.state.unwrap()) + } } impl From for DeltaOps { diff --git a/crates/core/src/operations/set_tbl_properties.rs b/crates/core/src/operations/set_tbl_properties.rs new file mode 100644 index 0000000000..f275455158 --- /dev/null +++ b/crates/core/src/operations/set_tbl_properties.rs @@ -0,0 +1,312 @@ +//! Set table properties on a table + +use std::collections::{HashMap, HashSet}; + +use futures::future::BoxFuture; +use maplit::hashset; + +use super::transaction::{CommitBuilder, CommitProperties}; +use crate::kernel::{Action, Protocol, ReaderFeatures, WriterFeatures}; +use crate::logstore::LogStoreRef; +use crate::protocol::DeltaOperation; +use crate::table::state::DeltaTableState; +use crate::DeltaConfigKey; +use crate::DeltaTable; +use crate::{DeltaResult, DeltaTableError}; + +/// Remove constraints from the table +pub struct SetTablePropertiesBuilder { + /// A snapshot of the table's state + snapshot: DeltaTableState, + /// Name of the property + properties: HashMap, + /// Raise if property doesn't exist + raise_if_not_exists: bool, + /// Delta object store for handling data files + log_store: LogStoreRef, + /// Additional information to add to the commit + commit_properties: CommitProperties, +} + +impl SetTablePropertiesBuilder { + /// Create a new builder + pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { + Self { + properties: HashMap::new(), + raise_if_not_exists: true, + snapshot, + log_store, + commit_properties: CommitProperties::default(), + } + } + + /// Specify the properties to be removed + pub fn with_properties(mut self, table_properties: HashMap) -> Self { + self.properties = table_properties; + self + } + + /// Specify if you want to raise if the property does not exist + pub fn with_raise_if_not_exists(mut self, raise: bool) -> Self { + self.raise_if_not_exists = raise; + self + } + + /// Additional metadata to be added to commit info + pub fn with_commit_properties(mut self, commit_properties: CommitProperties) -> Self { + self.commit_properties = commit_properties; + self + } +} + +/// Will apply the properties to the protocol by either bumping the version or setting +/// features +pub fn apply_properties_to_protocol( + current_protocol: &Protocol, + new_properties: &HashMap, + raise_if_not_exists: bool, +) -> DeltaResult { + let mut parsed_properties: HashMap = HashMap::new(); + + for (key, value) in new_properties { + if let Ok(parsed_key) = key.parse::() { + parsed_properties.insert(parsed_key, value.to_string()); + } else if raise_if_not_exists { + return Err(DeltaTableError::Generic(format!( + "Error parsing property '{}':'{}'", + key, value + ))); + } + } + + let mut new_protocol = current_protocol.clone(); + + // Check and update delta.minReaderVersion + if let Some(min_reader_version) = parsed_properties.get(&DeltaConfigKey::MinReaderVersion) { + let new_min_reader_version = min_reader_version.parse::(); + match new_min_reader_version { + Ok(version) => match version { + 1..=3 => { + if version > new_protocol.min_reader_version { + new_protocol.min_reader_version = version + } + } + _ => { + return Err(DeltaTableError::Generic(format!( + "delta.minReaderVersion = '{}' is invalid, valid values are ['1','2','3']", + min_reader_version + ))) + } + }, + Err(_) => { + return Err(DeltaTableError::Generic(format!( + "delta.minReaderVersion = '{}' is invalid, valid values are ['1','2','3']", + min_reader_version + ))) + } + } + } + + // Check and update delta.minWriterVersion + if let Some(min_writer_version) = parsed_properties.get(&DeltaConfigKey::MinWriterVersion) { + let new_min_writer_version = min_writer_version.parse::(); + match new_min_writer_version { + Ok(version) => match version { + 2..=7 => { + if version > new_protocol.min_writer_version { + new_protocol.min_writer_version = version + } + } + _ => { + return Err(DeltaTableError::Generic(format!( + "delta.minWriterVersion = '{}' is invalid, valid values are ['2','3','4','5','6','7']", + min_writer_version + ))) + } + }, + Err(_) => { + return Err(DeltaTableError::Generic(format!( + "delta.minWriterVersion = '{}' is invalid, valid values are ['2','3','4','5','6','7']", + min_writer_version + ))) + } + } + } + + // Check enableChangeDataFeed and bump protocol or add writerFeature if writer versions is >=7 + if let Some(enable_cdf) = parsed_properties.get(&DeltaConfigKey::EnableChangeDataFeed) { + let if_enable_cdf = enable_cdf.to_ascii_lowercase().parse::(); + match if_enable_cdf { + Ok(true) => { + if new_protocol.min_writer_version >= 7 { + match new_protocol.writer_features { + Some(mut features) => { + features.insert(WriterFeatures::ChangeDataFeed); + new_protocol.writer_features = Some(features); + } + None => { + new_protocol.writer_features = + Some(hashset! {WriterFeatures::ChangeDataFeed}) + } + } + } else if new_protocol.min_writer_version <= 3 { + new_protocol.min_writer_version = 4 + } + } + Ok(false) => {} + _ => { + return Err(DeltaTableError::Generic(format!( + "delta.enableChangeDataFeed = '{}' is invalid, valid values are ['true']", + enable_cdf + ))) + } + } + } + + if let Some(enable_dv) = parsed_properties.get(&DeltaConfigKey::EnableDeletionVectors) { + let if_enable_dv = enable_dv.to_ascii_lowercase().parse::(); + match if_enable_dv { + Ok(true) => { + let writer_features = match new_protocol.writer_features { + Some(mut features) => { + features.insert(WriterFeatures::DeletionVectors); + features + } + None => hashset! {WriterFeatures::DeletionVectors}, + }; + let reader_features = match new_protocol.reader_features { + Some(mut features) => { + features.insert(ReaderFeatures::DeletionVectors); + features + } + None => hashset! {ReaderFeatures::DeletionVectors}, + }; + new_protocol.min_reader_version = 3; + new_protocol.min_writer_version = 7; + new_protocol.writer_features = Some(writer_features); + new_protocol.reader_features = Some(reader_features); + } + Ok(false) => {} + _ => { + return Err(DeltaTableError::Generic(format!( + "delta.enableDeletionVectors = '{}' is invalid, valid values are ['true']", + enable_dv + ))) + } + } + } + + Ok(new_protocol) +} + +/// Converts existing properties into features if the reader_version is >=3 or writer_version >=3 +/// only converts features that are "true" +pub fn convert_properties_to_features( + mut new_protocol: Protocol, + configuration: &HashMap>, +) -> Protocol { + if new_protocol.min_writer_version >= 7 { + let mut converted_writer_features = configuration + .iter() + .filter(|(_, value)| { + value.as_ref().map_or(false, |v| { + v.to_ascii_lowercase().parse::().is_ok_and(|v| v) + }) + }) + .collect::>>() + .keys() + .map(|key| (*key).clone().into()) + .filter(|v| !matches!(v, WriterFeatures::Other(_))) + .collect::>(); + + if configuration + .keys() + .any(|v| v.contains("delta.constraints.")) + { + converted_writer_features.insert(WriterFeatures::CheckConstraints); + } + + match new_protocol.writer_features { + Some(mut features) => { + features.extend(converted_writer_features); + new_protocol.writer_features = Some(features); + } + None => new_protocol.writer_features = Some(converted_writer_features), + } + } + if new_protocol.min_reader_version >= 3 { + let converted_reader_features = configuration + .iter() + .filter(|(_, value)| { + value.as_ref().map_or(false, |v| { + v.to_ascii_lowercase().parse::().is_ok_and(|v| v) + }) + }) + .map(|(key, _)| (*key).clone().into()) + .filter(|v| !matches!(v, ReaderFeatures::Other(_))) + .collect::>(); + match new_protocol.reader_features { + Some(mut features) => { + features.extend(converted_reader_features); + new_protocol.reader_features = Some(features); + } + None => new_protocol.reader_features = Some(converted_reader_features), + } + } + new_protocol +} + +impl std::future::IntoFuture for SetTablePropertiesBuilder { + type Output = DeltaResult; + + type IntoFuture = BoxFuture<'static, Self::Output>; + + fn into_future(self) -> Self::IntoFuture { + let mut this = self; + + Box::pin(async move { + let mut metadata = this.snapshot.metadata().clone(); + + let current_protocol = this.snapshot.protocol(); + let properties = this.properties; + + let new_protocol = apply_properties_to_protocol( + current_protocol, + &properties, + this.raise_if_not_exists, + )?; + + metadata.configuration.extend( + properties + .clone() + .into_iter() + .map(|(k, v)| (k, Some(v))) + .collect::>>(), + ); + + let final_protocol = + convert_properties_to_features(new_protocol, &metadata.configuration); + + let operation = DeltaOperation::SetTableProperties { properties }; + + let mut actions = vec![Action::Metadata(metadata)]; + + if current_protocol.ne(&final_protocol) { + actions.push(Action::Protocol(final_protocol)); + } + + let commit = CommitBuilder::from(this.commit_properties) + .with_actions(actions.clone()) + .build( + Some(&this.snapshot), + this.log_store.clone(), + operation.clone(), + )? + .await?; + Ok(DeltaTable::new_with_state( + this.log_store, + commit.snapshot(), + )) + }) + } +} diff --git a/crates/core/src/protocol/mod.rs b/crates/core/src/protocol/mod.rs index a9783bc39e..9cfa429fde 100644 --- a/crates/core/src/protocol/mod.rs +++ b/crates/core/src/protocol/mod.rs @@ -407,6 +407,13 @@ pub enum DeltaOperation { epoch_id: i64, }, + /// Set table properties operations + #[serde(rename_all = "camelCase")] + SetTableProperties { + /// Table properties that were added + properties: HashMap, + }, + #[serde(rename_all = "camelCase")] /// Represents a `Optimize` operation Optimize { @@ -461,6 +468,7 @@ impl DeltaOperation { DeltaOperation::Update { .. } => "UPDATE", DeltaOperation::Merge { .. } => "MERGE", DeltaOperation::StreamingUpdate { .. } => "STREAMING UPDATE", + DeltaOperation::SetTableProperties { .. } => "SET TBLPROPERTIES", DeltaOperation::Optimize { .. } => "OPTIMIZE", DeltaOperation::FileSystemCheck { .. } => "FSCK", DeltaOperation::Restore { .. } => "RESTORE", @@ -504,6 +512,7 @@ impl DeltaOperation { pub fn changes_data(&self) -> bool { match self { Self::Optimize { .. } + | Self::SetTableProperties { .. } | Self::VacuumStart { .. } | Self::VacuumEnd { .. } | Self::AddConstraint { .. } diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index 8cf2262d2c..a6f5094b35 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -92,6 +92,12 @@ class RawDeltaTable: raise_if_not_exists: bool, custom_metadata: Optional[Dict[str, str]], ) -> None: ... + def set_table_properties( + self, + properties: Dict[str, str], + raise_if_not_exists: bool, + custom_metadata: Optional[Dict[str, str]], + ) -> None: ... def restore( self, target: Optional[Any], diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 31e71e8fc6..f10e5c4932 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -1896,6 +1896,24 @@ def drop_constraint( """ self.table._table.drop_constraints(name, raise_if_not_exists, custom_metadata) + def set_table_properties( + self, + properties: Dict[str, str], + raise_if_not_exists: bool = True, + custom_metadata: Optional[Dict[str, str]] = None, + ) -> None: + """ + Unset properties from the table. + Args: + properties: properties which set + raise_if_not_exists: set if should raise if not exists. + custom_metadata: custom metadata that will be added to the transaction commit. + Example: + """ + self.table._table.set_table_properties( + properties, raise_if_not_exists, custom_metadata + ) + class TableOptimizer: """API for various table optimization commands.""" diff --git a/python/src/lib.rs b/python/src/lib.rs index 6a8c4288b4..bf06fd4b21 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -37,6 +37,7 @@ use deltalake::operations::load_cdf::CdfLoadBuilder; use deltalake::operations::merge::MergeBuilder; use deltalake::operations::optimize::{OptimizeBuilder, OptimizeType}; use deltalake::operations::restore::RestoreBuilder; +use deltalake::operations::set_tbl_properties::SetTablePropertiesBuilder; use deltalake::operations::transaction::{ CommitBuilder, CommitProperties, TableReference, PROTOCOL, }; @@ -1174,6 +1175,34 @@ impl RawDeltaTable { Ok(serde_json::to_string(&metrics).unwrap()) } + #[pyo3(signature = (properties, raise_if_not_exists, custom_metadata=None))] + pub fn set_table_properties( + &mut self, + properties: HashMap, + raise_if_not_exists: bool, + custom_metadata: Option>, + ) -> PyResult<()> { + let mut cmd = SetTablePropertiesBuilder::new( + self._table.log_store(), + self._table.snapshot().map_err(PythonError::from)?.clone(), + ) + .with_properties(properties) + .with_raise_if_not_exists(raise_if_not_exists); + + if let Some(metadata) = custom_metadata { + let json_metadata: Map = + metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); + cmd = cmd + .with_commit_properties(CommitProperties::default().with_metadata(json_metadata)); + }; + + let table = rt() + .block_on(cmd.into_future()) + .map_err(PythonError::from)?; + self._table.state = table.state; + Ok(()) + } + /// Execute the File System Check command (FSCK) on the delta table: removes old reference to files that /// have been deleted or are malformed #[pyo3(signature = (dry_run = true, custom_metadata = None))] diff --git a/python/tests/test_alter.py b/python/tests/test_alter.py index 4bc902d330..fb03acd23b 100644 --- a/python/tests/test_alter.py +++ b/python/tests/test_alter.py @@ -113,3 +113,195 @@ def test_drop_constraint_roundtrip_metadata( dt.alter.drop_constraint("check_price2", custom_metadata={"userName": "John Doe"}) assert dt.history(1)[0]["userName"] == "John Doe" + + +@pytest.mark.parametrize("min_writer_version", ["2", "3", "4", "5", "6", "7"]) +def test_set_table_properties_min_writer_version( + tmp_path: pathlib.Path, + sample_table: pa.Table, + min_writer_version: str, +): + write_deltalake(tmp_path, sample_table, mode="append", engine="rust") + dt = DeltaTable(tmp_path) + + configuration = {"delta.minWriterVersion": min_writer_version} + dt.alter.set_table_properties(configuration) + + protocol = dt.protocol() + + assert dt.metadata().configuration == configuration + assert protocol.min_reader_version == 1 + assert protocol.min_writer_version == int(min_writer_version) + + +def test_set_table_properties_invalid_min_writer_version( + tmp_path: pathlib.Path, sample_table: pa.Table +): + write_deltalake(tmp_path, sample_table, mode="append", engine="rust") + dt = DeltaTable(tmp_path) + with pytest.raises(DeltaError): + dt.alter.set_table_properties({"delta.minWriterVersion": "8"}) + + protocol = dt.protocol() + assert dt.metadata().configuration == {} + assert protocol.min_reader_version == 1 + assert protocol.min_writer_version == 2 + + +@pytest.mark.parametrize("min_reader_version", ["1", "2", "3"]) +def test_set_table_properties_min_reader_version( + tmp_path: pathlib.Path, + sample_table: pa.Table, + min_reader_version: str, +): + write_deltalake(tmp_path, sample_table, mode="append", engine="rust") + dt = DeltaTable(tmp_path) + configuration = {"delta.minReaderVersion": min_reader_version} + dt.alter.set_table_properties(configuration) + + protocol = dt.protocol() + assert dt.metadata().configuration == configuration + assert protocol.min_reader_version == int(min_reader_version) + assert protocol.min_writer_version == 2 + + +def test_set_table_properties_invalid_min_reader_version( + tmp_path: pathlib.Path, sample_table: pa.Table +): + write_deltalake(tmp_path, sample_table, mode="append", engine="rust") + dt = DeltaTable(tmp_path) + with pytest.raises(DeltaError): + dt.alter.set_table_properties({"delta.minReaderVersion": "8"}) + + protocol = dt.protocol() + assert dt.metadata().configuration == {} + assert protocol.min_reader_version == 1 + assert protocol.min_writer_version == 2 + + +def test_set_table_properties_enable_cdf( + tmp_path: pathlib.Path, sample_table: pa.Table +): + write_deltalake(tmp_path, sample_table, mode="append", engine="rust") + dt = DeltaTable(tmp_path) + dt.alter.set_table_properties({"delta.enableChangeDataFeed": "true"}) + + protocol = dt.protocol() + assert dt.metadata().configuration == {"delta.enableChangeDataFeed": "true"} + assert protocol.min_reader_version == 1 + assert protocol.min_writer_version == 4 + + +def test_set_table_properties_enable_cdf_invalid( + tmp_path: pathlib.Path, sample_table: pa.Table +): + write_deltalake(tmp_path, sample_table, mode="append", engine="rust") + dt = DeltaTable(tmp_path) + with pytest.raises(DeltaError): + dt.alter.set_table_properties({"delta.enableChangeDataFeed": "wrong"}) + + protocol = dt.protocol() + assert dt.metadata().configuration == {} + assert protocol.min_reader_version == 1 + assert protocol.min_writer_version == 2 + + +def test_set_table_properties_enable_cdf_value_false( + tmp_path: pathlib.Path, sample_table: pa.Table +): + write_deltalake(tmp_path, sample_table, mode="append", engine="rust") + dt = DeltaTable(tmp_path) + dt.alter.set_table_properties({"delta.enableChangeDataFeed": "false"}) + + protocol = dt.protocol() + assert dt.metadata().configuration == {"delta.enableChangeDataFeed": "false"} + assert protocol.min_reader_version == 1 + assert protocol.min_writer_version == 2 + + +def test_set_table_properties_enable_cdf_with_writer_version_bumped( + tmp_path: pathlib.Path, sample_table: pa.Table +): + write_deltalake(tmp_path, sample_table, mode="append", engine="rust") + dt = DeltaTable(tmp_path) + dt.alter.set_table_properties( + {"delta.enableChangeDataFeed": "true", "delta.minWriterVersion": "7"} + ) + + protocol = dt.protocol() + assert dt.metadata().configuration == { + "delta.enableChangeDataFeed": "true", + "delta.minWriterVersion": "7", + } + assert protocol.min_reader_version == 1 + assert protocol.min_writer_version == 7 + assert protocol.writer_features == ["changeDataFeed"] + + +def test_set_table_properties_enable_cdf_and_deletion_vectors( + tmp_path: pathlib.Path, sample_table: pa.Table +): + write_deltalake(tmp_path, sample_table, mode="append", engine="rust") + dt = DeltaTable(tmp_path) + dt.alter.set_table_properties( + {"delta.enableChangeDataFeed": "true", "delta.enableDeletionVectors": "true"} + ) + + protocol = dt.protocol() + assert dt.metadata().configuration == { + "delta.enableChangeDataFeed": "true", + "delta.enableDeletionVectors": "true", + } + assert protocol.min_reader_version == 3 + assert protocol.min_writer_version == 7 + assert list(sorted(protocol.writer_features)) == [ # type: ignore + "changeDataFeed", + "deletionVectors", + ] + assert protocol.reader_features == ["deletionVectors"] + + +def test_convert_checkConstraints_to_feature_after_version_upgrade( + tmp_path: pathlib.Path, sample_table: pa.Table +): + write_deltalake(tmp_path, sample_table) + + dt = DeltaTable(tmp_path) + + dt.alter.add_constraint({"check_price": "price >= 0"}) + + last_action = dt.history(1)[0] + assert last_action["operation"] == "ADD CONSTRAINT" + assert dt.version() == 1 + assert dt.metadata().configuration == { + "delta.constraints.check_price": "price >= 0" + } + assert dt.protocol().min_writer_version == 3 + + dt.alter.set_table_properties({"delta.enableDeletionVectors": "true"}) + + protocol = dt.protocol() + assert dt.metadata().configuration == { + "delta.constraints.check_price": "price >= 0", + "delta.enableDeletionVectors": "true", + } + assert protocol.min_reader_version == 3 + assert protocol.min_writer_version == 7 + assert list(sorted(protocol.writer_features)) == [ # type: ignore + "checkConstraints", + "deletionVectors", + ] + assert protocol.reader_features == ["deletionVectors"] + + +def test_set_table_properties_enable_dv(tmp_path: pathlib.Path, sample_table: pa.Table): + write_deltalake(tmp_path, sample_table, mode="append", engine="rust") + dt = DeltaTable(tmp_path) + dt.alter.set_table_properties({"delta.enableDeletionVectors": "true"}) + + protocol = dt.protocol() + assert dt.metadata().configuration == {"delta.enableDeletionVectors": "true"} + assert protocol.min_reader_version == 3 + assert protocol.min_writer_version == 7 + assert protocol.writer_features == ["deletionVectors"] + assert protocol.reader_features == ["deletionVectors"] diff --git a/python/tests/test_convert_to_delta.py b/python/tests/test_convert_to_delta.py index 164e817fb1..e7c59432e3 100644 --- a/python/tests/test_convert_to_delta.py +++ b/python/tests/test_convert_to_delta.py @@ -23,7 +23,7 @@ def test_local_convert_to_delta(tmp_path: pathlib.Path, sample_data: pa.Table): tmp_path, name=name, description=description, - configuration={"delta.AppendOnly": "True"}, + configuration={"delta.appendOnly": "true"}, custom_metadata={"userName": "John Doe"}, ) @@ -33,7 +33,7 @@ def test_local_convert_to_delta(tmp_path: pathlib.Path, sample_data: pa.Table): assert dt.files() == ["part-0.parquet"] assert dt.metadata().name == name assert dt.metadata().description == description - assert dt.metadata().configuration == {"delta.AppendOnly": "True"} + assert dt.metadata().configuration == {"delta.appendOnly": "true"} assert dt.history()[0]["userName"] == "John Doe" diff --git a/python/tests/test_create.py b/python/tests/test_create.py index ceca8178c3..4bb73183fe 100644 --- a/python/tests/test_create.py +++ b/python/tests/test_create.py @@ -13,7 +13,10 @@ def test_create_roundtrip_metadata(tmp_path: pathlib.Path, sample_data: pa.Table sample_data.schema, name="test_name", description="test_desc", - configuration={"delta.appendOnly": "false", "foo": "bar"}, + configuration={ + "delta.appendOnly": "true", + "delta.logRetentionDuration": "interval 2 days", + }, custom_metadata={"userName": "John Doe"}, ) @@ -21,9 +24,14 @@ def test_create_roundtrip_metadata(tmp_path: pathlib.Path, sample_data: pa.Table assert metadata.name == "test_name" assert metadata.description == "test_desc" - assert metadata.configuration == {"delta.appendOnly": "false", "foo": "bar"} + assert metadata.configuration == { + "delta.appendOnly": "true", + "delta.logRetentionDuration": "interval 2 days", + } assert dt.history()[0]["userName"] == "John Doe" + assert {*dt.protocol().writer_features} == {"appendOnly", "timestampNtz"} # type: ignore + def test_create_modes(tmp_path: pathlib.Path, sample_data: pa.Table): dt = DeltaTable.create(tmp_path, sample_data.schema, mode="error") @@ -56,6 +64,67 @@ def test_create_schema(tmp_path: pathlib.Path, sample_data: pa.Table): assert dt.schema().to_pyarrow() == sample_data.schema +def test_create_with_deletion_vectors_enabled( + tmp_path: pathlib.Path, sample_table: pa.Table +): + """append only is set to false so shouldn't be converted to a feature""" + dt = DeltaTable.create( + tmp_path, + sample_table.schema, + name="test_name", + description="test_desc", + configuration={ + "delta.appendOnly": "false", + "delta.enableDeletionVectors": "true", + }, + custom_metadata={"userName": "John Doe"}, + ) + + metadata = dt.metadata() + protocol = dt.protocol() + assert metadata.name == "test_name" + assert metadata.description == "test_desc" + assert metadata.configuration == { + "delta.appendOnly": "false", + "delta.enableDeletionVectors": "true", + } + assert protocol.min_reader_version == 3 + assert protocol.min_writer_version == 7 + assert protocol.writer_features == ["deletionVectors"] # type: ignore + assert protocol.reader_features == ["deletionVectors"] + assert dt.history()[0]["userName"] == "John Doe" + + +def test_create_higher_protocol_versions( + tmp_path: pathlib.Path, sample_table: pa.Table +): + dt = DeltaTable.create( + tmp_path, + sample_table.schema, + name="test_name", + description="test_desc", + configuration={ + "delta.appendOnly": "false", + "delta.minReaderVersion": "2", + "delta.minWriterVersion": "5", + }, + custom_metadata={"userName": "John Doe"}, + ) + + metadata = dt.metadata() + protocol = dt.protocol() + assert metadata.name == "test_name" + assert metadata.description == "test_desc" + assert metadata.configuration == { + "delta.appendOnly": "false", + "delta.minReaderVersion": "2", + "delta.minWriterVersion": "5", + } + assert protocol.min_reader_version == 2 + assert protocol.min_writer_version == 5 + assert dt.history()[0]["userName"] == "John Doe" + + def test_create_or_replace_existing_table( tmp_path: pathlib.Path, sample_data: pa.Table ): diff --git a/python/tests/test_writer.py b/python/tests/test_writer.py index 6e973ab18d..0a7e766cac 100644 --- a/python/tests/test_writer.py +++ b/python/tests/test_writer.py @@ -353,7 +353,7 @@ def test_roundtrip_metadata(tmp_path: pathlib.Path, sample_data: pa.Table, engin sample_data, name="test_name", description="test_desc", - configuration={"delta.appendOnly": "false", "foo": "bar"}, + configuration={"delta.appendOnly": "false"}, engine=engine, ) @@ -363,7 +363,7 @@ def test_roundtrip_metadata(tmp_path: pathlib.Path, sample_data: pa.Table, engin assert metadata.name == "test_name" assert metadata.description == "test_desc" - assert metadata.configuration == {"delta.appendOnly": "false", "foo": "bar"} + assert metadata.configuration == {"delta.appendOnly": "false"} @pytest.mark.parametrize("engine", ["pyarrow", "rust"])