Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
Lordworms committed Mar 27, 2024
1 parent 4c682db commit 7246f35
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 8 deletions.
15 changes: 7 additions & 8 deletions datafusion/core/src/physical_optimizer/limit_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,28 +70,27 @@ impl PhysicalOptimizerRule for LimitPushdown {
}
}
impl LimitPushdown {}
fn new_global_limit_with_input() {
GlobalLimitExec::with_new_children(self: Arc<Self>, children)
}
fn new_global_limit_with_input() {}
// try to push down current limit, based on the son
fn push_down_limit(
plan: Arc<dyn ExecutionPlan>,
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
let modified =
if let Some(global_limit) = plan.as_any().downcast_ref::<GlobalLimitExec>() {
let input = global_limit.input().as_any();
if let Some(global_limit) = input.downcast_ref::<GlobalLimitExec>() {
} else if let Some(coalesce_partition_exec) =
if let Some(coalesce_partition_exec) =
input.downcast_ref::<CoalescePartitionsExec>()
{
general_swap(plan)
} else if let Some(coalesce_batch_exec) =
input.downcast_ref::<CoalesceBatchesExec>()
{
} else if let Some() = input.downcast_ref() {
general_swap(plan)
} else {
None
}
} else {
Ok(Transformed::no(plan))
};
Ok(modified.map_or(Transformed::no(plan), Transformed::yes))
}
fn swapping_coalesce_partition() {}
fn general_swap(plan: &GlobalLimitExec) -> Result<Option<Arc<dyn ExecutionPlan>>> {}
24 changes: 24 additions & 0 deletions datafusion/core/src/physical_optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use std::sync::Arc;

use crate::datasource::physical_plan::ArrowExec;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sorts::sort::SortExec;
Expand All @@ -28,7 +29,9 @@ use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
use crate::physical_plan::{ExecutionPlan, ExecutionPlanProperties};

use datafusion_physical_expr::{LexRequirement, PhysicalSortRequirement};
use datafusion_physical_plan::joins::{CrossJoinExec, HashJoinExec};
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use datafusion_physical_plan::test::exec::StatisticsExec;
use datafusion_physical_plan::tree_node::PlanContext;

/// This utility function adds a `SortExec` above an operator according to the
Expand Down Expand Up @@ -112,3 +115,24 @@ pub fn is_repartition(plan: &Arc<dyn ExecutionPlan>) -> bool {
pub fn is_global_limit(plan: &Arc<dyn ExecutionPlan>) -> bool {
plan.as_any().is::<GlobalLimitExec>()
}
/// Check whether the given plan is a terminator of [`GlobalLimitExec`].
pub fn is_limit_terminator(plan: &Arc<dyn ExecutionPlan>) -> bool {
plan.as_any().is::<HashJoinExec>()
|| plan.as_any().is::<CrossJoinExec>()
|| plan.as_any().is::<MemoryExec>()
|| plan.as_any().is::<NestedLoopJoinExec>()
|| plan.as_any().is::<UnionExec>()
|| plan.as_any().is::<MemoryExec>()
|| plan.as_any().is::<PartialSortExec>()
|| plan.as_any().is::<ArrowExec>()
|| plan.as_any().is::<AvroExec>()
|| plan.as_any().is::<CsvExecExec>()
|| plan.as_any().is::<NdJsonExec>()
|| plan.as_any().is::<StatisticsExec>()
|| plan.as_any().is::<PlaceholderRowExec>()
|| plan.as_any().is::<RecursiveQueryExec>()
|| plan.as_any().is::<StreamingTableExec>()
|| plan.as_any().is::<InterleaveExec>()
|| plan.as_any().is::<UnnestExec>()
|| plan.as_any().is::<WindowAggExec>()
}

0 comments on commit 7246f35

Please sign in to comment.