Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use logical plan in delete, delta planner refactoring #2725

Merged
merged 3 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 33 additions & 17 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ pub mod cdf;
pub mod expr;
pub mod logical;
pub mod physical;
pub mod planner;

mod find_files;
mod schema_adapter;
Expand Down Expand Up @@ -344,7 +345,10 @@ pub struct DeltaScanConfigBuilder {
file_column_name: Option<String>,
/// Whether to wrap partition values in a dictionary encoding to potentially save space
wrap_partition_values: Option<bool>,
/// Whether to push down filter in end result or just prune the files
enable_parquet_pushdown: bool,
/// Schema to scan table with
schema: Option<SchemaRef>,
}

impl Default for DeltaScanConfigBuilder {
Expand All @@ -354,6 +358,7 @@ impl Default for DeltaScanConfigBuilder {
file_column_name: None,
wrap_partition_values: None,
enable_parquet_pushdown: true,
schema: None,
}
}
}
Expand Down Expand Up @@ -392,6 +397,12 @@ impl DeltaScanConfigBuilder {
self
}

/// Use the provided [SchemaRef] for the [DeltaScan]
pub fn with_schema(mut self, schema: SchemaRef) -> Self {
self.schema = Some(schema);
self
}

/// Build a DeltaScanConfig and ensure no column name conflicts occur during downstream processing
pub fn build(&self, snapshot: &DeltaTableState) -> DeltaResult<DeltaScanConfig> {
let file_column_name = if self.include_file_column {
Expand Down Expand Up @@ -433,6 +444,7 @@ impl DeltaScanConfigBuilder {
file_column_name,
wrap_partition_values: self.wrap_partition_values.unwrap_or(true),
enable_parquet_pushdown: self.enable_parquet_pushdown,
schema: self.schema.clone(),
})
}
}
Expand All @@ -446,6 +458,8 @@ pub struct DeltaScanConfig {
pub wrap_partition_values: bool,
/// Allow pushdown of the scan filter
pub enable_parquet_pushdown: bool,
/// Schema to read as
pub schema: Option<SchemaRef>,
}

#[derive(Debug)]
Expand All @@ -458,7 +472,6 @@ pub(crate) struct DeltaScanBuilder<'a> {
limit: Option<usize>,
files: Option<&'a [Add]>,
config: Option<DeltaScanConfig>,
schema: Option<SchemaRef>,
}

impl<'a> DeltaScanBuilder<'a> {
Expand All @@ -476,7 +489,6 @@ impl<'a> DeltaScanBuilder<'a> {
limit: None,
files: None,
config: None,
schema: None,
}
}

Expand Down Expand Up @@ -505,22 +517,17 @@ impl<'a> DeltaScanBuilder<'a> {
self
}

/// Use the provided [SchemaRef] for the [DeltaScan]
pub fn with_schema(mut self, schema: SchemaRef) -> Self {
self.schema = Some(schema);
self
}

pub async fn build(self) -> DeltaResult<DeltaScan> {
let config = match self.config {
Some(config) => config,
None => DeltaScanConfigBuilder::new().build(self.snapshot)?,
};

let schema = match self.schema {
Some(schema) => schema,
None => self.snapshot.arrow_schema()?,
};
let schema = match config.schema.clone() {
Some(value) => Ok(value),
None => self.snapshot.arrow_schema(),
}?;

let logical_schema = df_logical_schema(self.snapshot, &config)?;

let logical_schema = if let Some(used_columns) = self.projection {
Expand Down Expand Up @@ -742,6 +749,7 @@ pub struct DeltaTableProvider {
log_store: LogStoreRef,
config: DeltaScanConfig,
schema: Arc<ArrowSchema>,
files: Option<Vec<Add>>,
}

impl DeltaTableProvider {
Expand All @@ -756,8 +764,15 @@ impl DeltaTableProvider {
snapshot,
log_store,
config,
files: None,
})
}

/// Define which files to consider while building a scan, for advanced usecases
pub fn with_files(mut self, files: Vec<Add>) -> DeltaTableProvider {
self.files = Some(files);
self
}
}

#[async_trait]
Expand Down Expand Up @@ -792,15 +807,16 @@ impl TableProvider for DeltaTableProvider {
register_store(self.log_store.clone(), session.runtime_env().clone());
let filter_expr = conjunction(filters.iter().cloned());

let scan = DeltaScanBuilder::new(&self.snapshot, self.log_store.clone(), session)
let mut scan = DeltaScanBuilder::new(&self.snapshot, self.log_store.clone(), session)
.with_projection(projection)
.with_limit(limit)
.with_filter(filter_expr)
.with_scan_config(self.config.clone())
.build()
.await?;
.with_scan_config(self.config.clone());

Ok(Arc::new(scan))
if let Some(files) = &self.files {
scan = scan.with_files(files);
}
Ok(Arc::new(scan.build().await?))
}

fn supports_filters_pushdown(
Expand Down
4 changes: 4 additions & 0 deletions crates/core/src/delta_datafusion/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,3 +178,7 @@ pub(crate) fn find_metric_node(

None
}

pub(crate) fn get_metric(metrics: &MetricsSet, name: &str) -> usize {
metrics.sum_by_name(name).map(|m| m.as_usize()).unwrap_or(0)
}
57 changes: 57 additions & 0 deletions crates/core/src/delta_datafusion/planner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
//! Custom planners for datafusion so that you can convert custom nodes, can be used
//! to trace custom metrics in an operation
//!
//! # Example
//!
//! #[derive(Clone)]
//! struct MergeMetricExtensionPlanner {}
//!
//! #[async_trait]
//! impl ExtensionPlanner for MergeMetricExtensionPlanner {
//! async fn plan_extension(
//! &self,
//! planner: &dyn PhysicalPlanner,
//! node: &dyn UserDefinedLogicalNode,
//! _logical_inputs: &[&LogicalPlan],
//! physical_inputs: &[Arc<dyn ExecutionPlan>],
//! session_state: &SessionState,
//! ) -> DataFusionResult<Option<Arc<dyn ExecutionPlan>>> {}
//!
//! let merge_planner = DeltaPlanner::<MergeMetricExtensionPlanner> {
//! extension_planner: MergeMetricExtensionPlanner {}
//! };
//!
//! let state = state.with_query_planner(Arc::new(merge_planner));
use std::sync::Arc;

use crate::delta_datafusion::DataFusionResult;
use async_trait::async_trait;
use datafusion::physical_planner::PhysicalPlanner;
use datafusion::{
execution::{context::QueryPlanner, session_state::SessionState},
physical_plan::ExecutionPlan,
physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner},
};
use datafusion_expr::LogicalPlan;

/// Deltaplanner
pub struct DeltaPlanner<T: ExtensionPlanner> {
/// custom extension planner
pub extension_planner: T,
}

#[async_trait]
impl<T: ExtensionPlanner + Send + Sync + 'static + Clone> QueryPlanner for DeltaPlanner<T> {
async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
session_state: &SessionState,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
let planner = Arc::new(Box::new(DefaultPhysicalPlanner::with_extension_planners(
vec![Arc::new(self.extension_planner.clone())],
)));
planner
.create_physical_plan(logical_plan, session_state)
.await
}
}
Loading
Loading