Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combine Expr::Wildcard and Wxpr::QualifiedWildcard, add wildcard() expr fn #8105

Merged
merged 1 commit into from
Nov 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,7 @@ pub fn expr_applicable_for_cols(col_names: &[String], expr: &Expr) -> bool {
| Expr::AggregateFunction { .. }
| Expr::Sort { .. }
| Expr::WindowFunction { .. }
| Expr::Wildcard
| Expr::QualifiedWildcard { .. }
| Expr::Wildcard { .. }
| Expr::Placeholder(_) => {
is_applicable = false;
VisitRecursion::Stop
Expand Down
5 changes: 2 additions & 3 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,9 +364,8 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
Expr::Sort { .. } => {
internal_err!("Create physical name does not support sort expression")
}
Expr::Wildcard => internal_err!("Create physical name does not support wildcard"),
Expr::QualifiedWildcard { .. } => {
internal_err!("Create physical name does not support qualified wildcard")
Expr::Wildcard { .. } => {
internal_err!("Create physical name does not support wildcard")
}
Expr::Placeholder(_) => {
internal_err!("Create physical name does not support placeholder")
Expand Down
29 changes: 14 additions & 15 deletions datafusion/core/tests/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@ use datafusion::{assert_batches_eq, assert_batches_sorted_eq};
use datafusion_common::{DataFusionError, ScalarValue, UnnestOptions};
use datafusion_execution::config::SessionConfig;
use datafusion_expr::expr::{GroupingSet, Sort};
use datafusion_expr::Expr::Wildcard;
use datafusion_expr::{
array_agg, avg, col, count, exists, expr, in_subquery, lit, max, out_ref_col,
scalar_subquery, sum, AggregateFunction, Expr, ExprSchemable, WindowFrame,
scalar_subquery, sum, wildcard, AggregateFunction, Expr, ExprSchemable, WindowFrame,
WindowFrameBound, WindowFrameUnits, WindowFunction,
};
use datafusion_physical_expr::var_provider::{VarProvider, VarType};
Expand All @@ -64,8 +63,8 @@ async fn test_count_wildcard_on_sort() -> Result<()> {
let df_results = ctx
.table("t1")
.await?
.aggregate(vec![col("b")], vec![count(Wildcard)])?
.sort(vec![count(Wildcard).sort(true, false)])?
.aggregate(vec![col("b")], vec![count(wildcard())])?
.sort(vec![count(wildcard()).sort(true, false)])?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this shows the difference in API for users (calling wildcard()), which I think is more consistent with other functions

.explain(false, false)?
.collect()
.await?;
Expand Down Expand Up @@ -99,8 +98,8 @@ async fn test_count_wildcard_on_where_in() -> Result<()> {
Arc::new(
ctx.table("t2")
.await?
.aggregate(vec![], vec![count(Expr::Wildcard)])?
.select(vec![count(Expr::Wildcard)])?
.aggregate(vec![], vec![count(wildcard())])?
.select(vec![count(wildcard())])?
.into_unoptimized_plan(),
// Usually, into_optimized_plan() should be used here, but due to
// https://github.com/apache/arrow-datafusion/issues/5771,
Expand Down Expand Up @@ -136,8 +135,8 @@ async fn test_count_wildcard_on_where_exist() -> Result<()> {
.filter(exists(Arc::new(
ctx.table("t2")
.await?
.aggregate(vec![], vec![count(Expr::Wildcard)])?
.select(vec![count(Expr::Wildcard)])?
.aggregate(vec![], vec![count(wildcard())])?
.select(vec![count(wildcard())])?
.into_unoptimized_plan(),
// Usually, into_optimized_plan() should be used here, but due to
// https://github.com/apache/arrow-datafusion/issues/5771,
Expand Down Expand Up @@ -172,7 +171,7 @@ async fn test_count_wildcard_on_window() -> Result<()> {
.await?
.select(vec![Expr::WindowFunction(expr::WindowFunction::new(
WindowFunction::AggregateFunction(AggregateFunction::Count),
vec![Expr::Wildcard],
vec![wildcard()],
vec![],
vec![Expr::Sort(Sort::new(Box::new(col("a")), false, true))],
WindowFrame {
Expand Down Expand Up @@ -202,17 +201,17 @@ async fn test_count_wildcard_on_aggregate() -> Result<()> {
let sql_results = ctx
.sql("select count(*) from t1")
.await?
.select(vec![count(Expr::Wildcard)])?
.select(vec![count(wildcard())])?
.explain(false, false)?
.collect()
.await?;

// add `.select(vec![count(Expr::Wildcard)])?` to make sure we can analyze all node instead of just top node.
// add `.select(vec![count(wildcard())])?` to make sure we can analyze all node instead of just top node.
let df_results = ctx
.table("t1")
.await?
.aggregate(vec![], vec![count(Expr::Wildcard)])?
.select(vec![count(Expr::Wildcard)])?
.aggregate(vec![], vec![count(wildcard())])?
.select(vec![count(wildcard())])?
.explain(false, false)?
.collect()
.await?;
Expand Down Expand Up @@ -248,8 +247,8 @@ async fn test_count_wildcard_on_where_scalar_subquery() -> Result<()> {
ctx.table("t2")
.await?
.filter(out_ref_col(DataType::UInt32, "t1.a").eq(col("t2.a")))?
.aggregate(vec![], vec![count(Wildcard)])?
.select(vec![col(count(Wildcard).to_string())])?
.aggregate(vec![], vec![count(wildcard())])?
.select(vec![col(count(wildcard()).to_string())])?
.into_unoptimized_plan(),
))
.gt(lit(ScalarValue::UInt8(Some(0)))),
Expand Down
29 changes: 14 additions & 15 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,16 +166,12 @@ pub enum Expr {
InSubquery(InSubquery),
/// Scalar subquery
ScalarSubquery(Subquery),
/// Represents a reference to all available fields.
/// Represents a reference to all available fields in a specific schema,
/// with an optional (schema) qualifier.
///
/// This expr has to be resolved to a list of columns before translating logical
/// plan into physical plan.
Wildcard,
/// Represents a reference to all available fields in a specific schema.
///
/// This expr has to be resolved to a list of columns before translating logical
/// plan into physical plan.
QualifiedWildcard { qualifier: String },
Wildcard { qualifier: Option<String> },
/// List of grouping set expressions. Only valid in the context of an aggregate
/// GROUP BY expression list
GroupingSet(GroupingSet),
Expand Down Expand Up @@ -729,15 +725,14 @@ impl Expr {
Expr::Negative(..) => "Negative",
Expr::Not(..) => "Not",
Expr::Placeholder(_) => "Placeholder",
Expr::QualifiedWildcard { .. } => "QualifiedWildcard",
Expr::ScalarFunction(..) => "ScalarFunction",
Expr::ScalarSubquery { .. } => "ScalarSubquery",
Expr::ScalarUDF(..) => "ScalarUDF",
Expr::ScalarVariable(..) => "ScalarVariable",
Expr::Sort { .. } => "Sort",
Expr::TryCast { .. } => "TryCast",
Expr::WindowFunction { .. } => "WindowFunction",
Expr::Wildcard => "Wildcard",
Expr::Wildcard { .. } => "Wildcard",
}
}

Expand Down Expand Up @@ -1292,8 +1287,10 @@ impl fmt::Display for Expr {
write!(f, "{expr} IN ([{}])", expr_vec_fmt!(list))
}
}
Expr::Wildcard => write!(f, "*"),
Expr::QualifiedWildcard { qualifier } => write!(f, "{qualifier}.*"),
Expr::Wildcard { qualifier } => match qualifier {
Some(qualifier) => write!(f, "{qualifier}.*"),
None => write!(f, "*"),
},
Expr::GetIndexedField(GetIndexedField { field, expr }) => match field {
GetFieldAccess::NamedStructField { name } => {
write!(f, "({expr})[{name}]")
Expand Down Expand Up @@ -1613,10 +1610,12 @@ fn create_name(e: &Expr) -> Result<String> {
Expr::Sort { .. } => {
internal_err!("Create name does not support sort expression")
}
Expr::Wildcard => Ok("*".to_string()),
Expr::QualifiedWildcard { .. } => {
internal_err!("Create name does not support qualified wildcard")
}
Expr::Wildcard { qualifier } => match qualifier {
Some(qualifier) => internal_err!(
"Create name does not support qualified wildcard, got {qualifier}"
),
None => Ok("*".to_string()),
},
Expr::Placeholder(Placeholder { id, .. }) => Ok((*id).to_string()),
}
}
Expand Down
13 changes: 13 additions & 0 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,19 @@ pub fn placeholder(id: impl Into<String>) -> Expr {
})
}

/// Create an '*' [`Expr::Wildcard`] expression that matches all columns
///
/// # Example
///
/// ```rust
/// # use datafusion_expr::{wildcard};
/// let p = wildcard();
/// assert_eq!(p.to_string(), "*")
/// ```
pub fn wildcard() -> Expr {
Expr::Wildcard { qualifier: None }
}
Comment on lines +111 to +113
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a fn wildcard_with_qualifier() or fn wildcard(qualifier: None)? Though it's not used in our examples.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that it was such a niche usecase that if anyone needed the qualifier they could make it explicitly via Expr::Wildcard{ qualifier: Some(...)}, but that is indeed messier

I would be happy to add qualified_wildcard or similar if others think it would be valuable


/// Return a new expression `left <op> right`
pub fn binary_expr(left: Expr, op: Operator, right: Expr) -> Expr {
Expr::BinaryExpr(BinaryExpr::new(Box::new(left), op, Box::new(right)))
Expand Down
15 changes: 6 additions & 9 deletions datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,13 @@ impl ExprSchemable for Expr {
plan_datafusion_err!("Placeholder type could not be resolved")
})
}
Expr::Wildcard => {
Expr::Wildcard { qualifier } => {
// Wildcard do not really have a type and do not appear in projections
Ok(DataType::Null)
match qualifier {
Some(_) => internal_err!("QualifiedWildcard expressions are not valid in a logical query plan"),
None => Ok(DataType::Null)
}
}
Expr::QualifiedWildcard { .. } => internal_err!(
"QualifiedWildcard expressions are not valid in a logical query plan"
),
Expr::GroupingSet(_) => {
// grouping sets do not really have a type and do not appear in projections
Ok(DataType::Null)
Expand Down Expand Up @@ -270,12 +270,9 @@ impl ExprSchemable for Expr {
| Expr::SimilarTo(Like { expr, pattern, .. }) => {
Ok(expr.nullable(input_schema)? || pattern.nullable(input_schema)?)
}
Expr::Wildcard => internal_err!(
Expr::Wildcard { .. } => internal_err!(
"Wildcard expressions are not valid in a logical query plan"
),
Expr::QualifiedWildcard { .. } => internal_err!(
"QualifiedWildcard expressions are not valid in a logical query plan"
),
Expr::GetIndexedField(GetIndexedField { expr, field }) => {
field_for_index(expr, field, input_schema).map(|x| x.is_nullable())
}
Expand Down
13 changes: 9 additions & 4 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1287,11 +1287,16 @@ pub fn project(
for e in expr {
let e = e.into();
match e {
Expr::Wildcard => {
Expr::Wildcard { qualifier: None } => {
projected_expr.extend(expand_wildcard(input_schema, &plan, None)?)
}
Expr::QualifiedWildcard { ref qualifier } => projected_expr
.extend(expand_qualified_wildcard(qualifier, input_schema, None)?),
Expr::Wildcard {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here is the only place where Wildcard and QualifiedWildcard are treated substantially differently

qualifier: Some(qualifier),
} => projected_expr.extend(expand_qualified_wildcard(
&qualifier,
input_schema,
None,
)?),
_ => projected_expr
.push(columnize_expr(normalize_col(e, &plan)?, input_schema)),
}
Expand Down Expand Up @@ -1590,7 +1595,7 @@ mod tests {

let plan = table_scan(Some("t1"), &employee_schema(), None)?
.join_using(t2, JoinType::Inner, vec!["id"])?
.project(vec![Expr::Wildcard])?
.project(vec![Expr::Wildcard { qualifier: None }])?
.build()?;

// id column should only show up once in projection
Expand Down
8 changes: 2 additions & 6 deletions datafusion/expr/src/tree_node/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ impl TreeNode for Expr {
| Expr::Literal(_)
| Expr::Exists { .. }
| Expr::ScalarSubquery(_)
| Expr::Wildcard
| Expr::QualifiedWildcard { .. }
| Expr::Wildcard {..}
| Expr::Placeholder (_) => vec![],
Expr::BinaryExpr(BinaryExpr { left, right, .. }) => {
vec![left.as_ref().clone(), right.as_ref().clone()]
Expand Down Expand Up @@ -350,10 +349,7 @@ impl TreeNode for Expr {
transform_vec(list, &mut transform)?,
negated,
)),
Expr::Wildcard => Expr::Wildcard,
Expr::QualifiedWildcard { qualifier } => {
Expr::QualifiedWildcard { qualifier }
}
Expr::Wildcard { qualifier } => Expr::Wildcard { qualifier },
Expr::GetIndexedField(GetIndexedField { expr, field }) => {
Expr::GetIndexedField(GetIndexedField::new(
transform_boxed(expr, &mut transform)?,
Expand Down
3 changes: 1 addition & 2 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,7 @@ pub fn expr_to_columns(expr: &Expr, accum: &mut HashSet<Column>) -> Result<()> {
| Expr::Exists { .. }
| Expr::InSubquery(_)
| Expr::ScalarSubquery(_)
| Expr::Wildcard
| Expr::QualifiedWildcard { .. }
| Expr::Wildcard { .. }
| Expr::GetIndexedField { .. }
| Expr::Placeholder(_)
| Expr::OuterReferenceColumn { .. } => {}
Expand Down
Loading