Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support min_by/max_by group by aggregate #16163

Closed

Conversation

thirtiseven
Copy link
Contributor

@thirtiseven thirtiseven commented Jul 2, 2024

Description

Closes #16139

This pr adds support for min_by, which is used to return the value of a column associated with the minimum value of another column. It will be useful for spark-rapids.

Currently this pr only supports sort based group by, will try to add a hash group by too, but I'm not very clear how to do it right now because the input column from spark will be a struct column of value and order.

Related pr in spark-rapids: NVIDIA/spark-rapids#11123

For Spark, all orderable types (basic types and array/struct) are supported, except float and double with NaN values, because Spark has a special handling for NaN in non-nested floating types.

Checklist

  • I am familiar with the Contributing Guidelines.
  • New or existing tests cover these changes.
  • The documentation is up to date with these changes.

Copy link

copy-pr-bot bot commented Jul 2, 2024

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@github-actions github-actions bot added libcudf Affects libcudf (C++/CUDA) code. CMake CMake build issue Java Affects Java cuDF API. labels Jul 2, 2024
@firestarman firestarman added feature request New feature or request non-breaking Non-breaking change labels Jul 3, 2024
@thirtiseven thirtiseven force-pushed the min_by_cudf_sort_only branch from aa6c36b to 101a929 Compare July 3, 2024 06:09
Signed-off-by: Haoyang Li <[email protected]>
@thirtiseven thirtiseven marked this pull request as ready for review July 5, 2024 10:31
@thirtiseven thirtiseven requested review from a team as code owners July 5, 2024 10:31
Copy link
Contributor

@bdice bdice left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain an example of where this new aggregation is a better choice than argmin + gather? libcudf already provides the essential building blocks for this kind of operation, and I don't see how this specialized implementation provides a significant benefit. I'm not sure if I find the claims in #16139 about memory pressure, single-pass aggregation, and performance to be compelling from a surface level view.

If we do go this route and decide this feature is necessary, we should also implement a max_by aggregation at the same time, for symmetry.

@thirtiseven
Copy link
Contributor Author

Can you explain an example of where this new aggregation is a better choice than argmin + gather? libcudf already provides the essential building blocks for this kind of operation, and I don't see how this specialized implementation provides a significant benefit. I'm not sure if I find the claims in #16139 about memory pressure, single-pass aggregation, and performance to be compelling from a surface level view.

min_by is an aggregation for Spark but not in Pandas, so we would like to match Spark's behavior directly from the cuDF side. argmin/max + gather have few features gap between Spark's min/max_by that are difficult to handle from the spark-rapids side, such as

  • all nulls in a grouped order column, Spark returns null, argmin/max + gather returns first element in grouped value column.
  • NaN for float aggregation. In Spark, Nan is the maximum float value, but in cuDF, the calculation involving Nan is undefined.
  • min_by and max_by return the last minimum order value in Spark. For argmin it's matched, but for argmax cuDF will return the first maximum order value.

Now we have handled the all nulls case by modifying the null masks and using argmin+gather route to quickly support our customer's need. But for the next step, we'd like to have an independent implementation to support float aggregation and max_by.

Another reason is that min_by and max_by are two special aggregations in Spark that need to perform aggregation on two different columns. So we need to package the two columns into one struct column for cuDF to handle because AFAIK cuDF only supports aggregation on one column. Otherwise we need to do some special post-processing in spark-rapids to check if there are min/max_bys in aggs, gather them to their value column and concat the results back into the original agg result table, which would be much slower and harder to implement.

If we do go this route and decide this feature is necessary, we should also implement a max_by aggregation at the same time, for symmetry.

Will do if we go this route.

@bdice
Copy link
Contributor

bdice commented Jul 8, 2024

In terms of semantics, the proposed min_by would need to match the argmin + gather implementation exactly. In libcudf, null and NaN control are handled by separate arguments. For example, the collect set aggregation (https://docs.rapids.ai/api/libcudf/stable/group__aggregation__factories#gaebe680a414f3c942a631f609bcfb5781) accepts null_policy, null_equality, and nan_equality arguments. This is a better route to address the desired semantics if you can express it. There are also some examples of ordering policies in libcudf, like null_order.

We need to work on some improvements to argmin and argmax anyway, so this would be a good joint project for us. Currently we have “experimental” row comparators that we use for everything except for argmin calls, and we need to adopt those to expand support for nested list and struct columns. I can dig up the issue where this is discussed.

Signed-off-by: Haoyang Li <[email protected]>
@thirtiseven thirtiseven marked this pull request as draft July 9, 2024 08:26
@bdice
Copy link
Contributor

bdice commented Jul 9, 2024

@thirtiseven Here is the other issue I was thinking of: #14412 (comment)

@thirtiseven
Copy link
Contributor Author

thirtiseven commented Jul 10, 2024

In terms of semantics, the proposed min_by would need to match the argmin + gather implementation exactly. In libcudf, null and NaN control are handled by separate arguments. For example, the collect set aggregation (https://docs.rapids.ai/api/libcudf/stable/group__aggregation__factories#gaebe680a414f3c942a631f609bcfb5781) accepts null_policy, null_equality, and nan_equality arguments. This is a better route to address the desired semantics if you can express it. There are also some examples of ordering policies in libcudf, like null_order.

That sounds good, adding those arguments seems to work. I will try to do this, maybe in another pr just for argmin/argmax and make min/max_by just: 1. unpack the struct column. 2. call argmin/argmax with different arguments. 3. do a gather with the struct column. Does this make sense to you?

We need to work on some improvements to argmin and argmax anyway, so this would be a good joint project for us. Currently we have “experimental” row comparators that we use for everything except for argmin calls, and we need to adopt those to expand support for nested list and struct columns. I can dig up the issue where this is discussed.
@thirtiseven Here is the other issue I was thinking of: #14412 (comment)

Thank you! I was wondering how to write a hash-based implementation for min_by since the value will always be a struct (but no idea).
Currently for spark-rapids the sort-based way is good enough according to perf tests. We'd love to write a hash-based min/max_by as the next step after argmin/max supports nested types with row comparators.

Signed-off-by: Haoyang Li <[email protected]>
Signed-off-by: Haoyang Li <[email protected]>
@github-actions github-actions bot added Python Affects Python cuDF API. cudf.pandas Issues specific to cudf.pandas cudf.polars Issues specific to cudf.polars pylibcudf Issues specific to the pylibcudf package labels Jul 11, 2024
@thirtiseven thirtiseven changed the title Support min_by group by aggregate Support min_by/max_by group by aggregate Aug 13, 2024
@thirtiseven
Copy link
Contributor Author

Updated code to added support for max_by and removed group_min_by.cu to use argmin + gather.

There is a small post-processing to replace all nulls to -1 after calling argmin. In argmin's code, it says indices corresponding to null values was initialized to ARGMIN_SENTINEL. but it is not initialized in current code. I think it's not a bug, but some outdated comments, but I don't have much context about it. Filed an issue #16542 about it.

Please take another look thanks @bdice

@thirtiseven thirtiseven marked this pull request as ready for review August 13, 2024 09:49
Comment on lines 124 to 125
MAX_BY, ///< max reduction by another column
MIN_BY ///< min reduction by another column
Copy link
Contributor

@ttnghia ttnghia Aug 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be something like retrieve values based on min/max of another column, since it is not entirely "min/max reduction".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

Comment on lines 390 to 391
* `MAX_BY` returns the value of the element in the group that is the minimum
* according to the order_by column.
Copy link
Contributor

@ttnghia ttnghia Aug 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the order_by column here?

In addition, should not mention "group" since we may run this aggregation in groupby or reduction.

Note: The suggestion below is modified based on my other suggestion to support multiple min/max_by values.

Suggested change
* `MAX_BY` returns the value of the element in the group that is the minimum
* according to the order_by column.
* `MAX_BY` returns values of the rows corresponding to the maximum values of the child column specified by the `order_index` child column.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

Comment on lines 401 to 402
* `MIN_BY` returns the value of the element in the group that is the minimum
* according to the order_by column.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment as for MAX_BY.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

{
if (cache.has_result(values, agg)) return;

auto argmax_result = detail::group_argmax(get_grouped_values().child(1),
Copy link
Contributor

@ttnghia ttnghia Aug 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add type check for the input. We need to check if it is a structs column and having at least 2 children.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

Comment on lines +1265 to +1273
template <typename Source>
struct target_type_impl<Source, aggregation::MAX_BY> {
using type = struct_view;
};

// Computing MIN_BY of Source, use Source accumulator
template <typename Source>
struct target_type_impl<Source, aggregation::MIN_BY> {
using type = struct_view;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the output type is the same type as the input? If so, I would recommend to add a parameter to the constructor of min_by/max_by aggregation to specify which column index to compute arg_min/arg_max. By doing so, if in the future we need to compute min_by/max_by of more than one column (i.e., get values of several columns based on the same min/max value of the order by column) then we will trivially have this supported.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the output type is the same type as the input?

Yes.

The struct is packed manually in plugin when doing min/max_by, not the original schema of table. So we can just always make the column which need to be argmin/max as the last column without specifying the index.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, current code actually always selects the second column. Will make the ordering column be the first one and noted in comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the ordering column be the first one

Yes the first column as the ordering column makes more sense than the last one 👍

Copy link
Contributor

@ttnghia ttnghia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recommend to change the constructors of the aggregations so we can support batch min_by/max_by.

Signed-off-by: Haoyang Li <[email protected]>
Signed-off-by: Haoyang Li <[email protected]>
@vyasr vyasr requested a review from bdice August 16, 2024 16:38
Comment on lines +124 to +125
MAX_BY, ///< retrieve values based on max of another column
MIN_BY ///< retrieve values based on min of another column
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
MAX_BY, ///< retrieve values based on max of another column
MIN_BY ///< retrieve values based on min of another column
MAX_BY, ///< retrieve rows based on max values of the first child column
MIN_BY ///< retrieve rows based on min values of the first child column

{
if (cache.has_result(values, agg)) return;

CUDF_EXPECTS(values.num_children() >= 2, "Input column must have at least 2 children.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
CUDF_EXPECTS(values.num_children() >= 2, "Input column must have at least 2 children.");
CUDF_EXPECTS(values.type().id() == type_id::STRUCT && values.num_children() >= 2, "Input column must have at least 2 children.");


CUDF_EXPECTS(values.num_children() >= 2, "Input column must have at least 2 children.");

auto argmax_result = detail::group_argmax(get_grouped_values().child(0),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
auto argmax_result = detail::group_argmax(get_grouped_values().child(0),
auto const argmax_result = detail::group_argmax(get_grouped_values().child(0),

stream,
mr);

auto sentinel = cudf::numeric_scalar<size_type>{cudf::detail::ARGMAX_SENTINEL};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
auto sentinel = cudf::numeric_scalar<size_type>{cudf::detail::ARGMAX_SENTINEL};
auto const sentinel = cudf::numeric_scalar<size_type>{cudf::detail::ARGMAX_SENTINEL};

{
if (cache.has_result(values, agg)) return;

CUDF_EXPECTS(values.num_children() >= 2, "Input column must have at least 2 children.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
CUDF_EXPECTS(values.num_children() >= 2, "Input column must have at least 2 children.");
CUDF_EXPECTS(values.type().id() == type_id::STRUCT && values.num_children() >= 2, "Input column must have at least 2 children.");

Comment on lines +242 to +252
auto argmin_result = detail::group_argmin(get_grouped_values().child(0),
helper.num_groups(stream),
helper.group_labels(stream),
helper.key_sort_order(stream),
stream,
mr);

auto sentinel = cudf::numeric_scalar<size_type>{cudf::detail::ARGMIN_SENTINEL};

auto null_removed_map = cudf::replace_nulls(
argmin_result->view(), sentinel, stream, mr);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
auto argmin_result = detail::group_argmin(get_grouped_values().child(0),
helper.num_groups(stream),
helper.group_labels(stream),
helper.key_sort_order(stream),
stream,
mr);
auto sentinel = cudf::numeric_scalar<size_type>{cudf::detail::ARGMIN_SENTINEL};
auto null_removed_map = cudf::replace_nulls(
argmin_result->view(), sentinel, stream, mr);
auto const argmin_result = detail::group_argmin(get_grouped_values().child(0),
helper.num_groups(stream),
helper.group_labels(stream),
helper.key_sort_order(stream),
stream,
mr);
auto const sentinel = cudf::numeric_scalar<size_type>{cudf::detail::ARGMIN_SENTINEL};
auto const null_removed_map = cudf::replace_nulls(
argmin_result->view(), sentinel, stream, mr);

Comment on lines +217 to +218

auto null_removed_map = cudf::replace_nulls(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
auto null_removed_map = cudf::replace_nulls(
auto const null_removed_map = cudf::replace_nulls(

Comment on lines +104 to +105
* @brief Internal API to calculate groupwise maximum value by another order column, packed in a
* structs column.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @brief Internal API to calculate groupwise maximum value by another order column, packed in a
* structs column.
* @brief Internal API to find rows corresponding to the groupwise maximum value of the first child in the values column.
*
* The values column is always given as a structs column having at least two children.

Comment on lines +115 to +121
* @param structs_column Structs column containing the values to get maximum from
* @param group_labels ID of group that the corresponding value belongs to
* @param num_groups Number of groups
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory
*/
std::unique_ptr<column> group_max_by(column_view const& structs_column,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @param structs_column Structs column containing the values to get maximum from
* @param group_labels ID of group that the corresponding value belongs to
* @param num_groups Number of groups
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory
*/
std::unique_ptr<column> group_max_by(column_view const& structs_column,
* @param values The input structs column
* @param group_labels ID of groups that the corresponding value belongs to
* @param num_groups Number of groups
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory
*/
std::unique_ptr<column> group_max_by(column_view const& values,

Comment on lines +128 to +129
* @brief Internal API to calculate groupwise minimum value by another order column, packed in a
* structs column.
Copy link
Contributor

@ttnghia ttnghia Aug 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same suggestion for this function as for group_max_by.

@@ -91,6 +91,26 @@ TYPED_TEST(groupby_argmin_test, zero_valid_values)
test_single_agg(keys, vals, expect_keys, expect_vals, std::move(agg2), force_use_sort_impl::YES);
}

TYPED_TEST(groupby_argmin_test, zero_valid_values_xx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have this test?

Comment on lines +42 to +43
cudf::test::fixed_width_column_wrapper<K> values{4, 1, 2, 3, 4, 5, 6, 7, 8, 9};
cudf::test::structs_column_wrapper vals{orders, values};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should better call the second column payload instead of values, since the entire structs column is input values into groupby.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(This applies for all new added tests)

@@ -0,0 +1,10 @@
[3231126][17:00:30:499434][info ] ----- RMM LOG BEGIN [PTDS DISABLED] -----
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added by mistake?

@davidwendt
Copy link
Contributor

Updated code to added support for max_by and removed group_min_by.cu to use argmin + gather.

There is a small post-processing to replace all nulls to -1 after calling argmin. In argmin's code, it says indices corresponding to null values was initialized to ARGMIN_SENTINEL. but it is not initialized in current code. I think it's not a bug, but some outdated comments, but I don't have much context about it. Filed an issue #16542 about it.

Please take another look thanks @bdice

Seems like this aggregation is just calling other libcudf APIs: groupby(argmax), replace_nulls, and gather.
I'm not convinced this needs to be implemented by libcudf or is providing any specific value.
The caller can just make these 3 calls. I recommend closing this PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CMake CMake build issue feature request New feature or request Java Affects Java cuDF API. libcudf Affects libcudf (C++/CUDA) code. non-breaking Non-breaking change
Projects
Status: Done
Development

Successfully merging this pull request may close these issues.

[FEA] Add min_by aggregate support
5 participants