Skip to content

Commit

Permalink
fix: use physical name for column name lookup in partitions (#1836)
Browse files Browse the repository at this point in the history
# Description
get_actions wrongly assumes that partition_columns from schema and
partitionValues from log must be the same. This is not true since
partition_columns are logical column names while partitionValues are
physical column names.

Tests pending

# Related Issue(s)

- closes #1835

# Documentation

https://github.com/delta-io/delta/blob/master/PROTOCOL.md#writer-requirements-for-column-mapping
"Track partition values and column level statistics with the physical
name of the column in the transaction log."

---------

Co-authored-by: Will Jones <[email protected]>
  • Loading branch information
aersam and wjones127 authored Nov 24, 2023
1 parent 7250544 commit ba043d6
Show file tree
Hide file tree
Showing 10 changed files with 186 additions and 3 deletions.
3 changes: 3 additions & 0 deletions crates/deltalake-core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@ pub enum DeltaTableError {
#[from]
source: crate::kernel::Error,
},

#[error("Table metadata is invalid: {0}")]
MetadataError(String),
}

impl From<object_store::path::Error> for DeltaTableError {
Expand Down
3 changes: 3 additions & 0 deletions crates/deltalake-core/src/kernel/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ pub enum Error {
/// Invariant expression.
line: String,
},

#[error("Table metadata is invalid: {0}")]
MetadataError(String),
}

#[cfg(feature = "object_store")]
Expand Down
14 changes: 14 additions & 0 deletions crates/deltalake-core/src/kernel/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,20 @@ impl StructField {
self.nullable
}

/// Returns the physical name of the column
/// Equals the name if column mapping is not enabled on table
pub fn physical_name(&self) -> Result<&str, Error> {
// Even on mapping type id the physical name should be there for partitions
let phys_name = self.get_config_value(&ColumnMetadataKey::ColumnMappingPhysicalName);
match phys_name {
None => Ok(&self.name),
Some(MetadataValue::String(s)) => Ok(s),
Some(MetadataValue::Number(_)) => Err(Error::MetadataError(
"Unexpected type for physical name".to_string(),
)),
}
}

#[inline]
/// Returns the data type of the column
pub const fn data_type(&self) -> &DataType {
Expand Down
74 changes: 74 additions & 0 deletions crates/deltalake-core/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1100,6 +1100,80 @@ mod tests {
assert_eq!(expected, actions);
}

#[tokio::test]
async fn test_with_column_mapping() {
// test table with column mapping and partitions
let path = "./tests/data/table_with_column_mapping";
let table = crate::open_table(path).await.unwrap();
let actions = table.get_state().add_actions_table(true).unwrap();
let expected_columns: Vec<(&str, ArrayRef)> = vec![
(
"path",
Arc::new(array::StringArray::from(vec![
"BH/part-00000-4d6e745c-8e04-48d9-aa60-438228358f1a.c000.zstd.parquet",
"8v/part-00001-69b4a452-aeac-4ffa-bf5c-a0c2833d05eb.c000.zstd.parquet",
])),
),
(
"size_bytes",
Arc::new(array::Int64Array::from(vec![890, 810])),
),
(
"modification_time",
Arc::new(arrow::array::TimestampMillisecondArray::from(vec![
1699946088000,
1699946088000,
])),
),
(
"data_change",
Arc::new(array::BooleanArray::from(vec![true, true])),
),
(
"partition.Company Very Short",
Arc::new(array::StringArray::from(vec!["BMS", "BME"])),
),
("num_records", Arc::new(array::Int64Array::from(vec![4, 1]))),
(
"null_count.Company Very Short",
Arc::new(array::NullArray::new(2)),
),
("min.Company Very Short", Arc::new(array::NullArray::new(2))),
("max.Company Very Short", Arc::new(array::NullArray::new(2))),
("null_count.Super Name", Arc::new(array::NullArray::new(2))),
("min.Super Name", Arc::new(array::NullArray::new(2))),
("max.Super Name", Arc::new(array::NullArray::new(2))),
(
"tags.INSERTION_TIME",
Arc::new(array::StringArray::from(vec![
"1699946088000000",
"1699946088000001",
])),
),
(
"tags.MAX_INSERTION_TIME",
Arc::new(array::StringArray::from(vec![
"1699946088000000",
"1699946088000001",
])),
),
(
"tags.MIN_INSERTION_TIME",
Arc::new(array::StringArray::from(vec![
"1699946088000000",
"1699946088000001",
])),
),
(
"tags.OPTIMIZE_TARGET_SIZE",
Arc::new(array::StringArray::from(vec!["33554432", "33554432"])),
),
];
let expected = RecordBatch::try_from_iter(expected_columns.clone()).unwrap();

assert_eq!(expected, actions);
}

#[tokio::test]
async fn test_with_stats() {
// test table with stats
Expand Down
51 changes: 51 additions & 0 deletions crates/deltalake-core/src/table/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,14 @@ impl<'a> TableConfig<'a> {
.and_then(|o| o.as_ref().and_then(|v| v.parse().ok()))
.unwrap_or_default()
}

/// Return the column mapping mode according to delta.columnMapping.mode
pub fn column_mapping_mode(&self) -> ColumnMappingMode {
self.0
.get(DeltaConfigKey::ColumnMappingMode.as_ref())
.and_then(|o| o.as_ref().and_then(|v| v.parse().ok()))
.unwrap_or_default()
}
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
Expand Down Expand Up @@ -394,6 +402,49 @@ impl FromStr for CheckpointPolicy {
}
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
/// The Column Mapping modes used for reading and writing data
#[serde(rename_all = "camelCase")]
pub enum ColumnMappingMode {
/// No column mapping is applied
None,
/// Columns are mapped by their field_id in parquet
Id,
/// Columns are mapped to a physical name
Name,
}

impl Default for ColumnMappingMode {
fn default() -> Self {
Self::None
}
}

impl AsRef<str> for ColumnMappingMode {
fn as_ref(&self) -> &str {
match self {
Self::None => "none",
Self::Id => "id",
Self::Name => "name",
}
}
}

impl FromStr for ColumnMappingMode {
type Err = DeltaTableError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_ascii_lowercase().as_str() {
"none" => Ok(Self::None),
"id" => Ok(Self::Id),
"name" => Ok(Self::Name),
_ => Err(DeltaTableError::Generic(
"Invalid string for ColumnMappingMode".into(),
)),
}
}
}

const SECONDS_PER_MINUTE: u64 = 60;
const SECONDS_PER_HOUR: u64 = 60 * SECONDS_PER_MINUTE;
const SECONDS_PER_DAY: u64 = 24 * SECONDS_PER_HOUR;
Expand Down
38 changes: 35 additions & 3 deletions crates/deltalake-core/src/table/state_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use arrow_array::{
use arrow_schema::{DataType, Field, Fields, TimeUnit};
use itertools::Itertools;

use super::config::ColumnMappingMode;
use super::state::DeltaTableState;
use crate::errors::DeltaTableError;
use crate::kernel::{DataType as DeltaDataType, StructType};
Expand Down Expand Up @@ -145,7 +146,7 @@ impl DeltaTableState {
flatten: bool,
) -> Result<arrow::record_batch::RecordBatch, DeltaTableError> {
let metadata = self.current_metadata().ok_or(DeltaTableError::NoMetadata)?;

let column_mapping_mode = self.table_config().column_mapping_mode();
let partition_column_types: Vec<arrow::datatypes::DataType> = metadata
.partition_columns
.iter()
Expand All @@ -167,13 +168,44 @@ impl DeltaTableState {
})
.collect::<HashMap<&str, _>>();

let physical_name_to_logical_name = match column_mapping_mode {
ColumnMappingMode::None => HashMap::with_capacity(0), // No column mapping, no need for this HashMap
ColumnMappingMode::Id | ColumnMappingMode::Name => metadata
.partition_columns
.iter()
.map(|name| -> Result<_, DeltaTableError> {
let physical_name = metadata
.schema
.field_with_name(name)
.or(Err(DeltaTableError::MetadataError(format!(
"Invalid partition column {0}",
name
))))?
.physical_name()
.map_err(|e| DeltaTableError::Kernel { source: e })?;
Ok((physical_name, name.as_str()))
})
.collect::<Result<HashMap<&str, &str>, DeltaTableError>>()?,
};
// Append values
for action in self.files() {
for (name, maybe_value) in action.partition_values.iter() {
let logical_name = match column_mapping_mode {
ColumnMappingMode::None => name.as_str(),
ColumnMappingMode::Id | ColumnMappingMode::Name => {
physical_name_to_logical_name.get(name.as_str()).ok_or(
DeltaTableError::MetadataError(format!(
"Invalid partition column {0}",
name
)),
)?
}
};
if let Some(value) = maybe_value {
builders.get_mut(name.as_str()).unwrap().append_value(value);
builders.get_mut(logical_name).unwrap().append_value(value);
// Unwrap is safe here since the name exists in the mapping where we check validity already
} else {
builders.get_mut(name.as_str()).unwrap().append_null();
builders.get_mut(logical_name).unwrap().append_null();
}
}
}
Expand Down
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"txnId":"0e8eece8-347f-4c77-bc4f-daf3a5985dc9","tableSizeBytes":1700,"numFiles":2,"numMetadata":1,"numProtocol":1,"setTransactions":[],"domainMetadata":[],"metadata":{"id":"592de637-dd77-4aaa-af00-97d723a7f1f1","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"Company Very Short\",\"type\":\"string\",\"nullable\":true,\"metadata\":{\"delta.columnMapping.id\":1,\"delta.columnMapping.physicalName\":\"col-173b4db9-b5ad-427f-9e75-516aae37fbbb\"}},{\"name\":\"Super Name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{\"delta.columnMapping.id\":2,\"delta.columnMapping.physicalName\":\"col-3877fd94-0973-4941-ac6b-646849a1ff65\"}}]}","partitionColumns":["Company Very Short"],"configuration":{"delta.columnMapping.mode":"name","delta.autoOptimize.optimizeWrite":"true","delta.columnMapping.maxColumnId":"2","delta.targetFileSize":"33554432","delta.tuneFileSizesForRewrites":"true"},"createdTime":1699946083038},"protocol":{"minReaderVersion":2,"minWriterVersion":5},"histogramOpt":{"sortedBinBoundaries":[0,8192,16384,32768,65536,131072,262144,524288,1048576,2097152,4194304,8388608,12582912,16777216,20971520,25165824,29360128,33554432,37748736,41943040,50331648,58720256,67108864,75497472,83886080,92274688,100663296,109051904,117440512,125829120,130023424,134217728,138412032,142606336,146800640,150994944,167772160,184549376,201326592,218103808,234881024,251658240,268435456,285212672,301989888,318767104,335544320,352321536,369098752,385875968,402653184,419430400,436207616,452984832,469762048,486539264,503316480,520093696,536870912,553648128,570425344,587202560,603979776,671088640,738197504,805306368,872415232,939524096,1006632960,1073741824,1140850688,1207959552,1275068416,1342177280,1409286144,1476395008,1610612736,1744830464,1879048192,2013265920,2147483648,2415919104,2684354560,2952790016,3221225472,3489660928,3758096384,4026531840,4294967296,8589934592,17179869184,34359738368,68719476736,137438953472,274877906944],"fileCounts":[2,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"totalBytes":[1700,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]},"allFiles":[{"path":"8v/part-00001-69b4a452-aeac-4ffa-bf5c-a0c2833d05eb.c000.zstd.parquet","partitionValues":{"col-173b4db9-b5ad-427f-9e75-516aae37fbbb":"BME"},"size":810,"modificationTime":1699946088000,"dataChange":false,"stats":"{\"numRecords\":1,\"minValues\":{\"col-3877fd94-0973-4941-ac6b-646849a1ff65\":\"Timothy Lamb\"},\"maxValues\":{\"col-3877fd94-0973-4941-ac6b-646849a1ff65\":\"Timothy Lamb\"},\"nullCount\":{\"col-3877fd94-0973-4941-ac6b-646849a1ff65\":0}}","tags":{"INSERTION_TIME":"1699946088000001","MIN_INSERTION_TIME":"1699946088000001","MAX_INSERTION_TIME":"1699946088000001","OPTIMIZE_TARGET_SIZE":"33554432"}},{"path":"BH/part-00000-4d6e745c-8e04-48d9-aa60-438228358f1a.c000.zstd.parquet","partitionValues":{"col-173b4db9-b5ad-427f-9e75-516aae37fbbb":"BMS"},"size":890,"modificationTime":1699946088000,"dataChange":false,"stats":"{\"numRecords\":4,\"minValues\":{\"col-3877fd94-0973-4941-ac6b-646849a1ff65\":\"Anthony Johnson\"},\"maxValues\":{\"col-3877fd94-0973-4941-ac6b-646849a1ff65\":\"Stephanie Mcgrath\"},\"nullCount\":{\"col-3877fd94-0973-4941-ac6b-646849a1ff65\":0}}","tags":{"INSERTION_TIME":"1699946088000000","MIN_INSERTION_TIME":"1699946088000000","MAX_INSERTION_TIME":"1699946088000000","OPTIMIZE_TARGET_SIZE":"33554432"}}]}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{"commitInfo":{"timestamp":1699946089972,"userId":"2797914831036774","userName":"[email protected]","operation":"WRITE","operationParameters":{"mode":"Overwrite","statsOnLoad":false,"partitionBy":"[\"Company Very Short\"]"},"notebook":{"notebookId":"3271485675102593"},"clusterId":"0428-070410-lm8e9giw","isolationLevel":"WriteSerializable","isBlindAppend":false,"operationMetrics":{"numFiles":"2","numOutputRows":"5","numOutputBytes":"1700"},"tags":{"restoresDeletedRows":"false"},"engineInfo":"Databricks-Runtime/13.3.x-photon-scala2.12","txnId":"0e8eece8-347f-4c77-bc4f-daf3a5985dc9"}}
{"metaData":{"id":"592de637-dd77-4aaa-af00-97d723a7f1f1","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"Company Very Short\",\"type\":\"string\",\"nullable\":true,\"metadata\":{\"delta.columnMapping.id\":1,\"delta.columnMapping.physicalName\":\"col-173b4db9-b5ad-427f-9e75-516aae37fbbb\"}},{\"name\":\"Super Name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{\"delta.columnMapping.id\":2,\"delta.columnMapping.physicalName\":\"col-3877fd94-0973-4941-ac6b-646849a1ff65\"}}]}","partitionColumns":["Company Very Short"],"configuration":{"delta.columnMapping.mode":"name","delta.autoOptimize.optimizeWrite":"true","delta.columnMapping.maxColumnId":"2","delta.targetFileSize":"33554432","delta.tuneFileSizesForRewrites":"true"},"createdTime":1699946083038}}
{"protocol":{"minReaderVersion":2,"minWriterVersion":5}}
{"add":{"path":"BH/part-00000-4d6e745c-8e04-48d9-aa60-438228358f1a.c000.zstd.parquet","partitionValues":{"col-173b4db9-b5ad-427f-9e75-516aae37fbbb":"BMS"},"size":890,"modificationTime":1699946088000,"dataChange":true,"stats":"{\"numRecords\":4,\"minValues\":{\"col-3877fd94-0973-4941-ac6b-646849a1ff65\":\"Anthony Johnson\"},\"maxValues\":{\"col-3877fd94-0973-4941-ac6b-646849a1ff65\":\"Stephanie Mcgrath\"},\"nullCount\":{\"col-3877fd94-0973-4941-ac6b-646849a1ff65\":0}}","tags":{"INSERTION_TIME":"1699946088000000","MIN_INSERTION_TIME":"1699946088000000","MAX_INSERTION_TIME":"1699946088000000","OPTIMIZE_TARGET_SIZE":"33554432"}}}
{"add":{"path":"8v/part-00001-69b4a452-aeac-4ffa-bf5c-a0c2833d05eb.c000.zstd.parquet","partitionValues":{"col-173b4db9-b5ad-427f-9e75-516aae37fbbb":"BME"},"size":810,"modificationTime":1699946088000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"col-3877fd94-0973-4941-ac6b-646849a1ff65\":\"Timothy Lamb\"},\"maxValues\":{\"col-3877fd94-0973-4941-ac6b-646849a1ff65\":\"Timothy Lamb\"},\"nullCount\":{\"col-3877fd94-0973-4941-ac6b-646849a1ff65\":0}}","tags":{"INSERTION_TIME":"1699946088000001","MIN_INSERTION_TIME":"1699946088000001","MAX_INSERTION_TIME":"1699946088000001","OPTIMIZE_TARGET_SIZE":"33554432"}}}

0 comments on commit ba043d6

Please sign in to comment.