Skip to content

Commit

Permalink
feat: Support non-coalescing joins in default engine (pola-rs#16036)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored May 3, 2024
1 parent 8322323 commit dc45fc0
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 16 deletions.
9 changes: 8 additions & 1 deletion crates/polars-lazy/src/physical_plan/streaming/checks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,14 @@ pub(super) fn streamable_join(args: &JoinArgs) -> bool {
let supported = match args.how {
#[cfg(feature = "cross_join")]
JoinType::Cross => true,
JoinType::Inner | JoinType::Left | JoinType::Outer { .. } => true,
JoinType::Inner | JoinType::Left => {
// no-coalescing not yet supported in streaming
matches!(
args.coalesce,
JoinCoalesce::JoinSpecific | JoinCoalesce::CoalesceColumns
)
},
JoinType::Outer { .. } => true,
_ => false,
};
supported && !args.validation.needs_checks()
Expand Down
29 changes: 17 additions & 12 deletions crates/polars-ops/src/frame/join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ pub trait DataFrameJoinOps: IntoDf {
let left_df = self.to_df();
args.validation.is_valid_join(&args.how)?;

let should_coalesce = args.coalesce.coalesce(&args.how);

#[cfg(feature = "cross_join")]
if let JoinType::Cross = args.how {
return left_df.cross_join(other, args.suffix.as_deref(), args.slice);
Expand Down Expand Up @@ -202,13 +204,12 @@ pub trait DataFrameJoinOps: IntoDf {
if selected_left.len() == 1 {
let s_left = &selected_left[0];
let s_right = &selected_right[0];
let drop_names: Option<&[&str]> = if should_coalesce { None } else { Some(&[]) };
return match args.how {
JoinType::Inner => {
left_df._inner_join_from_series(other, s_left, s_right, args, _verbose, None)
},
JoinType::Left => {
left_df._left_join_from_series(other, s_left, s_right, args, _verbose, None)
},
JoinType::Inner => left_df
._inner_join_from_series(other, s_left, s_right, args, _verbose, drop_names),
JoinType::Left => left_df
._left_join_from_series(other, s_left, s_right, args, _verbose, drop_names),
JoinType::Outer => left_df._outer_join_from_series(other, s_left, s_right, args),
#[cfg(feature = "semi_anti_join")]
JoinType::Anti => left_df._semi_anti_join_from_series(
Expand Down Expand Up @@ -265,7 +266,12 @@ pub trait DataFrameJoinOps: IntoDf {

let lhs_keys = prepare_keys_multiple(&selected_left, args.join_nulls)?.into_series();
let rhs_keys = prepare_keys_multiple(&selected_right, args.join_nulls)?.into_series();
let names_right = selected_right.iter().map(|s| s.name()).collect::<Vec<_>>();

let drop_names = if should_coalesce {
Some(selected_right.iter().map(|s| s.name()).collect::<Vec<_>>())
} else {
Some(vec![])
};

// Multiple keys.
match args.how {
Expand All @@ -278,16 +284,15 @@ pub trait DataFrameJoinOps: IntoDf {
},
JoinType::Outer => {
let names_left = selected_left.iter().map(|s| s.name()).collect::<Vec<_>>();
let coalesce = args.coalesce;
args.coalesce = JoinCoalesce::KeepColumns;
let suffix = args.suffix.clone();
let out = left_df._outer_join_from_series(other, &lhs_keys, &rhs_keys, args);

if coalesce.coalesce(&JoinType::Outer) {
if should_coalesce {
Ok(_coalesce_outer_join(
out?,
&names_left,
&names_right,
drop_names.as_ref().unwrap(),
suffix.as_deref(),
left_df,
))
Expand All @@ -301,15 +306,15 @@ pub trait DataFrameJoinOps: IntoDf {
&rhs_keys,
args,
_verbose,
Some(&names_right),
drop_names.as_deref(),
),
JoinType::Left => left_df._left_join_from_series(
other,
&lhs_keys,
&rhs_keys,
args,
_verbose,
Some(&names_right),
drop_names.as_deref(),
),
#[cfg(feature = "semi_anti_join")]
JoinType::Anti | JoinType::Semi => self._join_impl(
Expand Down
7 changes: 7 additions & 0 deletions py-polars/polars/dataframe/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -6182,6 +6182,7 @@ def join(
suffix: str = "_right",
validate: JoinValidation = "m:m",
join_nulls: bool = False,
coalesce: bool | None = None,
) -> DataFrame:
"""
Join in SQL-like fashion.
Expand Down Expand Up @@ -6236,6 +6237,11 @@ def join(
- This is currently not supported the streaming engine.
join_nulls
Join on null values. By default null values will never produce matches.
coalesce
Coalescing behavior (merging of join columns).
- None: -> join specific.
- True: -> Always coalesce join columns.
- False: -> Never coalesce join columns.
Returns
-------
Expand Down Expand Up @@ -6336,6 +6342,7 @@ def join(
suffix=suffix,
validate=validate,
join_nulls=join_nulls,
coalesce=coalesce,
)
.collect(_eager=True)
)
Expand Down
10 changes: 7 additions & 3 deletions py-polars/polars/lazyframe/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -3711,6 +3711,7 @@ def join_asof(
Force the physical plan to evaluate the computation of both DataFrames up to
the join in parallel.
Examples
--------
>>> from datetime import datetime
Expand Down Expand Up @@ -3814,6 +3815,7 @@ def join(
suffix: str = "_right",
validate: JoinValidation = "m:m",
join_nulls: bool = False,
coalesce: bool | None = None,
allow_parallel: bool = True,
force_parallel: bool = False,
) -> Self:
Expand All @@ -3837,8 +3839,6 @@ def join(
right table
* *outer*
Returns all rows when there is a match in either left or right table
* *outer_coalesce*
Same as 'outer', but coalesces the key columns
* *cross*
Returns the Cartesian product of rows from both tables
* *semi*
Expand Down Expand Up @@ -3871,6 +3871,11 @@ def join(
- This is currently not supported the streaming engine.
join_nulls
Join on null values. By default null values will never produce matches.
coalesce
Coalescing behavior (merging of join columns).
- None: -> join specific.
- True: -> Always coalesce join columns.
- False: -> Never coalesce join columns.
allow_parallel
Allow the physical plan to optionally evaluate the computation of both
DataFrames up to the join in parallel.
Expand Down Expand Up @@ -3980,7 +3985,6 @@ def join(
msg = "must specify `on` OR `left_on` and `right_on`"
raise ValueError(msg)

coalesce = None
if how == "outer_coalesce":
coalesce = True

Expand Down
30 changes: 30 additions & 0 deletions py-polars/tests/unit/operations/test_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -960,3 +960,33 @@ def test_cross_join_slice_pushdown() -> None:
},
schema={"x": pl.UInt16, "x_": pl.UInt16},
)
assert_frame_equal(result, expected)


@pytest.mark.parametrize("how", ["left", "inner"])
@typing.no_type_check
def test_join_coalesce(how: str) -> None:
a = pl.LazyFrame({"a": [1, 2], "b": [1, 2]})
b = pl.LazyFrame(
{
"a": [1, 2, 1, 2],
"b": [5, 7, 8, 9],
"c": [1, 2, 1, 2],
}
)

how = "inner"
q = a.join(b, on="a", coalesce=False, how=how)
out = q.collect()
assert q.schema == out.schema
assert out.columns == ["a", "b", "a_right", "b_right", "c"]

q = a.join(b, on=["a", "b"], coalesce=False, how=how)
out = q.collect()
assert q.schema == out.schema
assert out.columns == ["a", "b", "a_right", "b_right", "c"]

q = a.join(b, on=["a", "b"], coalesce=True, how=how)
out = q.collect()
assert q.schema == out.schema
assert out.columns == ["a", "b", "c"]

0 comments on commit dc45fc0

Please sign in to comment.