Skip to content

Commit

Permalink
chore: upgrade datafusion 33 (#1775)
Browse files Browse the repository at this point in the history
# Description

I've had to live on the bleeding edge of datafusion since I've
discovered multiple bugs while implementing merge enhancements. Creating
this PR to contain changes necessary to use it.

Datafusion has made significant changes with how table statistics are
represented. I refactored and was able to trim a significant amount of
code.
There were some bugs with how we presented statistics for tables that do
not contain column metadata (deleta-0.2.0) where we stated the number of
records for a file is 0.

## Fixes

The null-ability status for partition columns are now accurately
captured by Datafusion. Before if a partition column contained a null
value an error would be returned. This should be resolved now.
  • Loading branch information
Blajda authored Nov 17, 2023
1 parent 44a3760 commit 4d103a7
Show file tree
Hide file tree
Showing 10 changed files with 348 additions and 443 deletions.
31 changes: 16 additions & 15 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,24 @@ debug = "line-tables-only"

[workspace.dependencies]
# arrow
arrow = { version = "47" }
arrow-array = { version = "47" }
arrow-buffer = { version = "47" }
arrow-cast = { version = "47" }
arrow-ord = { version = "47" }
arrow-row = { version = "47" }
arrow-schema = { version = "47" }
arrow-select = { version = "47" }
parquet = { version = "47" }
arrow = { version = "48.0.1" }
arrow-array = { version = "48.0.1" }
arrow-buffer = { version = "48.0.1" }
arrow-cast = { version = "48.0.1" }
arrow-ord = { version = "48.0.1" }
arrow-row = { version = "48.0.1" }
arrow-schema = { version = "48.0.1" }
arrow-select = { version = "48.0.1" }
parquet = { version = "48.0.1" }

# datafusion
datafusion = { version = "32" }
datafusion-expr = { version = "32" }
datafusion-common = { version = "32" }
datafusion-proto = { version = "32" }
datafusion-sql = { version = "32" }
datafusion-physical-expr = { version = "32" }
datafusion = { version = "33.0.0" }
datafusion-expr = { version = "33.0.0" }
datafusion-common = { version = "33.0.0" }
datafusion-proto = { version = "33.0.0" }
datafusion-sql = { version = "33.0.0" }
datafusion-physical-expr = { version = "33.0.0" }


# serde
serde = { version = "1", features = ["derive"] }
Expand Down
2 changes: 1 addition & 1 deletion crates/deltalake-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ reqwest = { version = "0.11.18", default-features = false, features = [
# Datafusion
dashmap = { version = "5", optional = true }

sqlparser = { version = "0.38", optional = true }
sqlparser = { version = "0.39", optional = true }

# NOTE dependencies only for integration tests
fs_extra = { version = "1.3.0", optional = true }
Expand Down
4 changes: 4 additions & 0 deletions crates/deltalake-core/src/delta_datafusion/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ impl<'a> ContextProvider for DeltaContextProvider<'a> {
fn get_window_meta(&self, name: &str) -> Option<Arc<datafusion_expr::WindowUDF>> {
self.state.window_functions().get(name).cloned()
}

fn get_table_source(&self, _name: TableReference) -> DFResult<Arc<dyn TableSource>> {
unimplemented!()
}
}

/// Parse a string predicate into an `Expr`
Expand Down
568 changes: 188 additions & 380 deletions crates/deltalake-core/src/delta_datafusion/mod.rs

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions crates/deltalake-core/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ mod datafusion_utils {
metrics::{ExecutionPlanMetricsSet, MetricsSet},
ExecutionPlan, RecordBatchStream, SendableRecordBatchStream,
};
use datafusion_common::DFSchema;
use datafusion_common::{DFSchema, Statistics};
use datafusion_expr::Expr;
use futures::{Stream, StreamExt};

Expand Down Expand Up @@ -334,7 +334,7 @@ mod datafusion_utils {
}))
}

fn statistics(&self) -> datafusion_common::Statistics {
fn statistics(&self) -> DataFusionResult<Statistics> {
self.parent.statistics()
}

Expand Down
4 changes: 2 additions & 2 deletions crates/deltalake-core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1185,10 +1185,10 @@ pub(super) mod zorder {
.ok_or(DataFusionError::NotImplemented(
"z-order on zero columns.".to_string(),
))?;
let columns = columns
let columns: Vec<ArrayRef> = columns
.iter()
.map(|col| col.clone().into_array(length))
.collect_vec();
.try_collect()?;
let array = zorder_key(&columns)?;
Ok(ColumnarValue::Array(array))
}
Expand Down
15 changes: 11 additions & 4 deletions crates/deltalake-core/src/operations/transaction/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,12 @@ impl<'a> AddContainer<'a> {
Some(v) => serde_json::Value::String(v.to_string()),
None => serde_json::Value::Null,
};
to_correct_scalar_value(&value, data_type).unwrap_or(
get_null_of_arrow_type(data_type).expect("Could not determine null type"),
)
to_correct_scalar_value(&value, data_type)
.ok()
.flatten()
.unwrap_or(
get_null_of_arrow_type(data_type).expect("Could not determine null type"),
)
} else if let Ok(Some(statistics)) = add.get_stats() {
let values = if get_max {
statistics.max_values
Expand All @@ -200,7 +203,11 @@ impl<'a> AddContainer<'a> {

values
.get(&column.name)
.and_then(|f| to_correct_scalar_value(f.as_value()?, data_type))
.and_then(|f| {
to_correct_scalar_value(f.as_value()?, data_type)
.ok()
.flatten()
})
.unwrap_or(
get_null_of_arrow_type(data_type).expect("Could not determine null type"),
)
Expand Down
156 changes: 118 additions & 38 deletions crates/deltalake-core/tests/integration_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use std::error::Error;
mod common;

mod local {
use datafusion::common::stats::Precision;
use deltalake_core::writer::JsonWriter;

use super::*;
Expand Down Expand Up @@ -281,67 +282,146 @@ mod local {

#[tokio::test]
async fn test_datafusion_stats() -> Result<()> {
// Validate a table that contains statisitics for all files
let table = open_table("./tests/data/delta-0.8.0").await.unwrap();
let statistics = table.state.datafusion_table_statistics();
let statistics = table.state.datafusion_table_statistics()?;

assert_eq!(statistics.num_rows, Some(4),);
assert_eq!(statistics.num_rows, Precision::Exact(4_usize),);

assert_eq!(statistics.total_byte_size, Some(440 + 440));
assert_eq!(
statistics.total_byte_size,
Precision::Exact((440 + 440) as usize)
);

let column_stats = statistics.column_statistics.get(0).unwrap();
assert_eq!(column_stats.null_count, Precision::Exact(0));
assert_eq!(
statistics
.column_statistics
.clone()
.unwrap()
.iter()
.map(|x| x.null_count)
.collect::<Vec<Option<usize>>>(),
vec![Some(0)],
column_stats.max_value,
Precision::Exact(ScalarValue::from(4_i32))
);
assert_eq!(
column_stats.min_value,
Precision::Exact(ScalarValue::from(0_i32))
);

let ctx = SessionContext::new();
ctx.register_table("test_table", Arc::new(table))?;

let batches = ctx
let actual = ctx
.sql("SELECT max(value), min(value) FROM test_table")
.await?
.collect()
.await?;

assert_eq!(batches.len(), 1);
let batch = &batches[0];
let expected = vec![
"+-----------------------+-----------------------+",
"| MAX(test_table.value) | MIN(test_table.value) |",
"+-----------------------+-----------------------+",
"| 4 | 0 |",
"+-----------------------+-----------------------+",
];
assert_batches_sorted_eq!(&expected, &actual);

// Validate a table that does not contain column statisitics
let table = open_table("./tests/data/delta-0.2.0").await.unwrap();
let statistics = table.state.datafusion_table_statistics()?;

assert_eq!(statistics.num_rows, Precision::Absent);

assert_eq!(
batch.column(0).as_ref(),
Arc::new(Int32Array::from(vec![4])).as_ref(),
statistics.total_byte_size,
Precision::Exact((400 + 404 + 396) as usize)
);
let column_stats = statistics.column_statistics.get(0).unwrap();
assert_eq!(column_stats.null_count, Precision::Absent);
assert_eq!(column_stats.max_value, Precision::Absent);
assert_eq!(column_stats.min_value, Precision::Absent);

ctx.register_table("test_table2", Arc::new(table))?;
let actual = ctx
.sql("SELECT max(value), min(value) FROM test_table2")
.await?
.collect()
.await?;

let expected = vec![
"+------------------------+------------------------+",
"| MAX(test_table2.value) | MIN(test_table2.value) |",
"+------------------------+------------------------+",
"| 3 | 1 |",
"+------------------------+------------------------+",
];
assert_batches_sorted_eq!(&expected, &actual);

// Validate a table that contains nested structures.

// This table is interesting since it goes through schema evolution.
// In particular 'new_column' contains statistics for when it
// is introduced (10) but the commit following (11) does not contain
// statistics for this column.
let table = open_table("./tests/data/delta-1.2.1-only-struct-stats")
.await
.unwrap();
let schema = table.get_schema().unwrap();
let statistics = table.state.datafusion_table_statistics()?;
assert_eq!(statistics.num_rows, Precision::Exact(12));

// `new_column` statistics
let stats = statistics
.column_statistics
.get(schema.index_of("new_column").unwrap())
.unwrap();
assert_eq!(stats.null_count, Precision::Absent);
assert_eq!(stats.min_value, Precision::Absent);
assert_eq!(stats.max_value, Precision::Absent);

// `date` statistics
let stats = statistics
.column_statistics
.get(schema.index_of("date").unwrap())
.unwrap();
assert_eq!(stats.null_count, Precision::Exact(0));
// 2022-10-24
assert_eq!(
batch.column(1).as_ref(),
Arc::new(Int32Array::from(vec![0])).as_ref(),
stats.min_value,
Precision::Exact(ScalarValue::Date32(Some(19289)))
);

assert_eq!(
statistics
.column_statistics
.clone()
.unwrap()
.iter()
.map(|x| x.max_value.as_ref())
.collect::<Vec<Option<&ScalarValue>>>(),
vec![Some(&ScalarValue::from(4_i32))],
stats.max_value,
Precision::Exact(ScalarValue::Date32(Some(19289)))
);

// `timestamp` statistics
let stats = statistics
.column_statistics
.get(schema.index_of("timestamp").unwrap())
.unwrap();
assert_eq!(stats.null_count, Precision::Exact(0));
// 2022-10-24T22:59:32.846Z
assert_eq!(
statistics
.column_statistics
.clone()
.unwrap()
.iter()
.map(|x| x.min_value.as_ref())
.collect::<Vec<Option<&ScalarValue>>>(),
vec![Some(&ScalarValue::from(0_i32))],
stats.min_value,
Precision::Exact(ScalarValue::TimestampMicrosecond(
Some(1666652372846000),
None
))
);
// 2022-10-24T22:59:46.083Z
assert_eq!(
stats.max_value,
Precision::Exact(ScalarValue::TimestampMicrosecond(
Some(1666652386083000),
None
))
);

// `struct_element` statistics
let stats = statistics
.column_statistics
.get(schema.index_of("nested_struct").unwrap())
.unwrap();
assert_eq!(stats.null_count, Precision::Absent);
assert_eq!(stats.min_value, Precision::Absent);
assert_eq!(stats.max_value, Precision::Absent);

Ok(())
}

Expand Down Expand Up @@ -782,14 +862,14 @@ mod local {

let expected_schema = ArrowSchema::new(vec![
ArrowField::new("c3", ArrowDataType::Int32, true),
ArrowField::new("c1", ArrowDataType::Int32, false),
ArrowField::new("c1", ArrowDataType::Int32, true),
ArrowField::new(
"c2",
ArrowDataType::Dictionary(
Box::new(ArrowDataType::UInt16),
Box::new(ArrowDataType::Utf8),
),
false,
true,
),
]);

Expand Down
5 changes: 5 additions & 0 deletions crates/deltalake-sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ mod tests {
use arrow_schema::{DataType, Field, Schema};
use datafusion_common::config::ConfigOptions;
use datafusion_common::DataFusionError;
use datafusion_common::Result as DataFusionResult;
use datafusion_expr::logical_plan::builder::LogicalTableSource;
use datafusion_expr::{AggregateUDF, ScalarUDF, TableSource};
use datafusion_sql::TableReference;
Expand Down Expand Up @@ -124,6 +125,10 @@ mod tests {

impl ContextProvider for TestSchemaProvider {
fn get_table_provider(&self, name: TableReference) -> DFResult<Arc<dyn TableSource>> {
self.get_table_source(name)
}

fn get_table_source(&self, name: TableReference) -> DFResult<Arc<dyn TableSource>> {
match self.tables.get(name.table()) {
Some(table) => Ok(table.clone()),
_ => Err(DataFusionError::Plan(format!(
Expand Down
2 changes: 1 addition & 1 deletion python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ tokio = { workspace = true, features = ["rt-multi-thread"] }
reqwest = { version = "*", features = ["native-tls-vendored"] }

[dependencies.pyo3]
version = "0.19"
version = "0.20"
features = ["extension-module", "abi3", "abi3-py38"]

[dependencies.deltalake]
Expand Down

0 comments on commit 4d103a7

Please sign in to comment.