Skip to content

Commit

Permalink
Schema ambiguity checks when dereferencing
Browse files Browse the repository at this point in the history
  • Loading branch information
findepi committed Nov 20, 2024
1 parent 9ea86ca commit f9cc687
Show file tree
Hide file tree
Showing 10 changed files with 98 additions and 48 deletions.
78 changes: 58 additions & 20 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ impl DFSchema {
&self,
qualifier: Option<&TableReference>,
name: &str,
) -> Option<usize> {
) -> Result<Option<usize>> {
let mut matches = self
.iter()
.enumerate()
Expand All @@ -328,16 +328,53 @@ impl DFSchema {
// field to lookup is unqualified, no need to compare qualifier
(None, Some(_)) | (None, None) => f.name() == name,
})
.map(|(idx, _)| idx);
matches.next()
.map(|(idx, (q, _))| (idx, q));
let first_match = matches.next();
match first_match {
None => Ok(None),
Some((first_index, first_qualifier)) => {
let next_match = matches.next();
match next_match {
None => Ok(Some(first_index)),
Some((_, next_qualifier)) => {
match (first_qualifier, next_qualifier) {
(Some(q), Some(_)) => {
_schema_err!(SchemaError::DuplicateQualifiedField {
qualifier: Box::new(q.clone()),
name: name.to_string(),
})
}

(None, None) => {
_schema_err!(SchemaError::DuplicateUnqualifiedField {
name: name.to_string(),
})
}

_ => _schema_err!(SchemaError::AmbiguousReference {
field: Column {
relation: Some(
first_qualifier
.or(next_qualifier)
.unwrap()
.clone()
),
name: name.to_string(),
},
}),
}
}
}
}
}
}

/// Find the index of the column with the given qualifier and name,
/// returning `None` if not found
///
/// See [Self::index_of_column] for a version that returns an error if the
/// column is not found
pub fn maybe_index_of_column(&self, col: &Column) -> Option<usize> {
pub fn maybe_index_of_column(&self, col: &Column) -> Result<Option<usize>> {
self.index_of_column_by_name(col.relation.as_ref(), &col.name)
}

Expand All @@ -347,14 +384,15 @@ impl DFSchema {
/// See [Self::maybe_index_of_column] for a version that returns `None` if
/// the column is not found
pub fn index_of_column(&self, col: &Column) -> Result<usize> {
self.maybe_index_of_column(col)
self.maybe_index_of_column(col)?
.ok_or_else(|| field_not_found(col.relation.clone(), &col.name, self))
}

/// Check if the column is in the current schema
pub fn is_column_from_schema(&self, col: &Column) -> bool {
self.index_of_column_by_name(col.relation.as_ref(), &col.name)
.is_some()
pub fn is_column_from_schema(&self, col: &Column) -> Result<bool> {
Ok(self
.index_of_column_by_name(col.relation.as_ref(), &col.name)?
.is_some())
}

/// Find the field with the given name
Expand All @@ -378,7 +416,7 @@ impl DFSchema {
) -> Result<(Option<&TableReference>, &Field)> {
if let Some(qualifier) = qualifier {
let idx = self
.index_of_column_by_name(Some(qualifier), name)
.index_of_column_by_name(Some(qualifier), name)?
.ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?;
Ok((self.field_qualifiers[idx].as_ref(), self.field(idx)))
} else {
Expand Down Expand Up @@ -490,7 +528,7 @@ impl DFSchema {
name: &str,
) -> Result<&Field> {
let idx = self
.index_of_column_by_name(Some(qualifier), name)
.index_of_column_by_name(Some(qualifier), name)?
.ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?;

Ok(self.field(idx))
Expand Down Expand Up @@ -629,9 +667,9 @@ impl DFSchema {
let iter1 = fields1.iter();
let iter2 = fields2.iter();
fields1.len() == fields2.len() &&
// all fields have to be the same
// all fields have to be the same
iter1
.zip(iter2)
.zip(iter2)
.all(|(f1, f2)| Self::field_is_logically_equal(f1, f2))
}
(DataType::Union(fields1, _), DataType::Union(fields2, _)) => {
Expand Down Expand Up @@ -668,9 +706,9 @@ impl DFSchema {
let iter1 = fields1.iter();
let iter2 = fields2.iter();
fields1.len() == fields2.len() &&
// all fields have to be the same
// all fields have to be the same
iter1
.zip(iter2)
.zip(iter2)
.all(|(f1, f2)| Self::field_is_semantically_equal(f1, f2))
}
(DataType::Union(fields1, _), DataType::Union(fields2, _)) => {
Expand Down Expand Up @@ -1178,8 +1216,8 @@ mod tests {
.to_string(),
expected_help
);
assert!(schema.index_of_column_by_name(None, "y").is_none());
assert!(schema.index_of_column_by_name(None, "t1.c0").is_none());
assert!(schema.index_of_column_by_name(None, "y")?.is_none());
assert!(schema.index_of_column_by_name(None, "t1.c0")?.is_none());

Ok(())
}
Expand Down Expand Up @@ -1268,28 +1306,28 @@ mod tests {
{
let col = Column::from_qualified_name("t1.c0");
let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
assert!(schema.is_column_from_schema(&col));
assert!(schema.is_column_from_schema(&col)?);
}

// qualified not exists
{
let col = Column::from_qualified_name("t1.c2");
let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
assert!(!schema.is_column_from_schema(&col));
assert!(!schema.is_column_from_schema(&col)?);
}

// unqualified exists
{
let col = Column::from_name("c0");
let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
assert!(schema.is_column_from_schema(&col));
assert!(schema.is_column_from_schema(&col)?);
}

// unqualified not exists
{
let col = Column::from_name("c2");
let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
assert!(!schema.is_column_from_schema(&col));
assert!(!schema.is_column_from_schema(&col)?);
}

Ok(())
Expand Down
4 changes: 2 additions & 2 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::sync::{Arc, OnceLock};

use super::dml::CopyTo;
use super::DdlStatement;
use crate::builder::{change_redundant_column, unnest_with_options};
use crate::builder::unnest_with_options;
use crate::expr::{Placeholder, Sort as SortExpr, WindowFunction};
use crate::expr_rewriter::{
create_col_from_scalar_expr, normalize_cols, normalize_sorts, NamePreserver,
Expand Down Expand Up @@ -2193,7 +2193,7 @@ impl SubqueryAlias {
alias: impl Into<TableReference>,
) -> Result<Self> {
let alias = alias.into();
let fields = change_redundant_column(plan.schema().fields());
let fields = plan.schema().fields().clone();
let meta_data = plan.schema().as_ref().metadata().clone();
let schema: Schema =
DFSchema::from_unqualified_fields(fields.into(), meta_data)?.into();
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1012,7 +1012,7 @@ pub fn check_all_columns_from_schema(
schema: &DFSchema,
) -> Result<bool> {
for col in columns.iter() {
let exist = schema.is_column_from_schema(col);
let exist = schema.is_column_from_schema(col)?;
if !exist {
return Ok(false);
}
Expand Down
7 changes: 4 additions & 3 deletions datafusion/optimizer/src/optimize_projections/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ fn optimize_projections(
let all_exprs_iter = new_group_bys.iter().chain(new_aggr_expr.iter());
let schema = aggregate.input.schema();
let necessary_indices =
RequiredIndicies::new().with_exprs(schema, all_exprs_iter);
RequiredIndicies::new().with_exprs(schema, all_exprs_iter)?;
let necessary_exprs = necessary_indices.get_required_exprs(schema);

return optimize_projections(
Expand Down Expand Up @@ -217,7 +217,8 @@ fn optimize_projections(

// Get all the required column indices at the input, either by the
// parent or window expression requirements.
let required_indices = child_reqs.with_exprs(&input_schema, &new_window_expr);
let required_indices =
child_reqs.with_exprs(&input_schema, &new_window_expr)?;

return optimize_projections(
Arc::unwrap_or_clone(window.input),
Expand Down Expand Up @@ -753,7 +754,7 @@ fn rewrite_projection_given_requirements(
let exprs_used = indices.get_at_indices(&expr);

let required_indices =
RequiredIndicies::new().with_exprs(input.schema(), exprs_used.iter());
RequiredIndicies::new().with_exprs(input.schema(), exprs_used.iter())?;

// rewrite the children projection, and if they are changed rewrite the
// projection down
Expand Down
22 changes: 10 additions & 12 deletions datafusion/optimizer/src/optimize_projections/required_indices.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl RequiredIndicies {
// Add indices of the child fields referred to by the expressions in the
// parent
plan.apply_expressions(|e| {
self.add_expr(schema, e);
self.add_expr(schema, e)?;
Ok(TreeNodeRecursion::Continue)
})?;
Ok(self.compact())
Expand All @@ -111,17 +111,18 @@ impl RequiredIndicies {
///
/// * `input_schema`: The input schema to analyze for index requirements.
/// * `expr`: An expression for which we want to find necessary field indices.
fn add_expr(&mut self, input_schema: &DFSchemaRef, expr: &Expr) {
fn add_expr(&mut self, input_schema: &DFSchemaRef, expr: &Expr) -> Result<()> {
// TODO could remove these clones (and visit the expression directly)
let mut cols = expr.column_refs();
// Get outer-referenced (subquery) columns:
outer_columns(expr, &mut cols);
self.indices.reserve(cols.len());
for col in cols {
if let Some(idx) = input_schema.maybe_index_of_column(col) {
if let Some(idx) = input_schema.maybe_index_of_column(col)? {
self.indices.push(idx);
}
}
Ok(())
}

/// Adds the indices of the fields referred to by the given expressions
Expand All @@ -132,17 +133,14 @@ impl RequiredIndicies {
/// * `input_schema`: The input schema to analyze for index requirements.
/// * `exprs`: the expressions for which we want to find field indices.
pub fn with_exprs<'a>(
self,
mut self,
schema: &DFSchemaRef,
exprs: impl IntoIterator<Item = &'a Expr>,
) -> Self {
exprs
.into_iter()
.fold(self, |mut acc, expr| {
acc.add_expr(schema, expr);
acc
})
.compact()
) -> Result<Self> {
for expr in exprs {
self.add_expr(schema, expr)?;
}
Ok(self.compact())
}

/// Adds all `indices` into this instance.
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1750,7 +1750,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.enumerate()
.map(|(i, c)| {
let column_index = table_schema
.index_of_column_by_name(None, &c)
.index_of_column_by_name(None, &c)?
.ok_or_else(|| unqualified_field_not_found(&c, &table_schema))?;
if value_indices[column_index].is_some() {
return schema_err!(SchemaError::DuplicateUnqualifiedField {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/src/unparser/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ pub(crate) fn unproject_sort_expr(

// In case of aggregation there could be columns containing aggregation functions we need to unproject
if let Some(agg) = agg {
if agg.schema.is_column_from_schema(col_ref) {
if agg.schema.is_column_from_schema(col_ref)? {
let new_expr = unproject_agg_exprs(sort_expr.expr, agg, None)?;
sort_expr.expr = new_expr;
return Ok(sort_expr);
Expand Down
11 changes: 6 additions & 5 deletions datafusion/sqllogictest/test_files/join.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1215,18 +1215,19 @@ statement ok
create table t1(v1 int) as values(100);

## Query with Ambiguous column reference
query I
query error DataFusion error: Schema error: Schema contains duplicate qualified field name t1\.v1
select count(*)
from t1
right outer join t1
on t1.v1 > 0;
----
1

query I
query error
select t1.v1 from t1 join t1 using(v1) cross join (select struct('foo' as v1) as t1);
----
100
DataFusion error: Optimizer rule 'eliminate_cross_join' failed
caused by
Schema error: Schema contains duplicate qualified field name t1.v1


statement ok
drop table t1;
Expand Down
16 changes: 14 additions & 2 deletions datafusion/sqllogictest/test_files/select.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1807,11 +1807,23 @@ SELECT id AS col, id+1 AS col FROM users
----
1 2

# a reference is ambiguous
query error DataFusion error: Schema error: Ambiguous reference to unqualified field a
select a from (select 1 as a, 2 as a) t;

# t.a reference is ambiguous
query error DataFusion error: Schema error: Schema contains duplicate qualified field name t\.a
select t.a from (select 1 as a, 2 as a) t;

# TODO PostgreSQL disallows self-join without giving tables distinct aliases, but some other databases, e.g. Trino, do allow this, so this could work
# TODO When joining using USING, the condition columns should appear once in the output, and should be selectible using unqualified name only
query ITIT
query error
SELECT * FROM users JOIN users USING (id);
----
1 Tom 1 Tom
DataFusion error: expand_wildcard_rule
caused by
Schema error: Schema contains duplicate qualified field name users.id


statement ok
create view v as select count(id) from users;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/substrait/src/logical_plan/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1270,7 +1270,7 @@ fn apply_projection(table: DataFrame, substrait_schema: DFSchema) -> Result<Logi
.iter()
.map(|substrait_field| {
Ok(df_schema
.index_of_column_by_name(None, substrait_field.name().as_str())
.index_of_column_by_name(None, substrait_field.name().as_str())?
.unwrap())
})
.collect::<Result<_>>()?;
Expand Down

0 comments on commit f9cc687

Please sign in to comment.