From 34d5fff9ab7d70b122f0c7e3ecb77bcfb47692eb Mon Sep 17 00:00:00 2001
From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com>
Date: Sun, 4 Aug 2024 13:56:15 +0200
Subject: [PATCH 1/3] feat: logical plan usage in delete

---
 crates/core/src/delta_datafusion/mod.rs | 49 ++++++++++-----
 crates/core/src/operations/delete.rs    | 82 +++++++++++++++----------
 2 files changed, 83 insertions(+), 48 deletions(-)

diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs
index 4c76ac156c..0ec6b8d61d 100644
--- a/crates/core/src/delta_datafusion/mod.rs
+++ b/crates/core/src/delta_datafusion/mod.rs
@@ -344,7 +344,10 @@ pub struct DeltaScanConfigBuilder {
     file_column_name: Option<String>,
     /// Whether to wrap partition values in a dictionary encoding to potentially save space
     wrap_partition_values: Option<bool>,
+    /// Whether to push down filter in end result or just prune the files
     enable_parquet_pushdown: bool,
+    /// Schema to scan table with
+    schema: Option<SchemaRef>,
 }
 
 impl Default for DeltaScanConfigBuilder {
@@ -354,6 +357,7 @@ impl Default for DeltaScanConfigBuilder {
             file_column_name: None,
             wrap_partition_values: None,
             enable_parquet_pushdown: true,
+            schema: None,
         }
     }
 }
@@ -392,6 +396,12 @@ impl DeltaScanConfigBuilder {
         self
     }
 
+    /// Use the provided [SchemaRef] for the [DeltaScan]
+    pub fn with_schema(mut self, schema: SchemaRef) -> Self {
+        self.schema = Some(schema);
+        self
+    }
+
     /// Build a DeltaScanConfig and ensure no column name conflicts occur during downstream processing
     pub fn build(&self, snapshot: &DeltaTableState) -> DeltaResult<DeltaScanConfig> {
         let file_column_name = if self.include_file_column {
@@ -433,6 +443,7 @@ impl DeltaScanConfigBuilder {
             file_column_name,
             wrap_partition_values: self.wrap_partition_values.unwrap_or(true),
             enable_parquet_pushdown: self.enable_parquet_pushdown,
+            schema: self.schema.clone(),
         })
     }
 }
@@ -446,6 +457,8 @@ pub struct DeltaScanConfig {
     pub wrap_partition_values: bool,
     /// Allow pushdown of the scan filter
     pub enable_parquet_pushdown: bool,
+    /// Schema to read as
+    pub schema: Option<SchemaRef>,
 }
 
 #[derive(Debug)]
@@ -458,7 +471,6 @@ pub(crate) struct DeltaScanBuilder<'a> {
     limit: Option<usize>,
     files: Option<&'a [Add]>,
     config: Option<DeltaScanConfig>,
-    schema: Option<SchemaRef>,
 }
 
 impl<'a> DeltaScanBuilder<'a> {
@@ -476,7 +488,6 @@ impl<'a> DeltaScanBuilder<'a> {
             limit: None,
             files: None,
             config: None,
-            schema: None,
         }
     }
 
@@ -505,22 +516,17 @@ impl<'a> DeltaScanBuilder<'a> {
         self
     }
 
-    /// Use the provided [SchemaRef] for the [DeltaScan]
-    pub fn with_schema(mut self, schema: SchemaRef) -> Self {
-        self.schema = Some(schema);
-        self
-    }
-
     pub async fn build(self) -> DeltaResult<DeltaScan> {
         let config = match self.config {
             Some(config) => config,
             None => DeltaScanConfigBuilder::new().build(self.snapshot)?,
         };
 
-        let schema = match self.schema {
-            Some(schema) => schema,
-            None => self.snapshot.arrow_schema()?,
-        };
+        let schema = config
+            .schema
+            .clone()
+            .unwrap_or(self.snapshot.arrow_schema()?);
+
         let logical_schema = df_logical_schema(self.snapshot, &config)?;
 
         let logical_schema = if let Some(used_columns) = self.projection {
@@ -742,6 +748,7 @@ pub struct DeltaTableProvider {
     log_store: LogStoreRef,
     config: DeltaScanConfig,
     schema: Arc<ArrowSchema>,
+    files: Option<Vec<Add>>,
 }
 
 impl DeltaTableProvider {
@@ -756,8 +763,15 @@ impl DeltaTableProvider {
             snapshot,
             log_store,
             config,
+            files: None,
         })
     }
+
+    /// Define which files to consider while building a scan, for advanced usecases
+    pub fn with_files(mut self, files: Vec<Add>) -> DeltaTableProvider {
+        self.files = Some(files);
+        self
+    }
 }
 
 #[async_trait]
@@ -792,15 +806,16 @@ impl TableProvider for DeltaTableProvider {
         register_store(self.log_store.clone(), session.runtime_env().clone());
         let filter_expr = conjunction(filters.iter().cloned());
 
-        let scan = DeltaScanBuilder::new(&self.snapshot, self.log_store.clone(), session)
+        let mut scan = DeltaScanBuilder::new(&self.snapshot, self.log_store.clone(), session)
             .with_projection(projection)
             .with_limit(limit)
             .with_filter(filter_expr)
-            .with_scan_config(self.config.clone())
-            .build()
-            .await?;
+            .with_scan_config(self.config.clone());
 
-        Ok(Arc::new(scan))
+        if let Some(files) = &self.files {
+            scan = scan.with_files(files);
+        }
+        Ok(Arc::new(scan.build().await?))
     }
 
     fn supports_filters_pushdown(
diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs
index 7b88d0c4d4..0029ce46b4 100644
--- a/crates/core/src/operations/delete.rs
+++ b/crates/core/src/operations/delete.rs
@@ -19,6 +19,8 @@
 
 use crate::logstore::LogStoreRef;
 use core::panic;
+use datafusion::dataframe::DataFrame;
+use datafusion::datasource::provider_as_source;
 use datafusion::execution::context::{SessionContext, SessionState};
 use datafusion::physical_plan::filter::FilterExec;
 use datafusion::physical_plan::projection::ProjectionExec;
@@ -26,7 +28,7 @@ use datafusion::physical_plan::ExecutionPlan;
 use datafusion::prelude::Expr;
 use datafusion_common::scalar::ScalarValue;
 use datafusion_common::DFSchema;
-use datafusion_expr::lit;
+use datafusion_expr::{lit, LogicalPlanBuilder};
 use datafusion_physical_expr::{
     expressions::{self},
     PhysicalExpr,
@@ -45,7 +47,8 @@ use super::write::{execute_non_empty_expr_cdc, WriterStatsConfig};
 
 use crate::delta_datafusion::expr::fmt_expr_to_sql;
 use crate::delta_datafusion::{
-    find_files, register_store, DataFusionMixins, DeltaScanBuilder, DeltaSessionContext,
+    find_files, register_store, DataFusionMixins, DeltaScanBuilder, DeltaScanConfigBuilder,
+    DeltaSessionContext, DeltaTableProvider,
 };
 use crate::errors::DeltaResult;
 use crate::kernel::{Action, Add, Remove};
@@ -136,26 +139,29 @@ async fn excute_non_empty_expr(
     log_store: LogStoreRef,
     state: &SessionState,
     expression: &Expr,
-    metrics: &mut DeleteMetrics,
     rewrite: &[Add],
+    metrics: &mut DeleteMetrics,
     writer_properties: Option<WriterProperties>,
     partition_scan: bool,
 ) -> DeltaResult<Vec<Action>> {
     // For each identified file perform a parquet scan + filter + limit (1) + count.
     // If returned count is not zero then append the file to be rewritten and removed from the log. Otherwise do nothing to the file.
     let mut actions: Vec<Action> = Vec::new();
-
-    let input_schema = snapshot.input_schema()?;
-    let input_dfschema: DFSchema = input_schema.clone().as_ref().clone().try_into()?;
     let table_partition_cols = snapshot.metadata().partition_columns.clone();
-    let scan = DeltaScanBuilder::new(snapshot, log_store.clone(), state)
-        .with_files(rewrite)
-        // Use input schema which doesn't wrap partition values, otherwise divide_by_partition_value won't work on UTF8 partitions
-        // Since it can't fetch a scalar from a dictionary type
+
+    let scan_config = DeltaScanConfigBuilder::default()
+        .with_file_column(false)
         .with_schema(snapshot.input_schema()?)
-        .build()
-        .await?;
-    let scan = Arc::new(scan);
+        .build(&snapshot)?;
+
+    let target_provider = Arc::new(
+        DeltaTableProvider::try_new(snapshot.clone(), log_store.clone(), scan_config.clone())?
+            .with_files(rewrite.to_vec()),
+    );
+    let target_provider = provider_as_source(target_provider);
+    let plan = LogicalPlanBuilder::scan("target", target_provider.clone(), None)?.build()?;
+
+    let df = DataFrame::new(state.clone(), plan);
 
     let writer_stats_config = WriterStatsConfig::new(
         snapshot.table_config().num_indexed_cols(),
@@ -169,9 +175,11 @@ async fn excute_non_empty_expr(
         // Apply the negation of the filter and rewrite files
         let negated_expression = Expr::Not(Box::new(Expr::IsTrue(Box::new(expression.clone()))));
 
-        let predicate_expr = state.create_physical_expr(negated_expression, &input_dfschema)?;
-        let filter: Arc<dyn ExecutionPlan> =
-            Arc::new(FilterExec::try_new(predicate_expr, scan.clone())?);
+        let filter = df
+            .clone()
+            .filter(negated_expression)?
+            .create_physical_plan()
+            .await?;
 
         let add_actions: Vec<Action> = write_execution_plan(
             Some(snapshot),
@@ -191,7 +199,7 @@ async fn excute_non_empty_expr(
 
         actions.extend(add_actions);
 
-        let read_records = scan.parquet_scan.metrics().and_then(|m| m.output_rows());
+        let read_records = Some(df.clone().count().await?);
         let filter_records = filter.metrics().and_then(|m| m.output_rows());
         metrics.num_copied_rows = filter_records;
         metrics.num_deleted_rows = read_records
@@ -200,19 +208,31 @@ async fn excute_non_empty_expr(
     }
 
     // CDC logic, simply filters data with predicate and adds the _change_type="delete" as literal column
-    if let Some(cdc_actions) = execute_non_empty_expr_cdc(
-        snapshot,
-        log_store,
-        state.clone(),
-        scan,
-        input_dfschema,
-        expression,
-        table_partition_cols,
-        writer_properties,
-        writer_stats_config,
-    )
-    .await?
-    {
+    if let Ok(true) = should_write_cdc(&snapshot) {
+        // Create CDC scan
+        let change_type_lit = lit(ScalarValue::Utf8(Some("delete".to_string())));
+        let cdc_filter = df
+            .filter(expression.clone())?
+            .with_column("_change_type", change_type_lit)?
+            .create_physical_plan()
+            .await?;
+
+        use crate::operations::write::write_execution_plan_cdc;
+        let cdc_actions = write_execution_plan_cdc(
+            Some(snapshot),
+            state.clone(),
+            cdc_filter,
+            table_partition_cols.clone(),
+            log_store.object_store(),
+            Some(snapshot.table_config().target_file_size() as usize),
+            None,
+            writer_properties,
+            false,
+            Some(SchemaMode::Overwrite), // If not overwrite, the plan schema is not taken but table schema, however we need the plan schema since it has the _change_type_col
+            writer_stats_config,
+            None,
+        )
+        .await?;
         actions.extend(cdc_actions)
     }
 
@@ -243,8 +263,8 @@ async fn execute(
             log_store.clone(),
             &state,
             &predicate,
-            &mut metrics,
             &candidates.candidates,
+            &mut metrics,
             writer_properties,
             candidates.partition_scan,
         )

From 7f28ddfb12e156fa2c8f40b01952b403726935c4 Mon Sep 17 00:00:00 2001
From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com>
Date: Sun, 4 Aug 2024 14:18:32 +0200
Subject: [PATCH 2/3] feat: add delta planner

---
 crates/core/src/delta_datafusion/mod.rs      |  1 +
 crates/core/src/delta_datafusion/physical.rs |  4 ++
 crates/core/src/delta_datafusion/planner.rs  | 59 ++++++++++++++++++++
 3 files changed, 64 insertions(+)
 create mode 100644 crates/core/src/delta_datafusion/planner.rs

diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs
index 0ec6b8d61d..9cb10b7461 100644
--- a/crates/core/src/delta_datafusion/mod.rs
+++ b/crates/core/src/delta_datafusion/mod.rs
@@ -92,6 +92,7 @@ pub mod cdf;
 pub mod expr;
 pub mod logical;
 pub mod physical;
+pub mod planner;
 
 mod find_files;
 mod schema_adapter;
diff --git a/crates/core/src/delta_datafusion/physical.rs b/crates/core/src/delta_datafusion/physical.rs
index adbb7fb4fe..c37b85101e 100644
--- a/crates/core/src/delta_datafusion/physical.rs
+++ b/crates/core/src/delta_datafusion/physical.rs
@@ -178,3 +178,7 @@ pub(crate) fn find_metric_node(
 
     None
 }
+
+pub(crate) fn get_metric(metrics: &MetricsSet, name: &str) -> usize {
+    metrics.sum_by_name(name).map(|m| m.as_usize()).unwrap_or(0)
+}
diff --git a/crates/core/src/delta_datafusion/planner.rs b/crates/core/src/delta_datafusion/planner.rs
new file mode 100644
index 0000000000..9c31410cec
--- /dev/null
+++ b/crates/core/src/delta_datafusion/planner.rs
@@ -0,0 +1,59 @@
+//! Custom planners for datafusion so that you can convert custom nodes, can be used
+//! to trace custom metrics in an operation
+//!
+//! # Example
+//!
+//! #[derive(Clone)]
+//! struct MergeMetricExtensionPlanner {}
+//!
+//! #[async_trait]
+//! impl ExtensionPlanner for MergeMetricExtensionPlanner {
+//!     async fn plan_extension(
+//!         &self,
+//!         planner: &dyn PhysicalPlanner,
+//!         node: &dyn UserDefinedLogicalNode,
+//!         _logical_inputs: &[&LogicalPlan],
+//!         physical_inputs: &[Arc<dyn ExecutionPlan>],
+//!         session_state: &SessionState,
+//!     ) -> DataFusionResult<Option<Arc<dyn ExecutionPlan>>> {}
+//!
+//! let merge_planner = DeltaPlanner::<MergeMetricExtensionPlanner> {
+//!     extension_planner: MergeMetricExtensionPlanner {}
+//! };
+//!
+//! let state = state.with_query_planner(Arc::new(merge_planner));
+use std::sync::Arc;
+
+use crate::delta_datafusion::DataFusionResult;
+use async_trait::async_trait;
+use datafusion::physical_planner::PhysicalPlanner;
+use datafusion::{
+    execution::{context::QueryPlanner, session_state::SessionState},
+    physical_plan::ExecutionPlan,
+    physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner},
+};
+use datafusion_expr::LogicalPlan;
+
+/// Deltaplanner
+pub struct DeltaPlanner<T: ExtensionPlanner> {
+    /// custom extension planner
+    pub extension_planner: T,
+}
+
+#[async_trait]
+impl<T: ExtensionPlanner + std::marker::Send + Sync + 'static + Clone> QueryPlanner
+    for DeltaPlanner<T>
+{
+    async fn create_physical_plan(
+        &self,
+        logical_plan: &LogicalPlan,
+        session_state: &SessionState,
+    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
+        let planner = Arc::new(Box::new(DefaultPhysicalPlanner::with_extension_planners(
+            vec![Arc::new(self.extension_planner.clone())],
+        )));
+        planner
+            .create_physical_plan(logical_plan, session_state)
+            .await
+    }
+}

From a6aeeca967111480fa492dd4d1a0388e392cbf5c Mon Sep 17 00:00:00 2001
From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com>
Date: Sun, 4 Aug 2024 14:18:54 +0200
Subject: [PATCH 3/3] feat: use delta planner in delete and merge

---
 crates/core/src/delta_datafusion/mod.rs     |   8 +-
 crates/core/src/delta_datafusion/planner.rs |   4 +-
 crates/core/src/operations/delete.rs        | 122 ++++++++++++++------
 crates/core/src/operations/merge/mod.rs     |  43 ++-----
 crates/core/src/operations/write.rs         |  23 +++-
 5 files changed, 120 insertions(+), 80 deletions(-)

diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs
index 9cb10b7461..c2b410cb74 100644
--- a/crates/core/src/delta_datafusion/mod.rs
+++ b/crates/core/src/delta_datafusion/mod.rs
@@ -523,10 +523,10 @@ impl<'a> DeltaScanBuilder<'a> {
             None => DeltaScanConfigBuilder::new().build(self.snapshot)?,
         };
 
-        let schema = config
-            .schema
-            .clone()
-            .unwrap_or(self.snapshot.arrow_schema()?);
+        let schema = match config.schema.clone() {
+            Some(value) => Ok(value),
+            None => self.snapshot.arrow_schema(),
+        }?;
 
         let logical_schema = df_logical_schema(self.snapshot, &config)?;
 
diff --git a/crates/core/src/delta_datafusion/planner.rs b/crates/core/src/delta_datafusion/planner.rs
index 9c31410cec..f0af1092ca 100644
--- a/crates/core/src/delta_datafusion/planner.rs
+++ b/crates/core/src/delta_datafusion/planner.rs
@@ -41,9 +41,7 @@ pub struct DeltaPlanner<T: ExtensionPlanner> {
 }
 
 #[async_trait]
-impl<T: ExtensionPlanner + std::marker::Send + Sync + 'static + Clone> QueryPlanner
-    for DeltaPlanner<T>
-{
+impl<T: ExtensionPlanner + Send + Sync + 'static + Clone> QueryPlanner for DeltaPlanner<T> {
     async fn create_physical_plan(
         &self,
         logical_plan: &LogicalPlan,
diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs
index 0029ce46b4..7ab4ce79f2 100644
--- a/crates/core/src/operations/delete.rs
+++ b/crates/core/src/operations/delete.rs
@@ -17,22 +17,22 @@
 //!     .await?;
 //! ````
 
+use crate::delta_datafusion::logical::MetricObserver;
+use crate::delta_datafusion::physical::{find_metric_node, get_metric, MetricObserverExec};
+use crate::delta_datafusion::planner::DeltaPlanner;
 use crate::logstore::LogStoreRef;
-use core::panic;
+use async_trait::async_trait;
 use datafusion::dataframe::DataFrame;
 use datafusion::datasource::provider_as_source;
+use datafusion::error::Result as DataFusionResult;
 use datafusion::execution::context::{SessionContext, SessionState};
-use datafusion::physical_plan::filter::FilterExec;
-use datafusion::physical_plan::projection::ProjectionExec;
+use datafusion::physical_plan::metrics::MetricBuilder;
 use datafusion::physical_plan::ExecutionPlan;
+use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner};
 use datafusion::prelude::Expr;
-use datafusion_common::scalar::ScalarValue;
-use datafusion_common::DFSchema;
-use datafusion_expr::{lit, LogicalPlanBuilder};
-use datafusion_physical_expr::{
-    expressions::{self},
-    PhysicalExpr,
-};
+use datafusion_common::ScalarValue;
+use datafusion_expr::{lit, Extension, LogicalPlan, LogicalPlanBuilder, UserDefinedLogicalNode};
+
 use futures::future::BoxFuture;
 use std::sync::Arc;
 use std::time::{Instant, SystemTime, UNIX_EPOCH};
@@ -43,19 +43,22 @@ use serde::Serialize;
 use super::cdc::should_write_cdc;
 use super::datafusion_utils::Expression;
 use super::transaction::{CommitBuilder, CommitProperties, PROTOCOL};
-use super::write::{execute_non_empty_expr_cdc, WriterStatsConfig};
+use super::write::WriterStatsConfig;
 
 use crate::delta_datafusion::expr::fmt_expr_to_sql;
 use crate::delta_datafusion::{
-    find_files, register_store, DataFusionMixins, DeltaScanBuilder, DeltaScanConfigBuilder,
-    DeltaSessionContext, DeltaTableProvider,
+    find_files, register_store, DataFusionMixins, DeltaScanConfigBuilder, DeltaSessionContext,
+    DeltaTableProvider,
 };
 use crate::errors::DeltaResult;
 use crate::kernel::{Action, Add, Remove};
-use crate::operations::write::{write_execution_plan, SchemaMode};
+use crate::operations::write::{write_execution_plan, write_execution_plan_cdc, SchemaMode};
 use crate::protocol::DeltaOperation;
 use crate::table::state::DeltaTableState;
-use crate::DeltaTable;
+use crate::{DeltaTable, DeltaTableError};
+
+const SOURCE_COUNT_ID: &str = "delete_source_count";
+const SOURCE_COUNT_METRIC: &str = "num_source_rows";
 
 /// Delete Records from the Delta Table.
 /// See this module's documentation for more information
@@ -82,9 +85,9 @@ pub struct DeleteMetrics {
     /// Number of files removed
     pub num_removed_files: usize,
     /// Number of rows removed
-    pub num_deleted_rows: Option<usize>,
+    pub num_deleted_rows: usize,
     /// Number of rows copied in the process of deleting files
-    pub num_copied_rows: Option<usize>,
+    pub num_copied_rows: usize,
     /// Time taken to execute the entire operation
     pub execution_time_ms: u64,
     /// Time taken to scan the file for matches
@@ -133,6 +136,36 @@ impl DeleteBuilder {
     }
 }
 
+#[derive(Clone)]
+struct DeleteMetricExtensionPlanner {}
+
+#[async_trait]
+impl ExtensionPlanner for DeleteMetricExtensionPlanner {
+    async fn plan_extension(
+        &self,
+        _planner: &dyn PhysicalPlanner,
+        node: &dyn UserDefinedLogicalNode,
+        _logical_inputs: &[&LogicalPlan],
+        physical_inputs: &[Arc<dyn ExecutionPlan>],
+        _session_state: &SessionState,
+    ) -> DataFusionResult<Option<Arc<dyn ExecutionPlan>>> {
+        if let Some(metric_observer) = node.as_any().downcast_ref::<MetricObserver>() {
+            if metric_observer.id.eq(SOURCE_COUNT_ID) {
+                return Ok(Some(MetricObserverExec::try_new(
+                    SOURCE_COUNT_ID.into(),
+                    physical_inputs,
+                    |batch, metrics| {
+                        MetricBuilder::new(metrics)
+                            .global_counter(SOURCE_COUNT_METRIC)
+                            .add(batch.num_rows());
+                    },
+                )?));
+            }
+        }
+        Ok(None)
+    }
+}
+
 #[allow(clippy::too_many_arguments)]
 async fn excute_non_empty_expr(
     snapshot: &DeltaTableState,
@@ -149,19 +182,33 @@ async fn excute_non_empty_expr(
     let mut actions: Vec<Action> = Vec::new();
     let table_partition_cols = snapshot.metadata().partition_columns.clone();
 
+    let delete_planner = DeltaPlanner::<DeleteMetricExtensionPlanner> {
+        extension_planner: DeleteMetricExtensionPlanner {},
+    };
+
+    let state = state.clone().with_query_planner(Arc::new(delete_planner));
+
     let scan_config = DeltaScanConfigBuilder::default()
         .with_file_column(false)
         .with_schema(snapshot.input_schema()?)
-        .build(&snapshot)?;
+        .build(snapshot)?;
 
     let target_provider = Arc::new(
         DeltaTableProvider::try_new(snapshot.clone(), log_store.clone(), scan_config.clone())?
             .with_files(rewrite.to_vec()),
     );
     let target_provider = provider_as_source(target_provider);
-    let plan = LogicalPlanBuilder::scan("target", target_provider.clone(), None)?.build()?;
+    let source = LogicalPlanBuilder::scan("target", target_provider.clone(), None)?.build()?;
+
+    let source = LogicalPlan::Extension(Extension {
+        node: Arc::new(MetricObserver {
+            id: "delete_source_count".into(),
+            input: source,
+            enable_pushdown: false,
+        }),
+    });
 
-    let df = DataFrame::new(state.clone(), plan);
+    let df = DataFrame::new(state.clone(), source);
 
     let writer_stats_config = WriterStatsConfig::new(
         snapshot.table_config().num_indexed_cols(),
@@ -199,16 +246,19 @@ async fn excute_non_empty_expr(
 
         actions.extend(add_actions);
 
-        let read_records = Some(df.clone().count().await?);
-        let filter_records = filter.metrics().and_then(|m| m.output_rows());
+        let source_count = find_metric_node(SOURCE_COUNT_ID, &filter).ok_or_else(|| {
+            DeltaTableError::Generic("Unable to locate expected metric node".into())
+        })?;
+        let source_count_metrics = source_count.metrics().unwrap();
+        let read_records = get_metric(&source_count_metrics, SOURCE_COUNT_METRIC);
+        let filter_records = filter.metrics().and_then(|m| m.output_rows()).unwrap_or(0);
+
         metrics.num_copied_rows = filter_records;
-        metrics.num_deleted_rows = read_records
-            .zip(filter_records)
-            .map(|(read, filter)| read - filter);
+        metrics.num_deleted_rows = read_records - filter_records;
     }
 
     // CDC logic, simply filters data with predicate and adds the _change_type="delete" as literal column
-    if let Ok(true) = should_write_cdc(&snapshot) {
+    if let Ok(true) = should_write_cdc(snapshot) {
         // Create CDC scan
         let change_type_lit = lit(ScalarValue::Utf8(Some("delete".to_string())));
         let cdc_filter = df
@@ -460,8 +510,8 @@ mod tests {
         assert_eq!(table.get_files_count(), 0);
         assert_eq!(metrics.num_added_files, 0);
         assert_eq!(metrics.num_removed_files, 1);
-        assert_eq!(metrics.num_deleted_rows, None);
-        assert_eq!(metrics.num_copied_rows, None);
+        assert_eq!(metrics.num_deleted_rows, 0);
+        assert_eq!(metrics.num_copied_rows, 0);
 
         let commit_info = table.history(None).await.unwrap();
         let last_commit = &commit_info[0];
@@ -476,8 +526,8 @@ mod tests {
         assert_eq!(table.version(), 2);
         assert_eq!(metrics.num_added_files, 0);
         assert_eq!(metrics.num_removed_files, 0);
-        assert_eq!(metrics.num_deleted_rows, None);
-        assert_eq!(metrics.num_copied_rows, None);
+        assert_eq!(metrics.num_deleted_rows, 0);
+        assert_eq!(metrics.num_copied_rows, 0);
     }
 
     #[tokio::test]
@@ -548,8 +598,8 @@ mod tests {
         assert_eq!(metrics.num_added_files, 1);
         assert_eq!(metrics.num_removed_files, 1);
         assert!(metrics.scan_time_ms > 0);
-        assert_eq!(metrics.num_deleted_rows, Some(1));
-        assert_eq!(metrics.num_copied_rows, Some(3));
+        assert_eq!(metrics.num_deleted_rows, 1);
+        assert_eq!(metrics.num_copied_rows, 3);
 
         let commit_info = table.history(None).await.unwrap();
         let last_commit = &commit_info[0];
@@ -703,8 +753,8 @@ mod tests {
 
         assert_eq!(metrics.num_added_files, 0);
         assert_eq!(metrics.num_removed_files, 1);
-        assert_eq!(metrics.num_deleted_rows, None);
-        assert_eq!(metrics.num_copied_rows, None);
+        assert_eq!(metrics.num_deleted_rows, 0);
+        assert_eq!(metrics.num_copied_rows, 0);
         assert!(metrics.scan_time_ms > 0);
 
         let expected = vec![
@@ -764,8 +814,8 @@ mod tests {
 
         assert_eq!(metrics.num_added_files, 0);
         assert_eq!(metrics.num_removed_files, 1);
-        assert_eq!(metrics.num_deleted_rows, Some(1));
-        assert_eq!(metrics.num_copied_rows, Some(0));
+        assert_eq!(metrics.num_deleted_rows, 1);
+        assert_eq!(metrics.num_copied_rows, 0);
         assert!(metrics.scan_time_ms > 0);
 
         let expected = [
diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs
index 73a97436f1..ea54e4e211 100644
--- a/crates/core/src/operations/merge/mod.rs
+++ b/crates/core/src/operations/merge/mod.rs
@@ -35,15 +35,13 @@ use std::time::Instant;
 use async_trait::async_trait;
 use datafusion::datasource::provider_as_source;
 use datafusion::error::Result as DataFusionResult;
-use datafusion::execution::context::{QueryPlanner, SessionConfig};
+use datafusion::execution::context::SessionConfig;
 use datafusion::logical_expr::build_join_schema;
-use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner};
+use datafusion::physical_plan::metrics::MetricBuilder;
+use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner};
 use datafusion::{
     execution::context::SessionState,
-    physical_plan::{
-        metrics::{MetricBuilder, MetricsSet},
-        ExecutionPlan,
-    },
+    physical_plan::ExecutionPlan,
     prelude::{DataFrame, SessionContext},
 };
 use datafusion_common::tree_node::{Transformed, TreeNode};
@@ -68,7 +66,8 @@ use super::datafusion_utils::{into_expr, maybe_into_expr, Expression};
 use super::transaction::{CommitProperties, PROTOCOL};
 use crate::delta_datafusion::expr::{fmt_expr_to_sql, parse_predicate_expression};
 use crate::delta_datafusion::logical::MetricObserver;
-use crate::delta_datafusion::physical::{find_metric_node, MetricObserverExec};
+use crate::delta_datafusion::physical::{find_metric_node, get_metric, MetricObserverExec};
+use crate::delta_datafusion::planner::DeltaPlanner;
 use crate::delta_datafusion::{
     execute_plan_to_batch, register_store, DeltaColumn, DeltaScanConfigBuilder, DeltaSessionConfig,
     DeltaTableProvider,
@@ -576,7 +575,7 @@ pub struct MergeMetrics {
     /// Time taken to rewrite the matched files
     pub rewrite_time_ms: u64,
 }
-
+#[derive(Clone)]
 struct MergeMetricExtensionPlanner {}
 
 #[async_trait]
@@ -1017,7 +1016,11 @@ async fn execute(
     let exec_start = Instant::now();
 
     let current_metadata = snapshot.metadata();
-    let state = state.with_query_planner(Arc::new(MergePlanner {}));
+    let merge_planner = DeltaPlanner::<MergeMetricExtensionPlanner> {
+        extension_planner: MergeMetricExtensionPlanner {},
+    };
+
+    let state = state.with_query_planner(Arc::new(merge_planner));
 
     // TODO: Given the join predicate, remove any expression that involve the
     // source table and keep expressions that only involve the target table.
@@ -1486,9 +1489,6 @@ async fn execute(
 
     let source_count_metrics = source_count.metrics().unwrap();
     let target_count_metrics = op_count.metrics().unwrap();
-    fn get_metric(metrics: &MetricsSet, name: &str) -> usize {
-        metrics.sum_by_name(name).map(|m| m.as_usize()).unwrap_or(0)
-    }
 
     metrics.num_source_rows = get_metric(&source_count_metrics, SOURCE_COUNT_METRIC);
     metrics.num_target_rows_inserted = get_metric(&target_count_metrics, TARGET_INSERTED_METRIC);
@@ -1555,25 +1555,6 @@ fn remove_table_alias(expr: Expr, table_alias: &str) -> Expr {
     .data
 }
 
-// TODO: Abstract MergePlanner into DeltaPlanner to support other delta operations in the future.
-struct MergePlanner {}
-
-#[async_trait]
-impl QueryPlanner for MergePlanner {
-    async fn create_physical_plan(
-        &self,
-        logical_plan: &LogicalPlan,
-        session_state: &SessionState,
-    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
-        let planner = Arc::new(Box::new(DefaultPhysicalPlanner::with_extension_planners(
-            vec![Arc::new(MergeMetricExtensionPlanner {})],
-        )));
-        planner
-            .create_physical_plan(logical_plan, session_state)
-            .await
-    }
-}
-
 impl std::future::IntoFuture for MergeBuilder {
     type Output = DeltaResult<(DeltaTable, MergeMetrics)>;
     type IntoFuture = BoxFuture<'static, Self::Output>;
diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs
index 8386c07a9a..bc5c1850d8 100644
--- a/crates/core/src/operations/write.rs
+++ b/crates/core/src/operations/write.rs
@@ -54,7 +54,9 @@ use super::writer::{DeltaWriter, WriterConfig};
 use super::CreateBuilder;
 use crate::delta_datafusion::expr::fmt_expr_to_sql;
 use crate::delta_datafusion::expr::parse_predicate_expression;
-use crate::delta_datafusion::{find_files, register_store, DeltaScanBuilder};
+use crate::delta_datafusion::{
+    find_files, register_store, DeltaScanBuilder, DeltaScanConfigBuilder,
+};
 use crate::delta_datafusion::{DataFusionMixins, DeltaDataChecker};
 use crate::errors::{DeltaResult, DeltaTableError};
 use crate::kernel::{Action, Add, AddCDCFile, Metadata, PartitionsExt, Remove, StructType};
@@ -573,11 +575,15 @@ async fn execute_non_empty_expr(
     let input_schema = snapshot.input_schema()?;
     let input_dfschema: DFSchema = input_schema.clone().as_ref().clone().try_into()?;
 
+    let scan_config = DeltaScanConfigBuilder::new()
+        .with_schema(snapshot.input_schema()?)
+        .build(snapshot)?;
+
     let scan = DeltaScanBuilder::new(snapshot, log_store.clone(), &state)
         .with_files(rewrite)
         // Use input schema which doesn't wrap partition values, otherwise divide_by_partition_value won't work on UTF8 partitions
         // Since it can't fetch a scalar from a dictionary type
-        .with_schema(snapshot.input_schema()?)
+        .with_scan_config(scan_config)
         .build()
         .await?;
     let scan = Arc::new(scan);
@@ -711,7 +717,7 @@ async fn prepare_predicate_actions(
         find_files(snapshot, log_store.clone(), &state, Some(predicate.clone())).await?;
 
     let mut actions = execute_non_empty_expr(
-        &snapshot,
+        snapshot,
         log_store,
         state,
         partition_columns,
@@ -752,19 +758,24 @@ async fn execute_non_empty_expr_cdc_all_actions(
     writer_stats_config: WriterStatsConfig,
 ) -> DeltaResult<Option<Vec<Action>>> {
     let current_state_add_actions = &snapshot.file_actions()?;
+
+    let scan_config = DeltaScanConfigBuilder::new()
+        .with_schema(snapshot.input_schema()?)
+        .build(snapshot)?;
+
     // Since all files get removed, check to write CDC
     let scan = DeltaScanBuilder::new(snapshot, log_store.clone(), &state)
         .with_files(current_state_add_actions)
         // Use input schema which doesn't wrap partition values, otherwise divide_by_partition_value won't work on UTF8 partitions
         // Since it can't fetch a scalar from a dictionary type
-        .with_schema(snapshot.input_schema()?)
+        .with_scan_config(scan_config)
         .build()
         .await?;
 
     let input_schema = snapshot.input_schema()?;
     let input_dfschema: DFSchema = input_schema.clone().as_ref().clone().try_into()?;
 
-    Ok(execute_non_empty_expr_cdc(
+    execute_non_empty_expr_cdc(
         snapshot,
         log_store,
         state,
@@ -775,7 +786,7 @@ async fn execute_non_empty_expr_cdc_all_actions(
         writer_properties,
         writer_stats_config,
     )
-    .await?)
+    .await
 }
 
 impl std::future::IntoFuture for WriteBuilder {