Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(python, rust): add set table properties operation #2264

Merged
merged 25 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
7c808f6
init
ion-elgreco Mar 8, 2024
f049fcf
add tests
ion-elgreco Mar 8, 2024
444b4a7
add tests for create
ion-elgreco Mar 8, 2024
510ce05
remove config to feature in from_str for invariants/check constraints
ion-elgreco Mar 8, 2024
a3c02e9
only convert configs that have "true"
ion-elgreco Mar 8, 2024
9a708fc
init
ion-elgreco Mar 8, 2024
070e780
add tests
ion-elgreco Mar 8, 2024
e29188c
add tests for create
ion-elgreco Mar 8, 2024
48461cd
remove config to feature in from_str for invariants/check constraints
ion-elgreco Mar 8, 2024
be87d69
only convert configs that have "true"
ion-elgreco Mar 8, 2024
4b43481
Merge branch 'feat/set_table_properties' of https://github.com/ion-el…
ion-elgreco Mar 16, 2024
7c59317
Merge branch 'main' of https://github.com/ion-elgreco/delta-rs into f…
ion-elgreco Mar 16, 2024
e82abfb
use new commitbuilder
ion-elgreco Mar 16, 2024
9456a43
clippy
ion-elgreco Mar 16, 2024
ee9affe
Merge branch 'main' into feat/set_table_properties
ion-elgreco Mar 19, 2024
fd68f8b
Merge branch 'main' into feat/set_table_properties
ion-elgreco Mar 20, 2024
77c0c09
Merge branch 'main' into feat/set_table_properties
ion-elgreco Mar 23, 2024
148fe05
Merge branch 'main' into feat/set_table_properties
ion-elgreco Mar 28, 2024
943203e
Merge branch 'main' into feat/set_table_properties
ion-elgreco Mar 28, 2024
3691199
Merge branch 'main' into feat/set_table_properties
ion-elgreco Mar 29, 2024
c7f0283
Merge branch 'main' into feat/set_table_properties
ion-elgreco Apr 1, 2024
bfe15b0
Merge branch 'main' into feat/set_table_properties
ion-elgreco Apr 10, 2024
45ceeff
Merge remote-tracking branch 'upstream/main' into feat/set_table_prop…
rtyler May 15, 2024
1252b9c
fix: accommodate changes in Commit APIs
rtyler May 15, 2024
51169c6
rm state merge, not required anymore
ion-elgreco May 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/core/src/kernel/models/actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,8 @@ impl From<&str> for WriterFeatures {
fn from(value: &str) -> Self {
match value {
"appendOnly" | "delta.appendOnly" => WriterFeatures::AppendOnly,
"invariants" | "delta.invariants" => WriterFeatures::Invariants,
"checkConstraints" | "delta.checkConstraints" => WriterFeatures::CheckConstraints,
"invariants" => WriterFeatures::Invariants,
"checkConstraints" => WriterFeatures::CheckConstraints,
"changeDataFeed" | "delta.enableChangeDataFeed" => WriterFeatures::ChangeDataFeed,
"generatedColumns" => WriterFeatures::GeneratedColumns,
"columnMapping" => WriterFeatures::ColumnMapping,
Expand Down
75 changes: 36 additions & 39 deletions crates/core/src/operations/create.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
//! Command for creating a new delta table
// https://github.com/delta-io/delta/blob/master/core/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala

use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::sync::Arc;

use futures::future::BoxFuture;
use maplit::hashset;
use serde_json::Value;

use super::transaction::{CommitBuilder, TableReference, PROTOCOL};
Expand All @@ -13,6 +14,9 @@ use crate::kernel::{
Action, DataType, Metadata, Protocol, ReaderFeatures, StructField, StructType, WriterFeatures,
};
use crate::logstore::{LogStore, LogStoreRef};
use crate::operations::set_tbl_properties::{
apply_properties_to_protocol, convert_properties_to_features,
};
use crate::protocol::{DeltaOperation, SaveMode};
use crate::table::builder::ensure_table_uri;
use crate::table::config::DeltaConfigKey;
Expand Down Expand Up @@ -237,41 +241,28 @@ impl CreateBuilder {
)
};

let configuration = self.configuration;
let contains_timestampntz = PROTOCOL.contains_timestampntz(&self.columns);

// TODO configure more permissive versions based on configuration. Also how should this ideally be handled?
// We set the lowest protocol we can, and if subsequent writes use newer features we update metadata?

let (min_reader_version, min_writer_version, writer_features, reader_features) =
if contains_timestampntz {
let mut converted_writer_features = self
.configuration
.keys()
.map(|key| key.clone().into())
.filter(|v| !matches!(v, WriterFeatures::Other(_)))
.collect::<HashSet<WriterFeatures>>();

let mut converted_reader_features = self
.configuration
.keys()
.map(|key| key.clone().into())
.filter(|v| !matches!(v, ReaderFeatures::Other(_)))
.collect::<HashSet<ReaderFeatures>>();
converted_writer_features.insert(WriterFeatures::TimestampWithoutTimezone);
converted_reader_features.insert(ReaderFeatures::TimestampWithoutTimezone);
(
3,
7,
Some(converted_writer_features),
Some(converted_reader_features),
)
} else {
(
PROTOCOL.default_reader_version(),
PROTOCOL.default_writer_version(),
None,
None,
)
};
let current_protocol = if contains_timestampntz {
Protocol {
min_reader_version: 3,
min_writer_version: 7,
writer_features: Some(hashset! {WriterFeatures::TimestampWithoutTimezone}),
reader_features: Some(hashset! {ReaderFeatures::TimestampWithoutTimezone}),
}
} else {
Protocol {
min_reader_version: PROTOCOL.default_reader_version(),
min_writer_version: PROTOCOL.default_writer_version(),
reader_features: None,
writer_features: None,
}
};

let protocol = self
.actions
.iter()
Expand All @@ -280,17 +271,23 @@ impl CreateBuilder {
Action::Protocol(p) => p.clone(),
_ => unreachable!(),
})
.unwrap_or_else(|| Protocol {
min_reader_version,
min_writer_version,
writer_features,
reader_features,
});
.unwrap_or_else(|| current_protocol);

let protocol = apply_properties_to_protocol(
&protocol,
&configuration
.iter()
.map(|(k, v)| (k.clone(), v.clone().unwrap()))
.collect::<HashMap<String, String>>(),
true,
)?;

let protocol = convert_properties_to_features(protocol, &configuration);

let mut metadata = Metadata::try_new(
StructType::new(self.columns),
self.partition_columns.unwrap_or_default(),
self.configuration,
configuration,
)?
.with_created_time(chrono::Utc::now().timestamp_millis());
if let Some(name) = self.name {
Expand Down
7 changes: 7 additions & 0 deletions crates/core/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub use ::datafusion::physical_plan::common::collect as collect_sendable_stream;
use arrow::record_batch::RecordBatch;
use optimize::OptimizeBuilder;
use restore::RestoreBuilder;
use set_tbl_properties::SetTablePropertiesBuilder;

#[cfg(feature = "datafusion")]
pub mod constraints;
Expand All @@ -48,6 +49,7 @@ mod load;
pub mod load_cdf;
#[cfg(feature = "datafusion")]
pub mod merge;
pub mod set_tbl_properties;
#[cfg(feature = "datafusion")]
pub mod update;
#[cfg(feature = "datafusion")]
Expand Down Expand Up @@ -219,6 +221,11 @@ impl DeltaOps {
pub fn drop_constraints(self) -> DropConstraintBuilder {
DropConstraintBuilder::new(self.0.log_store, self.0.state.unwrap())
}

/// Set table properties
pub fn set_tbl_properties(self) -> SetTablePropertiesBuilder {
SetTablePropertiesBuilder::new(self.0.log_store, self.0.state.unwrap())
}
}

impl From<DeltaTable> for DeltaOps {
Expand Down
Loading
Loading