Skip to content

Commit

Permalink
Implement DISTINCT ON from Postgres (#7981)
Browse files Browse the repository at this point in the history
* Initial DISTINT ON implementation

* Add a couple more tests

* Add comments in the replace_distinct_aggregate optimizer

* Run cargo fmt to fix CI

* Make DISTINCT ON planning more robust to support arbitrary selection expressions

* Add DISTINCT ON + join SLT

* Handle no DISTINCT ON expressions and extend the docs for the replace_distinct_aggregate optimizer

* Remove misleading DISTINCT ON SLT comment

* Add an EXPLAIN SLT for a basic DISTINCT ON query

* Revise comment in CommonSubexprEliminate::try_optimize_aggregate

* Implement qualified expression alias and extend test coverage

* Update datafusion/proto/proto/datafusion.proto

Co-authored-by: Jonah Gao <[email protected]>

* Accompanying generated changes to alias proto tag revision

* Remove obsolete comment

---------

Co-authored-by: Jonah Gao <[email protected]>
  • Loading branch information
gruuya and jonahgao authored Nov 13, 2023
1 parent 7889bf9 commit 2185842
Show file tree
Hide file tree
Showing 25 changed files with 879 additions and 99 deletions.
33 changes: 30 additions & 3 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::Operator;
use crate::{aggregate_function, ExprSchemable};
use arrow::datatypes::DataType;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{internal_err, DFSchema};
use datafusion_common::{internal_err, DFSchema, OwnedTableReference};
use datafusion_common::{plan_err, Column, DataFusionError, Result, ScalarValue};
use std::collections::HashSet;
use std::fmt;
Expand Down Expand Up @@ -187,13 +187,20 @@ pub enum Expr {
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct Alias {
pub expr: Box<Expr>,
pub relation: Option<OwnedTableReference>,
pub name: String,
}

impl Alias {
pub fn new(expr: Expr, name: impl Into<String>) -> Self {
/// Create an alias with an optional schema/field qualifier.
pub fn new(
expr: Expr,
relation: Option<impl Into<OwnedTableReference>>,
name: impl Into<String>,
) -> Self {
Self {
expr: Box::new(expr),
relation: relation.map(|r| r.into()),
name: name.into(),
}
}
Expand Down Expand Up @@ -844,7 +851,27 @@ impl Expr {
asc,
nulls_first,
}) => Expr::Sort(Sort::new(Box::new(expr.alias(name)), asc, nulls_first)),
_ => Expr::Alias(Alias::new(self, name.into())),
_ => Expr::Alias(Alias::new(self, None::<&str>, name.into())),
}
}

/// Return `self AS name` alias expression with a specific qualifier
pub fn alias_qualified(
self,
relation: Option<impl Into<OwnedTableReference>>,
name: impl Into<String>,
) -> Expr {
match self {
Expr::Sort(Sort {
expr,
asc,
nulls_first,
}) => Expr::Sort(Sort::new(
Box::new(expr.alias_qualified(relation, name)),
asc,
nulls_first,
)),
_ => Expr::Alias(Alias::new(self, relation, name.into())),
}
}

Expand Down
7 changes: 7 additions & 0 deletions datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,13 @@ impl ExprSchemable for Expr {
self.nullable(input_schema)?,
)
.with_metadata(self.metadata(input_schema)?)),
Expr::Alias(Alias { relation, name, .. }) => Ok(DFField::new(
relation.clone(),
name,
self.get_type(input_schema)?,
self.nullable(input_schema)?,
)
.with_metadata(self.metadata(input_schema)?)),
_ => Ok(DFField::new_unqualified(
&self.display_name()?,
self.get_type(input_schema)?,
Expand Down
29 changes: 21 additions & 8 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ use crate::expr_rewriter::{
rewrite_sort_cols_by_aggs,
};
use crate::logical_plan::{
Aggregate, Analyze, CrossJoin, Distinct, EmptyRelation, Explain, Filter, Join,
JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare,
Aggregate, Analyze, CrossJoin, Distinct, DistinctOn, EmptyRelation, Explain, Filter,
Join, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare,
Projection, Repartition, Sort, SubqueryAlias, TableScan, Union, Unnest, Values,
Window,
};
Expand Down Expand Up @@ -551,16 +551,29 @@ impl LogicalPlanBuilder {
let left_plan: LogicalPlan = self.plan;
let right_plan: LogicalPlan = plan;

Ok(Self::from(LogicalPlan::Distinct(Distinct {
input: Arc::new(union(left_plan, right_plan)?),
})))
Ok(Self::from(LogicalPlan::Distinct(Distinct::All(Arc::new(
union(left_plan, right_plan)?,
)))))
}

/// Apply deduplication: Only distinct (different) values are returned)
pub fn distinct(self) -> Result<Self> {
Ok(Self::from(LogicalPlan::Distinct(Distinct {
input: Arc::new(self.plan),
})))
Ok(Self::from(LogicalPlan::Distinct(Distinct::All(Arc::new(
self.plan,
)))))
}

/// Project first values of the specified expression list according to the provided
/// sorting expressions grouped by the `DISTINCT ON` clause expressions.
pub fn distinct_on(
self,
on_expr: Vec<Expr>,
select_expr: Vec<Expr>,
sort_expr: Option<Vec<Expr>>,
) -> Result<Self> {
Ok(Self::from(LogicalPlan::Distinct(Distinct::On(
DistinctOn::try_new(on_expr, select_expr, sort_expr, Arc::new(self.plan))?,
))))
}

/// Apply a join to `right` using explicitly specified columns and an
Expand Down
8 changes: 4 additions & 4 deletions datafusion/expr/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ pub use ddl::{
};
pub use dml::{DmlStatement, WriteOp};
pub use plan::{
Aggregate, Analyze, CrossJoin, DescribeTable, Distinct, EmptyRelation, Explain,
Extension, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning,
PlanType, Prepare, Projection, Repartition, Sort, StringifiedPlan, Subquery,
SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window,
Aggregate, Analyze, CrossJoin, DescribeTable, Distinct, DistinctOn, EmptyRelation,
Explain, Extension, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan,
Partitioning, PlanType, Prepare, Projection, Repartition, Sort, StringifiedPlan,
Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window,
};
pub use statement::{
SetVariable, Statement, TransactionAccessMode, TransactionConclusion, TransactionEnd,
Expand Down
163 changes: 148 additions & 15 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use std::sync::Arc;
use super::dml::CopyTo;
use super::DdlStatement;
use crate::dml::CopyOptions;
use crate::expr::{Alias, Exists, InSubquery, Placeholder};
use crate::expr_rewriter::create_col_from_scalar_expr;
use crate::expr::{Alias, Exists, InSubquery, Placeholder, Sort as SortExpr};
use crate::expr_rewriter::{create_col_from_scalar_expr, normalize_cols};
use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
use crate::logical_plan::extension::UserDefinedLogicalNode;
use crate::logical_plan::{DmlStatement, Statement};
Expand Down Expand Up @@ -163,7 +163,8 @@ impl LogicalPlan {
}) => projected_schema,
LogicalPlan::Projection(Projection { schema, .. }) => schema,
LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
LogicalPlan::Distinct(Distinct { input }) => input.schema(),
LogicalPlan::Distinct(Distinct::All(input)) => input.schema(),
LogicalPlan::Distinct(Distinct::On(DistinctOn { schema, .. })) => schema,
LogicalPlan::Window(Window { schema, .. }) => schema,
LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
LogicalPlan::Sort(Sort { input, .. }) => input.schema(),
Expand Down Expand Up @@ -367,6 +368,16 @@ impl LogicalPlan {
LogicalPlan::Unnest(Unnest { column, .. }) => {
f(&Expr::Column(column.clone()))
}
LogicalPlan::Distinct(Distinct::On(DistinctOn {
on_expr,
select_expr,
sort_expr,
..
})) => on_expr
.iter()
.chain(select_expr.iter())
.chain(sort_expr.clone().unwrap_or(vec![]).iter())
.try_for_each(f),
// plans without expressions
LogicalPlan::EmptyRelation(_)
| LogicalPlan::Subquery(_)
Expand All @@ -377,7 +388,7 @@ impl LogicalPlan {
| LogicalPlan::Analyze(_)
| LogicalPlan::Explain(_)
| LogicalPlan::Union(_)
| LogicalPlan::Distinct(_)
| LogicalPlan::Distinct(Distinct::All(_))
| LogicalPlan::Dml(_)
| LogicalPlan::Ddl(_)
| LogicalPlan::Copy(_)
Expand Down Expand Up @@ -405,7 +416,9 @@ impl LogicalPlan {
LogicalPlan::Union(Union { inputs, .. }) => {
inputs.iter().map(|arc| arc.as_ref()).collect()
}
LogicalPlan::Distinct(Distinct { input }) => vec![input],
LogicalPlan::Distinct(
Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
) => vec![input],
LogicalPlan::Explain(explain) => vec![&explain.plan],
LogicalPlan::Analyze(analyze) => vec![&analyze.input],
LogicalPlan::Dml(write) => vec![&write.input],
Expand Down Expand Up @@ -461,8 +474,11 @@ impl LogicalPlan {
Ok(Some(agg.group_expr.as_slice()[0].clone()))
}
}
LogicalPlan::Distinct(Distinct::On(DistinctOn { select_expr, .. })) => {
Ok(Some(select_expr[0].clone()))
}
LogicalPlan::Filter(Filter { input, .. })
| LogicalPlan::Distinct(Distinct { input, .. })
| LogicalPlan::Distinct(Distinct::All(input))
| LogicalPlan::Sort(Sort { input, .. })
| LogicalPlan::Limit(Limit { input, .. })
| LogicalPlan::Repartition(Repartition { input, .. })
Expand Down Expand Up @@ -823,10 +839,29 @@ impl LogicalPlan {
inputs: inputs.iter().cloned().map(Arc::new).collect(),
schema: schema.clone(),
})),
LogicalPlan::Distinct(Distinct { .. }) => {
Ok(LogicalPlan::Distinct(Distinct {
input: Arc::new(inputs[0].clone()),
}))
LogicalPlan::Distinct(distinct) => {
let distinct = match distinct {
Distinct::All(_) => Distinct::All(Arc::new(inputs[0].clone())),
Distinct::On(DistinctOn {
on_expr,
select_expr,
..
}) => {
let sort_expr = expr.split_off(on_expr.len() + select_expr.len());
let select_expr = expr.split_off(on_expr.len());
Distinct::On(DistinctOn::try_new(
expr,
select_expr,
if !sort_expr.is_empty() {
Some(sort_expr)
} else {
None
},
Arc::new(inputs[0].clone()),
)?)
}
};
Ok(LogicalPlan::Distinct(distinct))
}
LogicalPlan::Analyze(a) => {
assert!(expr.is_empty());
Expand Down Expand Up @@ -1064,7 +1099,9 @@ impl LogicalPlan {
LogicalPlan::Subquery(_) => None,
LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => input.max_rows(),
LogicalPlan::Limit(Limit { fetch, .. }) => *fetch,
LogicalPlan::Distinct(Distinct { input }) => input.max_rows(),
LogicalPlan::Distinct(
Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
) => input.max_rows(),
LogicalPlan::Values(v) => Some(v.values.len()),
LogicalPlan::Unnest(_) => None,
LogicalPlan::Ddl(_)
Expand Down Expand Up @@ -1667,9 +1704,21 @@ impl LogicalPlan {
LogicalPlan::Statement(statement) => {
write!(f, "{}", statement.display())
}
LogicalPlan::Distinct(Distinct { .. }) => {
write!(f, "Distinct:")
}
LogicalPlan::Distinct(distinct) => match distinct {
Distinct::All(_) => write!(f, "Distinct:"),
Distinct::On(DistinctOn {
on_expr,
select_expr,
sort_expr,
..
}) => write!(
f,
"DistinctOn: on_expr=[[{}]], select_expr=[[{}]], sort_expr=[[{}]]",
expr_vec_fmt!(on_expr),
expr_vec_fmt!(select_expr),
if let Some(sort_expr) = sort_expr { expr_vec_fmt!(sort_expr) } else { "".to_string() },
),
},
LogicalPlan::Explain { .. } => write!(f, "Explain"),
LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
LogicalPlan::Union(_) => write!(f, "Union"),
Expand Down Expand Up @@ -2132,9 +2181,93 @@ pub struct Limit {

/// Removes duplicate rows from the input
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct Distinct {
pub enum Distinct {
/// Plain `DISTINCT` referencing all selection expressions
All(Arc<LogicalPlan>),
/// The `Postgres` addition, allowing separate control over DISTINCT'd and selected columns
On(DistinctOn),
}

/// Removes duplicate rows from the input
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct DistinctOn {
/// The `DISTINCT ON` clause expression list
pub on_expr: Vec<Expr>,
/// The selected projection expression list
pub select_expr: Vec<Expr>,
/// The `ORDER BY` clause, whose initial expressions must match those of the `ON` clause when
/// present. Note that those matching expressions actually wrap the `ON` expressions with
/// additional info pertaining to the sorting procedure (i.e. ASC/DESC, and NULLS FIRST/LAST).
pub sort_expr: Option<Vec<Expr>>,
/// The logical plan that is being DISTINCT'd
pub input: Arc<LogicalPlan>,
/// The schema description of the DISTINCT ON output
pub schema: DFSchemaRef,
}

impl DistinctOn {
/// Create a new `DistinctOn` struct.
pub fn try_new(
on_expr: Vec<Expr>,
select_expr: Vec<Expr>,
sort_expr: Option<Vec<Expr>>,
input: Arc<LogicalPlan>,
) -> Result<Self> {
if on_expr.is_empty() {
return plan_err!("No `ON` expressions provided");
}

let on_expr = normalize_cols(on_expr, input.as_ref())?;

let schema = DFSchema::new_with_metadata(
exprlist_to_fields(&select_expr, &input)?,
input.schema().metadata().clone(),
)?;

let mut distinct_on = DistinctOn {
on_expr,
select_expr,
sort_expr: None,
input,
schema: Arc::new(schema),
};

if let Some(sort_expr) = sort_expr {
distinct_on = distinct_on.with_sort_expr(sort_expr)?;
}

Ok(distinct_on)
}

/// Try to update `self` with a new sort expressions.
///
/// Validates that the sort expressions are a super-set of the `ON` expressions.
pub fn with_sort_expr(mut self, sort_expr: Vec<Expr>) -> Result<Self> {
let sort_expr = normalize_cols(sort_expr, self.input.as_ref())?;

// Check that the left-most sort expressions are the same as the `ON` expressions.
let mut matched = true;
for (on, sort) in self.on_expr.iter().zip(sort_expr.iter()) {
match sort {
Expr::Sort(SortExpr { expr, .. }) => {
if on != &**expr {
matched = false;
break;
}
}
_ => return plan_err!("Not a sort expression: {sort}"),
}
}

if self.on_expr.len() > sort_expr.len() || !matched {
return plan_err!(
"SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
);
}

self.sort_expr = Some(sort_expr);
Ok(self)
}
}

/// Aggregates its input based on a set of grouping and aggregate
Expand Down
8 changes: 5 additions & 3 deletions datafusion/expr/src/tree_node/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,11 @@ impl TreeNode for Expr {
let mut transform = transform;

Ok(match self {
Expr::Alias(Alias { expr, name, .. }) => {
Expr::Alias(Alias::new(transform(*expr)?, name))
}
Expr::Alias(Alias {
expr,
relation,
name,
}) => Expr::Alias(Alias::new(transform(*expr)?, relation, name)),
Expr::Column(_) => self,
Expr::OuterReferenceColumn(_, _) => self,
Expr::Exists { .. } => self,
Expand Down
Loading

0 comments on commit 2185842

Please sign in to comment.