From a6aeeca967111480fa492dd4d1a0388e392cbf5c Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Sun, 4 Aug 2024 14:18:54 +0200 Subject: [PATCH] feat: use delta planner in delete and merge --- crates/core/src/delta_datafusion/mod.rs | 8 +- crates/core/src/delta_datafusion/planner.rs | 4 +- crates/core/src/operations/delete.rs | 122 ++++++++++++++------ crates/core/src/operations/merge/mod.rs | 43 ++----- crates/core/src/operations/write.rs | 23 +++- 5 files changed, 120 insertions(+), 80 deletions(-) diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index 9cb10b7461..c2b410cb74 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -523,10 +523,10 @@ impl<'a> DeltaScanBuilder<'a> { None => DeltaScanConfigBuilder::new().build(self.snapshot)?, }; - let schema = config - .schema - .clone() - .unwrap_or(self.snapshot.arrow_schema()?); + let schema = match config.schema.clone() { + Some(value) => Ok(value), + None => self.snapshot.arrow_schema(), + }?; let logical_schema = df_logical_schema(self.snapshot, &config)?; diff --git a/crates/core/src/delta_datafusion/planner.rs b/crates/core/src/delta_datafusion/planner.rs index 9c31410cec..f0af1092ca 100644 --- a/crates/core/src/delta_datafusion/planner.rs +++ b/crates/core/src/delta_datafusion/planner.rs @@ -41,9 +41,7 @@ pub struct DeltaPlanner { } #[async_trait] -impl QueryPlanner - for DeltaPlanner -{ +impl QueryPlanner for DeltaPlanner { async fn create_physical_plan( &self, logical_plan: &LogicalPlan, diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index 0029ce46b4..7ab4ce79f2 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -17,22 +17,22 @@ //! .await?; //! ```` +use crate::delta_datafusion::logical::MetricObserver; +use crate::delta_datafusion::physical::{find_metric_node, get_metric, MetricObserverExec}; +use crate::delta_datafusion::planner::DeltaPlanner; use crate::logstore::LogStoreRef; -use core::panic; +use async_trait::async_trait; use datafusion::dataframe::DataFrame; use datafusion::datasource::provider_as_source; +use datafusion::error::Result as DataFusionResult; use datafusion::execution::context::{SessionContext, SessionState}; -use datafusion::physical_plan::filter::FilterExec; -use datafusion::physical_plan::projection::ProjectionExec; +use datafusion::physical_plan::metrics::MetricBuilder; use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner}; use datafusion::prelude::Expr; -use datafusion_common::scalar::ScalarValue; -use datafusion_common::DFSchema; -use datafusion_expr::{lit, LogicalPlanBuilder}; -use datafusion_physical_expr::{ - expressions::{self}, - PhysicalExpr, -}; +use datafusion_common::ScalarValue; +use datafusion_expr::{lit, Extension, LogicalPlan, LogicalPlanBuilder, UserDefinedLogicalNode}; + use futures::future::BoxFuture; use std::sync::Arc; use std::time::{Instant, SystemTime, UNIX_EPOCH}; @@ -43,19 +43,22 @@ use serde::Serialize; use super::cdc::should_write_cdc; use super::datafusion_utils::Expression; use super::transaction::{CommitBuilder, CommitProperties, PROTOCOL}; -use super::write::{execute_non_empty_expr_cdc, WriterStatsConfig}; +use super::write::WriterStatsConfig; use crate::delta_datafusion::expr::fmt_expr_to_sql; use crate::delta_datafusion::{ - find_files, register_store, DataFusionMixins, DeltaScanBuilder, DeltaScanConfigBuilder, - DeltaSessionContext, DeltaTableProvider, + find_files, register_store, DataFusionMixins, DeltaScanConfigBuilder, DeltaSessionContext, + DeltaTableProvider, }; use crate::errors::DeltaResult; use crate::kernel::{Action, Add, Remove}; -use crate::operations::write::{write_execution_plan, SchemaMode}; +use crate::operations::write::{write_execution_plan, write_execution_plan_cdc, SchemaMode}; use crate::protocol::DeltaOperation; use crate::table::state::DeltaTableState; -use crate::DeltaTable; +use crate::{DeltaTable, DeltaTableError}; + +const SOURCE_COUNT_ID: &str = "delete_source_count"; +const SOURCE_COUNT_METRIC: &str = "num_source_rows"; /// Delete Records from the Delta Table. /// See this module's documentation for more information @@ -82,9 +85,9 @@ pub struct DeleteMetrics { /// Number of files removed pub num_removed_files: usize, /// Number of rows removed - pub num_deleted_rows: Option, + pub num_deleted_rows: usize, /// Number of rows copied in the process of deleting files - pub num_copied_rows: Option, + pub num_copied_rows: usize, /// Time taken to execute the entire operation pub execution_time_ms: u64, /// Time taken to scan the file for matches @@ -133,6 +136,36 @@ impl DeleteBuilder { } } +#[derive(Clone)] +struct DeleteMetricExtensionPlanner {} + +#[async_trait] +impl ExtensionPlanner for DeleteMetricExtensionPlanner { + async fn plan_extension( + &self, + _planner: &dyn PhysicalPlanner, + node: &dyn UserDefinedLogicalNode, + _logical_inputs: &[&LogicalPlan], + physical_inputs: &[Arc], + _session_state: &SessionState, + ) -> DataFusionResult>> { + if let Some(metric_observer) = node.as_any().downcast_ref::() { + if metric_observer.id.eq(SOURCE_COUNT_ID) { + return Ok(Some(MetricObserverExec::try_new( + SOURCE_COUNT_ID.into(), + physical_inputs, + |batch, metrics| { + MetricBuilder::new(metrics) + .global_counter(SOURCE_COUNT_METRIC) + .add(batch.num_rows()); + }, + )?)); + } + } + Ok(None) + } +} + #[allow(clippy::too_many_arguments)] async fn excute_non_empty_expr( snapshot: &DeltaTableState, @@ -149,19 +182,33 @@ async fn excute_non_empty_expr( let mut actions: Vec = Vec::new(); let table_partition_cols = snapshot.metadata().partition_columns.clone(); + let delete_planner = DeltaPlanner:: { + extension_planner: DeleteMetricExtensionPlanner {}, + }; + + let state = state.clone().with_query_planner(Arc::new(delete_planner)); + let scan_config = DeltaScanConfigBuilder::default() .with_file_column(false) .with_schema(snapshot.input_schema()?) - .build(&snapshot)?; + .build(snapshot)?; let target_provider = Arc::new( DeltaTableProvider::try_new(snapshot.clone(), log_store.clone(), scan_config.clone())? .with_files(rewrite.to_vec()), ); let target_provider = provider_as_source(target_provider); - let plan = LogicalPlanBuilder::scan("target", target_provider.clone(), None)?.build()?; + let source = LogicalPlanBuilder::scan("target", target_provider.clone(), None)?.build()?; + + let source = LogicalPlan::Extension(Extension { + node: Arc::new(MetricObserver { + id: "delete_source_count".into(), + input: source, + enable_pushdown: false, + }), + }); - let df = DataFrame::new(state.clone(), plan); + let df = DataFrame::new(state.clone(), source); let writer_stats_config = WriterStatsConfig::new( snapshot.table_config().num_indexed_cols(), @@ -199,16 +246,19 @@ async fn excute_non_empty_expr( actions.extend(add_actions); - let read_records = Some(df.clone().count().await?); - let filter_records = filter.metrics().and_then(|m| m.output_rows()); + let source_count = find_metric_node(SOURCE_COUNT_ID, &filter).ok_or_else(|| { + DeltaTableError::Generic("Unable to locate expected metric node".into()) + })?; + let source_count_metrics = source_count.metrics().unwrap(); + let read_records = get_metric(&source_count_metrics, SOURCE_COUNT_METRIC); + let filter_records = filter.metrics().and_then(|m| m.output_rows()).unwrap_or(0); + metrics.num_copied_rows = filter_records; - metrics.num_deleted_rows = read_records - .zip(filter_records) - .map(|(read, filter)| read - filter); + metrics.num_deleted_rows = read_records - filter_records; } // CDC logic, simply filters data with predicate and adds the _change_type="delete" as literal column - if let Ok(true) = should_write_cdc(&snapshot) { + if let Ok(true) = should_write_cdc(snapshot) { // Create CDC scan let change_type_lit = lit(ScalarValue::Utf8(Some("delete".to_string()))); let cdc_filter = df @@ -460,8 +510,8 @@ mod tests { assert_eq!(table.get_files_count(), 0); assert_eq!(metrics.num_added_files, 0); assert_eq!(metrics.num_removed_files, 1); - assert_eq!(metrics.num_deleted_rows, None); - assert_eq!(metrics.num_copied_rows, None); + assert_eq!(metrics.num_deleted_rows, 0); + assert_eq!(metrics.num_copied_rows, 0); let commit_info = table.history(None).await.unwrap(); let last_commit = &commit_info[0]; @@ -476,8 +526,8 @@ mod tests { assert_eq!(table.version(), 2); assert_eq!(metrics.num_added_files, 0); assert_eq!(metrics.num_removed_files, 0); - assert_eq!(metrics.num_deleted_rows, None); - assert_eq!(metrics.num_copied_rows, None); + assert_eq!(metrics.num_deleted_rows, 0); + assert_eq!(metrics.num_copied_rows, 0); } #[tokio::test] @@ -548,8 +598,8 @@ mod tests { assert_eq!(metrics.num_added_files, 1); assert_eq!(metrics.num_removed_files, 1); assert!(metrics.scan_time_ms > 0); - assert_eq!(metrics.num_deleted_rows, Some(1)); - assert_eq!(metrics.num_copied_rows, Some(3)); + assert_eq!(metrics.num_deleted_rows, 1); + assert_eq!(metrics.num_copied_rows, 3); let commit_info = table.history(None).await.unwrap(); let last_commit = &commit_info[0]; @@ -703,8 +753,8 @@ mod tests { assert_eq!(metrics.num_added_files, 0); assert_eq!(metrics.num_removed_files, 1); - assert_eq!(metrics.num_deleted_rows, None); - assert_eq!(metrics.num_copied_rows, None); + assert_eq!(metrics.num_deleted_rows, 0); + assert_eq!(metrics.num_copied_rows, 0); assert!(metrics.scan_time_ms > 0); let expected = vec![ @@ -764,8 +814,8 @@ mod tests { assert_eq!(metrics.num_added_files, 0); assert_eq!(metrics.num_removed_files, 1); - assert_eq!(metrics.num_deleted_rows, Some(1)); - assert_eq!(metrics.num_copied_rows, Some(0)); + assert_eq!(metrics.num_deleted_rows, 1); + assert_eq!(metrics.num_copied_rows, 0); assert!(metrics.scan_time_ms > 0); let expected = [ diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index 73a97436f1..ea54e4e211 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -35,15 +35,13 @@ use std::time::Instant; use async_trait::async_trait; use datafusion::datasource::provider_as_source; use datafusion::error::Result as DataFusionResult; -use datafusion::execution::context::{QueryPlanner, SessionConfig}; +use datafusion::execution::context::SessionConfig; use datafusion::logical_expr::build_join_schema; -use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner}; +use datafusion::physical_plan::metrics::MetricBuilder; +use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner}; use datafusion::{ execution::context::SessionState, - physical_plan::{ - metrics::{MetricBuilder, MetricsSet}, - ExecutionPlan, - }, + physical_plan::ExecutionPlan, prelude::{DataFrame, SessionContext}, }; use datafusion_common::tree_node::{Transformed, TreeNode}; @@ -68,7 +66,8 @@ use super::datafusion_utils::{into_expr, maybe_into_expr, Expression}; use super::transaction::{CommitProperties, PROTOCOL}; use crate::delta_datafusion::expr::{fmt_expr_to_sql, parse_predicate_expression}; use crate::delta_datafusion::logical::MetricObserver; -use crate::delta_datafusion::physical::{find_metric_node, MetricObserverExec}; +use crate::delta_datafusion::physical::{find_metric_node, get_metric, MetricObserverExec}; +use crate::delta_datafusion::planner::DeltaPlanner; use crate::delta_datafusion::{ execute_plan_to_batch, register_store, DeltaColumn, DeltaScanConfigBuilder, DeltaSessionConfig, DeltaTableProvider, @@ -576,7 +575,7 @@ pub struct MergeMetrics { /// Time taken to rewrite the matched files pub rewrite_time_ms: u64, } - +#[derive(Clone)] struct MergeMetricExtensionPlanner {} #[async_trait] @@ -1017,7 +1016,11 @@ async fn execute( let exec_start = Instant::now(); let current_metadata = snapshot.metadata(); - let state = state.with_query_planner(Arc::new(MergePlanner {})); + let merge_planner = DeltaPlanner:: { + extension_planner: MergeMetricExtensionPlanner {}, + }; + + let state = state.with_query_planner(Arc::new(merge_planner)); // TODO: Given the join predicate, remove any expression that involve the // source table and keep expressions that only involve the target table. @@ -1486,9 +1489,6 @@ async fn execute( let source_count_metrics = source_count.metrics().unwrap(); let target_count_metrics = op_count.metrics().unwrap(); - fn get_metric(metrics: &MetricsSet, name: &str) -> usize { - metrics.sum_by_name(name).map(|m| m.as_usize()).unwrap_or(0) - } metrics.num_source_rows = get_metric(&source_count_metrics, SOURCE_COUNT_METRIC); metrics.num_target_rows_inserted = get_metric(&target_count_metrics, TARGET_INSERTED_METRIC); @@ -1555,25 +1555,6 @@ fn remove_table_alias(expr: Expr, table_alias: &str) -> Expr { .data } -// TODO: Abstract MergePlanner into DeltaPlanner to support other delta operations in the future. -struct MergePlanner {} - -#[async_trait] -impl QueryPlanner for MergePlanner { - async fn create_physical_plan( - &self, - logical_plan: &LogicalPlan, - session_state: &SessionState, - ) -> DataFusionResult> { - let planner = Arc::new(Box::new(DefaultPhysicalPlanner::with_extension_planners( - vec![Arc::new(MergeMetricExtensionPlanner {})], - ))); - planner - .create_physical_plan(logical_plan, session_state) - .await - } -} - impl std::future::IntoFuture for MergeBuilder { type Output = DeltaResult<(DeltaTable, MergeMetrics)>; type IntoFuture = BoxFuture<'static, Self::Output>; diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index 8386c07a9a..bc5c1850d8 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -54,7 +54,9 @@ use super::writer::{DeltaWriter, WriterConfig}; use super::CreateBuilder; use crate::delta_datafusion::expr::fmt_expr_to_sql; use crate::delta_datafusion::expr::parse_predicate_expression; -use crate::delta_datafusion::{find_files, register_store, DeltaScanBuilder}; +use crate::delta_datafusion::{ + find_files, register_store, DeltaScanBuilder, DeltaScanConfigBuilder, +}; use crate::delta_datafusion::{DataFusionMixins, DeltaDataChecker}; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::{Action, Add, AddCDCFile, Metadata, PartitionsExt, Remove, StructType}; @@ -573,11 +575,15 @@ async fn execute_non_empty_expr( let input_schema = snapshot.input_schema()?; let input_dfschema: DFSchema = input_schema.clone().as_ref().clone().try_into()?; + let scan_config = DeltaScanConfigBuilder::new() + .with_schema(snapshot.input_schema()?) + .build(snapshot)?; + let scan = DeltaScanBuilder::new(snapshot, log_store.clone(), &state) .with_files(rewrite) // Use input schema which doesn't wrap partition values, otherwise divide_by_partition_value won't work on UTF8 partitions // Since it can't fetch a scalar from a dictionary type - .with_schema(snapshot.input_schema()?) + .with_scan_config(scan_config) .build() .await?; let scan = Arc::new(scan); @@ -711,7 +717,7 @@ async fn prepare_predicate_actions( find_files(snapshot, log_store.clone(), &state, Some(predicate.clone())).await?; let mut actions = execute_non_empty_expr( - &snapshot, + snapshot, log_store, state, partition_columns, @@ -752,19 +758,24 @@ async fn execute_non_empty_expr_cdc_all_actions( writer_stats_config: WriterStatsConfig, ) -> DeltaResult>> { let current_state_add_actions = &snapshot.file_actions()?; + + let scan_config = DeltaScanConfigBuilder::new() + .with_schema(snapshot.input_schema()?) + .build(snapshot)?; + // Since all files get removed, check to write CDC let scan = DeltaScanBuilder::new(snapshot, log_store.clone(), &state) .with_files(current_state_add_actions) // Use input schema which doesn't wrap partition values, otherwise divide_by_partition_value won't work on UTF8 partitions // Since it can't fetch a scalar from a dictionary type - .with_schema(snapshot.input_schema()?) + .with_scan_config(scan_config) .build() .await?; let input_schema = snapshot.input_schema()?; let input_dfschema: DFSchema = input_schema.clone().as_ref().clone().try_into()?; - Ok(execute_non_empty_expr_cdc( + execute_non_empty_expr_cdc( snapshot, log_store, state, @@ -775,7 +786,7 @@ async fn execute_non_empty_expr_cdc_all_actions( writer_properties, writer_stats_config, ) - .await?) + .await } impl std::future::IntoFuture for WriteBuilder {