Skip to content

Commit

Permalink
Match pandas join ordering obligations in pandas-compatible mode (#14428
Browse files Browse the repository at this point in the history
)

If we pass sort=True to merges we are on the hook to sort the result in order with respect to the key columns. If those key columns have repeated values there is still some space for ambiguity. Currently we get a result back whose order (for the repeated key values) is determined by the gather map that libcudf returns for the join. This does not come with any ordering guarantees.

When sort=False, pandas has join-type dependent ordering guarantees which we also do not match. To fix this, in pandas-compatible mode only, reorder the gather maps according to the order of the input keys. When sort=False this means that our result matches pandas ordering. When sort=True, it ensures that (if we use a stable sort) the tie-break for equal sort keys is the input dataframe order.

While we're here, switch from argsort + gather to sort_by_key when sorting results.

- Closes #14001

Authors:
  - Lawrence Mitchell (https://github.com/wence-)

Approvers:
  - Ashwin Srinath (https://github.com/shwina)
  - Bradley Dice (https://github.com/bdice)

URL: #14428
  • Loading branch information
wence- authored Nov 17, 2023
1 parent a51ab18 commit d2069f4
Show file tree
Hide file tree
Showing 3 changed files with 373 additions and 14 deletions.
1 change: 1 addition & 0 deletions python/cudf/cudf/core/_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@
PANDAS_LT_153 = PANDAS_VERSION < version.parse("1.5.3")
PANDAS_GE_200 = PANDAS_VERSION >= version.parse("2.0.0")
PANDAS_GE_210 = PANDAS_VERSION >= version.parse("2.1.0")
PANDAS_GE_220 = PANDAS_VERSION >= version.parse("2.2.0")
125 changes: 111 additions & 14 deletions python/cudf/cudf/core/join/join.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
# Copyright (c) 2020-2023, NVIDIA CORPORATION.
from __future__ import annotations

import itertools
import warnings
from typing import Any, ClassVar, List, Optional

import cudf
from cudf import _lib as libcudf
from cudf._lib.types import size_type_dtype
from cudf.core.copy_types import GatherMap
from cudf.core.join._join_helpers import (
_coerce_to_tuple,
Expand Down Expand Up @@ -94,7 +96,44 @@ def __init__(
self.lhs = lhs.copy(deep=False)
self.rhs = rhs.copy(deep=False)
self.how = how
self.sort = sort
# If the user requests that the result is sorted or we're in
# pandas-compatible mode we have various obligations on the
# output order:
#
# compat-> | False | True
# sort | |
# ---------+--------------------------+-------------------------------
# False| no obligation | ordering as per pandas docs(*)
# True | sorted lexicographically | sorted lexicographically(*)
#
# (*) If two keys are equal, tiebreak is to use input table order.
#
# In pandas-compat mode, we have obligations on the order to
# match pandas (even if sort=False), see
# pandas.pydata.org/docs/reference/api/pandas.DataFrame.merge.html.
# The ordering requirements differ depending on which join
# type is specified:
#
# - left: preserve key order (only keeping left keys)
# - right: preserve key order (only keeping right keys)
# - inner: preserve key order (of left keys)
# - outer: sort keys lexicographically
# - cross (not supported): preserve key order (of left keys)
#
# Moreover, in all cases, whenever there is a tiebreak
# situation (for sorting or otherwise), the deciding order is
# "input table order"
self.sort = sort or (
cudf.get_option("mode.pandas_compatible") and how == "outer"
)
self.preserve_key_order = cudf.get_option(
"mode.pandas_compatible"
) and how in {
"inner",
"outer",
"left",
"right",
}
self.lsuffix, self.rsuffix = suffixes

# At this point validation guarantees that if on is not None we
Expand Down Expand Up @@ -160,6 +199,55 @@ def __init__(
}
)

def _gather_maps(self, left_cols, right_cols):
# Produce gather maps for the join, optionally reordering to
# match pandas-order in compat mode.
maps = self._joiner(
left_cols,
right_cols,
how=self.how,
)
if not self.preserve_key_order:
return maps
# We should only get here if we're in a join on which
# pandas-compat places some ordering obligation (which
# precludes a semi-join)
# We must perform this reordering even if sort=True since the
# obligation to ensure tiebreaks appear in input table order
# means that the gather maps must be permuted into an original
# order.
assert self.how in {"inner", "outer", "left", "right"}
# And hence both maps returned from the libcudf join should be
# non-None.
assert all(m is not None for m in maps)
lengths = [len(left_cols[0]), len(right_cols[0])]
# Only nullify those maps that need it.
nullify = [
self.how not in {"inner", "left"},
self.how not in {"inner", "right"},
]
# To reorder maps so that they are in order of the input
# tables, we gather from iota on both right and left, and then
# sort the gather maps with those two columns as key.
key_order = list(
itertools.chain.from_iterable(
libcudf.copying.gather(
[cudf.core.column.arange(n, dtype=size_type_dtype)],
map_,
nullify=null,
)
for map_, n, null in zip(maps, lengths, nullify)
)
)
return libcudf.sort.sort_by_key(
list(maps),
# If how is right, right map is primary sort key.
key_order[:: -1 if self.how == "right" else 1],
[True] * len(key_order),
["last"] * len(key_order),
stable=True,
)

def perform_merge(self) -> cudf.DataFrame:
left_join_cols = []
right_join_cols = []
Expand All @@ -184,12 +272,9 @@ def perform_merge(self) -> cudf.DataFrame:
left_key.set(self.lhs, lcol_casted, validate=False)
right_key.set(self.rhs, rcol_casted, validate=False)

left_rows, right_rows = self._joiner(
left_join_cols,
right_join_cols,
how=self.how,
left_rows, right_rows = self._gather_maps(
left_join_cols, right_join_cols
)

gather_kwargs = {
"keep_index": self._using_left_index or self._using_right_index,
}
Expand Down Expand Up @@ -305,6 +390,11 @@ def _sort_result(self, result: cudf.DataFrame) -> cudf.DataFrame:
# same order as given in 'on'. If the indices are used as
# keys, the index will be sorted. If one index is specified,
# the key columns on the other side will be used to sort.
# In pandas-compatible mode, tie-breaking for multiple equal
# sort keys is to produce output in input dataframe order.
# This is taken care of by using a stable sort here, and (in
# pandas-compat mode) reordering the gather maps before
# producing the input result.
by: List[Any] = []
if self._using_left_index and self._using_right_index:
by.extend(result._index._data.columns)
Expand All @@ -313,15 +403,22 @@ def _sort_result(self, result: cudf.DataFrame) -> cudf.DataFrame:
if not self._using_right_index:
by.extend([result._data[col.name] for col in self._right_keys])
if by:
to_sort = cudf.DataFrame._from_data(dict(enumerate(by)))
sort_order = GatherMap.from_column_unchecked(
cudf.core.column.as_column(to_sort.argsort()),
len(result),
nullify=False,
keep_index = self._using_left_index or self._using_right_index
if keep_index:
to_sort = [*result._index._columns, *result._columns]
index_names = result._index.names
else:
to_sort = [*result._columns]
index_names = None
result_columns = libcudf.sort.sort_by_key(
to_sort,
by,
[True] * len(by),
["last"] * len(by),
stable=True,
)
result = result._gather(
sort_order,
keep_index=self._using_left_index or self._using_right_index,
result = result._from_columns_like_self(
result_columns, result._column_names, index_names
)
return result

Expand Down
Loading

0 comments on commit d2069f4

Please sign in to comment.