Skip to content

Commit

Permalink
refactor: kernel table features
Browse files Browse the repository at this point in the history
Signed-off-by: Ion Koutsouris <[email protected]>
  • Loading branch information
ion-elgreco committed Jan 12, 2025
1 parent b831543 commit 1bb60ee
Show file tree
Hide file tree
Showing 15 changed files with 114 additions and 254 deletions.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ debug = true
debug = "line-tables-only"

[workspace.dependencies]
delta_kernel = { version = "0.6.0", features = ["default-engine"] }
delta_kernel = { version = "=0.6.0", features = ["default-engine"] }
#delta_kernel = { path = "../delta-kernel-rs/kernel", features = ["sync-engine"] }

# arrow
Expand Down Expand Up @@ -59,6 +59,8 @@ datafusion-sql = { version = "44" }
# serde
serde = { version = "1.0.194", features = ["derive"] }
serde_json = "1"
strum = { version = "*"}


# "stdlib"
bytes = { version = "1" }
Expand Down
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ datafusion-functions-aggregate = { workspace = true, optional = true }
# serde
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
strum = { workspace = true}

# "stdlib"
bytes = { workspace = true }
Expand Down
259 changes: 42 additions & 217 deletions crates/core/src/kernel/models/actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use url::Url;
use super::schema::StructType;
use crate::kernel::{error::Error, DeltaResult};
use crate::TableProperty;
use delta_kernel::table_features::{ReaderFeatures, WriterFeatures};

/// Defines a file format used in table
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -183,18 +184,37 @@ impl Protocol {
mut self,
configuration: &HashMap<String, Option<String>>,
) -> Protocol {
fn parse_bool(value: &Option<String>) -> bool {
value
.as_ref()
.is_some_and(|v| v.to_ascii_lowercase().parse::<bool>().is_ok_and(|v| v))
}

if self.min_writer_version >= 7 {
// TODO: move this is in future to use delta_kernel::table_properties
let mut converted_writer_features = configuration
.iter()
.filter(|(_, value)| {
value
.as_ref()
.is_some_and(|v| v.to_ascii_lowercase().parse::<bool>().is_ok_and(|v| v))
})
.collect::<HashMap<&String, &Option<String>>>()
.keys()
.map(|key| (*key).clone().into())
.filter(|v| !matches!(v, WriterFeatures::Other(_)))
.filter_map(|(key, value)| match key.as_str() {
"delta.enableChangeDataFeed" if parse_bool(value) => {
Some(WriterFeatures::ChangeDataFeed)
}
"delta.appendOnly" if parse_bool(value) => Some(WriterFeatures::AppendOnly),
"delta.enableDeletionVectors" if parse_bool(value) => {
Some(WriterFeatures::DeletionVectors)
}
"delta.enableRowTracking" if parse_bool(value) => {
Some(WriterFeatures::RowTracking)
}
"delta.checkpointPolicy" if value.clone().unwrap_or_default() == "v2" => {
Some(WriterFeatures::V2Checkpoint)
}
_ => None,
})
.collect::<HashSet<WriterFeatures>>();

if configuration
Expand All @@ -215,13 +235,15 @@ impl Protocol {
if self.min_reader_version >= 3 {
let converted_reader_features = configuration
.iter()
.filter(|(_, value)| {
value
.as_ref()
.is_some_and(|v| v.to_ascii_lowercase().parse::<bool>().is_ok_and(|v| v))
.filter_map(|(key, value)| match key.as_str() {
"delta.enableDeletionVectors" if parse_bool(value) => {
Some(ReaderFeatures::DeletionVectors)
}
"delta.checkpointPolicy" if value.clone().unwrap_or_default() == "v2" => {
Some(ReaderFeatures::V2Checkpoint)
}
_ => None,
})
.map(|(key, _)| (*key).clone().into())
.filter(|v| !matches!(v, ReaderFeatures::Other(_)))
.collect::<HashSet<ReaderFeatures>>();
match self.reader_features {
Some(mut features) => {
Expand Down Expand Up @@ -459,225 +481,28 @@ impl fmt::Display for TableFeatures {
}
}

impl TableFeatures {
/// Convert table feature to respective reader or/and write feature
pub fn to_reader_writer_features(&self) -> (Option<ReaderFeatures>, Option<WriterFeatures>) {
let reader_feature = ReaderFeatures::try_from(self).ok();
let writer_feature = WriterFeatures::try_from(self).ok();
(reader_feature, writer_feature)
}
}

/// Features table readers can support as well as let users know
/// what is supported
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Hash)]
#[serde(rename_all = "camelCase")]
pub enum ReaderFeatures {
/// Mapping of one column to another
ColumnMapping,
/// Deletion vectors for merge, update, delete
DeletionVectors,
/// timestamps without timezone support
#[serde(rename = "timestampNtz")]
TimestampWithoutTimezone,
/// version 2 of checkpointing
V2Checkpoint,
/// If we do not match any other reader features
#[serde(untagged)]
Other(String),
}

impl From<&parquet::record::Field> for ReaderFeatures {
fn from(value: &parquet::record::Field) -> Self {
match value {
parquet::record::Field::Str(feature) => match feature.as_str() {
"columnMapping" => ReaderFeatures::ColumnMapping,
"deletionVectors" | "delta.enableDeletionVectors" => {
ReaderFeatures::DeletionVectors
}
"timestampNtz" => ReaderFeatures::TimestampWithoutTimezone,
"v2Checkpoint" => ReaderFeatures::V2Checkpoint,
f => ReaderFeatures::Other(f.to_string()),
},
f => ReaderFeatures::Other(f.to_string()),
}
}
}

impl From<String> for ReaderFeatures {
fn from(value: String) -> Self {
value.as_str().into()
}
}

impl From<&str> for ReaderFeatures {
fn from(value: &str) -> Self {
match value {
"columnMapping" => ReaderFeatures::ColumnMapping,
"deletionVectors" => ReaderFeatures::DeletionVectors,
"timestampNtz" => ReaderFeatures::TimestampWithoutTimezone,
"v2Checkpoint" => ReaderFeatures::V2Checkpoint,
f => ReaderFeatures::Other(f.to_string()),
}
}
}

impl AsRef<str> for ReaderFeatures {
fn as_ref(&self) -> &str {
match self {
ReaderFeatures::ColumnMapping => "columnMapping",
ReaderFeatures::DeletionVectors => "deletionVectors",
ReaderFeatures::TimestampWithoutTimezone => "timestampNtz",
ReaderFeatures::V2Checkpoint => "v2Checkpoint",
ReaderFeatures::Other(f) => f,
}
}
}

impl fmt::Display for ReaderFeatures {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.as_ref())
}
}

impl TryFrom<&TableFeatures> for ReaderFeatures {
type Error = String;
type Error = strum::ParseError;

fn try_from(value: &TableFeatures) -> Result<Self, Self::Error> {
match ReaderFeatures::from(value.as_ref()) {
ReaderFeatures::Other(_) => {
Err(format!("Table feature {} is not a reader feature", value))
}
value => Ok(value),
}
}
}

/// Features table writers can support as well as let users know
/// what is supported
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Hash)]
#[serde(rename_all = "camelCase")]
pub enum WriterFeatures {
/// Append Only Tables
AppendOnly,
/// Table invariants
Invariants,
/// Check constraints on columns
CheckConstraints,
/// CDF on a table
ChangeDataFeed,
/// Columns with generated values
GeneratedColumns,
/// Mapping of one column to another
ColumnMapping,
/// ID Columns
IdentityColumns,
/// Deletion vectors for merge, update, delete
DeletionVectors,
/// Row tracking on tables
RowTracking,
/// timestamps without timezone support
#[serde(rename = "timestampNtz")]
TimestampWithoutTimezone,
/// domain specific metadata
DomainMetadata,
/// version 2 of checkpointing
V2Checkpoint,
/// Iceberg compatibility support
IcebergCompatV1,
/// If we do not match any other reader features
#[serde(untagged)]
Other(String),
}

impl From<String> for WriterFeatures {
fn from(value: String) -> Self {
value.as_str().into()
}
}

impl From<&str> for WriterFeatures {
fn from(value: &str) -> Self {
match value {
"appendOnly" | "delta.appendOnly" => WriterFeatures::AppendOnly,
"invariants" => WriterFeatures::Invariants,
"checkConstraints" => WriterFeatures::CheckConstraints,
"changeDataFeed" | "delta.enableChangeDataFeed" => WriterFeatures::ChangeDataFeed,
"generatedColumns" => WriterFeatures::GeneratedColumns,
"columnMapping" => WriterFeatures::ColumnMapping,
"identityColumns" => WriterFeatures::IdentityColumns,
"deletionVectors" | "delta.enableDeletionVectors" => WriterFeatures::DeletionVectors,
"rowTracking" | "delta.enableRowTracking" => WriterFeatures::RowTracking,
"timestampNtz" => WriterFeatures::TimestampWithoutTimezone,
"domainMetadata" => WriterFeatures::DomainMetadata,
"v2Checkpoint" => WriterFeatures::V2Checkpoint,
"icebergCompatV1" => WriterFeatures::IcebergCompatV1,
f => WriterFeatures::Other(f.to_string()),
}
}
}

impl AsRef<str> for WriterFeatures {
fn as_ref(&self) -> &str {
match self {
WriterFeatures::AppendOnly => "appendOnly",
WriterFeatures::Invariants => "invariants",
WriterFeatures::CheckConstraints => "checkConstraints",
WriterFeatures::ChangeDataFeed => "changeDataFeed",
WriterFeatures::GeneratedColumns => "generatedColumns",
WriterFeatures::ColumnMapping => "columnMapping",
WriterFeatures::IdentityColumns => "identityColumns",
WriterFeatures::DeletionVectors => "deletionVectors",
WriterFeatures::RowTracking => "rowTracking",
WriterFeatures::TimestampWithoutTimezone => "timestampNtz",
WriterFeatures::DomainMetadata => "domainMetadata",
WriterFeatures::V2Checkpoint => "v2Checkpoint",
WriterFeatures::IcebergCompatV1 => "icebergCompatV1",
WriterFeatures::Other(f) => f,
}
}
}

impl fmt::Display for WriterFeatures {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.as_ref())
ReaderFeatures::try_from(value.as_ref())
}
}

impl TryFrom<&TableFeatures> for WriterFeatures {
type Error = String;
type Error = strum::ParseError;

fn try_from(value: &TableFeatures) -> Result<Self, Self::Error> {
match WriterFeatures::from(value.as_ref()) {
WriterFeatures::Other(_) => {
Err(format!("Table feature {} is not a writer feature", value))
}
value => Ok(value),
}
WriterFeatures::try_from(value.as_ref())
}
}

impl From<&parquet::record::Field> for WriterFeatures {
fn from(value: &parquet::record::Field) -> Self {
match value {
parquet::record::Field::Str(feature) => match feature.as_str() {
"appendOnly" => WriterFeatures::AppendOnly,
"invariants" => WriterFeatures::Invariants,
"checkConstraints" => WriterFeatures::CheckConstraints,
"changeDataFeed" => WriterFeatures::ChangeDataFeed,
"generatedColumns" => WriterFeatures::GeneratedColumns,
"columnMapping" => WriterFeatures::ColumnMapping,
"identityColumns" => WriterFeatures::IdentityColumns,
"deletionVectors" => WriterFeatures::DeletionVectors,
"rowTracking" => WriterFeatures::RowTracking,
"timestampNtz" => WriterFeatures::TimestampWithoutTimezone,
"domainMetadata" => WriterFeatures::DomainMetadata,
"v2Checkpoint" => WriterFeatures::V2Checkpoint,
"icebergCompatV1" => WriterFeatures::IcebergCompatV1,
f => WriterFeatures::Other(f.to_string()),
},
f => WriterFeatures::Other(f.to_string()),
}
impl TableFeatures {
/// Convert table feature to respective reader or/and write feature
pub fn to_reader_writer_features(&self) -> (Option<ReaderFeatures>, Option<WriterFeatures>) {
let reader_feature = ReaderFeatures::try_from(self).ok();
let writer_feature = WriterFeatures::try_from(self).ok();
(reader_feature, writer_feature)
}
}

Expand Down
6 changes: 4 additions & 2 deletions crates/core/src/kernel/snapshot/log_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,9 @@ pub(super) async fn list_log_files(

#[cfg(test)]
pub(super) mod tests {
use delta_kernel::table_features::{ReaderFeatures, WriterFeatures};
use deltalake_test::utils::*;
use maplit::hashset;
use tokio::task::JoinHandle;

use crate::{
Expand Down Expand Up @@ -637,8 +639,8 @@ pub(super) mod tests {
let expected = Protocol {
min_reader_version: 3,
min_writer_version: 7,
reader_features: Some(vec!["deletionVectors".into()].into_iter().collect()),
writer_features: Some(vec!["deletionVectors".into()].into_iter().collect()),
reader_features: Some(hashset! {ReaderFeatures::DeletionVectors}),
writer_features: Some(hashset! {WriterFeatures::DeletionVectors}),
};
assert_eq!(protocol, expected);

Expand Down
17 changes: 13 additions & 4 deletions crates/core/src/kernel/snapshot/parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use arrow_array::{
Array, BooleanArray, Int32Array, Int64Array, ListArray, MapArray, StringArray, StructArray,
};
use delta_kernel::table_features::{ReaderFeatures, WriterFeatures};
use percent_encoding::percent_decode_str;

use crate::kernel::arrow::extract::{self as ex, ProvidesColumnByName};
Expand Down Expand Up @@ -63,10 +64,18 @@ pub(super) fn read_protocol(batch: &dyn ProvidesColumnByName) -> DeltaResult<Opt
return Ok(Some(Protocol {
min_reader_version: ex::read_primitive(min_reader_version, idx)?,
min_writer_version: ex::read_primitive(min_writer_version, idx)?,
reader_features: collect_string_list(&maybe_reader_features, idx)
.map(|v| v.into_iter().map(Into::into).collect()),
writer_features: collect_string_list(&maybe_writer_features, idx)
.map(|v| v.into_iter().map(Into::into).collect()),
reader_features: collect_string_list(&maybe_reader_features, idx).map(|v| {
v.into_iter()
.map(|v| TryInto::<ReaderFeatures>::try_into(v.as_str()))
.filter_map(|v| v.ok())
.collect()
}),
writer_features: collect_string_list(&maybe_writer_features, idx).map(|v| {
v.into_iter()
.map(|v| TryInto::<WriterFeatures>::try_into(v.as_str()))
.filter_map(|v| v.ok())
.collect()
}),
}));
}
}
Expand Down
Loading

0 comments on commit 1bb60ee

Please sign in to comment.