Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

single_distinct_to_groupby no longer drops qualifiers #4050

Merged
merged 11 commits into from
Nov 2, 2022
4 changes: 2 additions & 2 deletions benchmarks/expected-plans/q16.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Sort: supplier_cnt DESC NULLS FIRST, part.p_brand ASC NULLS LAST, part.p_type ASC NULLS LAST, part.p_size ASC NULLS LAST
Projection: part.p_brand, part.p_type, part.p_size, COUNT(DISTINCT partsupp.ps_suppkey) AS supplier_cnt
Projection: group_alias_0 AS p_brand, group_alias_1 AS p_type, group_alias_2 AS p_size, COUNT(alias1) AS COUNT(DISTINCT partsupp.ps_suppkey)
Projection: group_alias_0 AS part.p_brand, group_alias_1 AS part.p_type, group_alias_2 AS part.p_size, COUNT(alias1) AS COUNT(DISTINCT partsupp.ps_suppkey)
Comment on lines 2 to +3
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this logical plan was invalid before this PR because the outer projection was looking for part.p_brand and the inner projection only provided p_brand.

Aggregate: groupBy=[[group_alias_0, group_alias_1, group_alias_2]], aggr=[[COUNT(alias1)]]
Aggregate: groupBy=[[part.p_brand AS group_alias_0, part.p_type AS group_alias_1, part.p_size AS group_alias_2, partsupp.ps_suppkey AS alias1]], aggr=[[]]
LeftAnti Join: partsupp.ps_suppkey = __sq_1.s_suppkey
Expand All @@ -10,4 +10,4 @@ Sort: supplier_cnt DESC NULLS FIRST, part.p_brand ASC NULLS LAST, part.p_type AS
TableScan: part projection=[p_partkey, p_brand, p_type, p_size]
Projection: supplier.s_suppkey AS s_suppkey, alias=__sq_1
Filter: supplier.s_comment LIKE Utf8("%Customer%Complaints%")
TableScan: supplier projection=[s_suppkey, s_comment]
TableScan: supplier projection=[s_suppkey, s_comment]
19 changes: 17 additions & 2 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::convert::TryFrom;
use std::sync::Arc;

use crate::error::{DataFusionError, Result, SchemaError};
use crate::{field_not_found, Column};
use crate::{field_not_found, Column, TableReference};

use arrow::compute::can_cast_types;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
Expand Down Expand Up @@ -203,7 +203,22 @@ impl DFSchema {
// qualifier and name.
(Some(q), Some(field_q)) => q == field_q && field.name() == name,
// field to lookup is qualified but current field is unqualified.
(Some(_), None) => false,
(Some(qq), None) => {
// the original field may now be aliased with a name that matches the
// original qualified name
let table_ref: TableReference = field.name().as_str().into();
match table_ref {
TableReference::Partial { schema, table } => {
schema == qq && table == name
}
TableReference::Full {
catalog,
schema,
table,
} if catalog.is_empty() => schema == qq && table == name,
_ => false,
}
}
// field to lookup is unqualified, no need to compare qualifier
(None, Some(_)) | (None, None) => field.name() == name,
})
Expand Down
2 changes: 2 additions & 0 deletions datafusion/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub mod parsers;
mod pyarrow;
pub mod scalar;
pub mod stats;
mod table_reference;
pub mod test_util;

pub use column::Column;
Expand All @@ -34,6 +35,7 @@ pub use error::{field_not_found, DataFusionError, Result, SchemaError};
pub use parsers::parse_interval;
pub use scalar::{ScalarType, ScalarValue};
pub use stats::{ColumnStatistics, Statistics};
pub use table_reference::{ResolvedTableReference, TableReference};

/// Downcast an Arrow Array to a concrete type, return an `DataFusionError::Internal` if the cast is
/// not possible. In normal usage of DataFusion the downcast should always succeed.
Expand Down
14 changes: 7 additions & 7 deletions datafusion/core/tests/sql/group_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -654,13 +654,13 @@ async fn group_by_dictionary() {
.expect("ran plan correctly");

let expected = vec![
"+-----+------------------------+",
"| val | COUNT(DISTINCT t.dict) |",
"+-----+------------------------+",
"| 1 | 2 |",
"| 2 | 2 |",
"| 4 | 1 |",
"+-----+------------------------+",
"+-------+------------------------+",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why this plan would produce t.val as the column name but the query above it (SELECT val, count(distinct dict) FROM t GROUP BY val) still produced val 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only fixed this for the single_distinct_to_groupby case but it would be good to fix this consistently for all cases. I will see if I can fix in this PR, or file a new issue if it not trivial

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually. I'm confused now as well ... perhaps this name should not have been qualified in the first place? I'll take another look

Copy link
Member Author

@andygrove andygrove Nov 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand this now. The test above is referencing a column named t.val and the physical plan drops column qualifiers:

        Expr::Column(c) => {
            let idx = input_dfschema.index_of_column(c)?;
            Ok(Arc::new(Column::new(&c.name, idx)))
        }

The updated test is referencing a column with an alias of t.val, and the physical plan uses the alias name, hence the different output.

"| t.val | COUNT(DISTINCT t.dict) |",
"+-------+------------------------+",
"| 1 | 2 |",
"| 2 | 2 |",
"| 4 | 1 |",
"+-------+------------------------+",
];
assert_batches_sorted_eq!(expected, &results);
}
Expand Down
12 changes: 7 additions & 5 deletions datafusion/optimizer/src/single_distinct_to_groupby.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,14 @@ fn optimize(plan: &LogicalPlan) -> Result<LogicalPlan> {
// - aggr expr
let mut alias_expr: Vec<Expr> = Vec::new();
for (alias, original_field) in group_expr_alias {
alias_expr.push(col(&alias).alias(original_field.name()));
alias_expr.push(col(&alias).alias(original_field.qualified_name()));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main change

}
for (i, expr) in new_aggr_exprs.iter().enumerate() {
alias_expr.push(columnize_expr(
expr.clone()
.alias(schema.clone().fields()[i + group_expr.len()].name()),
expr.clone().alias(
schema.clone().fields()[i + group_expr.len()]
.qualified_name(),
),
&outer_aggr_schema,
));
}
Expand Down Expand Up @@ -362,7 +364,7 @@ mod tests {
.build()?;

// Should work
let expected = "Projection: group_alias_0 AS a, COUNT(alias1) AS COUNT(DISTINCT test.b) [a:UInt32, COUNT(DISTINCT test.b):Int64;N]\
let expected = "Projection: group_alias_0 AS test.a, COUNT(alias1) AS COUNT(DISTINCT test.b) [a:UInt32, COUNT(DISTINCT test.b):Int64;N]\
\n Aggregate: groupBy=[[group_alias_0]], aggr=[[COUNT(alias1)]] [group_alias_0:UInt32, COUNT(alias1):Int64;N]\
\n Aggregate: groupBy=[[test.a AS group_alias_0, test.b AS alias1]], aggr=[[]] [group_alias_0:UInt32, alias1:UInt32]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
Expand Down Expand Up @@ -409,7 +411,7 @@ mod tests {
)?
.build()?;
// Should work
let expected = "Projection: group_alias_0 AS a, COUNT(alias1) AS COUNT(DISTINCT test.b), MAX(alias1) AS MAX(DISTINCT test.b) [a:UInt32, COUNT(DISTINCT test.b):Int64;N, MAX(DISTINCT test.b):UInt32;N]\
let expected = "Projection: group_alias_0 AS test.a, COUNT(alias1) AS COUNT(DISTINCT test.b), MAX(alias1) AS MAX(DISTINCT test.b) [a:UInt32, COUNT(DISTINCT test.b):Int64;N, MAX(DISTINCT test.b):UInt32;N]\
\n Aggregate: groupBy=[[group_alias_0]], aggr=[[COUNT(alias1), MAX(alias1)]] [group_alias_0:UInt32, COUNT(alias1):Int64;N, MAX(alias1):UInt32;N]\
\n Aggregate: groupBy=[[test.a AS group_alias_0, test.b AS alias1]], aggr=[[]] [group_alias_0:UInt32, alias1:UInt32]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
Expand Down
28 changes: 28 additions & 0 deletions datafusion/proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,34 @@ mod roundtrip_tests {
Ok(())
}

#[tokio::test]
async fn roundtrip_single_count_distinct() -> Result<(), DataFusionError> {
let ctx = SessionContext::new();

let schema = Schema::new(vec![
Field::new("a", DataType::Int64, true),
Field::new("b", DataType::Decimal128(15, 2), true),
]);

ctx.register_csv(
"t1",
"testdata/test.csv",
CsvReadOptions::default().schema(&schema),
)
.await?;

let query = "SELECT a, COUNT(DISTINCT b) as b_cd FROM t1 GROUP BY a";
let plan = ctx.sql(query).await?.to_logical_plan()?;

println!("{:?}", plan);

let bytes = logical_plan_to_bytes(&plan)?;
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


Ok(())
}

#[tokio::test]
async fn roundtrip_logical_plan_with_extension() -> Result<(), DataFusionError> {
let ctx = SessionContext::new();
Expand Down
3 changes: 1 addition & 2 deletions datafusion/sql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@

pub mod parser;
pub mod planner;
mod table_reference;
pub mod utils;

pub use datafusion_common::{ResolvedTableReference, TableReference};
pub use sqlparser;
pub use table_reference::{ResolvedTableReference, TableReference};
2 changes: 1 addition & 1 deletion datafusion/sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ use std::str::FromStr;
use std::sync::Arc;
use std::{convert::TryInto, vec};

use crate::table_reference::TableReference;
use crate::utils::{make_decimal_type, normalize_ident, resolve_columns};
use datafusion_common::TableReference;
use datafusion_common::{
field_not_found, Column, DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue,
};
Expand Down