Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: upgrade datafusion 33 #1775

Merged
merged 15 commits into from
Nov 17, 2023
Merged
31 changes: 16 additions & 15 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,24 @@ debug = "line-tables-only"

[workspace.dependencies]
# arrow
arrow = { version = "47" }
arrow-array = { version = "47" }
arrow-buffer = { version = "47" }
arrow-cast = { version = "47" }
arrow-ord = { version = "47" }
arrow-row = { version = "47" }
arrow-schema = { version = "47" }
arrow-select = { version = "47" }
parquet = { version = "47" }
arrow = { version = "48.0.1" }
arrow-array = { version = "48.0.1" }
arrow-buffer = { version = "48.0.1" }
arrow-cast = { version = "48.0.1" }
arrow-ord = { version = "48.0.1" }
arrow-row = { version = "48.0.1" }
arrow-schema = { version = "48.0.1" }
arrow-select = { version = "48.0.1" }
parquet = { version = "48.0.1" }

# datafusion
datafusion = { version = "32" }
datafusion-expr = { version = "32" }
datafusion-common = { version = "32" }
datafusion-proto = { version = "32" }
datafusion-sql = { version = "32" }
datafusion-physical-expr = { version = "32" }
datafusion = { version = "33.0.0" }
datafusion-expr = { version = "33.0.0" }
datafusion-common = { version = "33.0.0" }
datafusion-proto = { version = "33.0.0" }
datafusion-sql = { version = "33.0.0" }
datafusion-physical-expr = { version = "33.0.0" }


# serde
serde = { version = "1", features = ["derive"] }
Expand Down
2 changes: 1 addition & 1 deletion crates/deltalake-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ reqwest = { version = "0.11.18", default-features = false, features = [
# Datafusion
dashmap = { version = "5", optional = true }

sqlparser = { version = "0.38", optional = true }
sqlparser = { version = "0.39", optional = true }

# NOTE dependencies only for integration tests
fs_extra = { version = "1.3.0", optional = true }
Expand Down
4 changes: 4 additions & 0 deletions crates/deltalake-core/src/delta_datafusion/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ impl<'a> ContextProvider for DeltaContextProvider<'a> {
fn get_window_meta(&self, name: &str) -> Option<Arc<datafusion_expr::WindowUDF>> {
self.state.window_functions().get(name).cloned()
}

fn get_table_source(&self, _name: TableReference) -> DFResult<Arc<dyn TableSource>> {
unimplemented!()
}
}

/// Parse a string predicate into an `Expr`
Expand Down
568 changes: 188 additions & 380 deletions crates/deltalake-core/src/delta_datafusion/mod.rs

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions crates/deltalake-core/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ mod datafusion_utils {
metrics::{ExecutionPlanMetricsSet, MetricsSet},
ExecutionPlan, RecordBatchStream, SendableRecordBatchStream,
};
use datafusion_common::DFSchema;
use datafusion_common::{DFSchema, Statistics};
use datafusion_expr::Expr;
use futures::{Stream, StreamExt};

Expand Down Expand Up @@ -334,7 +334,7 @@ mod datafusion_utils {
}))
}

fn statistics(&self) -> datafusion_common::Statistics {
fn statistics(&self) -> DataFusionResult<Statistics> {
self.parent.statistics()
}

Expand Down
4 changes: 2 additions & 2 deletions crates/deltalake-core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1185,10 +1185,10 @@ pub(super) mod zorder {
.ok_or(DataFusionError::NotImplemented(
"z-order on zero columns.".to_string(),
))?;
let columns = columns
let columns: Vec<ArrayRef> = columns
.iter()
.map(|col| col.clone().into_array(length))
.collect_vec();
.try_collect()?;
let array = zorder_key(&columns)?;
Ok(ColumnarValue::Array(array))
}
Expand Down
15 changes: 11 additions & 4 deletions crates/deltalake-core/src/operations/transaction/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,12 @@ impl<'a> AddContainer<'a> {
Some(v) => serde_json::Value::String(v.to_string()),
None => serde_json::Value::Null,
};
to_correct_scalar_value(&value, data_type).unwrap_or(
get_null_of_arrow_type(data_type).expect("Could not determine null type"),
)
to_correct_scalar_value(&value, data_type)
.ok()
.flatten()
.unwrap_or(
get_null_of_arrow_type(data_type).expect("Could not determine null type"),
)
} else if let Ok(Some(statistics)) = add.get_stats() {
let values = if get_max {
statistics.max_values
Expand All @@ -200,7 +203,11 @@ impl<'a> AddContainer<'a> {

values
.get(&column.name)
.and_then(|f| to_correct_scalar_value(f.as_value()?, data_type))
.and_then(|f| {
to_correct_scalar_value(f.as_value()?, data_type)
.ok()
.flatten()
})
.unwrap_or(
get_null_of_arrow_type(data_type).expect("Could not determine null type"),
)
Expand Down
156 changes: 118 additions & 38 deletions crates/deltalake-core/tests/integration_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use std::error::Error;
mod common;

mod local {
use datafusion::common::stats::Precision;
use deltalake_core::writer::JsonWriter;

use super::*;
Expand Down Expand Up @@ -281,67 +282,146 @@ mod local {

#[tokio::test]
async fn test_datafusion_stats() -> Result<()> {
// Validate a table that contains statisitics for all files
let table = open_table("./tests/data/delta-0.8.0").await.unwrap();
let statistics = table.state.datafusion_table_statistics();
let statistics = table.state.datafusion_table_statistics()?;

assert_eq!(statistics.num_rows, Some(4),);
assert_eq!(statistics.num_rows, Precision::Exact(4_usize),);

assert_eq!(statistics.total_byte_size, Some(440 + 440));
assert_eq!(
statistics.total_byte_size,
Precision::Exact((440 + 440) as usize)
);

let column_stats = statistics.column_statistics.get(0).unwrap();
assert_eq!(column_stats.null_count, Precision::Exact(0));
assert_eq!(
statistics
.column_statistics
.clone()
.unwrap()
.iter()
.map(|x| x.null_count)
.collect::<Vec<Option<usize>>>(),
vec![Some(0)],
column_stats.max_value,
Precision::Exact(ScalarValue::from(4_i32))
);
assert_eq!(
column_stats.min_value,
Precision::Exact(ScalarValue::from(0_i32))
);

let ctx = SessionContext::new();
ctx.register_table("test_table", Arc::new(table))?;

let batches = ctx
let actual = ctx
.sql("SELECT max(value), min(value) FROM test_table")
.await?
.collect()
.await?;

assert_eq!(batches.len(), 1);
let batch = &batches[0];
let expected = vec![
"+-----------------------+-----------------------+",
"| MAX(test_table.value) | MIN(test_table.value) |",
"+-----------------------+-----------------------+",
"| 4 | 0 |",
"+-----------------------+-----------------------+",
];
assert_batches_sorted_eq!(&expected, &actual);

// Validate a table that does not contain column statisitics
let table = open_table("./tests/data/delta-0.2.0").await.unwrap();
let statistics = table.state.datafusion_table_statistics()?;

assert_eq!(statistics.num_rows, Precision::Absent);

assert_eq!(
batch.column(0).as_ref(),
Arc::new(Int32Array::from(vec![4])).as_ref(),
statistics.total_byte_size,
Precision::Exact((400 + 404 + 396) as usize)
);
let column_stats = statistics.column_statistics.get(0).unwrap();
assert_eq!(column_stats.null_count, Precision::Absent);
assert_eq!(column_stats.max_value, Precision::Absent);
assert_eq!(column_stats.min_value, Precision::Absent);

ctx.register_table("test_table2", Arc::new(table))?;
let actual = ctx
.sql("SELECT max(value), min(value) FROM test_table2")
.await?
.collect()
.await?;

let expected = vec![
"+------------------------+------------------------+",
"| MAX(test_table2.value) | MIN(test_table2.value) |",
"+------------------------+------------------------+",
"| 3 | 1 |",
"+------------------------+------------------------+",
];
assert_batches_sorted_eq!(&expected, &actual);

// Validate a table that contains nested structures.

// This table is interesting since it goes through schema evolution.
// In particular 'new_column' contains statistics for when it
// is introduced (10) but the commit following (11) does not contain
// statistics for this column.
let table = open_table("./tests/data/delta-1.2.1-only-struct-stats")
.await
.unwrap();
let schema = table.get_schema().unwrap();
let statistics = table.state.datafusion_table_statistics()?;
assert_eq!(statistics.num_rows, Precision::Exact(12));

// `new_column` statistics
let stats = statistics
.column_statistics
.get(schema.index_of("new_column").unwrap())
.unwrap();
assert_eq!(stats.null_count, Precision::Absent);
assert_eq!(stats.min_value, Precision::Absent);
assert_eq!(stats.max_value, Precision::Absent);

// `date` statistics
let stats = statistics
.column_statistics
.get(schema.index_of("date").unwrap())
.unwrap();
assert_eq!(stats.null_count, Precision::Exact(0));
// 2022-10-24
assert_eq!(
batch.column(1).as_ref(),
Arc::new(Int32Array::from(vec![0])).as_ref(),
stats.min_value,
Precision::Exact(ScalarValue::Date32(Some(19289)))
);

assert_eq!(
statistics
.column_statistics
.clone()
.unwrap()
.iter()
.map(|x| x.max_value.as_ref())
.collect::<Vec<Option<&ScalarValue>>>(),
vec![Some(&ScalarValue::from(4_i32))],
stats.max_value,
Precision::Exact(ScalarValue::Date32(Some(19289)))
);

// `timestamp` statistics
let stats = statistics
.column_statistics
.get(schema.index_of("timestamp").unwrap())
.unwrap();
assert_eq!(stats.null_count, Precision::Exact(0));
// 2022-10-24T22:59:32.846Z
assert_eq!(
statistics
.column_statistics
.clone()
.unwrap()
.iter()
.map(|x| x.min_value.as_ref())
.collect::<Vec<Option<&ScalarValue>>>(),
vec![Some(&ScalarValue::from(0_i32))],
stats.min_value,
Precision::Exact(ScalarValue::TimestampMicrosecond(
Some(1666652372846000),
None
))
);
// 2022-10-24T22:59:46.083Z
assert_eq!(
stats.max_value,
Precision::Exact(ScalarValue::TimestampMicrosecond(
Some(1666652386083000),
None
))
);

// `struct_element` statistics
let stats = statistics
.column_statistics
.get(schema.index_of("nested_struct").unwrap())
.unwrap();
assert_eq!(stats.null_count, Precision::Absent);
assert_eq!(stats.min_value, Precision::Absent);
assert_eq!(stats.max_value, Precision::Absent);

Ok(())
}

Expand Down Expand Up @@ -782,14 +862,14 @@ mod local {

let expected_schema = ArrowSchema::new(vec![
ArrowField::new("c3", ArrowDataType::Int32, true),
ArrowField::new("c1", ArrowDataType::Int32, false),
ArrowField::new("c1", ArrowDataType::Int32, true),
ArrowField::new(
"c2",
ArrowDataType::Dictionary(
Box::new(ArrowDataType::UInt16),
Box::new(ArrowDataType::Utf8),
),
false,
true,
),
]);

Expand Down
5 changes: 5 additions & 0 deletions crates/deltalake-sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ mod tests {
use arrow_schema::{DataType, Field, Schema};
use datafusion_common::config::ConfigOptions;
use datafusion_common::DataFusionError;
use datafusion_common::Result as DataFusionResult;
use datafusion_expr::logical_plan::builder::LogicalTableSource;
use datafusion_expr::{AggregateUDF, ScalarUDF, TableSource};
use datafusion_sql::TableReference;
Expand Down Expand Up @@ -124,6 +125,10 @@ mod tests {

impl ContextProvider for TestSchemaProvider {
fn get_table_provider(&self, name: TableReference) -> DFResult<Arc<dyn TableSource>> {
self.get_table_source(name)
}

fn get_table_source(&self, name: TableReference) -> DFResult<Arc<dyn TableSource>> {
match self.tables.get(name.table()) {
Some(table) => Ok(table.clone()),
_ => Err(DataFusionError::Plan(format!(
Expand Down
2 changes: 1 addition & 1 deletion python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ tokio = { workspace = true, features = ["rt-multi-thread"] }
reqwest = { version = "*", features = ["native-tls-vendored"] }

[dependencies.pyo3]
version = "0.19"
version = "0.20"
features = ["extension-module", "abi3", "abi3-py38"]

[dependencies.deltalake]
Expand Down
Loading