From 7c075455aa44e95873aa7033707eecd4d356fc6b Mon Sep 17 00:00:00 2001 From: Yanxin Xiang Date: Sat, 23 Mar 2024 09:01:29 -0500 Subject: [PATCH] for debug iniyt init remove .gz --- .../src/physical_optimizer/limit_pushdown.rs | 96 +++++++++++++++++++ datafusion/core/src/physical_optimizer/mod.rs | 1 + .../core/src/physical_optimizer/utils.rs | 29 ++++++ 3 files changed, 126 insertions(+) create mode 100644 datafusion/core/src/physical_optimizer/limit_pushdown.rs diff --git a/datafusion/core/src/physical_optimizer/limit_pushdown.rs b/datafusion/core/src/physical_optimizer/limit_pushdown.rs new file mode 100644 index 0000000000000..53c9444e4d5d8 --- /dev/null +++ b/datafusion/core/src/physical_optimizer/limit_pushdown.rs @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! The [`Limit`] rule tries to modify a given plan so that it can +//! accommodate infinite sources and utilize statistical information (if there +//! is any) to obtain more performant plans. To achieve the first goal, it +//! tries to transform a non-runnable query (with the given infinite sources) +//! into a runnable query by replacing pipeline-breaking join operations with +//! pipeline-friendly ones. To achieve the second goal, it selects the proper +//! `PartitionMode` and the build side using the available statistics for hash joins. + +use std::sync::Arc; + +use crate::datasource::physical_plan::CsvExec; +use crate::physical_optimizer::utils::is_global_limit; +use crate::physical_optimizer::PhysicalOptimizerRule; +use crate::physical_plan::aggregates::AggregateExec; +use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; +use crate::physical_plan::{ExecutionPlan, ExecutionPlanProperties}; +use datafusion_common::config::ConfigOptions; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; +use datafusion_common::Result; +use datafusion_optimizer::push_down_limit; +use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec; +use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; +use hashbrown::raw::Global; +use itertools::Itertools; +pub struct LimitPushdown {} + +impl LimitPushdown { + fn new() -> Self { + Self {} + } +} + +impl PhysicalOptimizerRule for LimitPushdown { + fn optimize( + &self, + plan: Arc, + _config: &ConfigOptions, + ) -> Result> { + // if this node is not a global limit, then directly return + if !is_global_limit(&plan) { + return Ok(plan); + } + // we traverse the treenode to try to push down the limit same logic as project push down + plan.transform_down(&push_down_limit).data() + } + + fn name() -> &str { + "LimitPushdown" + } + + fn schema_check(&self) -> bool { + true + } +} +impl LimitPushdown {} +fn new_global_limit_with_input() {} +// try to push down current limit, based on the son +fn push_down_limit( + plan: Arc, +) -> Result>> { + let modified = + if let Some(global_limit) = plan.as_any().downcast_ref::() { + let input = global_limit.input().as_any(); + if let Some(coalesce_partition_exec) = + input.downcast_ref::() + { + general_swap(plan) + } else if let Some(coalesce_batch_exec) = + input.downcast_ref::() + { + general_swap(plan) + } else { + None + } + } else { + Ok(Transformed::no(plan)) + }; +} +fn general_swap(plan: &GlobalLimitExec) -> Result>> {} diff --git a/datafusion/core/src/physical_optimizer/mod.rs b/datafusion/core/src/physical_optimizer/mod.rs index e990fead610d1..541fd51db780b 100644 --- a/datafusion/core/src/physical_optimizer/mod.rs +++ b/datafusion/core/src/physical_optimizer/mod.rs @@ -27,6 +27,7 @@ pub mod combine_partial_final_agg; pub mod enforce_distribution; pub mod enforce_sorting; pub mod join_selection; +pub mod limit_pushdown; pub mod limited_distinct_aggregation; pub mod optimizer; pub mod output_requirements; diff --git a/datafusion/core/src/physical_optimizer/utils.rs b/datafusion/core/src/physical_optimizer/utils.rs index 2c0d042281e6f..1a375518d66a8 100644 --- a/datafusion/core/src/physical_optimizer/utils.rs +++ b/datafusion/core/src/physical_optimizer/utils.rs @@ -19,6 +19,7 @@ use std::sync::Arc; +use crate::datasource::physical_plan::ArrowExec; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; use crate::physical_plan::repartition::RepartitionExec; use crate::physical_plan::sorts::sort::SortExec; @@ -28,7 +29,9 @@ use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec}; use crate::physical_plan::{ExecutionPlan, ExecutionPlanProperties}; use datafusion_physical_expr::{LexRequirement, PhysicalSortRequirement}; +use datafusion_physical_plan::joins::{CrossJoinExec, HashJoinExec}; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; +use datafusion_physical_plan::test::exec::StatisticsExec; use datafusion_physical_plan::tree_node::PlanContext; /// This utility function adds a `SortExec` above an operator according to the @@ -107,3 +110,29 @@ pub fn is_union(plan: &Arc) -> bool { pub fn is_repartition(plan: &Arc) -> bool { plan.as_any().is::() } + +/// Check whether the given operator is a [`GlobalLimitExec`]. +pub fn is_global_limit(plan: &Arc) -> bool { + plan.as_any().is::() +} +/// Check whether the given plan is a terminator of [`GlobalLimitExec`]. +pub fn is_limit_terminator(plan: &Arc) -> bool { + plan.as_any().is::() + || plan.as_any().is::() + || plan.as_any().is::() + || plan.as_any().is::() + || plan.as_any().is::() + || plan.as_any().is::() + || plan.as_any().is::() + || plan.as_any().is::() + || plan.as_any().is::() + || plan.as_any().is::() + || plan.as_any().is::() + || plan.as_any().is::() + || plan.as_any().is::() + || plan.as_any().is::() + || plan.as_any().is::() + || plan.as_any().is::() + || plan.as_any().is::() + || plan.as_any().is::() +}