Skip to content

Commit

Permalink
[BUG]: joins with duplicate column names and qualified table expansion (
Browse files Browse the repository at this point in the history
#3074)

- closes #3070
- closes #3072
- closes #3073
  • Loading branch information
universalmind303 authored Oct 18, 2024
1 parent 5795adc commit b04e56f
Show file tree
Hide file tree
Showing 13 changed files with 338 additions and 61 deletions.
2 changes: 2 additions & 0 deletions daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1726,6 +1726,8 @@ class LogicalPlanBuilder:
right_on: list[PyExpr],
join_type: JoinType,
strategy: JoinStrategy | None = None,
join_prefix: str | None = None,
join_suffix: str | None = None,
) -> LogicalPlanBuilder: ...
def concat(self, other: LogicalPlanBuilder) -> LogicalPlanBuilder: ...
def add_monotonically_increasing_id(self, column_name: str | None) -> LogicalPlanBuilder: ...
Expand Down
45 changes: 44 additions & 1 deletion daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1688,10 +1688,13 @@ def join(
right_on: Optional[Union[List[ColumnInputType], ColumnInputType]] = None,
how: str = "inner",
strategy: Optional[str] = None,
prefix: Optional[str] = None,
suffix: Optional[str] = None,
) -> "DataFrame":
"""Column-wise join of the current DataFrame with an ``other`` DataFrame, similar to a SQL ``JOIN``
If the two DataFrames have duplicate non-join key column names, "right." will be prepended to the conflicting right columns.
If the two DataFrames have duplicate non-join key column names, "right." will be prepended to the conflicting right columns. You can change the behavior by passing either (or both) `prefix` or `suffix` to the function.
If `prefix` is passed, it will be prepended to the conflicting right columns. If `suffix` is passed, it will be appended to the conflicting right columns.
.. NOTE::
Although self joins are supported, we currently duplicate the logical plan for the right side
Expand All @@ -1716,6 +1719,42 @@ def join(
<BLANKLINE>
(Showing first 2 of 2 rows)
>>> import daft
>>> from daft import col
>>> df1 = daft.from_pydict({ "a": ["w", "x", "y"], "b": [1, 2, 3] })
>>> df2 = daft.from_pydict({ "a": ["x", "y", "z"], "b": [20, 30, 40] })
>>> joined_df = df1.join(df2, left_on=[col("a"), col("b")], right_on=[col("a"), col("b")/10], prefix="right_")
>>> joined_df.show()
╭──────┬───────┬─────────╮
│ a ┆ b ┆ right_b │
│ --- ┆ --- ┆ --- │
│ Utf8 ┆ Int64 ┆ Int64 │
╞══════╪═══════╪═════════╡
│ x ┆ 2 ┆ 20 │
├╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
│ y ┆ 3 ┆ 30 │
╰──────┴───────┴─────────╯
<BLANKLINE>
(Showing first 2 of 2 rows)
>>> import daft
>>> from daft import col
>>> df1 = daft.from_pydict({ "a": ["w", "x", "y"], "b": [1, 2, 3] })
>>> df2 = daft.from_pydict({ "a": ["x", "y", "z"], "b": [20, 30, 40] })
>>> joined_df = df1.join(df2, left_on=[col("a"), col("b")], right_on=[col("a"), col("b")/10], suffix="_right")
>>> joined_df.show()
╭──────┬───────┬─────────╮
│ a ┆ b ┆ b_right │
│ --- ┆ --- ┆ --- │
│ Utf8 ┆ Int64 ┆ Int64 │
╞══════╪═══════╪═════════╡
│ x ┆ 2 ┆ 20 │
├╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
│ y ┆ 3 ┆ 30 │
╰──────┴───────┴─────────╯
<BLANKLINE>
(Showing first 2 of 2 rows)
Args:
other (DataFrame): the right DataFrame to join on.
on (Optional[Union[List[ColumnInputType], ColumnInputType]], optional): key or keys to join on [use if the keys on the left and right side match.]. Defaults to None.
Expand All @@ -1724,6 +1763,8 @@ def join(
how (str, optional): what type of join to perform; currently "inner", "left", "right", "outer", "anti", and "semi" are supported. Defaults to "inner".
strategy (Optional[str]): The join strategy (algorithm) to use; currently "hash", "sort_merge", "broadcast", and None are supported, where None
chooses the join strategy automatically during query optimization. The default is None.
suffix (Optional[str], optional): Suffix to add to the column names in case of a name collision. Defaults to "".
prefix (Optional[str], optional): Prefix to add to the column names in case of a name collision. Defaults to "right.".
Raises:
ValueError: if `on` is passed in and `left_on` or `right_on` is not None.
Expand Down Expand Up @@ -1756,6 +1797,8 @@ def join(
right_on=right_exprs,
how=join_type,
strategy=join_strategy,
join_prefix=prefix,
join_suffix=suffix,
)
return DataFrame(builder)

Expand Down
4 changes: 4 additions & 0 deletions daft/logical/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,13 +255,17 @@ def join( # type: ignore[override]
right_on: list[Expression],
how: JoinType = JoinType.Inner,
strategy: JoinStrategy | None = None,
join_suffix: str | None = None,
join_prefix: str | None = None,
) -> LogicalPlanBuilder:
builder = self._builder.join(
right._builder,
[expr._expr for expr in left_on],
[expr._expr for expr in right_on],
how,
strategy,
join_suffix,
join_prefix,
)
return LogicalPlanBuilder(builder)

Expand Down
11 changes: 10 additions & 1 deletion src/daft-plan/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,13 +443,16 @@ impl LogicalPlanBuilder {
Ok(self.with_new_plan(pivot_logical_plan))
}

#[allow(clippy::too_many_arguments)]
pub fn join<Right: Into<LogicalPlanRef>>(
&self,
right: Right,
left_on: Vec<ExprRef>,
right_on: Vec<ExprRef>,
join_type: JoinType,
join_strategy: Option<JoinStrategy>,
join_suffix: Option<&str>,
join_prefix: Option<&str>,
) -> DaftResult<Self> {
let logical_plan: LogicalPlan = logical_ops::Join::try_new(
self.plan.clone(),
Expand All @@ -458,6 +461,8 @@ impl LogicalPlanBuilder {
right_on,
join_type,
join_strategy,
join_suffix,
join_prefix,
)?
.into();
Ok(self.with_new_plan(logical_plan))
Expand Down Expand Up @@ -868,14 +873,16 @@ impl PyLogicalPlanBuilder {
)?
.into())
}

#[allow(clippy::too_many_arguments)]
pub fn join(
&self,
right: &Self,
left_on: Vec<PyExpr>,
right_on: Vec<PyExpr>,
join_type: JoinType,
join_strategy: Option<JoinStrategy>,
join_suffix: Option<&str>,
join_prefix: Option<&str>,
) -> PyResult<Self> {
Ok(self
.builder
Expand All @@ -885,6 +892,8 @@ impl PyLogicalPlanBuilder {
pyexprs_to_exprs(right_on),
join_type,
join_strategy,
join_suffix,
join_prefix,
)?
.into())
}
Expand Down
4 changes: 4 additions & 0 deletions src/daft-plan/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ mod test {
vec![col("id")],
JoinType::Inner,
None,
None,
None,
)?
.filter(col("first_name").eq(lit("hello")))?
.select(vec![col("first_name")])?
Expand Down Expand Up @@ -237,6 +239,8 @@ Project1 --> Limit0
vec![col("id")],
JoinType::Inner,
None,
None,
None,
)?
.filter(col("first_name").eq(lit("hello")))?
.select(vec![col("first_name")])?
Expand Down
12 changes: 11 additions & 1 deletion src/daft-plan/src/logical_ops/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,16 @@ impl std::hash::Hash for Join {
}

impl Join {
#[allow(clippy::too_many_arguments)]
pub(crate) fn try_new(
left: Arc<LogicalPlan>,
right: Arc<LogicalPlan>,
left_on: Vec<ExprRef>,
right_on: Vec<ExprRef>,
join_type: JoinType,
join_strategy: Option<JoinStrategy>,
join_suffix: Option<&str>,
join_prefix: Option<&str>,
) -> logical_plan::Result<Self> {
let (left_on, _) = resolve_exprs(left_on, &left.schema(), false).context(CreationSnafu)?;
let (right_on, _) =
Expand Down Expand Up @@ -124,7 +127,14 @@ impl Join {
} else {
let mut new_name = name.clone();
while names_so_far.contains(&new_name) {
new_name = format!("right.{}", new_name);
if let Some(prefix) = join_prefix {
new_name = format!("{}{}", prefix, new_name);
} else if join_suffix.is_none() {
new_name = format!("right.{}", new_name);
}
if let Some(suffix) = join_suffix {
new_name = format!("{}{}", new_name, suffix);
}
}
names_so_far.insert(new_name.clone());

Expand Down
40 changes: 37 additions & 3 deletions src/daft-plan/src/logical_optimization/rules/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,8 @@ mod tests {
join_on.clone(),
how,
None,
None,
None,
)?
.filter(pred.clone())?
.build();
Expand All @@ -686,7 +688,15 @@ mod tests {
left_scan_plan.filter(pred)?
};
let expected = expected_left_filter_scan
.join(&right_scan_plan, join_on.clone(), join_on, how, None)?
.join(
&right_scan_plan,
join_on.clone(),
join_on,
how,
None,
None,
None,
)?
.build();
assert_optimized_plan_eq(plan, expected)?;
Ok(())
Expand Down Expand Up @@ -720,6 +730,8 @@ mod tests {
join_on.clone(),
how,
None,
None,
None,
)?
.filter(pred.clone())?
.build();
Expand All @@ -738,6 +750,8 @@ mod tests {
join_on,
how,
None,
None,
None,
)?
.build();
assert_optimized_plan_eq(plan, expected)?;
Expand Down Expand Up @@ -785,6 +799,8 @@ mod tests {
join_on.clone(),
how,
None,
None,
None,
)?
.filter(pred.clone())?
.build();
Expand All @@ -811,6 +827,8 @@ mod tests {
join_on,
how,
None,
None,
None,
)?
.build();
assert_optimized_plan_eq(plan, expected)?;
Expand All @@ -835,7 +853,15 @@ mod tests {
let join_on = vec![col("b")];
let pred = col("a").lt(lit(2));
let plan = left_scan_plan
.join(&right_scan_plan, join_on.clone(), join_on, how, None)?
.join(
&right_scan_plan,
join_on.clone(),
join_on,
how,
None,
None,
None,
)?
.filter(pred)?
.build();
// should not push down filter
Expand All @@ -862,7 +888,15 @@ mod tests {
let join_on = vec![col("b")];
let pred = col("c").lt(lit(2.0));
let plan = left_scan_plan
.join(&right_scan_plan, join_on.clone(), join_on, how, None)?
.join(
&right_scan_plan,
join_on.clone(),
join_on,
how,
None,
None,
None,
)?
.filter(pred)?
.build();
// should not push down filter
Expand Down
11 changes: 10 additions & 1 deletion src/daft-plan/src/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,16 @@ impl LogicalPlan {
[input1, input2] => match self {
Self::Source(_) => panic!("Source nodes don't have children, with_new_children() should never be called for Source ops"),
Self::Concat(_) => Self::Concat(Concat::try_new(input1.clone(), input2.clone()).unwrap()),
Self::Join(Join { left_on, right_on, join_type, join_strategy, .. }) => Self::Join(Join::try_new(input1.clone(), input2.clone(), left_on.clone(), right_on.clone(), *join_type, *join_strategy).unwrap()),
Self::Join(Join { left_on, right_on, join_type, join_strategy, .. }) => Self::Join(Join::try_new(
input1.clone(),
input2.clone(),
left_on.clone(),
right_on.clone(),
*join_type,
*join_strategy,
None, // The suffix is already eagerly computed in the constructor
None // the prefix is already eagerly computed in the constructor
).unwrap()),
_ => panic!("Logical op {} has one input, but got two", self),
},
_ => panic!("Logical ops should never have more than 2 inputs, but got: {}", children.len())
Expand Down
2 changes: 2 additions & 0 deletions src/daft-plan/src/physical_planner/translate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1220,6 +1220,8 @@ mod tests {
vec![col("a"), col("b")],
JoinType::Inner,
Some(JoinStrategy::Hash),
None,
None,
)?
.build();
logical_to_physical(logical_plan, cfg)
Expand Down
2 changes: 2 additions & 0 deletions src/daft-sql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,8 @@ mod tests {
vec![col("id")],
JoinType::Inner,
None,
None,
None,
)?
.select(vec![col("*")])?
.build();
Expand Down
Loading

0 comments on commit b04e56f

Please sign in to comment.