diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index d2c2bd7a1281..2ffa10a27788 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -1238,20 +1238,13 @@ mod tests { col("int_col").sort(false, true), ]], Ok(vec![vec![ - PhysicalSortExpr { - expr: physical_col("string_col", &schema).unwrap(), - options: SortOptions { - descending: false, - nulls_first: false, - }, - }, - PhysicalSortExpr { - expr: physical_col("int_col", &schema).unwrap(), - options: SortOptions { - descending: true, - nulls_first: true, - }, - }, + PhysicalSortExpr::new_default(physical_col("string_col", &schema).unwrap()) + .asc() + .nulls_last(), + + PhysicalSortExpr::new_default(physical_col("int_col", &schema).unwrap()) + .desc() + .nulls_first() ]]) ), ]; diff --git a/datafusion/physical-expr-common/src/sort_expr.rs b/datafusion/physical-expr-common/src/sort_expr.rs index 745ec543c31a..e35aec26bb23 100644 --- a/datafusion/physical-expr-common/src/sort_expr.rs +++ b/datafusion/physical-expr-common/src/sort_expr.rs @@ -30,6 +30,51 @@ use datafusion_common::Result; use datafusion_expr_common::columnar_value::ColumnarValue; /// Represents Sort operation for a column in a RecordBatch +/// +/// Example: +/// ``` +/// # use std::any::Any; +/// # use std::fmt::Display; +/// # use std::hash::Hasher; +/// # use std::sync::Arc; +/// # use arrow::array::RecordBatch; +/// # use datafusion_common::Result; +/// # use arrow::compute::SortOptions; +/// # use arrow::datatypes::{DataType, Schema}; +/// # use datafusion_expr_common::columnar_value::ColumnarValue; +/// # use datafusion_physical_expr_common::physical_expr::PhysicalExpr; +/// # use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; +/// # // this crate doesn't have a physical expression implementation +/// # // so make a really simple one +/// # #[derive(Clone, Debug, PartialEq, Eq, Hash)] +/// # struct MyPhysicalExpr; +/// # impl PhysicalExpr for MyPhysicalExpr { +/// # fn as_any(&self) -> &dyn Any {todo!() } +/// # fn data_type(&self, input_schema: &Schema) -> Result {todo!()} +/// # fn nullable(&self, input_schema: &Schema) -> Result {todo!() } +/// # fn evaluate(&self, batch: &RecordBatch) -> Result {todo!() } +/// # fn children(&self) -> Vec<&Arc> {todo!()} +/// # fn with_new_children(self: Arc, children: Vec>) -> Result> {todo!()} +/// # fn dyn_hash(&self, _state: &mut dyn Hasher) {todo!()} +/// # } +/// # impl Display for MyPhysicalExpr { +/// # fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!(f, "a") } +/// # } +/// # impl PartialEq for MyPhysicalExpr { +/// # fn eq(&self, _other: &dyn Any) -> bool { true } +/// # } +/// # fn col(name: &str) -> Arc { Arc::new(MyPhysicalExpr) } +/// // Sort by a ASC +/// let options = SortOptions::default(); +/// let sort_expr = PhysicalSortExpr::new(col("a"), options); +/// assert_eq!(sort_expr.to_string(), "a ASC"); +/// +/// // Sort by a DESC NULLS LAST +/// let sort_expr = PhysicalSortExpr::new_default(col("a")) +/// .desc() +/// .nulls_last(); +/// assert_eq!(sort_expr.to_string(), "a DESC NULLS LAST"); +/// ``` #[derive(Clone, Debug)] pub struct PhysicalSortExpr { /// Physical expression representing the column to sort @@ -43,6 +88,35 @@ impl PhysicalSortExpr { pub fn new(expr: Arc, options: SortOptions) -> Self { Self { expr, options } } + + /// Create a new PhysicalSortExpr with default [`SortOptions`] + pub fn new_default(expr: Arc) -> Self { + Self::new(expr, SortOptions::default()) + } + + /// Set the sort sort options to ASC + pub fn asc(mut self) -> Self { + self.options.descending = false; + self + } + + /// Set the sort sort options to DESC + pub fn desc(mut self) -> Self { + self.options.descending = true; + self + } + + /// Set the sort sort options to NULLS FIRST + pub fn nulls_first(mut self) -> Self { + self.options.nulls_first = true; + self + } + + /// Set the sort sort options to NULLS LAST + pub fn nulls_last(mut self) -> Self { + self.options.nulls_first = false; + self + } } impl PartialEq for PhysicalSortExpr { @@ -60,7 +134,7 @@ impl Hash for PhysicalSortExpr { } } -impl std::fmt::Display for PhysicalSortExpr { +impl Display for PhysicalSortExpr { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!(f, "{} {}", self.expr, to_str(&self.options)) } diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index fa9628abdfbb..18bdedae119e 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -1022,9 +1022,8 @@ mod tests { impl SortedUnboundedExec { fn compute_properties(schema: SchemaRef) -> PlanProperties { let mut eq_properties = EquivalenceProperties::new(schema); - eq_properties.add_new_orderings(vec![vec![PhysicalSortExpr::new( + eq_properties.add_new_orderings(vec![vec![PhysicalSortExpr::new_default( Arc::new(Column::new("c1", 0)), - SortOptions::default(), )]]); let mode = ExecutionMode::Unbounded; PlanProperties::new(eq_properties, Partitioning::UnknownPartitioning(1), mode) @@ -1560,10 +1559,9 @@ mod tests { cache: SortedUnboundedExec::compute_properties(Arc::new(schema.clone())), }; let mut plan = SortExec::new( - vec![PhysicalSortExpr::new( - Arc::new(Column::new("c1", 0)), - SortOptions::default(), - )], + vec![PhysicalSortExpr::new_default(Arc::new(Column::new( + "c1", 0, + )))], Arc::new(source), ); plan = plan.with_fetch(Some(9)); diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 4d333175bf75..f83bb58d08dd 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -1174,9 +1174,7 @@ mod tests { let mut eq_properties = EquivalenceProperties::new(schema); eq_properties.add_new_orderings(vec![columns .iter() - .map(|expr| { - PhysicalSortExpr::new(Arc::clone(expr), SortOptions::default()) - }) + .map(|expr| PhysicalSortExpr::new_default(Arc::clone(expr))) .collect::>()]); let mode = ExecutionMode::Unbounded; PlanProperties::new(eq_properties, Partitioning::Hash(columns, 3), mode) @@ -1286,10 +1284,9 @@ mod tests { congestion_cleared: Arc::new(Mutex::new(false)), }; let spm = SortPreservingMergeExec::new( - vec![PhysicalSortExpr::new( - Arc::new(Column::new("c1", 0)), - SortOptions::default(), - )], + vec![PhysicalSortExpr::new_default(Arc::new(Column::new( + "c1", 0, + )))], Arc::new(source), ); let spm_task = SpawnedTask::spawn(collect(Arc::new(spm), task_ctx));