-
Notifications
You must be signed in to change notification settings - Fork 917
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support min_by/max_by group by aggregate #16163
Support min_by/max_by group by aggregate #16163
Conversation
Signed-off-by: Haoyang Li <[email protected]>
aa6c36b
to
101a929
Compare
Signed-off-by: Haoyang Li <[email protected]>
Signed-off-by: Haoyang Li <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain an example of where this new aggregation is a better choice than argmin + gather? libcudf already provides the essential building blocks for this kind of operation, and I don't see how this specialized implementation provides a significant benefit. I'm not sure if I find the claims in #16139 about memory pressure, single-pass aggregation, and performance to be compelling from a surface level view.
If we do go this route and decide this feature is necessary, we should also implement a max_by
aggregation at the same time, for symmetry.
min_by is an aggregation for Spark but not in Pandas, so we would like to match Spark's behavior directly from the cuDF side. argmin/max + gather have few features gap between Spark's min/max_by that are difficult to handle from the spark-rapids side, such as
Now we have handled the all nulls case by modifying the null masks and using argmin+gather route to quickly support our customer's need. But for the next step, we'd like to have an independent implementation to support float aggregation and max_by. Another reason is that min_by and max_by are two special aggregations in Spark that need to perform aggregation on two different columns. So we need to package the two columns into one struct column for cuDF to handle because AFAIK cuDF only supports aggregation on one column. Otherwise we need to do some special post-processing in spark-rapids to check if there are min/max_bys in aggs, gather them to their value column and concat the results back into the original agg result table, which would be much slower and harder to implement.
Will do if we go this route. |
In terms of semantics, the proposed min_by would need to match the argmin + gather implementation exactly. In libcudf, null and NaN control are handled by separate arguments. For example, the collect set aggregation (https://docs.rapids.ai/api/libcudf/stable/group__aggregation__factories#gaebe680a414f3c942a631f609bcfb5781) accepts null_policy, null_equality, and nan_equality arguments. This is a better route to address the desired semantics if you can express it. There are also some examples of ordering policies in libcudf, like null_order. We need to work on some improvements to argmin and argmax anyway, so this would be a good joint project for us. Currently we have “experimental” row comparators that we use for everything except for argmin calls, and we need to adopt those to expand support for nested list and struct columns. I can dig up the issue where this is discussed. |
Signed-off-by: Haoyang Li <[email protected]>
@thirtiseven Here is the other issue I was thinking of: #14412 (comment) |
That sounds good, adding those arguments seems to work. I will try to do this, maybe in another pr just for argmin/argmax and make min/max_by just: 1. unpack the struct column. 2. call argmin/argmax with different arguments. 3. do a gather with the struct column. Does this make sense to you?
Thank you! I was wondering how to write a hash-based implementation for min_by since the value will always be a struct (but no idea). |
Signed-off-by: Haoyang Li <[email protected]>
Signed-off-by: Haoyang Li <[email protected]>
Updated code to added support for max_by and removed group_min_by.cu to use argmin + gather. There is a small post-processing to replace all nulls to -1 after calling argmin. In argmin's code, it says indices corresponding to null values was initialized to ARGMIN_SENTINEL. but it is not initialized in current code. I think it's not a bug, but some outdated comments, but I don't have much context about it. Filed an issue #16542 about it. Please take another look thanks @bdice |
cpp/include/cudf/aggregation.hpp
Outdated
MAX_BY, ///< max reduction by another column | ||
MIN_BY ///< min reduction by another column |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be something like retrieve values based on min/max of another column
, since it is not entirely "min/max reduction".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
cpp/include/cudf/aggregation.hpp
Outdated
* `MAX_BY` returns the value of the element in the group that is the minimum | ||
* according to the order_by column. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the order_by
column here?
In addition, should not mention "group" since we may run this aggregation in groupby or reduction.
Note: The suggestion below is modified based on my other suggestion to support multiple min/max_by values.
* `MAX_BY` returns the value of the element in the group that is the minimum | |
* according to the order_by column. | |
* `MAX_BY` returns values of the rows corresponding to the maximum values of the child column specified by the `order_index` child column. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
cpp/include/cudf/aggregation.hpp
Outdated
* `MIN_BY` returns the value of the element in the group that is the minimum | ||
* according to the order_by column. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar comment as for MAX_BY
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
cpp/src/groupby/sort/aggregate.cpp
Outdated
{ | ||
if (cache.has_result(values, agg)) return; | ||
|
||
auto argmax_result = detail::group_argmax(get_grouped_values().child(1), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should add type check for the input. We need to check if it is a structs column and having at least 2 children.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
template <typename Source> | ||
struct target_type_impl<Source, aggregation::MAX_BY> { | ||
using type = struct_view; | ||
}; | ||
|
||
// Computing MIN_BY of Source, use Source accumulator | ||
template <typename Source> | ||
struct target_type_impl<Source, aggregation::MIN_BY> { | ||
using type = struct_view; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the output type is the same type as the input? If so, I would recommend to add a parameter to the constructor of min_by/max_by aggregation to specify which column index to compute arg_min/arg_max. By doing so, if in the future we need to compute min_by/max_by of more than one column (i.e., get values of several columns based on the same min/max value of the order by column) then we will trivially have this supported.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the output type is the same type as the input?
Yes.
The struct is packed manually in plugin when doing min/max_by, not the original schema of table. So we can just always make the column which need to be argmin/max as the last column without specifying the index.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, current code actually always selects the second column. Will make the ordering column be the first one and noted in comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the ordering column be the first one
Yes the first column as the ordering column makes more sense than the last one 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Recommend to change the constructors of the aggregations so we can support batch min_by
/max_by
.
Signed-off-by: Haoyang Li <[email protected]>
Signed-off-by: Haoyang Li <[email protected]>
MAX_BY, ///< retrieve values based on max of another column | ||
MIN_BY ///< retrieve values based on min of another column |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MAX_BY, ///< retrieve values based on max of another column | |
MIN_BY ///< retrieve values based on min of another column | |
MAX_BY, ///< retrieve rows based on max values of the first child column | |
MIN_BY ///< retrieve rows based on min values of the first child column |
{ | ||
if (cache.has_result(values, agg)) return; | ||
|
||
CUDF_EXPECTS(values.num_children() >= 2, "Input column must have at least 2 children."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CUDF_EXPECTS(values.num_children() >= 2, "Input column must have at least 2 children."); | |
CUDF_EXPECTS(values.type().id() == type_id::STRUCT && values.num_children() >= 2, "Input column must have at least 2 children."); |
|
||
CUDF_EXPECTS(values.num_children() >= 2, "Input column must have at least 2 children."); | ||
|
||
auto argmax_result = detail::group_argmax(get_grouped_values().child(0), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
auto argmax_result = detail::group_argmax(get_grouped_values().child(0), | |
auto const argmax_result = detail::group_argmax(get_grouped_values().child(0), |
stream, | ||
mr); | ||
|
||
auto sentinel = cudf::numeric_scalar<size_type>{cudf::detail::ARGMAX_SENTINEL}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
auto sentinel = cudf::numeric_scalar<size_type>{cudf::detail::ARGMAX_SENTINEL}; | |
auto const sentinel = cudf::numeric_scalar<size_type>{cudf::detail::ARGMAX_SENTINEL}; |
{ | ||
if (cache.has_result(values, agg)) return; | ||
|
||
CUDF_EXPECTS(values.num_children() >= 2, "Input column must have at least 2 children."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CUDF_EXPECTS(values.num_children() >= 2, "Input column must have at least 2 children."); | |
CUDF_EXPECTS(values.type().id() == type_id::STRUCT && values.num_children() >= 2, "Input column must have at least 2 children."); |
auto argmin_result = detail::group_argmin(get_grouped_values().child(0), | ||
helper.num_groups(stream), | ||
helper.group_labels(stream), | ||
helper.key_sort_order(stream), | ||
stream, | ||
mr); | ||
|
||
auto sentinel = cudf::numeric_scalar<size_type>{cudf::detail::ARGMIN_SENTINEL}; | ||
|
||
auto null_removed_map = cudf::replace_nulls( | ||
argmin_result->view(), sentinel, stream, mr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
auto argmin_result = detail::group_argmin(get_grouped_values().child(0), | |
helper.num_groups(stream), | |
helper.group_labels(stream), | |
helper.key_sort_order(stream), | |
stream, | |
mr); | |
auto sentinel = cudf::numeric_scalar<size_type>{cudf::detail::ARGMIN_SENTINEL}; | |
auto null_removed_map = cudf::replace_nulls( | |
argmin_result->view(), sentinel, stream, mr); | |
auto const argmin_result = detail::group_argmin(get_grouped_values().child(0), | |
helper.num_groups(stream), | |
helper.group_labels(stream), | |
helper.key_sort_order(stream), | |
stream, | |
mr); | |
auto const sentinel = cudf::numeric_scalar<size_type>{cudf::detail::ARGMIN_SENTINEL}; | |
auto const null_removed_map = cudf::replace_nulls( | |
argmin_result->view(), sentinel, stream, mr); |
|
||
auto null_removed_map = cudf::replace_nulls( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
auto null_removed_map = cudf::replace_nulls( | |
auto const null_removed_map = cudf::replace_nulls( |
* @brief Internal API to calculate groupwise maximum value by another order column, packed in a | ||
* structs column. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* @brief Internal API to calculate groupwise maximum value by another order column, packed in a | |
* structs column. | |
* @brief Internal API to find rows corresponding to the groupwise maximum value of the first child in the values column. | |
* | |
* The values column is always given as a structs column having at least two children. |
* @param structs_column Structs column containing the values to get maximum from | ||
* @param group_labels ID of group that the corresponding value belongs to | ||
* @param num_groups Number of groups | ||
* @param stream CUDA stream used for device memory operations and kernel launches. | ||
* @param mr Device memory resource used to allocate the returned column's device memory | ||
*/ | ||
std::unique_ptr<column> group_max_by(column_view const& structs_column, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* @param structs_column Structs column containing the values to get maximum from | |
* @param group_labels ID of group that the corresponding value belongs to | |
* @param num_groups Number of groups | |
* @param stream CUDA stream used for device memory operations and kernel launches. | |
* @param mr Device memory resource used to allocate the returned column's device memory | |
*/ | |
std::unique_ptr<column> group_max_by(column_view const& structs_column, | |
* @param values The input structs column | |
* @param group_labels ID of groups that the corresponding value belongs to | |
* @param num_groups Number of groups | |
* @param stream CUDA stream used for device memory operations and kernel launches. | |
* @param mr Device memory resource used to allocate the returned column's device memory | |
*/ | |
std::unique_ptr<column> group_max_by(column_view const& values, |
* @brief Internal API to calculate groupwise minimum value by another order column, packed in a | ||
* structs column. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same suggestion for this function as for group_max_by
.
@@ -91,6 +91,26 @@ TYPED_TEST(groupby_argmin_test, zero_valid_values) | |||
test_single_agg(keys, vals, expect_keys, expect_vals, std::move(agg2), force_use_sort_impl::YES); | |||
} | |||
|
|||
TYPED_TEST(groupby_argmin_test, zero_valid_values_xx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we have this test?
cudf::test::fixed_width_column_wrapper<K> values{4, 1, 2, 3, 4, 5, 6, 7, 8, 9}; | ||
cudf::test::structs_column_wrapper vals{orders, values}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should better call the second column payload
instead of values, since the entire structs column is input values
into groupby.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(This applies for all new added tests)
@@ -0,0 +1,10 @@ | |||
[3231126][17:00:30:499434][info ] ----- RMM LOG BEGIN [PTDS DISABLED] ----- |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added by mistake?
Seems like this aggregation is just calling other libcudf APIs: |
Description
Closes #16139
This pr adds support for min_by, which is used to return the value of a column associated with the minimum value of another column. It will be useful for spark-rapids.
Currently this pr only supports sort based group by, will try to add a hash group by too, but I'm not very clear how to do it right now because the input column from spark will be a struct column of value and order.
Related pr in spark-rapids: NVIDIA/spark-rapids#11123
For Spark, all orderable types (basic types and array/struct) are supported, except float and double with NaN values, because Spark has a special handling for NaN in non-nested floating types.
Checklist