-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
single_distinct_to_groupby no longer drops qualifiers #4050
Conversation
"| 2 | 2 |", | ||
"| 4 | 1 |", | ||
"+-----+------------------------+", | ||
"+-------+------------------------+", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand why this plan would produce t.val
as the column name but the query above it (SELECT val, count(distinct dict) FROM t GROUP BY val
) still produced val
🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I only fixed this for the single_distinct_to_groupby
case but it would be good to fix this consistently for all cases. I will see if I can fix in this PR, or file a new issue if it not trivial
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually. I'm confused now as well ... perhaps this name should not have been qualified in the first place? I'll take another look
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand this now. The test above is referencing a column named t.val
and the physical plan drops column qualifiers:
Expr::Column(c) => {
let idx = input_dfschema.index_of_column(c)?;
Ok(Arc::new(Column::new(&c.name, idx)))
}
The updated test is referencing a column with an alias of t.val,
and the physical plan uses the alias name, hence the different output.
@@ -137,12 +137,14 @@ fn optimize(plan: &LogicalPlan) -> Result<LogicalPlan> { | |||
// - aggr expr | |||
let mut alias_expr: Vec<Expr> = Vec::new(); | |||
for (alias, original_field) in group_expr_alias { | |||
alias_expr.push(col(&alias).alias(original_field.name())); | |||
alias_expr.push(col(&alias).alias(original_field.qualified_name())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the main change
Projection: part.p_brand, part.p_type, part.p_size, COUNT(DISTINCT partsupp.ps_suppkey) AS supplier_cnt | ||
Projection: group_alias_0 AS p_brand, group_alias_1 AS p_type, group_alias_2 AS p_size, COUNT(alias1) AS COUNT(DISTINCT partsupp.ps_suppkey) | ||
Projection: group_alias_0 AS part.p_brand, group_alias_1 AS part.p_type, group_alias_2 AS part.p_size, COUNT(alias1) AS COUNT(DISTINCT partsupp.ps_suppkey) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that this logical plan was invalid before this PR because the outer projection was looking for part.p_brand
and the inner projection only provided p_brand
.
let fields = result[0] | ||
.schema() | ||
.fields() | ||
.iter() | ||
.map(|f| { | ||
let simple_name = match f.name().find('.') { | ||
Some(i) => f.name()[i + 1..].to_string(), | ||
_ => f.name().to_string(), | ||
}; | ||
f.clone().with_name(simple_name) | ||
}) | ||
.collect(); | ||
let result_schema = SchemaRef::new(Schema::new(fields)); | ||
let result = result | ||
.iter() | ||
.map(|b| { | ||
RecordBatch::try_new(result_schema.clone(), b.columns().to_vec()) | ||
.map_err(|e| e.into()) | ||
}) | ||
.collect::<Result<Vec<_>>>()?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can't use the physical schema from query execution because it has ?table?
as the qualifier so this code just strips field names down to simple names
// the original field may now be aliased with a name that matches the | ||
// original qualified name | ||
let table_ref: TableReference = field.name().as_str().into(); | ||
match table_ref { | ||
TableReference::Partial { schema, table } => { | ||
schema == qq && table == name | ||
} | ||
TableReference::Full { schema, table, .. } => { | ||
schema == qq && table == name | ||
} | ||
_ => false, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not crazy about this but I don't see how else we can handle this unless we introduce some sort of alias-with-qualifier concept
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps aliases should be Column
instead of String
?
enum Expr {
Alias(expr: Box<Expr>, alias: Column>
...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think alias's can be fully qualified identifiers
I think the core disconnect is that datafusion 'flattens' compound identifiers into strings for column names which can then be referred to by anything that reads the plan that makes them
Postgres deals with this type of issue by using indexes to identify columns, but that ends up with its own problems as the indexes invariably get mixed up sometimes and debugging it is super tricky.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me @andygrove -- thank you
|
||
let bytes = logical_plan_to_bytes(&plan)?; | ||
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?; | ||
assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
// the original field may now be aliased with a name that matches the | ||
// original qualified name | ||
let table_ref: TableReference = field.name().as_str().into(); | ||
match table_ref { | ||
TableReference::Partial { schema, table } => { | ||
schema == qq && table == name | ||
} | ||
TableReference::Full { schema, table, .. } => { | ||
schema == qq && table == name | ||
} | ||
_ => false, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think alias's can be fully qualified identifiers
I think the core disconnect is that datafusion 'flattens' compound identifiers into strings for column names which can then be referred to by anything that reads the plan that makes them
Postgres deals with this type of issue by using indexes to identify columns, but that ends up with its own problems as the indexes invariably get mixed up sometimes and debugging it is super tricky.
Thanks for the review @alamb. With this PR we can now support 21 out of 22 TPC-H queries in DataFusion and Ballista. Getting close .. |
So close! |
Benchmark runs are scheduled for baseline = 396b5aa and contender = 8d6448e. 8d6448e is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Which issue does this PR close?
Closes #4049
Closes #3820
Includes changes from #4084
Rationale for this change
Simple bug fix so that we can serialize plans containing a single distinct aggregate
What changes are included in this PR?
TableReference
fromsql
tocommon
DFSChema::index_of_column_by_name
Are there any user-facing changes?