Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance improvements and simplifications for fixed size row-based rolling windows #17623

Merged
merged 23 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
df81991
Require that window offsets to rolling GPU kernel are in bounds
wence- Dec 16, 2024
1b9670a
Note in-bounds requirements when calling cudf::rolling_window
wence- Dec 17, 2024
584bd73
Remove unnecessary transform iterators
wence- Dec 17, 2024
554641e
Rolling collect list no longer needs to fix up window boundaries
wence- Dec 18, 2024
3c47cfc
Adapt rolling tests that relied on kernel-side in-bounds clamping
wence- Dec 17, 2024
c14ffeb
Remove stray volatile
wence- Dec 18, 2024
8b64789
Remove test for bug that was fixed in CUDA 10.1
wence- Dec 18, 2024
88796a3
No need for int64s
wence- Dec 18, 2024
432cacf
Remove assertion of post-conditions on groupby helper
wence- Dec 18, 2024
045b514
Abstract and share code for fixed window clamped iterators
wence- Dec 18, 2024
a4c6cdb
Lead/lag nested postprocessing no longer needs to clamp
wence- Dec 18, 2024
dbddc28
Adapt stream test
wence- Dec 19, 2024
e22bf55
Also adapt java variable-size rolling tests
wence- Dec 19, 2024
3b3b15f
constexpr/const all the things
wence- Jan 8, 2025
fedda39
Copyright updates
wence- Jan 8, 2025
4e43c2b
Flip template args
wence- Jan 17, 2025
b1777fb
Use device_span rather than raw pointer
wence- Jan 17, 2025
65d0904
Merge remote-tracking branch 'upstream/branch-25.02' into wence/fea/r…
wence- Jan 17, 2025
b6ed325
Minor doc comment
wence- Jan 17, 2025
62fa800
const all the things
wence- Jan 17, 2025
0223341
Undo usage of device_span to save two registers
wence- Jan 20, 2025
a00525f
CUDF_EXPORT in namespace declaration
wence- Jan 20, 2025
45e289a
Merge branch 'branch-25.02' into wence/fea/rolling-perf
wence- Jan 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cpp/include/cudf/rolling.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ namespace CUDF_EXPORT cudf {
* - instead of storing NA/NaN for output rows that do not meet the minimum number of observations
* this function updates the valid bitmask of the column to indicate which elements are valid.
*
* @note Windows near the endpoints of the input are automatically clamped to be in-bounds.
*
wence- marked this conversation as resolved.
Show resolved Hide resolved
* Notes on return column types:
* - The returned column for count aggregation always has `INT32` type.
* - The returned column for VARIANCE/STD aggregations always has `FLOAT64` type.
Expand Down Expand Up @@ -594,6 +596,11 @@ std::unique_ptr<column> grouped_range_rolling_window(
* column of the same type as the input. Therefore it is suggested to convert integer column types
* (especially low-precision integers) to `FLOAT32` or `FLOAT64` before doing a rolling `MEAN`.
*
* @note All entries in `preceding_window` and `following_window` must produce window extents that
* are in-bounds for the `input`. That is, for all `i`, it is required that the set of rows defined
* by the interval `[i - preceding_window[i] + 1, ..., i + following_window[i] + 1)` is a subset of
* `[0, input.size())`.
*
* @throws cudf::logic_error if window column type is not INT32
*
* @param[in] input The input column
Expand Down
14 changes: 3 additions & 11 deletions cpp/src/rolling/detail/lead_lag_nested.cuh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2024, NVIDIA CORPORATION.
* Copyright (c) 2021-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -147,11 +147,7 @@ std::unique_ptr<column> compute_lead_lag_for_nested(aggregation::Kind op,
gather_map.begin<size_type>(),
cuda::proclaim_return_type<size_type>(
[following, input_size, null_index, row_offset] __device__(size_type i) {
// Note: grouped_*rolling_window() trims preceding/following to
wence- marked this conversation as resolved.
Show resolved Hide resolved
// the beginning/end of the group. `rolling_window()` does not.
// Must trim _following[i] so as not to go past the column end.
auto _following = min(following[i], input_size - i - 1);
return (row_offset > _following) ? null_index : (i + row_offset);
return (row_offset > following[i]) ? null_index : (i + row_offset);
}));
} else {
thrust::transform(rmm::exec_policy(stream),
Expand All @@ -160,11 +156,7 @@ std::unique_ptr<column> compute_lead_lag_for_nested(aggregation::Kind op,
gather_map.begin<size_type>(),
cuda::proclaim_return_type<size_type>(
[preceding, input_size, null_index, row_offset] __device__(size_type i) {
// Note: grouped_*rolling_window() trims preceding/following to
// the beginning/end of the group. `rolling_window()` does not.
// Must trim _preceding[i] so as not to go past the column start.
auto _preceding = min(preceding[i], i + 1);
return (row_offset > (_preceding - 1)) ? null_index : (i - row_offset);
return (row_offset > (preceding[i] - 1)) ? null_index : (i - row_offset);
wence- marked this conversation as resolved.
Show resolved Hide resolved
}));
}

Expand Down
35 changes: 17 additions & 18 deletions cpp/src/rolling/detail/rolling.cuh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
* Copyright (c) 2020-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -1010,7 +1010,7 @@ class rolling_aggregation_postprocessor final : public cudf::detail::aggregation
* @param[out] output_valid_count Output count of valid values
* @param[in] device_operator The operator used to perform a single window operation
* @param[in] preceding_window_begin Rolling window size iterator, accumulates from
* in_col[i-preceding_window] to in_col[i] inclusive
* in_col[i-preceding_window + 1] to in_col[i] inclusive
wence- marked this conversation as resolved.
Show resolved Hide resolved
* @param[in] following_window_begin Rolling window size iterator in the forward
* direction, accumulates from in_col[i] to in_col[i+following_window] inclusive
*/
Expand All @@ -1034,28 +1034,27 @@ __launch_bounds__(block_size) CUDF_KERNEL

size_type warp_valid_count{0};

auto active_threads = __ballot_sync(0xffff'ffffu, i < input.size());
while (i < input.size()) {
// to prevent overflow issues when computing bounds use int64_t
int64_t const preceding_window = preceding_window_begin[i];
int64_t const following_window = following_window_begin[i];
auto const num_rows = input.size();
auto active_threads = __ballot_sync(0xffff'ffffu, i < num_rows);
while (i < num_rows) {
// The caller is required to provide window bounds that will
// result in indexing that is in-bounds for the column. Therefore all
// of these calculations cannot overflow and we can do everything
// in size_type arithmetic. Moreover, we require that start <=
// end, i.e., the window is never "reversed" though it may be empty.
auto const preceding_window = preceding_window_begin[i];
auto const following_window = following_window_begin[i];

// compute bounds
auto const start = static_cast<size_type>(
min(static_cast<int64_t>(input.size()), max(int64_t{0}, i - preceding_window + 1)));
auto const end = static_cast<size_type>(
min(static_cast<int64_t>(input.size()), max(int64_t{0}, i + following_window + 1)));
auto const start_index = min(start, end);
auto const end_index = max(start, end);
size_type const start = i - preceding_window + 1;
size_type const end = i + following_window + 1;
Comment on lines +1040 to +1049
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check my working for integer overflow here. preceding_window and following_window both have type size_type (as does i). So this is all happening in size_type arithmetic.

Since i is the row index, it is contained in [0, ..., num_rows). So i + 1 is always safe. So we just have to make sure that subtracting a negative preceding window never overflows, and adding a positive following window never overflows. This is handled by the clamping kernel.

So if we inline that logic, this looks like:

size_type start = i - cuda::std::min(i + 1, cuda::std::max(delta, i + 1 - num_rows))) + 1;
size_type end = i + cuda::std::max(- i - 1, cuda::std::min(delta, num_rows - i - 1)) + 1;

I think none of these operations overflow if num_rows == INT_MAX but please check.


// aggregate
// TODO: We should explore using shared memory to avoid redundant loads.
// This might require separating the kernel into a special version
// for dynamic and static sizes.

volatile bool output_is_valid = false;
wence- marked this conversation as resolved.
Show resolved Hide resolved
output_is_valid = device_operator.template operator()<OutputType, has_nulls>(
input, default_outputs, output, start_index, end_index, i);
bool const output_is_valid = device_operator.template operator()<OutputType, has_nulls>(
input, default_outputs, output, start, end, i);

// set the mask
cudf::bitmask_type const result_mask{__ballot_sync(active_threads, output_is_valid)};
Expand All @@ -1068,7 +1067,7 @@ __launch_bounds__(block_size) CUDF_KERNEL

// process next element
i += stride;
active_threads = __ballot_sync(active_threads, i < input.size());
active_threads = __ballot_sync(active_threads, i < num_rows);
}

// sum the valid counts across the whole block
Expand Down
23 changes: 3 additions & 20 deletions cpp/src/rolling/detail/rolling_collect_list.cuh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2024, NVIDIA CORPORATION.
* Copyright (c) 2021-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -154,8 +154,8 @@ std::pair<std::unique_ptr<column>, std::unique_ptr<column>> purge_null_entries(
template <typename PrecedingIter, typename FollowingIter>
std::unique_ptr<column> rolling_collect_list(column_view const& input,
column_view const& default_outputs,
PrecedingIter preceding_begin_raw,
FollowingIter following_begin_raw,
PrecedingIter preceding_begin,
FollowingIter following_begin,
size_type min_periods,
null_policy null_handling,
rmm::cuda_stream_view stream,
Expand All @@ -166,23 +166,6 @@ std::unique_ptr<column> rolling_collect_list(column_view const& input,

if (input.is_empty()) return empty_like(input);

// Fix up preceding/following iterators to respect column boundaries,
wence- marked this conversation as resolved.
Show resolved Hide resolved
// similar to gpu_rolling().
// `rolling_window()` does not fix up preceding/following so as not to read past
// column boundaries.
// `grouped_rolling_window()` and `time_range_based_grouped_rolling_window() do.
auto preceding_begin = thrust::make_transform_iterator(
thrust::make_counting_iterator<size_type>(0),
cuda::proclaim_return_type<size_type>([preceding_begin_raw] __device__(auto i) {
return thrust::min(preceding_begin_raw[i], i + 1);
}));
auto following_begin = thrust::make_transform_iterator(
thrust::make_counting_iterator<size_type>(0),
cuda::proclaim_return_type<size_type>(
[following_begin_raw, size = input.size()] __device__(auto i) {
return thrust::min(following_begin_raw[i], size - i - 1);
}));

// Materialize collect list's offsets.
auto offsets =
create_collect_offsets(input.size(), preceding_begin, following_begin, min_periods, stream, mr);
Expand Down
37 changes: 11 additions & 26 deletions cpp/src/rolling/detail/rolling_fixed_window.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
* Copyright (c) 2022-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,16 +15,15 @@
*/

#include "rolling.cuh"

#include <cudf_test/column_utilities.hpp>
#include "rolling_utils.cuh"

#include <cudf/detail/aggregation/aggregation.hpp>
#include <cudf/detail/iterator.cuh>
#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/utilities/default_stream.hpp>
#include <cudf/utilities/memory_resource.hpp>

#include <cuda/functional>
#include <thrust/extrema.h>

namespace cudf::detail {

Expand Down Expand Up @@ -62,28 +61,14 @@ std::unique_ptr<column> rolling_window(column_view const& input,
stream,
mr);
} else {
// Clamp preceding/following to column boundaries.
// E.g. If preceding_window == 2, then for a column of 5 elements, preceding_window will be:
// [1, 2, 2, 2, 1]

auto const preceding_calc = cuda::proclaim_return_type<cudf::size_type>(
[preceding_window] __device__(size_type i) { return thrust::min(i + 1, preceding_window); });

auto const following_calc = cuda::proclaim_return_type<cudf::size_type>(
[col_size = input.size(), following_window] __device__(size_type i) {
return thrust::min(col_size - i - 1, following_window);
});

auto const preceding_column = expand_to_column(preceding_calc, input.size(), stream);
auto const following_column = expand_to_column(following_calc, input.size(), stream);
return cudf::detail::rolling_window(input,
default_outputs,
preceding_column->view().begin<cudf::size_type>(),
following_column->view().begin<cudf::size_type>(),
min_periods,
agg,
stream,
mr);
namespace utils = cudf::detail::rolling;
auto groups = utils::ungrouped{input.size()};
auto preceding =
utils::make_clamped_window_iterator<utils::direction::PRECEDING>(preceding_window, groups);
auto following =
utils::make_clamped_window_iterator<utils::direction::FOLLOWING>(following_window, groups);
wence- marked this conversation as resolved.
Show resolved Hide resolved
return cudf::detail::rolling_window(
input, default_outputs, preceding, following, min_periods, agg, stream, mr);
}
}
} // namespace cudf::detail
121 changes: 121 additions & 0 deletions cpp/src/rolling/detail/rolling_utils.cuh
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright (c) 2024-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <cudf/detail/iterator.cuh>
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
#include <cudf/types.hpp>

#include <cuda/functional>

namespace CUDF_EXPORT cudf {

namespace detail::rolling {

/**
* @brief A group descriptor for an ungrouped rolling window.
*
* @param num_rows The number of rows to be rolled over.
*
* @note This is used for uniformity of interface between grouped and ungrouped iterator
* construction.
*/
struct ungrouped {
cudf::size_type num_rows;

[[nodiscard]] __device__ constexpr cudf::size_type label(cudf::size_type) const noexcept
{
return 0;
}
[[nodiscard]] __device__ constexpr cudf::size_type start(cudf::size_type) const noexcept
{
return 0;
}
[[nodiscard]] __device__ constexpr cudf::size_type end(cudf::size_type) const noexcept
{
return num_rows;
}
};

/**
* @brief A group descriptor for a grouped rolling window.
*
* @param labels The group labels, mapping from input rows to group.
* @param offsets The group offsets providing the endpoints of each group.
*
* @note This is used for uniformity of interface between grouped and ungrouped iterator
* construction.
*/
struct grouped {
wence- marked this conversation as resolved.
Show resolved Hide resolved
// Taking raw pointers here to avoid stealing two registers for the sizes which are never needed.
cudf::size_type const* labels;
wence- marked this conversation as resolved.
Show resolved Hide resolved
cudf::size_type const* offsets;

[[nodiscard]] __device__ constexpr cudf::size_type label(cudf::size_type i) const noexcept
{
return labels[i];
}
[[nodiscard]] __device__ constexpr cudf::size_type start(cudf::size_type label) const noexcept
{
return offsets[label];
}
[[nodiscard]] __device__ constexpr cudf::size_type end(cudf::size_type label) const noexcept
{
return offsets[label + 1];
}
};

enum class direction : bool {
PRECEDING,
FOLLOWING,
};

template <direction Direction, typename Grouping>
struct fixed_window_clamper {
Grouping groups;
cudf::size_type delta;
[[nodiscard]] __device__ constexpr cudf::size_type operator()(cudf::size_type i) const
{
auto const label = groups.label(i);
// i is contained in [start, end)
auto const start = groups.start(label);
auto const end = groups.end(label);
if constexpr (Direction == direction::PRECEDING) {
return cuda::std::min(i + 1 - start, cuda::std::max(delta, i + 1 - end));
} else {
return cuda::std::max(start - i - 1, cuda::std::min(delta, end - i - 1));
wence- marked this conversation as resolved.
Show resolved Hide resolved
}
}
};

/**
* @brief Construct a clamped counting iterator for a row-based window offset
*
* @tparam Direction the direction of the window `PRECEDING` or `FOLLOWING`.
* @tparam Grouping the group specification.
* @param delta the window offset.
* @param grouper the grouping object.
*
* @return An iterator suitable for passing to `cudf::detail::rolling_window`
*/
template <direction Direction, typename Grouping>
[[nodiscard]] auto inline make_clamped_window_iterator(cudf::size_type delta, Grouping grouper)
{
return cudf::detail::make_counting_transform_iterator(
cudf::size_type{0}, fixed_window_clamper<Direction, Grouping>{grouper, delta});
}
} // namespace detail::rolling
} // namespace CUDF_EXPORT cudf
20 changes: 3 additions & 17 deletions cpp/src/rolling/detail/rolling_variable_window.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
* Copyright (c) 2022-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -63,24 +63,10 @@ std::unique_ptr<column> rolling_window(column_view const& input,
} else {
auto defaults_col =
cudf::is_dictionary(input.type()) ? dictionary_column_view(input).indices() : input;
// Clamp preceding/following to column boundaries.
// E.g. If preceding_window == [2, 2, 2, 2, 2] for a column of 5 elements, the new
// preceding_window will be: [1, 2, 2, 2, 1]
auto const preceding_window_begin = cudf::detail::make_counting_transform_iterator(
0,
cuda::proclaim_return_type<size_type>(
[preceding = preceding_window.begin<size_type>()] __device__(size_type i) {
return thrust::min(i + 1, preceding[i]);
}));
auto const following_window_begin = cudf::detail::make_counting_transform_iterator(
0,
cuda::proclaim_return_type<size_type>(
[col_size = input.size(), following = following_window.begin<size_type>()] __device__(
size_type i) { return thrust::min(col_size - i - 1, following[i]); }));
Comment on lines -66 to -79
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, this is now guaranteed by the caller.

return cudf::detail::rolling_window(input,
empty_like(defaults_col)->view(),
preceding_window_begin,
following_window_begin,
preceding_window.begin<size_type>(),
following_window.begin<size_type>(),
min_periods,
agg,
stream,
Expand Down
Loading
Loading