diff --git a/datafusion-examples/examples/planner_api.rs b/datafusion-examples/examples/planner_api.rs index 35cf766ba1af..6663f2269cc1 100644 --- a/datafusion-examples/examples/planner_api.rs +++ b/datafusion-examples/examples/planner_api.rs @@ -15,10 +15,10 @@ // specific language governing permissions and limitations // under the License. -use datafusion::error::Result; use datafusion::physical_plan::displayable; use datafusion::physical_planner::DefaultPhysicalPlanner; use datafusion::prelude::*; +use datafusion::{error::Result, physical_plan::ExecutionPlan}; use datafusion_expr::{LogicalPlan, PlanType}; /// This example demonstrates the process of converting logical plan @@ -82,9 +82,35 @@ async fn to_physical_plan_in_one_api_demo( .plan ); + let traversal = extract_node_ids_from_execution_plan_tree(physical_plan.as_ref()); + let expected_traversal = vec![ + Some(0), + Some(1), + Some(2), + Some(3), + Some(4), + Some(5), + Some(6), + Some(7), + Some(8), + Some(9), + ]; + assert_eq!(expected_traversal, traversal); Ok(()) } +fn extract_node_ids_from_execution_plan_tree( + physical_plan: &dyn ExecutionPlan, +) -> Vec> { + let mut traversed_nodes: Vec> = vec![]; + for child in physical_plan.children() { + let node_ids = extract_node_ids_from_execution_plan_tree(child.as_ref()); + traversed_nodes.extend(node_ids); + } + traversed_nodes.push(physical_plan.properties().node_id()); + traversed_nodes +} + /// Converts a logical plan into a physical plan by utilizing the analyzer, /// optimizer, and query planner APIs separately. This flavor gives more /// control over the planning process. diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs index 39625a55ca15..4f6f1cd7d95b 100644 --- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs +++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs @@ -213,6 +213,22 @@ impl ExecutionPlan for ArrowExec { cache: self.cache.clone(), })) } + + fn with_node_id( + self: Arc, + _node_id: usize, + ) -> Result>> { + let new_cache = self.cache.clone().with_node_id(_node_id); + + Ok(Some(Arc::new(Self { + base_config: self.base_config.clone(), + projected_statistics: self.projected_statistics.clone(), + projected_schema: self.projected_schema.clone(), + projected_output_ordering: self.projected_output_ordering.clone(), + metrics: self.metrics.clone(), + cache: new_cache, + }))) + } } pub struct ArrowOpener { diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs b/datafusion/core/src/datasource/physical_plan/avro.rs index ce72c4087424..aeab0f8c9a43 100644 --- a/datafusion/core/src/datasource/physical_plan/avro.rs +++ b/datafusion/core/src/datasource/physical_plan/avro.rs @@ -181,6 +181,22 @@ impl ExecutionPlan for AvroExec { cache: self.cache.clone(), })) } + + fn with_node_id( + self: Arc, + _node_id: usize, + ) -> Result>> { + let new_cache = self.cache.clone().with_node_id(_node_id); + + Ok(Some(Arc::new(Self { + base_config: self.base_config.clone(), + projected_statistics: self.projected_statistics.clone(), + projected_schema: self.projected_schema.clone(), + projected_output_ordering: self.projected_output_ordering.clone(), + metrics: self.metrics.clone(), + cache: new_cache, + }))) + } } #[cfg(feature = "avro")] diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index 5ab32ed36e53..d4ff36d00712 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -448,6 +448,27 @@ impl ExecutionPlan for CsvExec { cache: self.cache.clone(), })) } + + fn with_node_id( + self: Arc, + _node_id: usize, + ) -> Result>> { + let new_cache = self.cache.clone().with_node_id(_node_id); + + Ok(Some(Arc::new(Self { + base_config: self.base_config.clone(), + projected_statistics: self.projected_statistics.clone(), + has_header: self.has_header, + delimiter: self.delimiter, + quote: self.quote, + escape: self.escape, + comment: self.comment, + newlines_in_values: self.newlines_in_values, + metrics: self.metrics.clone(), + file_compression_type: self.file_compression_type, + cache: new_cache, + }))) + } } /// A Config for [`CsvOpener`] diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index cf8f129a5036..c0a2ecf51fdb 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -222,6 +222,21 @@ impl ExecutionPlan for NdJsonExec { cache: self.cache.clone(), })) } + + fn with_node_id( + self: Arc, + _node_id: usize, + ) -> Result>> { + let new_cache = self.cache.clone().with_node_id(_node_id); + + Ok(Some(Arc::new(Self { + base_config: self.base_config.clone(), + projected_statistics: self.projected_statistics.clone(), + metrics: self.metrics.clone(), + file_compression_type: self.file_compression_type, + cache: new_cache, + }))) + } } /// A [`FileOpener`] that opens a JSON file and yields a [`FileOpenFuture`] diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 85d6f8db2373..c3bd5245613d 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -766,6 +766,29 @@ impl ExecutionPlan for ParquetExec { schema_adapter_factory: self.schema_adapter_factory.clone(), })) } + + fn with_node_id( + self: Arc, + _node_id: usize, + ) -> Result>> { + let new_cache = self.cache.clone().with_node_id(_node_id); + + let new_plan = Self { + base_config: self.base_config.clone(), + projected_statistics: self.projected_statistics.clone(), + metrics: self.metrics.clone(), + predicate: self.predicate.clone(), + pruning_predicate: self.pruning_predicate.clone(), + page_pruning_predicate: self.page_pruning_predicate.clone(), + metadata_size_hint: self.metadata_size_hint, + parquet_file_reader_factory: self.parquet_file_reader_factory.clone(), + cache: new_cache, + table_parquet_options: self.table_parquet_options.clone(), + schema_adapter_factory: self.schema_adapter_factory.clone(), + }; + + Ok(Some(Arc::new(new_plan))) + } } fn should_enable_page_index( diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index 88a90e1e1d09..a8c2cafe2ef3 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -63,6 +63,9 @@ use datafusion_optimizer::{ use datafusion_physical_expr::create_physical_expr; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::node_id::{ + annotate_node_id_for_execution_plan, NodeIdAnnotator, +}; use datafusion_physical_plan::ExecutionPlan; use datafusion_sql::parser::{DFParser, Statement}; use datafusion_sql::planner::{ContextProvider, ParserOptions, PlannerContext, SqlToRel}; @@ -732,9 +735,12 @@ impl SessionState { logical_plan: &LogicalPlan, ) -> datafusion_common::Result> { let logical_plan = self.optimize(logical_plan)?; - self.query_planner + let physical_plan = self + .query_planner .create_physical_plan(&logical_plan, self) - .await + .await?; + let mut id_annotator = NodeIdAnnotator::new(); + annotate_node_id_for_execution_plan(&physical_plan, &mut id_annotator) } /// Create a [`PhysicalExpr`] from an [`Expr`] after applying type diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 5aa255e7c341..b212d5896270 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -783,6 +783,24 @@ impl ExecutionPlan for AggregateExec { } } } + + fn with_node_id( + self: Arc, + _node_id: usize, + ) -> Result>> { + let mut new_plan = AggregateExec::try_new_with_schema( + self.mode, + self.group_by.clone(), + self.aggr_expr.clone(), + self.filter_expr.clone(), + self.input().clone(), + Arc::clone(&self.input_schema), + Arc::clone(&self.schema), + )?; + let new_props: PlanProperties = new_plan.cache.clone().with_node_id(_node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } fn create_schema( diff --git a/datafusion/physical-plan/src/analyze.rs b/datafusion/physical-plan/src/analyze.rs index 287446328f8d..40eec50fd6e2 100644 --- a/datafusion/physical-plan/src/analyze.rs +++ b/datafusion/physical-plan/src/analyze.rs @@ -204,6 +204,21 @@ impl ExecutionPlan for AnalyzeExec { futures::stream::once(output), ))) } + + fn with_node_id( + self: Arc, + _node_id: usize, + ) -> Result>> { + let mut new_plan = AnalyzeExec::new( + self.verbose, + self.show_statistics, + self.input.clone(), + self.schema.clone(), + ); + let new_props = new_plan.cache.clone().with_node_id(_node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } /// Creates the output of AnalyzeExec as a RecordBatch diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index 7caf5b8ab65a..feff79536969 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -199,6 +199,17 @@ impl ExecutionPlan for CoalesceBatchesExec { fn fetch(&self) -> Option { self.fetch } + + fn with_node_id( + self: Arc, + _node_id: usize, + ) -> Result>> { + let mut new_plan = + CoalesceBatchesExec::new(self.input.clone(), self.target_batch_size); + let new_props = new_plan.cache.clone().with_node_id(_node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } /// Stream for [`CoalesceBatchesExec`]. See [`CoalesceBatchesExec`] for more details. diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs index 486ae41901db..c818572e4fea 100644 --- a/datafusion/physical-plan/src/coalesce_partitions.rs +++ b/datafusion/physical-plan/src/coalesce_partitions.rs @@ -178,6 +178,16 @@ impl ExecutionPlan for CoalescePartitionsExec { fn supports_limit_pushdown(&self) -> bool { true } + + fn with_node_id( + self: Arc, + _node_id: usize, + ) -> Result>> { + let mut new_plan = CoalescePartitionsExec::new(self.input.clone()); + let new_props = new_plan.cache.clone().with_node_id(_node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } #[cfg(test)] diff --git a/datafusion/physical-plan/src/display.rs b/datafusion/physical-plan/src/display.rs index 0d2653c5c775..0377fcba2822 100644 --- a/datafusion/physical-plan/src/display.rs +++ b/datafusion/physical-plan/src/display.rs @@ -272,6 +272,11 @@ impl<'a, 'b> ExecutionPlanVisitor for IndentVisitor<'a, 'b> { fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result { write!(self.f, "{:indent$}", "", indent = self.indent * 2)?; plan.fmt_as(self.t, self.f)?; + let node_id = plan + .properties() + .node_id() + .map_or("None".to_string(), |id| format!(", node_id={}", id)); + write!(self.f, "{node_id}")?; match self.show_metrics { ShowMetrics::None => {} ShowMetrics::Aggregated => { @@ -392,11 +397,19 @@ impl ExecutionPlanVisitor for GraphvizVisitor<'_, '_> { "" }; + let node_id = plan + .properties() + .node_id() + .map_or("node_id=None".to_string(), |id| format!("node_id={}", id)); + self.graphviz_builder.add_node( self.f, id, &label, - Some(&format!("{}{}{}", metrics, delimiter, statistics)), + Some(&format!( + "{}{}{}{}", + metrics, delimiter, statistics, node_id + )), )?; if let Some(parent_node_id) = self.parents.last() { diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index e1182719293d..213837af2308 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -403,6 +403,15 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { fn fetch(&self) -> Option { None } + + /// If supported, returns a copy of this `ExecutionPlan` node with the specified + /// node_id. Returns `None` otherwise. + fn with_node_id( + self: Arc, + _node_id: usize, + ) -> Result>> { + Ok(None) + } } /// Extension trait provides an easy API to fetch various properties of @@ -444,6 +453,9 @@ pub trait ExecutionPlanProperties { /// See also [`ExecutionPlan::maintains_input_order`] and [`Self::output_ordering`] /// for related concepts. fn equivalence_properties(&self) -> &EquivalenceProperties; + + // Node Id of this ExecutionPlan node. See also [`ExecutionPlan::with_node_id`] + fn node_id(&self) -> Option; } impl ExecutionPlanProperties for Arc { @@ -462,6 +474,10 @@ impl ExecutionPlanProperties for Arc { fn equivalence_properties(&self) -> &EquivalenceProperties { self.properties().equivalence_properties() } + + fn node_id(&self) -> Option { + self.properties().node_id() + } } impl ExecutionPlanProperties for &dyn ExecutionPlan { @@ -480,6 +496,10 @@ impl ExecutionPlanProperties for &dyn ExecutionPlan { fn equivalence_properties(&self) -> &EquivalenceProperties { self.properties().equivalence_properties() } + + fn node_id(&self) -> Option { + self.properties().node_id() + } } /// Describes the execution mode of an operator's resulting stream with respect @@ -557,6 +577,8 @@ pub struct PlanProperties { pub execution_mode: ExecutionMode, /// See [ExecutionPlanProperties::output_ordering] output_ordering: Option, + /// See [ExecutionPlanProperties::node_id] + node_id: Option, } impl PlanProperties { @@ -573,6 +595,7 @@ impl PlanProperties { partitioning, execution_mode, output_ordering, + node_id: None, } } @@ -597,6 +620,12 @@ impl PlanProperties { self } + /// Overwrite node id with its new value. + pub fn with_node_id(mut self, node_id: usize) -> Self { + self.node_id = Some(node_id); + self + } + pub fn equivalence_properties(&self) -> &EquivalenceProperties { &self.eq_properties } @@ -613,6 +642,10 @@ impl PlanProperties { self.execution_mode } + pub fn node_id(&self) -> Option { + self.node_id + } + /// Get schema of the node. fn schema(&self) -> &SchemaRef { self.eq_properties.schema() diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 6aba3d817710..b8947121397c 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -295,6 +295,17 @@ impl ExecutionPlan for FilterExec { fn statistics(&self) -> Result { Self::statistics_helper(&self.input, self.predicate(), self.default_selectivity) } + + fn with_node_id( + self: Arc, + _node_id: usize, + ) -> Result>> { + let mut new_plan = + FilterExec::try_new(self.predicate.clone(), self.input.clone())?; + let new_props = new_plan.cache.clone().with_node_id(_node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } /// This function ensures that all bounds in the `ExprBoundaries` vector are diff --git a/datafusion/physical-plan/src/insert.rs b/datafusion/physical-plan/src/insert.rs index 5dc27bc239d2..bdf6a3723225 100644 --- a/datafusion/physical-plan/src/insert.rs +++ b/datafusion/physical-plan/src/insert.rs @@ -253,6 +253,21 @@ impl ExecutionPlan for DataSinkExec { fn metrics(&self) -> Option { self.sink.metrics() } + + fn with_node_id( + self: Arc, + _node_id: usize, + ) -> Result>> { + let mut new_plan = DataSinkExec::new( + self.input.clone(), + self.sink.clone(), + self.sink_schema.clone(), + self.sort_order.clone(), + ); + let new_props = new_plan.cache.clone().with_node_id(_node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } /// Create a output record batch with a count diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 0868ee721665..30c0950e9d24 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -265,6 +265,16 @@ impl ExecutionPlan for CrossJoinExec { self.right.statistics()?, )) } + + fn with_node_id( + self: Arc, + _node_id: usize, + ) -> Result>> { + let mut new_plan = CrossJoinExec::new(self.left.clone(), self.right.clone()); + let new_props = new_plan.cache.clone().with_node_id(_node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } /// [left/right]_col_count are required in case the column statistics are None diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 7fac23ad5557..b7e855ebb591 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -823,6 +823,25 @@ impl ExecutionPlan for HashJoinExec { } Ok(stats) } + + fn with_node_id( + self: Arc, + _node_id: usize, + ) -> Result>> { + let mut new_plan = HashJoinExec::try_new( + self.left.clone(), + self.right.clone(), + self.on.clone(), + self.filter.clone(), + self.join_type(), + self.projection.clone(), + self.partition_mode().clone(), + self.null_equals_null, + )?; + let new_props = new_plan.cache.clone().with_node_id(_node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } /// Reads the left (build) side of the input, buffering it in memory, to build a diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index 511cb4c55fcd..8162bdeaabef 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -402,6 +402,24 @@ impl ExecutionPlan for SortMergeJoinExec { &self.schema, ) } + + fn with_node_id( + self: Arc, + _node_id: usize, + ) -> Result>> { + let mut new_plan = SortMergeJoinExec::try_new( + self.left.clone(), + self.right.clone(), + self.on.clone(), + self.filter.clone(), + self.join_type(), + self.sort_options.clone(), + self.null_equals_null, + )?; + let new_props = new_plan.cache.clone().with_node_id(_node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } /// Metrics for SortMergeJoinExec diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index ac718a95e9f4..c0b727edcb9a 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -452,6 +452,26 @@ impl ExecutionPlan for SymmetricHashJoinExec { Ok(Statistics::new_unknown(&self.schema())) } + fn with_node_id( + self: Arc, + _node_id: usize, + ) -> Result>> { + let mut new_plan = SymmetricHashJoinExec::try_new( + self.left.clone(), + self.right.clone(), + self.on.clone(), + self.filter.clone(), + self.join_type(), + self.null_equals_null, + self.left_sort_exprs.clone(), + self.right_sort_exprs.clone(), + self.mode, + )?; + let new_props = new_plan.cache.clone().with_node_id(_node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } + fn execute( &self, partition: usize, diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index fb86a008e2cd..b1e73b147c6c 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -66,6 +66,7 @@ pub mod joins; pub mod limit; pub mod memory; pub mod metrics; +pub mod node_id; pub mod placeholder_row; pub mod projection; pub mod recursive_query; @@ -80,7 +81,6 @@ pub mod unnest; pub mod values; pub mod windows; pub mod work_table; - pub mod udaf { pub use datafusion_physical_expr_functions_aggregate::aggregate::AggregateFunctionExpr; } diff --git a/datafusion/physical-plan/src/limit.rs b/datafusion/physical-plan/src/limit.rs index 360e942226d2..af8bc1e019b3 100644 --- a/datafusion/physical-plan/src/limit.rs +++ b/datafusion/physical-plan/src/limit.rs @@ -200,6 +200,17 @@ impl ExecutionPlan for GlobalLimitExec { fn supports_limit_pushdown(&self) -> bool { true } + + fn with_node_id( + self: Arc, + _node_id: usize, + ) -> Result>> { + let mut new_plan = + GlobalLimitExec::new(self.input.clone(), self.skip, self.fetch); + let new_props = new_plan.cache.clone().with_node_id(_node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } /// LocalLimitExec applies a limit to a single partition diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index 6b2c78902eae..834a9ac17dc1 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -153,6 +153,20 @@ impl ExecutionPlan for MemoryExec { self.projection.clone(), )) } + + fn with_node_id( + self: Arc, + _node_id: usize, + ) -> Result>> { + let mut new_plan = MemoryExec::try_new( + &self.partitions.clone(), + self.schema.clone(), + self.projection.clone(), + )?; + let new_props = new_plan.cache.clone().with_node_id(_node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } impl MemoryExec { diff --git a/datafusion/physical-plan/src/node_id.rs b/datafusion/physical-plan/src/node_id.rs new file mode 100644 index 000000000000..a03c747f7c15 --- /dev/null +++ b/datafusion/physical-plan/src/node_id.rs @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +use std::sync::Arc; + +use crate::ExecutionPlan; + +use datafusion_common::DataFusionError; + +// Util for traversing ExecutionPlan tree and annotating node_id +pub struct NodeIdAnnotator { + next_id: usize, +} + +impl NodeIdAnnotator { + pub fn new() -> Self { + NodeIdAnnotator { next_id: 0 } + } + + fn annotate_execution_plan_with_node_id( + &mut self, + plan: Arc, + ) -> Result, DataFusionError> { + let plan_with_id = plan.clone().with_node_id(self.next_id)?.unwrap_or(plan); + self.next_id += 1; + Ok(plan_with_id) + } +} + +pub fn annotate_node_id_for_execution_plan( + plan: &Arc, + annotator: &mut NodeIdAnnotator, +) -> Result, DataFusionError> { + let mut new_children: Vec> = vec![]; + for child in plan.children() { + let new_child: Arc = + annotate_node_id_for_execution_plan(child, annotator)?; + new_children.push(new_child); + } + let new_plan = plan.clone().with_new_children(new_children)?; + let new_plan_with_id = annotator.annotate_execution_plan_with_node_id(new_plan)?; + Ok(new_plan_with_id) +} diff --git a/datafusion/physical-plan/src/placeholder_row.rs b/datafusion/physical-plan/src/placeholder_row.rs index 272211d5056e..73ba6c0785c1 100644 --- a/datafusion/physical-plan/src/placeholder_row.rs +++ b/datafusion/physical-plan/src/placeholder_row.rs @@ -175,6 +175,16 @@ impl ExecutionPlan for PlaceholderRowExec { None, )) } + + fn with_node_id( + self: Arc, + _node_id: usize, + ) -> Result>> { + let mut new_plan = PlaceholderRowExec::new(self.schema.clone()); + let new_props = new_plan.cache.clone().with_node_id(_node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } #[cfg(test)] diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index d2bb8f2b0ead..3e89d7a66697 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -249,6 +249,17 @@ impl ExecutionPlan for ProjectionExec { fn supports_limit_pushdown(&self) -> bool { true } + + fn with_node_id( + self: Arc, + _node_id: usize, + ) -> Result>> { + let mut new_plan = + ProjectionExec::try_new(self.expr.clone(), self.input.clone())?; + let new_props = new_plan.cache.clone().with_node_id(_node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } /// If e is a direct column reference, returns the field level diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index bd9303f97db0..28d06f28dadb 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -185,6 +185,21 @@ impl ExecutionPlan for RecursiveQueryExec { fn statistics(&self) -> Result { Ok(Statistics::new_unknown(&self.schema())) } + + fn with_node_id( + self: Arc, + _node_id: usize, + ) -> Result>> { + let mut new_plan = RecursiveQueryExec::try_new( + self.name.clone(), + self.static_term.clone(), + self.recursive_term.clone(), + self.is_distinct, + )?; + let new_props = new_plan.cache.clone().with_node_id(_node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } impl DisplayAs for RecursiveQueryExec { diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 5a3fcb5029e1..8d33475635ac 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -666,6 +666,17 @@ impl ExecutionPlan for RepartitionExec { fn statistics(&self) -> Result { self.input.statistics() } + + fn with_node_id( + self: Arc, + _node_id: usize, + ) -> Result>> { + let mut new_plan = + RepartitionExec::try_new(self.input.clone(), self.partitioning.clone())?; + let new_props = new_plan.cache.clone().with_node_id(_node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } impl RepartitionExec { diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs b/datafusion/physical-plan/src/sorts/partial_sort.rs index 70a63e71ad2f..b20fca839d81 100644 --- a/datafusion/physical-plan/src/sorts/partial_sort.rs +++ b/datafusion/physical-plan/src/sorts/partial_sort.rs @@ -309,6 +309,20 @@ impl ExecutionPlan for PartialSortExec { fn statistics(&self) -> Result { self.input.statistics() } + + fn with_node_id( + self: Arc, + _node_id: usize, + ) -> Result>> { + let mut new_plan = PartialSortExec::new( + self.expr.clone(), + self.input.clone(), + self.common_prefix_length, + ); + let new_props = new_plan.cache.clone().with_node_id(_node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } struct PartialSortStream { diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index a81b09948cca..328eb5673e72 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -946,6 +946,16 @@ impl ExecutionPlan for SortExec { fn fetch(&self) -> Option { self.fetch } + + fn with_node_id( + self: Arc, + _node_id: usize, + ) -> Result>> { + let mut new_plan = SortExec::new(self.expr.clone(), self.input.clone()); + let new_props = new_plan.cache.clone().with_node_id(_node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } #[cfg(test)] diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 131fa71217cc..f1a6166267c6 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -296,6 +296,17 @@ impl ExecutionPlan for SortPreservingMergeExec { fn supports_limit_pushdown(&self) -> bool { true } + + fn with_node_id( + self: Arc, + _node_id: usize, + ) -> Result>> { + let mut new_plan = + SortPreservingMergeExec::new(self.expr.clone(), self.input.clone()); + let new_props = new_plan.cache.clone().with_node_id(_node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } #[cfg(test)] diff --git a/datafusion/physical-plan/src/streaming.rs b/datafusion/physical-plan/src/streaming.rs index 9dc8b214420b..d8b6d7750521 100644 --- a/datafusion/physical-plan/src/streaming.rs +++ b/datafusion/physical-plan/src/streaming.rs @@ -281,6 +281,25 @@ impl ExecutionPlan for StreamingTableExec { metrics: self.metrics.clone(), })) } + + fn with_node_id( + self: Arc, + _node_id: usize, + ) -> Result>> { + let mut new_plan = StreamingTableExec { + partitions: self.partitions.clone(), + projection: self.projection.clone(), + projected_schema: Arc::clone(&self.projected_schema), + projected_output_ordering: self.projected_output_ordering.clone(), + infinite: self.infinite, + limit: self.limit, + cache: self.cache.clone(), + metrics: self.metrics.clone(), + }; + let new_props = new_plan.cache.clone().with_node_id(_node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } #[cfg(test)] diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index 78b25686054d..0d054663803c 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -264,6 +264,16 @@ impl ExecutionPlan for UnionExec { fn supports_limit_pushdown(&self) -> bool { true } + + fn with_node_id( + self: Arc, + _node_id: usize, + ) -> Result>> { + let mut new_plan = UnionExec::new(self.inputs.clone()); + let new_props = new_plan.cache.clone().with_node_id(_node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } /// Combines multiple input streams by interleaving them. diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index 67c2aaedbebf..3cc0637c17a5 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -180,6 +180,22 @@ impl ExecutionPlan for UnnestExec { fn metrics(&self) -> Option { Some(self.metrics.clone_inner()) } + + fn with_node_id( + self: Arc, + _node_id: usize, + ) -> Result>> { + let mut new_plan = UnnestExec::new( + self.input.clone(), + self.list_column_indices.clone(), + self.struct_column_indices.clone(), + self.schema.clone(), + self.options.clone(), + ); + let new_props = new_plan.cache.clone().with_node_id(_node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } #[derive(Clone, Debug)] diff --git a/datafusion/physical-plan/src/values.rs b/datafusion/physical-plan/src/values.rs index 3ea27d62d80b..8ace3055270d 100644 --- a/datafusion/physical-plan/src/values.rs +++ b/datafusion/physical-plan/src/values.rs @@ -206,6 +206,19 @@ impl ExecutionPlan for ValuesExec { None, )) } + + fn with_node_id( + self: Arc, + _node_id: usize, + ) -> Result>> { + let mut new_plan = ValuesExec::try_new_from_batches( + Arc::clone(&self.schema), + self.data.clone(), + )?; + let new_props = new_plan.cache.clone().with_node_id(_node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } #[cfg(test)] diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs b/datafusion/physical-plan/src/windows/window_agg_exec.rs index afe9700ed08c..024552313d2d 100644 --- a/datafusion/physical-plan/src/windows/window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs @@ -262,6 +262,20 @@ impl ExecutionPlan for WindowAggExec { total_byte_size: Precision::Absent, }) } + + fn with_node_id( + self: Arc, + _node_id: usize, + ) -> Result>> { + let mut new_plan = WindowAggExec::try_new( + self.window_expr.clone(), + self.input.clone(), + self.partition_keys.clone(), + )?; + let new_props = new_plan.cache.clone().with_node_id(_node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } /// Compute the window aggregate columns diff --git a/datafusion/physical-plan/src/work_table.rs b/datafusion/physical-plan/src/work_table.rs index ba95640a87c7..6bad724fa53b 100644 --- a/datafusion/physical-plan/src/work_table.rs +++ b/datafusion/physical-plan/src/work_table.rs @@ -214,6 +214,16 @@ impl ExecutionPlan for WorkTableExec { fn statistics(&self) -> Result { Ok(Statistics::new_unknown(&self.schema())) } + + fn with_node_id( + self: Arc, + _node_id: usize, + ) -> Result>> { + let mut new_plan = WorkTableExec::new(self.name.clone(), self.schema.clone()); + let new_props = new_plan.cache.clone().with_node_id(_node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } #[cfg(test)]