Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement HOST_UDF aggregation for reduction and segmented reduction #17645

Merged
merged 43 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
2da9137
Implement `HOST_UDF` aggregation for reduction and segmented reduction
ttnghia Dec 20, 2024
fca442a
Merge branch 'branch-25.02' into host_udf_reduction
ttnghia Dec 20, 2024
4b5aa95
Fix example
ttnghia Dec 20, 2024
5982f3e
Update docs
ttnghia Dec 20, 2024
edaa007
Separate one base class into 4 base classes
ttnghia Dec 24, 2024
07579d0
Fix compile errors
ttnghia Dec 24, 2024
2a497e1
Refactor the base classes
ttnghia Dec 27, 2024
86ad3bb
Simplify the interface for reduction and segmented reduction
ttnghia Dec 28, 2024
f27c9fd
Rewrite tests
ttnghia Dec 28, 2024
2deeb3b
Merge branch 'branch-25.02' into host_udf_reduction
ttnghia Dec 28, 2024
41cf444
Rewrite comments and reformat
ttnghia Dec 28, 2024
e4cabfd
Reformat
ttnghia Dec 28, 2024
ca83ecf
Change return type to `const&`
ttnghia Dec 28, 2024
54a1e2d
Fix docs
ttnghia Dec 29, 2024
b3e5ce6
Fix inheritant property
ttnghia Dec 29, 2024
9b3e4fb
Fix forward declaration
ttnghia Dec 29, 2024
51d4aa6
Update copyright years
ttnghia Jan 2, 2025
dafe5ab
Merge branch 'branch-25.02' into host_udf_reduction
ttnghia Jan 4, 2025
f59d6b1
Refactor groupby base class, further simplifying it
ttnghia Jan 6, 2025
f315813
Merge branch 'branch-25.02' into host_udf_reduction
ttnghia Jan 6, 2025
8111e64
Fix style
ttnghia Jan 6, 2025
d7686aa
Fix typo
ttnghia Jan 7, 2025
98c5e77
Update cpp/include/cudf/aggregation/host_udf.hpp
ttnghia Jan 7, 2025
28329a7
Fix typo
ttnghia Jan 7, 2025
c3b52da
Fix typo
ttnghia Jan 7, 2025
89e8380
Remove unused header
ttnghia Jan 7, 2025
4f6d340
Update cpp/include/cudf/aggregation/host_udf.hpp
ttnghia Jan 8, 2025
9d9429a
Update cpp/include/cudf/aggregation/host_udf.hpp
ttnghia Jan 8, 2025
76b5687
Change parameter order
ttnghia Jan 8, 2025
38290ac
Extract reduction and segmented reduction tests
ttnghia Jan 10, 2025
1cb656b
Rename base classes
ttnghia Jan 10, 2025
78dc6a6
Rewrite callbacks
ttnghia Jan 10, 2025
9810e6e
Update copyright header
ttnghia Jan 10, 2025
cecb7f7
Misc
ttnghia Jan 10, 2025
1f24ab0
Rename variables
ttnghia Jan 10, 2025
a6b9699
Merge branch 'branch-25.02' into host_udf_reduction
ttnghia Jan 10, 2025
8b88982
Update cpp/include/cudf/aggregation/host_udf.hpp
ttnghia Jan 10, 2025
ad96501
Try to fix docs
ttnghia Jan 10, 2025
b274eec
Merge branch 'branch-25.02' into host_udf_reduction
ttnghia Jan 10, 2025
4b7c874
Try to fix docs build
ttnghia Jan 10, 2025
76e3a87
Test docs
ttnghia Jan 10, 2025
49b676a
Try fixing docs
ttnghia Jan 10, 2025
f68e7fd
Minimal fix for doc builds plus some minor typo fixes
vyasr Jan 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 38 additions & 9 deletions cpp/include/cudf/aggregation/host_udf.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,26 @@ struct host_udf_base {
host_udf_base() = default;
virtual ~host_udf_base() = default;

/**
* @brief Define the possible data needed for reduction.
*/
enum class reduction_data_attribute : int32_t {
INPUT_VALUES, ///< The input values column.
OUTPUT_DTYPE, ///< Data type for the output result.
INIT_VALUE ///< The initial value for computing reduction.
};

/**
* @brief Define the possible data needed for segmented reduction.
*/
enum class segmented_reduction_data_attribute : int32_t {
INPUT_VALUES, ///< The input values column.
OUTPUT_DTYPE, ///< Data type for the output result.
INIT_VALUE, ///< The initial value for computing reduction.
NULL_POLICY, ///< To control null handling (INCLUDE or EXCLUDE nulls).
OFFSETS ///< The offsets defining segments for segmented reduction.
};

/**
* @brief Define the possible data needed for groupby aggregations.
*
Expand Down Expand Up @@ -131,7 +151,10 @@ struct host_udf_base {
/**
* @brief Hold all possible data types for the input of the aggregation in the derived class.
*/
using value_type = std::variant<groupby_data_attribute, std::unique_ptr<aggregation>>;
using value_type = std::variant<reduction_data_attribute,
segmented_reduction_data_attribute,
groupby_data_attribute,
std::unique_ptr<aggregation>>;
value_type value; ///< The actual data attribute, wrapped by this struct
///< as a wrapper is needed to define `hash` and `equal_to` functors.

Expand All @@ -142,7 +165,10 @@ struct host_udf_base {
* @brief Construct a new data attribute from an aggregation attribute.
* @param value_ An aggregation attribute
*/
template <typename T, CUDF_ENABLE_IF(std::is_same_v<T, groupby_data_attribute>)>
template <typename T,
CUDF_ENABLE_IF(std::is_same_v<T, reduction_data_attribute> ||
std::is_same_v<T, segmented_reduction_data_attribute> ||
std::is_same_v<T, groupby_data_attribute>)>
data_attribute(T value_) : value{value_}
{
ttnghia marked this conversation as resolved.
Show resolved Hide resolved
}
Expand Down Expand Up @@ -220,7 +246,12 @@ struct host_udf_base {
* @brief Hold all possible types of the data that is passed to the derived class for executing
* the aggregation.
*/
using input_data_t = std::variant<column_view, size_type, device_span<size_type const>>;
using input_data_t = std::variant<column_view,
data_type,
std::optional<std::reference_wrapper<scalar const>>,
null_policy,
size_type,
device_span<size_type const>>;
ttnghia marked this conversation as resolved.
Show resolved Hide resolved

/**
* @brief Input to the aggregation, mapping from each data attribute to its actual data.
Expand All @@ -229,20 +260,18 @@ struct host_udf_base {
unordered_map<data_attribute, input_data_t, data_attribute::hash, data_attribute::equal_to>;

/**
* @brief Output type of the aggregation.
*
* Currently only a single type is supported as the output of the aggregation, but it will hold
* more type in the future when reduction is supported.
* @brief Output type of the aggregation. It can be either a scalar (for reduction) or a column
* (for segmented reduction or groupby aggregations).
*/
using output_t = std::variant<std::unique_ptr<column>>;
using output_t = std::variant<std::unique_ptr<scalar>, std::unique_ptr<column>>;

/**
* @brief Get the output when the input values column is empty.
*
* This is called in libcudf when the input values column is empty. In such situations libcudf
* tries to generate the output directly without unnecessarily evaluating the intermediate data.
*
* @param output_dtype The expected output data type
* @param output_dtype The output data type for reduction and segmented reduction results
* @param stream The CUDA stream to use for any kernel launches
* @param mr Device memory resource to use for any allocations
* @return The output result of the aggregation when input values is empty
Expand Down
4 changes: 3 additions & 1 deletion cpp/include/cudf/detail/aggregation/aggregation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -967,7 +967,9 @@ class udf_aggregation final : public rolling_aggregation {
/**
* @brief Derived class for specifying host-based UDF aggregation.
*/
class host_udf_aggregation final : public groupby_aggregation {
class host_udf_aggregation final : public groupby_aggregation,
public reduce_aggregation,
public segmented_reduce_aggregation {
public:
std::unique_ptr<host_udf_base> udf_ptr;

Expand Down
4 changes: 4 additions & 0 deletions cpp/src/groupby/sort/host_udf_aggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,5 +99,9 @@ template CUDF_EXPORT std::unique_ptr<aggregation> make_host_udf_aggregation<aggr
std::unique_ptr<host_udf_base>);
template CUDF_EXPORT std::unique_ptr<groupby_aggregation>
make_host_udf_aggregation<groupby_aggregation>(std::unique_ptr<host_udf_base>);
template CUDF_EXPORT std::unique_ptr<reduce_aggregation>
make_host_udf_aggregation<reduce_aggregation>(std::unique_ptr<host_udf_base>);
template CUDF_EXPORT std::unique_ptr<segmented_reduce_aggregation>
make_host_udf_aggregation<segmented_reduce_aggregation>(std::unique_ptr<host_udf_base>);

} // namespace cudf
40 changes: 38 additions & 2 deletions cpp/src/reductions/reductions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

#include <cudf/aggregation/host_udf.hpp>
#include <cudf/column/column.hpp>
#include <cudf/detail/aggregation/aggregation.hpp>
#include <cudf/detail/copy.hpp>
Expand Down Expand Up @@ -144,6 +145,39 @@ struct reduce_dispatch_functor {
auto td_agg = static_cast<cudf::detail::merge_tdigest_aggregation const&>(agg);
return tdigest::detail::reduce_merge_tdigest(col, td_agg.max_centroids, stream, mr);
}
case aggregation::HOST_UDF: {
auto const& udf_ptr = dynamic_cast<cudf::detail::host_udf_aggregation const&>(agg).udf_ptr;
auto const data_attrs = [&]() -> host_udf_base::data_attribute_set_t {
if (auto tmp = udf_ptr->get_required_data(); !tmp.empty()) { return tmp; }
// Empty attribute set means everything.
return {host_udf_base::reduction_data_attribute::INPUT_VALUES,
host_udf_base::reduction_data_attribute::OUTPUT_DTYPE,
host_udf_base::reduction_data_attribute::INIT_VALUE};
}();

// Do not cache udf_input, as the actual input data may change from run to run.
host_udf_base::input_map_t udf_input;
for (auto const& attr : data_attrs) {
CUDF_EXPECTS(std::holds_alternative<host_udf_base::reduction_data_attribute>(attr.value),
"Invalid input data attribute for HOST_UDF reduction.");
switch (std::get<host_udf_base::reduction_data_attribute>(attr.value)) {
case host_udf_base::reduction_data_attribute::INPUT_VALUES:
udf_input.emplace(attr, col);
break;
case host_udf_base::reduction_data_attribute::OUTPUT_DTYPE:
udf_input.emplace(attr, output_dtype);
break;
case host_udf_base::reduction_data_attribute::INIT_VALUE:
udf_input.emplace(attr, init);
break;
default: CUDF_UNREACHABLE("Invalid input data attribute for HOST_UDF reduction.");
}
}
auto output = (*udf_ptr)(udf_input, stream, mr);
CUDF_EXPECTS(std::holds_alternative<std::unique_ptr<scalar>>(output),
"Invalid output type from HOST_UDF reduction.");
return std::get<std::unique_ptr<scalar>>(std::move(output));
} // case aggregation::HOST_UDF
default: CUDF_FAIL("Unsupported reduction operator");
}
}
Expand All @@ -161,9 +195,11 @@ std::unique_ptr<scalar> reduce(column_view const& col,
cudf::data_type_error);
if (init.has_value() && !(agg.kind == aggregation::SUM || agg.kind == aggregation::PRODUCT ||
agg.kind == aggregation::MIN || agg.kind == aggregation::MAX ||
agg.kind == aggregation::ANY || agg.kind == aggregation::ALL)) {
agg.kind == aggregation::ANY || agg.kind == aggregation::ALL ||
agg.kind == aggregation::HOST_UDF)) {
CUDF_FAIL(
"Initial value is only supported for SUM, PRODUCT, MIN, MAX, ANY, and ALL aggregation types");
"Initial value is only supported for SUM, PRODUCT, MIN, MAX, ANY, ALL, and HOST_UDF "
"aggregation types");
}

// Returns default scalar if input column is empty or all null
Expand Down
51 changes: 49 additions & 2 deletions cpp/src/reductions/segmented/reductions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <cudf/aggregation/host_udf.hpp>
#include <cudf/column/column_factories.hpp>
#include <cudf/detail/aggregation/aggregation.hpp>
#include <cudf/detail/nvtx/ranges.hpp>
Expand Down Expand Up @@ -98,6 +100,49 @@ struct segmented_reduce_dispatch_functor {
}
case segmented_reduce_aggregation::NUNIQUE:
return segmented_nunique(col, offsets, null_handling, stream, mr);
case aggregation::HOST_UDF: {
auto const& udf_ptr = dynamic_cast<cudf::detail::host_udf_aggregation const&>(agg).udf_ptr;
auto const data_attrs = [&]() -> host_udf_base::data_attribute_set_t {
if (auto tmp = udf_ptr->get_required_data(); !tmp.empty()) { return tmp; }
// Empty attribute set means everything.
return {host_udf_base::segmented_reduction_data_attribute::INPUT_VALUES,
host_udf_base::segmented_reduction_data_attribute::OUTPUT_DTYPE,
host_udf_base::segmented_reduction_data_attribute::INIT_VALUE,
host_udf_base::segmented_reduction_data_attribute::NULL_POLICY,
host_udf_base::segmented_reduction_data_attribute::OFFSETS};
}();

// Do not cache udf_input, as the actual input data may change from run to run.
host_udf_base::input_map_t udf_input;
for (auto const& attr : data_attrs) {
CUDF_EXPECTS(
std::holds_alternative<host_udf_base::segmented_reduction_data_attribute>(attr.value),
"Invalid input data attribute for HOST_UDF segmented reduction.");
switch (std::get<host_udf_base::segmented_reduction_data_attribute>(attr.value)) {
case host_udf_base::segmented_reduction_data_attribute::INPUT_VALUES:
udf_input.emplace(attr, col);
break;
case host_udf_base::segmented_reduction_data_attribute::OUTPUT_DTYPE:
udf_input.emplace(attr, output_dtype);
break;
case host_udf_base::segmented_reduction_data_attribute::INIT_VALUE:
udf_input.emplace(attr, init);
break;
case host_udf_base::segmented_reduction_data_attribute::NULL_POLICY:
udf_input.emplace(attr, null_handling);
break;
case host_udf_base::segmented_reduction_data_attribute::OFFSETS:
udf_input.emplace(attr, offsets);
break;
default:
CUDF_UNREACHABLE("Invalid input data attribute for HOST_UDF segmented reduction.");
}
}
auto output = (*udf_ptr)(udf_input, stream, mr);
CUDF_EXPECTS(std::holds_alternative<std::unique_ptr<column>>(output),
"Invalid output type from HOST_UDF segmented reduction.");
return std::get<std::unique_ptr<column>>(std::move(output));
} // case aggregation::HOST_UDF
default: CUDF_FAIL("Unsupported aggregation type.");
}
}
Expand All @@ -117,9 +162,11 @@ std::unique_ptr<column> segmented_reduce(column_view const& segmented_values,
cudf::data_type_error);
if (init.has_value() && !(agg.kind == aggregation::SUM || agg.kind == aggregation::PRODUCT ||
agg.kind == aggregation::MIN || agg.kind == aggregation::MAX ||
agg.kind == aggregation::ANY || agg.kind == aggregation::ALL)) {
agg.kind == aggregation::ANY || agg.kind == aggregation::ALL ||
agg.kind == aggregation::HOST_UDF)) {
CUDF_FAIL(
"Initial value is only supported for SUM, PRODUCT, MIN, MAX, ANY, and ALL aggregation types");
"Initial value is only supported for SUM, PRODUCT, MIN, MAX, ANY, ALL, and HOST_UDF "
"aggregation types");
}

if (segmented_values.is_empty() && offsets.empty()) {
Expand Down
Loading
Loading