From 34d5fff9ab7d70b122f0c7e3ecb77bcfb47692eb Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Sun, 4 Aug 2024 13:56:15 +0200 Subject: [PATCH 1/3] feat: logical plan usage in delete --- crates/core/src/delta_datafusion/mod.rs | 49 ++++++++++----- crates/core/src/operations/delete.rs | 82 +++++++++++++++---------- 2 files changed, 83 insertions(+), 48 deletions(-) diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index 4c76ac156c..0ec6b8d61d 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -344,7 +344,10 @@ pub struct DeltaScanConfigBuilder { file_column_name: Option<String>, /// Whether to wrap partition values in a dictionary encoding to potentially save space wrap_partition_values: Option<bool>, + /// Whether to push down filter in end result or just prune the files enable_parquet_pushdown: bool, + /// Schema to scan table with + schema: Option<SchemaRef>, } impl Default for DeltaScanConfigBuilder { @@ -354,6 +357,7 @@ impl Default for DeltaScanConfigBuilder { file_column_name: None, wrap_partition_values: None, enable_parquet_pushdown: true, + schema: None, } } } @@ -392,6 +396,12 @@ impl DeltaScanConfigBuilder { self } + /// Use the provided [SchemaRef] for the [DeltaScan] + pub fn with_schema(mut self, schema: SchemaRef) -> Self { + self.schema = Some(schema); + self + } + /// Build a DeltaScanConfig and ensure no column name conflicts occur during downstream processing pub fn build(&self, snapshot: &DeltaTableState) -> DeltaResult<DeltaScanConfig> { let file_column_name = if self.include_file_column { @@ -433,6 +443,7 @@ impl DeltaScanConfigBuilder { file_column_name, wrap_partition_values: self.wrap_partition_values.unwrap_or(true), enable_parquet_pushdown: self.enable_parquet_pushdown, + schema: self.schema.clone(), }) } } @@ -446,6 +457,8 @@ pub struct DeltaScanConfig { pub wrap_partition_values: bool, /// Allow pushdown of the scan filter pub enable_parquet_pushdown: bool, + /// Schema to read as + pub schema: Option<SchemaRef>, } #[derive(Debug)] @@ -458,7 +471,6 @@ pub(crate) struct DeltaScanBuilder<'a> { limit: Option<usize>, files: Option<&'a [Add]>, config: Option<DeltaScanConfig>, - schema: Option<SchemaRef>, } impl<'a> DeltaScanBuilder<'a> { @@ -476,7 +488,6 @@ impl<'a> DeltaScanBuilder<'a> { limit: None, files: None, config: None, - schema: None, } } @@ -505,22 +516,17 @@ impl<'a> DeltaScanBuilder<'a> { self } - /// Use the provided [SchemaRef] for the [DeltaScan] - pub fn with_schema(mut self, schema: SchemaRef) -> Self { - self.schema = Some(schema); - self - } - pub async fn build(self) -> DeltaResult<DeltaScan> { let config = match self.config { Some(config) => config, None => DeltaScanConfigBuilder::new().build(self.snapshot)?, }; - let schema = match self.schema { - Some(schema) => schema, - None => self.snapshot.arrow_schema()?, - }; + let schema = config + .schema + .clone() + .unwrap_or(self.snapshot.arrow_schema()?); + let logical_schema = df_logical_schema(self.snapshot, &config)?; let logical_schema = if let Some(used_columns) = self.projection { @@ -742,6 +748,7 @@ pub struct DeltaTableProvider { log_store: LogStoreRef, config: DeltaScanConfig, schema: Arc<ArrowSchema>, + files: Option<Vec<Add>>, } impl DeltaTableProvider { @@ -756,8 +763,15 @@ impl DeltaTableProvider { snapshot, log_store, config, + files: None, }) } + + /// Define which files to consider while building a scan, for advanced usecases + pub fn with_files(mut self, files: Vec<Add>) -> DeltaTableProvider { + self.files = Some(files); + self + } } #[async_trait] @@ -792,15 +806,16 @@ impl TableProvider for DeltaTableProvider { register_store(self.log_store.clone(), session.runtime_env().clone()); let filter_expr = conjunction(filters.iter().cloned()); - let scan = DeltaScanBuilder::new(&self.snapshot, self.log_store.clone(), session) + let mut scan = DeltaScanBuilder::new(&self.snapshot, self.log_store.clone(), session) .with_projection(projection) .with_limit(limit) .with_filter(filter_expr) - .with_scan_config(self.config.clone()) - .build() - .await?; + .with_scan_config(self.config.clone()); - Ok(Arc::new(scan)) + if let Some(files) = &self.files { + scan = scan.with_files(files); + } + Ok(Arc::new(scan.build().await?)) } fn supports_filters_pushdown( diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index 7b88d0c4d4..0029ce46b4 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -19,6 +19,8 @@ use crate::logstore::LogStoreRef; use core::panic; +use datafusion::dataframe::DataFrame; +use datafusion::datasource::provider_as_source; use datafusion::execution::context::{SessionContext, SessionState}; use datafusion::physical_plan::filter::FilterExec; use datafusion::physical_plan::projection::ProjectionExec; @@ -26,7 +28,7 @@ use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::Expr; use datafusion_common::scalar::ScalarValue; use datafusion_common::DFSchema; -use datafusion_expr::lit; +use datafusion_expr::{lit, LogicalPlanBuilder}; use datafusion_physical_expr::{ expressions::{self}, PhysicalExpr, @@ -45,7 +47,8 @@ use super::write::{execute_non_empty_expr_cdc, WriterStatsConfig}; use crate::delta_datafusion::expr::fmt_expr_to_sql; use crate::delta_datafusion::{ - find_files, register_store, DataFusionMixins, DeltaScanBuilder, DeltaSessionContext, + find_files, register_store, DataFusionMixins, DeltaScanBuilder, DeltaScanConfigBuilder, + DeltaSessionContext, DeltaTableProvider, }; use crate::errors::DeltaResult; use crate::kernel::{Action, Add, Remove}; @@ -136,26 +139,29 @@ async fn excute_non_empty_expr( log_store: LogStoreRef, state: &SessionState, expression: &Expr, - metrics: &mut DeleteMetrics, rewrite: &[Add], + metrics: &mut DeleteMetrics, writer_properties: Option<WriterProperties>, partition_scan: bool, ) -> DeltaResult<Vec<Action>> { // For each identified file perform a parquet scan + filter + limit (1) + count. // If returned count is not zero then append the file to be rewritten and removed from the log. Otherwise do nothing to the file. let mut actions: Vec<Action> = Vec::new(); - - let input_schema = snapshot.input_schema()?; - let input_dfschema: DFSchema = input_schema.clone().as_ref().clone().try_into()?; let table_partition_cols = snapshot.metadata().partition_columns.clone(); - let scan = DeltaScanBuilder::new(snapshot, log_store.clone(), state) - .with_files(rewrite) - // Use input schema which doesn't wrap partition values, otherwise divide_by_partition_value won't work on UTF8 partitions - // Since it can't fetch a scalar from a dictionary type + + let scan_config = DeltaScanConfigBuilder::default() + .with_file_column(false) .with_schema(snapshot.input_schema()?) - .build() - .await?; - let scan = Arc::new(scan); + .build(&snapshot)?; + + let target_provider = Arc::new( + DeltaTableProvider::try_new(snapshot.clone(), log_store.clone(), scan_config.clone())? + .with_files(rewrite.to_vec()), + ); + let target_provider = provider_as_source(target_provider); + let plan = LogicalPlanBuilder::scan("target", target_provider.clone(), None)?.build()?; + + let df = DataFrame::new(state.clone(), plan); let writer_stats_config = WriterStatsConfig::new( snapshot.table_config().num_indexed_cols(), @@ -169,9 +175,11 @@ async fn excute_non_empty_expr( // Apply the negation of the filter and rewrite files let negated_expression = Expr::Not(Box::new(Expr::IsTrue(Box::new(expression.clone())))); - let predicate_expr = state.create_physical_expr(negated_expression, &input_dfschema)?; - let filter: Arc<dyn ExecutionPlan> = - Arc::new(FilterExec::try_new(predicate_expr, scan.clone())?); + let filter = df + .clone() + .filter(negated_expression)? + .create_physical_plan() + .await?; let add_actions: Vec<Action> = write_execution_plan( Some(snapshot), @@ -191,7 +199,7 @@ async fn excute_non_empty_expr( actions.extend(add_actions); - let read_records = scan.parquet_scan.metrics().and_then(|m| m.output_rows()); + let read_records = Some(df.clone().count().await?); let filter_records = filter.metrics().and_then(|m| m.output_rows()); metrics.num_copied_rows = filter_records; metrics.num_deleted_rows = read_records @@ -200,19 +208,31 @@ async fn excute_non_empty_expr( } // CDC logic, simply filters data with predicate and adds the _change_type="delete" as literal column - if let Some(cdc_actions) = execute_non_empty_expr_cdc( - snapshot, - log_store, - state.clone(), - scan, - input_dfschema, - expression, - table_partition_cols, - writer_properties, - writer_stats_config, - ) - .await? - { + if let Ok(true) = should_write_cdc(&snapshot) { + // Create CDC scan + let change_type_lit = lit(ScalarValue::Utf8(Some("delete".to_string()))); + let cdc_filter = df + .filter(expression.clone())? + .with_column("_change_type", change_type_lit)? + .create_physical_plan() + .await?; + + use crate::operations::write::write_execution_plan_cdc; + let cdc_actions = write_execution_plan_cdc( + Some(snapshot), + state.clone(), + cdc_filter, + table_partition_cols.clone(), + log_store.object_store(), + Some(snapshot.table_config().target_file_size() as usize), + None, + writer_properties, + false, + Some(SchemaMode::Overwrite), // If not overwrite, the plan schema is not taken but table schema, however we need the plan schema since it has the _change_type_col + writer_stats_config, + None, + ) + .await?; actions.extend(cdc_actions) } @@ -243,8 +263,8 @@ async fn execute( log_store.clone(), &state, &predicate, - &mut metrics, &candidates.candidates, + &mut metrics, writer_properties, candidates.partition_scan, ) From 7f28ddfb12e156fa2c8f40b01952b403726935c4 Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Sun, 4 Aug 2024 14:18:32 +0200 Subject: [PATCH 2/3] feat: add delta planner --- crates/core/src/delta_datafusion/mod.rs | 1 + crates/core/src/delta_datafusion/physical.rs | 4 ++ crates/core/src/delta_datafusion/planner.rs | 59 ++++++++++++++++++++ 3 files changed, 64 insertions(+) create mode 100644 crates/core/src/delta_datafusion/planner.rs diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index 0ec6b8d61d..9cb10b7461 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -92,6 +92,7 @@ pub mod cdf; pub mod expr; pub mod logical; pub mod physical; +pub mod planner; mod find_files; mod schema_adapter; diff --git a/crates/core/src/delta_datafusion/physical.rs b/crates/core/src/delta_datafusion/physical.rs index adbb7fb4fe..c37b85101e 100644 --- a/crates/core/src/delta_datafusion/physical.rs +++ b/crates/core/src/delta_datafusion/physical.rs @@ -178,3 +178,7 @@ pub(crate) fn find_metric_node( None } + +pub(crate) fn get_metric(metrics: &MetricsSet, name: &str) -> usize { + metrics.sum_by_name(name).map(|m| m.as_usize()).unwrap_or(0) +} diff --git a/crates/core/src/delta_datafusion/planner.rs b/crates/core/src/delta_datafusion/planner.rs new file mode 100644 index 0000000000..9c31410cec --- /dev/null +++ b/crates/core/src/delta_datafusion/planner.rs @@ -0,0 +1,59 @@ +//! Custom planners for datafusion so that you can convert custom nodes, can be used +//! to trace custom metrics in an operation +//! +//! # Example +//! +//! #[derive(Clone)] +//! struct MergeMetricExtensionPlanner {} +//! +//! #[async_trait] +//! impl ExtensionPlanner for MergeMetricExtensionPlanner { +//! async fn plan_extension( +//! &self, +//! planner: &dyn PhysicalPlanner, +//! node: &dyn UserDefinedLogicalNode, +//! _logical_inputs: &[&LogicalPlan], +//! physical_inputs: &[Arc<dyn ExecutionPlan>], +//! session_state: &SessionState, +//! ) -> DataFusionResult<Option<Arc<dyn ExecutionPlan>>> {} +//! +//! let merge_planner = DeltaPlanner::<MergeMetricExtensionPlanner> { +//! extension_planner: MergeMetricExtensionPlanner {} +//! }; +//! +//! let state = state.with_query_planner(Arc::new(merge_planner)); +use std::sync::Arc; + +use crate::delta_datafusion::DataFusionResult; +use async_trait::async_trait; +use datafusion::physical_planner::PhysicalPlanner; +use datafusion::{ + execution::{context::QueryPlanner, session_state::SessionState}, + physical_plan::ExecutionPlan, + physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner}, +}; +use datafusion_expr::LogicalPlan; + +/// Deltaplanner +pub struct DeltaPlanner<T: ExtensionPlanner> { + /// custom extension planner + pub extension_planner: T, +} + +#[async_trait] +impl<T: ExtensionPlanner + std::marker::Send + Sync + 'static + Clone> QueryPlanner + for DeltaPlanner<T> +{ + async fn create_physical_plan( + &self, + logical_plan: &LogicalPlan, + session_state: &SessionState, + ) -> DataFusionResult<Arc<dyn ExecutionPlan>> { + let planner = Arc::new(Box::new(DefaultPhysicalPlanner::with_extension_planners( + vec![Arc::new(self.extension_planner.clone())], + ))); + planner + .create_physical_plan(logical_plan, session_state) + .await + } +} From a6aeeca967111480fa492dd4d1a0388e392cbf5c Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Sun, 4 Aug 2024 14:18:54 +0200 Subject: [PATCH 3/3] feat: use delta planner in delete and merge --- crates/core/src/delta_datafusion/mod.rs | 8 +- crates/core/src/delta_datafusion/planner.rs | 4 +- crates/core/src/operations/delete.rs | 122 ++++++++++++++------ crates/core/src/operations/merge/mod.rs | 43 ++----- crates/core/src/operations/write.rs | 23 +++- 5 files changed, 120 insertions(+), 80 deletions(-) diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index 9cb10b7461..c2b410cb74 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -523,10 +523,10 @@ impl<'a> DeltaScanBuilder<'a> { None => DeltaScanConfigBuilder::new().build(self.snapshot)?, }; - let schema = config - .schema - .clone() - .unwrap_or(self.snapshot.arrow_schema()?); + let schema = match config.schema.clone() { + Some(value) => Ok(value), + None => self.snapshot.arrow_schema(), + }?; let logical_schema = df_logical_schema(self.snapshot, &config)?; diff --git a/crates/core/src/delta_datafusion/planner.rs b/crates/core/src/delta_datafusion/planner.rs index 9c31410cec..f0af1092ca 100644 --- a/crates/core/src/delta_datafusion/planner.rs +++ b/crates/core/src/delta_datafusion/planner.rs @@ -41,9 +41,7 @@ pub struct DeltaPlanner<T: ExtensionPlanner> { } #[async_trait] -impl<T: ExtensionPlanner + std::marker::Send + Sync + 'static + Clone> QueryPlanner - for DeltaPlanner<T> -{ +impl<T: ExtensionPlanner + Send + Sync + 'static + Clone> QueryPlanner for DeltaPlanner<T> { async fn create_physical_plan( &self, logical_plan: &LogicalPlan, diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index 0029ce46b4..7ab4ce79f2 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -17,22 +17,22 @@ //! .await?; //! ```` +use crate::delta_datafusion::logical::MetricObserver; +use crate::delta_datafusion::physical::{find_metric_node, get_metric, MetricObserverExec}; +use crate::delta_datafusion::planner::DeltaPlanner; use crate::logstore::LogStoreRef; -use core::panic; +use async_trait::async_trait; use datafusion::dataframe::DataFrame; use datafusion::datasource::provider_as_source; +use datafusion::error::Result as DataFusionResult; use datafusion::execution::context::{SessionContext, SessionState}; -use datafusion::physical_plan::filter::FilterExec; -use datafusion::physical_plan::projection::ProjectionExec; +use datafusion::physical_plan::metrics::MetricBuilder; use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner}; use datafusion::prelude::Expr; -use datafusion_common::scalar::ScalarValue; -use datafusion_common::DFSchema; -use datafusion_expr::{lit, LogicalPlanBuilder}; -use datafusion_physical_expr::{ - expressions::{self}, - PhysicalExpr, -}; +use datafusion_common::ScalarValue; +use datafusion_expr::{lit, Extension, LogicalPlan, LogicalPlanBuilder, UserDefinedLogicalNode}; + use futures::future::BoxFuture; use std::sync::Arc; use std::time::{Instant, SystemTime, UNIX_EPOCH}; @@ -43,19 +43,22 @@ use serde::Serialize; use super::cdc::should_write_cdc; use super::datafusion_utils::Expression; use super::transaction::{CommitBuilder, CommitProperties, PROTOCOL}; -use super::write::{execute_non_empty_expr_cdc, WriterStatsConfig}; +use super::write::WriterStatsConfig; use crate::delta_datafusion::expr::fmt_expr_to_sql; use crate::delta_datafusion::{ - find_files, register_store, DataFusionMixins, DeltaScanBuilder, DeltaScanConfigBuilder, - DeltaSessionContext, DeltaTableProvider, + find_files, register_store, DataFusionMixins, DeltaScanConfigBuilder, DeltaSessionContext, + DeltaTableProvider, }; use crate::errors::DeltaResult; use crate::kernel::{Action, Add, Remove}; -use crate::operations::write::{write_execution_plan, SchemaMode}; +use crate::operations::write::{write_execution_plan, write_execution_plan_cdc, SchemaMode}; use crate::protocol::DeltaOperation; use crate::table::state::DeltaTableState; -use crate::DeltaTable; +use crate::{DeltaTable, DeltaTableError}; + +const SOURCE_COUNT_ID: &str = "delete_source_count"; +const SOURCE_COUNT_METRIC: &str = "num_source_rows"; /// Delete Records from the Delta Table. /// See this module's documentation for more information @@ -82,9 +85,9 @@ pub struct DeleteMetrics { /// Number of files removed pub num_removed_files: usize, /// Number of rows removed - pub num_deleted_rows: Option<usize>, + pub num_deleted_rows: usize, /// Number of rows copied in the process of deleting files - pub num_copied_rows: Option<usize>, + pub num_copied_rows: usize, /// Time taken to execute the entire operation pub execution_time_ms: u64, /// Time taken to scan the file for matches @@ -133,6 +136,36 @@ impl DeleteBuilder { } } +#[derive(Clone)] +struct DeleteMetricExtensionPlanner {} + +#[async_trait] +impl ExtensionPlanner for DeleteMetricExtensionPlanner { + async fn plan_extension( + &self, + _planner: &dyn PhysicalPlanner, + node: &dyn UserDefinedLogicalNode, + _logical_inputs: &[&LogicalPlan], + physical_inputs: &[Arc<dyn ExecutionPlan>], + _session_state: &SessionState, + ) -> DataFusionResult<Option<Arc<dyn ExecutionPlan>>> { + if let Some(metric_observer) = node.as_any().downcast_ref::<MetricObserver>() { + if metric_observer.id.eq(SOURCE_COUNT_ID) { + return Ok(Some(MetricObserverExec::try_new( + SOURCE_COUNT_ID.into(), + physical_inputs, + |batch, metrics| { + MetricBuilder::new(metrics) + .global_counter(SOURCE_COUNT_METRIC) + .add(batch.num_rows()); + }, + )?)); + } + } + Ok(None) + } +} + #[allow(clippy::too_many_arguments)] async fn excute_non_empty_expr( snapshot: &DeltaTableState, @@ -149,19 +182,33 @@ async fn excute_non_empty_expr( let mut actions: Vec<Action> = Vec::new(); let table_partition_cols = snapshot.metadata().partition_columns.clone(); + let delete_planner = DeltaPlanner::<DeleteMetricExtensionPlanner> { + extension_planner: DeleteMetricExtensionPlanner {}, + }; + + let state = state.clone().with_query_planner(Arc::new(delete_planner)); + let scan_config = DeltaScanConfigBuilder::default() .with_file_column(false) .with_schema(snapshot.input_schema()?) - .build(&snapshot)?; + .build(snapshot)?; let target_provider = Arc::new( DeltaTableProvider::try_new(snapshot.clone(), log_store.clone(), scan_config.clone())? .with_files(rewrite.to_vec()), ); let target_provider = provider_as_source(target_provider); - let plan = LogicalPlanBuilder::scan("target", target_provider.clone(), None)?.build()?; + let source = LogicalPlanBuilder::scan("target", target_provider.clone(), None)?.build()?; + + let source = LogicalPlan::Extension(Extension { + node: Arc::new(MetricObserver { + id: "delete_source_count".into(), + input: source, + enable_pushdown: false, + }), + }); - let df = DataFrame::new(state.clone(), plan); + let df = DataFrame::new(state.clone(), source); let writer_stats_config = WriterStatsConfig::new( snapshot.table_config().num_indexed_cols(), @@ -199,16 +246,19 @@ async fn excute_non_empty_expr( actions.extend(add_actions); - let read_records = Some(df.clone().count().await?); - let filter_records = filter.metrics().and_then(|m| m.output_rows()); + let source_count = find_metric_node(SOURCE_COUNT_ID, &filter).ok_or_else(|| { + DeltaTableError::Generic("Unable to locate expected metric node".into()) + })?; + let source_count_metrics = source_count.metrics().unwrap(); + let read_records = get_metric(&source_count_metrics, SOURCE_COUNT_METRIC); + let filter_records = filter.metrics().and_then(|m| m.output_rows()).unwrap_or(0); + metrics.num_copied_rows = filter_records; - metrics.num_deleted_rows = read_records - .zip(filter_records) - .map(|(read, filter)| read - filter); + metrics.num_deleted_rows = read_records - filter_records; } // CDC logic, simply filters data with predicate and adds the _change_type="delete" as literal column - if let Ok(true) = should_write_cdc(&snapshot) { + if let Ok(true) = should_write_cdc(snapshot) { // Create CDC scan let change_type_lit = lit(ScalarValue::Utf8(Some("delete".to_string()))); let cdc_filter = df @@ -460,8 +510,8 @@ mod tests { assert_eq!(table.get_files_count(), 0); assert_eq!(metrics.num_added_files, 0); assert_eq!(metrics.num_removed_files, 1); - assert_eq!(metrics.num_deleted_rows, None); - assert_eq!(metrics.num_copied_rows, None); + assert_eq!(metrics.num_deleted_rows, 0); + assert_eq!(metrics.num_copied_rows, 0); let commit_info = table.history(None).await.unwrap(); let last_commit = &commit_info[0]; @@ -476,8 +526,8 @@ mod tests { assert_eq!(table.version(), 2); assert_eq!(metrics.num_added_files, 0); assert_eq!(metrics.num_removed_files, 0); - assert_eq!(metrics.num_deleted_rows, None); - assert_eq!(metrics.num_copied_rows, None); + assert_eq!(metrics.num_deleted_rows, 0); + assert_eq!(metrics.num_copied_rows, 0); } #[tokio::test] @@ -548,8 +598,8 @@ mod tests { assert_eq!(metrics.num_added_files, 1); assert_eq!(metrics.num_removed_files, 1); assert!(metrics.scan_time_ms > 0); - assert_eq!(metrics.num_deleted_rows, Some(1)); - assert_eq!(metrics.num_copied_rows, Some(3)); + assert_eq!(metrics.num_deleted_rows, 1); + assert_eq!(metrics.num_copied_rows, 3); let commit_info = table.history(None).await.unwrap(); let last_commit = &commit_info[0]; @@ -703,8 +753,8 @@ mod tests { assert_eq!(metrics.num_added_files, 0); assert_eq!(metrics.num_removed_files, 1); - assert_eq!(metrics.num_deleted_rows, None); - assert_eq!(metrics.num_copied_rows, None); + assert_eq!(metrics.num_deleted_rows, 0); + assert_eq!(metrics.num_copied_rows, 0); assert!(metrics.scan_time_ms > 0); let expected = vec![ @@ -764,8 +814,8 @@ mod tests { assert_eq!(metrics.num_added_files, 0); assert_eq!(metrics.num_removed_files, 1); - assert_eq!(metrics.num_deleted_rows, Some(1)); - assert_eq!(metrics.num_copied_rows, Some(0)); + assert_eq!(metrics.num_deleted_rows, 1); + assert_eq!(metrics.num_copied_rows, 0); assert!(metrics.scan_time_ms > 0); let expected = [ diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index 73a97436f1..ea54e4e211 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -35,15 +35,13 @@ use std::time::Instant; use async_trait::async_trait; use datafusion::datasource::provider_as_source; use datafusion::error::Result as DataFusionResult; -use datafusion::execution::context::{QueryPlanner, SessionConfig}; +use datafusion::execution::context::SessionConfig; use datafusion::logical_expr::build_join_schema; -use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner}; +use datafusion::physical_plan::metrics::MetricBuilder; +use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner}; use datafusion::{ execution::context::SessionState, - physical_plan::{ - metrics::{MetricBuilder, MetricsSet}, - ExecutionPlan, - }, + physical_plan::ExecutionPlan, prelude::{DataFrame, SessionContext}, }; use datafusion_common::tree_node::{Transformed, TreeNode}; @@ -68,7 +66,8 @@ use super::datafusion_utils::{into_expr, maybe_into_expr, Expression}; use super::transaction::{CommitProperties, PROTOCOL}; use crate::delta_datafusion::expr::{fmt_expr_to_sql, parse_predicate_expression}; use crate::delta_datafusion::logical::MetricObserver; -use crate::delta_datafusion::physical::{find_metric_node, MetricObserverExec}; +use crate::delta_datafusion::physical::{find_metric_node, get_metric, MetricObserverExec}; +use crate::delta_datafusion::planner::DeltaPlanner; use crate::delta_datafusion::{ execute_plan_to_batch, register_store, DeltaColumn, DeltaScanConfigBuilder, DeltaSessionConfig, DeltaTableProvider, @@ -576,7 +575,7 @@ pub struct MergeMetrics { /// Time taken to rewrite the matched files pub rewrite_time_ms: u64, } - +#[derive(Clone)] struct MergeMetricExtensionPlanner {} #[async_trait] @@ -1017,7 +1016,11 @@ async fn execute( let exec_start = Instant::now(); let current_metadata = snapshot.metadata(); - let state = state.with_query_planner(Arc::new(MergePlanner {})); + let merge_planner = DeltaPlanner::<MergeMetricExtensionPlanner> { + extension_planner: MergeMetricExtensionPlanner {}, + }; + + let state = state.with_query_planner(Arc::new(merge_planner)); // TODO: Given the join predicate, remove any expression that involve the // source table and keep expressions that only involve the target table. @@ -1486,9 +1489,6 @@ async fn execute( let source_count_metrics = source_count.metrics().unwrap(); let target_count_metrics = op_count.metrics().unwrap(); - fn get_metric(metrics: &MetricsSet, name: &str) -> usize { - metrics.sum_by_name(name).map(|m| m.as_usize()).unwrap_or(0) - } metrics.num_source_rows = get_metric(&source_count_metrics, SOURCE_COUNT_METRIC); metrics.num_target_rows_inserted = get_metric(&target_count_metrics, TARGET_INSERTED_METRIC); @@ -1555,25 +1555,6 @@ fn remove_table_alias(expr: Expr, table_alias: &str) -> Expr { .data } -// TODO: Abstract MergePlanner into DeltaPlanner to support other delta operations in the future. -struct MergePlanner {} - -#[async_trait] -impl QueryPlanner for MergePlanner { - async fn create_physical_plan( - &self, - logical_plan: &LogicalPlan, - session_state: &SessionState, - ) -> DataFusionResult<Arc<dyn ExecutionPlan>> { - let planner = Arc::new(Box::new(DefaultPhysicalPlanner::with_extension_planners( - vec![Arc::new(MergeMetricExtensionPlanner {})], - ))); - planner - .create_physical_plan(logical_plan, session_state) - .await - } -} - impl std::future::IntoFuture for MergeBuilder { type Output = DeltaResult<(DeltaTable, MergeMetrics)>; type IntoFuture = BoxFuture<'static, Self::Output>; diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index 8386c07a9a..bc5c1850d8 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -54,7 +54,9 @@ use super::writer::{DeltaWriter, WriterConfig}; use super::CreateBuilder; use crate::delta_datafusion::expr::fmt_expr_to_sql; use crate::delta_datafusion::expr::parse_predicate_expression; -use crate::delta_datafusion::{find_files, register_store, DeltaScanBuilder}; +use crate::delta_datafusion::{ + find_files, register_store, DeltaScanBuilder, DeltaScanConfigBuilder, +}; use crate::delta_datafusion::{DataFusionMixins, DeltaDataChecker}; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::{Action, Add, AddCDCFile, Metadata, PartitionsExt, Remove, StructType}; @@ -573,11 +575,15 @@ async fn execute_non_empty_expr( let input_schema = snapshot.input_schema()?; let input_dfschema: DFSchema = input_schema.clone().as_ref().clone().try_into()?; + let scan_config = DeltaScanConfigBuilder::new() + .with_schema(snapshot.input_schema()?) + .build(snapshot)?; + let scan = DeltaScanBuilder::new(snapshot, log_store.clone(), &state) .with_files(rewrite) // Use input schema which doesn't wrap partition values, otherwise divide_by_partition_value won't work on UTF8 partitions // Since it can't fetch a scalar from a dictionary type - .with_schema(snapshot.input_schema()?) + .with_scan_config(scan_config) .build() .await?; let scan = Arc::new(scan); @@ -711,7 +717,7 @@ async fn prepare_predicate_actions( find_files(snapshot, log_store.clone(), &state, Some(predicate.clone())).await?; let mut actions = execute_non_empty_expr( - &snapshot, + snapshot, log_store, state, partition_columns, @@ -752,19 +758,24 @@ async fn execute_non_empty_expr_cdc_all_actions( writer_stats_config: WriterStatsConfig, ) -> DeltaResult<Option<Vec<Action>>> { let current_state_add_actions = &snapshot.file_actions()?; + + let scan_config = DeltaScanConfigBuilder::new() + .with_schema(snapshot.input_schema()?) + .build(snapshot)?; + // Since all files get removed, check to write CDC let scan = DeltaScanBuilder::new(snapshot, log_store.clone(), &state) .with_files(current_state_add_actions) // Use input schema which doesn't wrap partition values, otherwise divide_by_partition_value won't work on UTF8 partitions // Since it can't fetch a scalar from a dictionary type - .with_schema(snapshot.input_schema()?) + .with_scan_config(scan_config) .build() .await?; let input_schema = snapshot.input_schema()?; let input_dfschema: DFSchema = input_schema.clone().as_ref().clone().try_into()?; - Ok(execute_non_empty_expr_cdc( + execute_non_empty_expr_cdc( snapshot, log_store, state, @@ -775,7 +786,7 @@ async fn execute_non_empty_expr_cdc_all_actions( writer_properties, writer_stats_config, ) - .await?) + .await } impl std::future::IntoFuture for WriteBuilder {