From 00510fb465003773d5766d8dbe6eeb614292f803 Mon Sep 17 00:00:00 2001 From: universalmind303 Date: Tue, 5 Mar 2024 13:54:42 -0600 Subject: [PATCH 1/4] bump datafusion to 36 --- Cargo.toml | 13 ++++---- crates/core/Cargo.toml | 4 ++- .../core/src/data_catalog/unity/datafusion.rs | 4 +-- crates/core/src/delta_datafusion/expr.rs | 30 ++++++++++--------- crates/core/src/kernel/snapshot/log_data.rs | 7 ++++- crates/sql/src/parser.rs | 2 +- 6 files changed, 35 insertions(+), 25 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 348cfd5310..07eb64a674 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,12 +46,13 @@ object_store = { version = "=0.9.0" } parquet = { version = "50" } # datafusion -datafusion = { version = "35" } -datafusion-expr = { version = "35" } -datafusion-common = { version = "35" } -datafusion-proto = { version = "35" } -datafusion-sql = { version = "35" } -datafusion-physical-expr = { version = "35" } +datafusion = { version = "36" } +datafusion-expr = { version = "36" } +datafusion-common = { version = "36" } +datafusion-proto = { version = "36" } +datafusion-sql = { version = "36" } +datafusion-physical-expr = { version = "36" } +datafusion-functions = { version = "36" } # serde serde = { version = "1.0.194", features = ["derive"] } diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 8a2f2d6817..bd3229eb83 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -40,6 +40,7 @@ datafusion-common = { workspace = true, optional = true } datafusion-proto = { workspace = true, optional = true } datafusion-sql = { workspace = true, optional = true } datafusion-physical-expr = { workspace = true, optional = true } +datafusion-functions = { workspace = true, optional = true } # serde serde = { workspace = true, features = ["derive"] } @@ -95,7 +96,7 @@ reqwest = { version = "0.11.18", default-features = false, features = [ "rustls-tls", "json", ], optional = true } -sqlparser = { version = "0.41", optional = true } +sqlparser = { version = "0.44", optional = true } [dev-dependencies] criterion = "0.5" @@ -121,6 +122,7 @@ datafusion = [ "datafusion-proto", "datafusion-physical-expr", "datafusion-sql", + "datafusion-functions", "sqlparser", ] datafusion-ext = ["datafusion"] diff --git a/crates/core/src/data_catalog/unity/datafusion.rs b/crates/core/src/data_catalog/unity/datafusion.rs index 21246c865a..0ed539e708 100644 --- a/crates/core/src/data_catalog/unity/datafusion.rs +++ b/crates/core/src/data_catalog/unity/datafusion.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use dashmap::DashMap; use datafusion::catalog::schema::SchemaProvider; -use datafusion::catalog::{CatalogList, CatalogProvider}; +use datafusion::catalog::{CatalogProvider, CatalogProviderList}; use datafusion::datasource::TableProvider; use tracing::error; @@ -49,7 +49,7 @@ impl UnityCatalogList { } } -impl CatalogList for UnityCatalogList { +impl CatalogProviderList for UnityCatalogList { fn as_any(&self) -> &dyn Any { self } diff --git a/crates/core/src/delta_datafusion/expr.rs b/crates/core/src/delta_datafusion/expr.rs index 03849f4df9..4fe30fdad5 100644 --- a/crates/core/src/delta_datafusion/expr.rs +++ b/crates/core/src/delta_datafusion/expr.rs @@ -34,10 +34,10 @@ use datafusion_expr::{ expr::InList, AggregateUDF, Between, BinaryExpr, Cast, Expr, GetIndexedField, Like, TableSource, }; use datafusion_sql::planner::{ContextProvider, SqlToRel}; -use sqlparser::ast::escape_quoted_string; -use sqlparser::dialect::GenericDialect; -use sqlparser::parser::Parser; -use sqlparser::tokenizer::Tokenizer; +use datafusion_sql::sqlparser::ast::escape_quoted_string; +use datafusion_sql::sqlparser::dialect::GenericDialect; +use datafusion_sql::sqlparser::parser::Parser; +use datafusion_sql::sqlparser::tokenizer::Tokenizer; use crate::{DeltaResult, DeltaTableError}; @@ -100,7 +100,7 @@ pub(crate) fn parse_predicate_expression( let context_provider = DeltaContextProvider { state: df_state }; let sql_to_rel = SqlToRel::new_with_options(&context_provider, DeltaParserOptions::default().into()); - + Ok(sql_to_rel.sql_to_expr(sql, schema, &mut Default::default())?) } @@ -275,13 +275,14 @@ impl<'a> Display for SqlFormat<'a> { datafusion_expr::GetFieldAccess::ListIndex { key } => { write!(f, "{}[{}]", SqlFormat { expr }, SqlFormat { expr: key }) } - datafusion_expr::GetFieldAccess::ListRange { start, stop } => { + datafusion_expr::GetFieldAccess::ListRange { start, stop, stride } => { write!( f, - "{}[{}:{}]", - SqlFormat { expr }, - SqlFormat { expr: start }, - SqlFormat { expr: stop } + "{expr}[{start}:{stop}:{stride}]", + expr = SqlFormat { expr }, + start = SqlFormat { expr: start }, + stop = SqlFormat { expr: stop }, + stride = SqlFormat { expr: stride } ) } }, @@ -367,8 +368,9 @@ impl<'a> fmt::Display for ScalarValueFormat<'a> { mod test { use arrow_schema::DataType as ArrowDataType; use datafusion::prelude::SessionContext; - use datafusion_common::{Column, DFSchema, ScalarValue}; - use datafusion_expr::{cardinality, col, decode, lit, substring, Cast, Expr, ExprSchemable}; + use datafusion_common::{Column, ScalarValue, ToDFSchema}; + use datafusion_functions::encoding::expr_fn::decode; + use datafusion_expr::{cardinality, col, lit, substring, Cast, Expr, ExprSchemable}; use crate::delta_datafusion::DeltaSessionContext; use crate::kernel::{ArrayType, DataType, PrimitiveType, StructField, StructType}; @@ -572,7 +574,7 @@ mod test { ), simple!( col("value") - .cast_to::( + .cast_to( &arrow_schema::DataType::Utf8, &table .snapshot() @@ -581,7 +583,7 @@ mod test { .unwrap() .as_ref() .to_owned() - .try_into() + .to_dfschema() .unwrap() ) .unwrap() diff --git a/crates/core/src/kernel/snapshot/log_data.rs b/crates/core/src/kernel/snapshot/log_data.rs index 686a4110fe..2e53a24315 100644 --- a/crates/core/src/kernel/snapshot/log_data.rs +++ b/crates/core/src/kernel/snapshot/log_data.rs @@ -549,7 +549,12 @@ mod datafusion { _ => None, }) .collect::>>() - .map(|o| Precision::Exact(ScalarValue::Struct(Some(o), fields.clone()))) + .map(|o| { + let arrays = o.into_iter().map(|sv| sv.to_array()).collect::, datafusion_common::DataFusionError>>().unwrap(); + let sa = StructArray::new(fields.clone(), arrays, None); + Precision::Exact(ScalarValue::Struct(Arc::new(sa))) + + }) .unwrap_or(Precision::Absent); } _ => Precision::Absent, diff --git a/crates/sql/src/parser.rs b/crates/sql/src/parser.rs index 10e7252730..cf044b659d 100644 --- a/crates/sql/src/parser.rs +++ b/crates/sql/src/parser.rs @@ -164,7 +164,7 @@ impl<'a> DeltaParser<'a> { } pub fn parse_vacuum(&mut self) -> Result { - let table_name = self.parser.parse_object_name()?; + let table_name = self.parser.parse_object_name(false)?; match self.parser.peek_token().token { Token::Word(w) => match w.keyword { Keyword::RETAIN => { From aff2e34456a05b0c6a66113ed84608fe3ccc84d0 Mon Sep 17 00:00:00 2001 From: universalmind303 Date: Tue, 5 Mar 2024 13:54:42 -0600 Subject: [PATCH 2/4] bump datafusion to 36 --- Cargo.toml | 13 ++++---- crates/core/Cargo.toml | 4 ++- .../core/src/data_catalog/unity/datafusion.rs | 4 +-- crates/core/src/delta_datafusion/expr.rs | 30 ++++++++++--------- crates/core/src/kernel/snapshot/log_data.rs | 7 ++++- crates/sql/src/parser.rs | 2 +- 6 files changed, 35 insertions(+), 25 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5f7080ec7c..92f9a8a36d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,12 +46,13 @@ object_store = { version = "0.9" } parquet = { version = "50" } # datafusion -datafusion = { version = "35" } -datafusion-expr = { version = "35" } -datafusion-common = { version = "35" } -datafusion-proto = { version = "35" } -datafusion-sql = { version = "35" } -datafusion-physical-expr = { version = "35" } +datafusion = { version = "36" } +datafusion-expr = { version = "36" } +datafusion-common = { version = "36" } +datafusion-proto = { version = "36" } +datafusion-sql = { version = "36" } +datafusion-physical-expr = { version = "36" } +datafusion-functions = { version = "36" } # serde serde = { version = "1.0.194", features = ["derive"] } diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 04b4a52275..7b5cd2adbc 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -40,6 +40,7 @@ datafusion-common = { workspace = true, optional = true } datafusion-proto = { workspace = true, optional = true } datafusion-sql = { workspace = true, optional = true } datafusion-physical-expr = { workspace = true, optional = true } +datafusion-functions = { workspace = true, optional = true } # serde serde = { workspace = true, features = ["derive"] } @@ -95,7 +96,7 @@ reqwest = { version = "0.11.18", default-features = false, features = [ "rustls-tls", "json", ], optional = true } -sqlparser = { version = "0.41", optional = true } +sqlparser = { version = "0.44", optional = true } [dev-dependencies] criterion = "0.5" @@ -121,6 +122,7 @@ datafusion = [ "datafusion-proto", "datafusion-physical-expr", "datafusion-sql", + "datafusion-functions", "sqlparser", ] datafusion-ext = ["datafusion"] diff --git a/crates/core/src/data_catalog/unity/datafusion.rs b/crates/core/src/data_catalog/unity/datafusion.rs index 21246c865a..0ed539e708 100644 --- a/crates/core/src/data_catalog/unity/datafusion.rs +++ b/crates/core/src/data_catalog/unity/datafusion.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use dashmap::DashMap; use datafusion::catalog::schema::SchemaProvider; -use datafusion::catalog::{CatalogList, CatalogProvider}; +use datafusion::catalog::{CatalogProvider, CatalogProviderList}; use datafusion::datasource::TableProvider; use tracing::error; @@ -49,7 +49,7 @@ impl UnityCatalogList { } } -impl CatalogList for UnityCatalogList { +impl CatalogProviderList for UnityCatalogList { fn as_any(&self) -> &dyn Any { self } diff --git a/crates/core/src/delta_datafusion/expr.rs b/crates/core/src/delta_datafusion/expr.rs index dfe234ad46..f9c31973eb 100644 --- a/crates/core/src/delta_datafusion/expr.rs +++ b/crates/core/src/delta_datafusion/expr.rs @@ -34,10 +34,10 @@ use datafusion_expr::{ expr::InList, AggregateUDF, Between, BinaryExpr, Cast, Expr, GetIndexedField, Like, TableSource, }; use datafusion_sql::planner::{ContextProvider, SqlToRel}; -use sqlparser::ast::escape_quoted_string; -use sqlparser::dialect::GenericDialect; -use sqlparser::parser::Parser; -use sqlparser::tokenizer::Tokenizer; +use datafusion_sql::sqlparser::ast::escape_quoted_string; +use datafusion_sql::sqlparser::dialect::GenericDialect; +use datafusion_sql::sqlparser::parser::Parser; +use datafusion_sql::sqlparser::tokenizer::Tokenizer; use crate::{DeltaResult, DeltaTableError}; @@ -100,7 +100,7 @@ pub(crate) fn parse_predicate_expression( let context_provider = DeltaContextProvider { state: df_state }; let sql_to_rel = SqlToRel::new_with_options(&context_provider, DeltaParserOptions::default().into()); - + Ok(sql_to_rel.sql_to_expr(sql, schema, &mut Default::default())?) } @@ -275,13 +275,14 @@ impl<'a> Display for SqlFormat<'a> { datafusion_expr::GetFieldAccess::ListIndex { key } => { write!(f, "{}[{}]", SqlFormat { expr }, SqlFormat { expr: key }) } - datafusion_expr::GetFieldAccess::ListRange { start, stop } => { + datafusion_expr::GetFieldAccess::ListRange { start, stop, stride } => { write!( f, - "{}[{}:{}]", - SqlFormat { expr }, - SqlFormat { expr: start }, - SqlFormat { expr: stop } + "{expr}[{start}:{stop}:{stride}]", + expr = SqlFormat { expr }, + start = SqlFormat { expr: start }, + stop = SqlFormat { expr: stop }, + stride = SqlFormat { expr: stride } ) } }, @@ -367,8 +368,9 @@ impl<'a> fmt::Display for ScalarValueFormat<'a> { mod test { use arrow_schema::DataType as ArrowDataType; use datafusion::prelude::SessionContext; - use datafusion_common::{Column, DFSchema, ScalarValue}; - use datafusion_expr::{cardinality, col, decode, lit, substring, Cast, Expr, ExprSchemable}; + use datafusion_common::{Column, ScalarValue, ToDFSchema}; + use datafusion_functions::encoding::expr_fn::decode; + use datafusion_expr::{cardinality, col, lit, substring, Cast, Expr, ExprSchemable}; use crate::delta_datafusion::{DataFusionMixins, DeltaSessionContext}; use crate::kernel::{ArrayType, DataType, PrimitiveType, StructField, StructType}; @@ -572,7 +574,7 @@ mod test { ), simple!( col("value") - .cast_to::( + .cast_to( &arrow_schema::DataType::Utf8, &table .snapshot() @@ -581,7 +583,7 @@ mod test { .unwrap() .as_ref() .to_owned() - .try_into() + .to_dfschema() .unwrap() ) .unwrap() diff --git a/crates/core/src/kernel/snapshot/log_data.rs b/crates/core/src/kernel/snapshot/log_data.rs index 972aeb6f9a..1152a92936 100644 --- a/crates/core/src/kernel/snapshot/log_data.rs +++ b/crates/core/src/kernel/snapshot/log_data.rs @@ -549,7 +549,12 @@ mod datafusion { _ => None, }) .collect::>>() - .map(|o| Precision::Exact(ScalarValue::Struct(Some(o), fields.clone()))) + .map(|o| { + let arrays = o.into_iter().map(|sv| sv.to_array()).collect::, datafusion_common::DataFusionError>>().unwrap(); + let sa = StructArray::new(fields.clone(), arrays, None); + Precision::Exact(ScalarValue::Struct(Arc::new(sa))) + + }) .unwrap_or(Precision::Absent); } _ => Precision::Absent, diff --git a/crates/sql/src/parser.rs b/crates/sql/src/parser.rs index 10e7252730..cf044b659d 100644 --- a/crates/sql/src/parser.rs +++ b/crates/sql/src/parser.rs @@ -164,7 +164,7 @@ impl<'a> DeltaParser<'a> { } pub fn parse_vacuum(&mut self) -> Result { - let table_name = self.parser.parse_object_name()?; + let table_name = self.parser.parse_object_name(false)?; match self.parser.peek_token().token { Token::Word(w) => match w.keyword { Keyword::RETAIN => { From 30d49ba7d69228b4dccfc58906daafda85301774 Mon Sep 17 00:00:00 2001 From: universalmind303 Date: Thu, 28 Mar 2024 11:25:02 -0500 Subject: [PATCH 3/4] fmt --- crates/core/src/delta_datafusion/expr.rs | 10 +++++++--- crates/core/src/kernel/snapshot/log_data.rs | 7 +++++-- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/crates/core/src/delta_datafusion/expr.rs b/crates/core/src/delta_datafusion/expr.rs index f9c31973eb..a43fe88bc4 100644 --- a/crates/core/src/delta_datafusion/expr.rs +++ b/crates/core/src/delta_datafusion/expr.rs @@ -100,7 +100,7 @@ pub(crate) fn parse_predicate_expression( let context_provider = DeltaContextProvider { state: df_state }; let sql_to_rel = SqlToRel::new_with_options(&context_provider, DeltaParserOptions::default().into()); - + Ok(sql_to_rel.sql_to_expr(sql, schema, &mut Default::default())?) } @@ -275,7 +275,11 @@ impl<'a> Display for SqlFormat<'a> { datafusion_expr::GetFieldAccess::ListIndex { key } => { write!(f, "{}[{}]", SqlFormat { expr }, SqlFormat { expr: key }) } - datafusion_expr::GetFieldAccess::ListRange { start, stop, stride } => { + datafusion_expr::GetFieldAccess::ListRange { + start, + stop, + stride, + } => { write!( f, "{expr}[{start}:{stop}:{stride}]", @@ -369,8 +373,8 @@ mod test { use arrow_schema::DataType as ArrowDataType; use datafusion::prelude::SessionContext; use datafusion_common::{Column, ScalarValue, ToDFSchema}; - use datafusion_functions::encoding::expr_fn::decode; use datafusion_expr::{cardinality, col, lit, substring, Cast, Expr, ExprSchemable}; + use datafusion_functions::encoding::expr_fn::decode; use crate::delta_datafusion::{DataFusionMixins, DeltaSessionContext}; use crate::kernel::{ArrayType, DataType, PrimitiveType, StructField, StructType}; diff --git a/crates/core/src/kernel/snapshot/log_data.rs b/crates/core/src/kernel/snapshot/log_data.rs index 1152a92936..b2c06210e9 100644 --- a/crates/core/src/kernel/snapshot/log_data.rs +++ b/crates/core/src/kernel/snapshot/log_data.rs @@ -550,10 +550,13 @@ mod datafusion { }) .collect::>>() .map(|o| { - let arrays = o.into_iter().map(|sv| sv.to_array()).collect::, datafusion_common::DataFusionError>>().unwrap(); + let arrays = o + .into_iter() + .map(|sv| sv.to_array()) + .collect::, datafusion_common::DataFusionError>>() + .unwrap(); let sa = StructArray::new(fields.clone(), arrays, None); Precision::Exact(ScalarValue::Struct(Arc::new(sa))) - }) .unwrap_or(Precision::Absent); } From 87ffda8857926edda122100f577255d918933e16 Mon Sep 17 00:00:00 2001 From: universalmind303 Date: Mon, 1 Apr 2024 10:09:36 -0500 Subject: [PATCH 4/4] update executionplan children methods --- crates/core/src/delta_datafusion/expr.rs | 2 +- .../src/delta_datafusion/find_files/physical.rs | 8 +++++++- crates/core/src/delta_datafusion/mod.rs | 13 ++++++++++++- crates/core/src/operations/merge/barrier.rs | 5 +++++ 4 files changed, 25 insertions(+), 3 deletions(-) diff --git a/crates/core/src/delta_datafusion/expr.rs b/crates/core/src/delta_datafusion/expr.rs index a43fe88bc4..4317f7f214 100644 --- a/crates/core/src/delta_datafusion/expr.rs +++ b/crates/core/src/delta_datafusion/expr.rs @@ -608,7 +608,7 @@ mod test { ), simple!( cardinality(col("_list").range(col("value"), lit(10_i64))), - "cardinality(_list[value:10])".to_string() + "cardinality(_list[value:10:1])".to_string() ), ]; diff --git a/crates/core/src/delta_datafusion/find_files/physical.rs b/crates/core/src/delta_datafusion/find_files/physical.rs index 9b9238dd86..56c7ca9989 100644 --- a/crates/core/src/delta_datafusion/find_files/physical.rs +++ b/crates/core/src/delta_datafusion/find_files/physical.rs @@ -99,8 +99,14 @@ impl ExecutionPlan for FindFilesExec { fn with_new_children( self: Arc, - _children: Vec>, + children: Vec>, ) -> Result> { + if !children.is_empty() { + return Err(datafusion::error::DataFusionError::Plan( + "Children cannot be replaced in FindFilesExec".to_string(), + )); + } + Ok(self) } diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index 057dcaca87..9a6cc7fa3b 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -804,7 +804,18 @@ impl ExecutionPlan for DeltaScan { self: Arc, children: Vec>, ) -> DataFusionResult> { - ExecutionPlan::with_new_children(self.parquet_scan.clone(), children) + if children.len() != 1 { + return Err(DataFusionError::Plan(format!( + "DeltaScan wrong number of children {}", + children.len() + ))); + } + Ok(Arc::new(DeltaScan { + table_uri: self.table_uri.clone(), + config: self.config.clone(), + parquet_scan: children[0].clone(), + logical_schema: self.logical_schema.clone(), + })) } fn execute( diff --git a/crates/core/src/operations/merge/barrier.rs b/crates/core/src/operations/merge/barrier.rs index f1df28c4a4..8cc0c2d804 100644 --- a/crates/core/src/operations/merge/barrier.rs +++ b/crates/core/src/operations/merge/barrier.rs @@ -95,6 +95,11 @@ impl ExecutionPlan for MergeBarrierExec { self: std::sync::Arc, children: Vec>, ) -> datafusion_common::Result> { + if children.len() != 1 { + return Err(DataFusionError::Plan( + "MergeBarrierExec wrong number of children".to_string(), + )); + } Ok(Arc::new(MergeBarrierExec::new( children[0].clone(), self.file_column.clone(),