Skip to content

Commit

Permalink
Support segmented reductions and null mask reductions (#9621)
Browse files Browse the repository at this point in the history
closes #9135 
closes #9552 

This PR adds support for numeric types to `simple_op`, `sum`, `prod`, `min`, `max`, `any`, `all`. Also, this PR adds `segmented_null_mask_reduction` to compute null mask reductions on segments.

Authors:
  - Michael Wang (https://github.com/isVoid)

Approvers:
  - Vyas Ramasubramani (https://github.com/vyasr)
  - Robert (Bobby) Evans (https://github.com/revans2)
  - Bradley Dice (https://github.com/bdice)
  - Jake Hemstad (https://github.com/jrhemstad)

URL: #9621
  • Loading branch information
isVoid authored Mar 10, 2022
1 parent dbe7b4f commit 7ff1956
Show file tree
Hide file tree
Showing 21 changed files with 1,612 additions and 32 deletions.
7 changes: 7 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,13 @@ add_library(
src/reductions/scan/scan.cpp
src/reductions/scan/scan_exclusive.cu
src/reductions/scan/scan_inclusive.cu
src/reductions/segmented_all.cu
src/reductions/segmented_any.cu
src/reductions/segmented_max.cu
src/reductions/segmented_min.cu
src/reductions/segmented_product.cu
src/reductions/segmented_reductions.cpp
src/reductions/segmented_sum.cu
src/reductions/std.cu
src/reductions/sum.cu
src/reductions/sum_of_squares.cu
Expand Down
7 changes: 4 additions & 3 deletions cpp/benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ target_compile_options(

target_link_libraries(
cudf_datagen PUBLIC GTest::gmock GTest::gtest GTest::gmock_main GTest::gtest_main
benchmark::benchmark nvbench::nvbench Threads::Threads cudf
benchmark::benchmark nvbench::nvbench Threads::Threads cudf cudftestutil
)

target_include_directories(
Expand Down Expand Up @@ -175,9 +175,10 @@ ConfigureBench(TYPE_DISPATCHER_BENCH type_dispatcher/type_dispatcher.cu)
# ##################################################################################################
# * reduction benchmark ---------------------------------------------------------------------------
ConfigureBench(
REDUCTION_BENCH reduction/anyall.cpp reduction/dictionary.cpp reduction/reduce.cpp
reduction/scan.cpp reduction/minmax.cpp
REDUCTION_BENCH reduction/anyall.cpp reduction/dictionary.cpp reduction/minmax.cpp
reduction/reduce.cpp reduction/scan.cpp
)
ConfigureNVBench(REDUCTION_NVBENCH reduction/segment_reduce.cu)

# ##################################################################################################
# * reduction benchmark ---------------------------------------------------------------------------
Expand Down
134 changes: 134 additions & 0 deletions cpp/benchmarks/reduction/segment_reduce.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <benchmarks/fixture/rmm_pool_raii.hpp>
#include <nvbench/nvbench.cuh>

#include <cudf_test/base_fixture.hpp>
#include <cudf_test/column_wrapper.hpp>

#include <cudf/aggregation.hpp>
#include <cudf/column/column.hpp>
#include <cudf/detail/iterator.cuh>
#include <cudf/reduction.hpp>
#include <cudf/types.hpp>
#include <cudf/utilities/span.hpp>

#include <rmm/cuda_stream_view.hpp>

#include <thrust/device_vector.h>

#include <memory>
#include <type_traits>
#include <vector>

namespace cudf {

bool constexpr is_boolean_output_agg(segmented_reduce_aggregation::Kind kind)
{
return kind == segmented_reduce_aggregation::ALL || kind == segmented_reduce_aggregation::ANY;
}

template <segmented_reduce_aggregation::Kind kind>
std::unique_ptr<segmented_reduce_aggregation> make_simple_aggregation()
{
switch (kind) {
case segmented_reduce_aggregation::SUM:
return make_sum_aggregation<segmented_reduce_aggregation>();
case segmented_reduce_aggregation::PRODUCT:
return make_product_aggregation<segmented_reduce_aggregation>();
case segmented_reduce_aggregation::MIN:
return make_min_aggregation<segmented_reduce_aggregation>();
case segmented_reduce_aggregation::MAX:
return make_max_aggregation<segmented_reduce_aggregation>();
case segmented_reduce_aggregation::ALL:
return make_all_aggregation<segmented_reduce_aggregation>();
case segmented_reduce_aggregation::ANY:
return make_any_aggregation<segmented_reduce_aggregation>();
default: CUDF_FAIL("Unsupported simple segmented aggregation");
}
}

template <typename InputType>
std::pair<std::unique_ptr<column>, thrust::device_vector<size_type>> make_test_data(
nvbench::state& state)
{
auto const column_size{size_type(state.get_int64("column_size"))};
auto const num_segments{size_type(state.get_int64("num_segments"))};

auto segment_length = column_size / num_segments;

test::UniformRandomGenerator<InputType> rand_gen(0, 100);
auto data_it = detail::make_counting_transform_iterator(
0, [&rand_gen](auto i) { return rand_gen.generate(); });

auto offset_it =
detail::make_counting_transform_iterator(0, [&column_size, &segment_length](auto i) {
return column_size < i * segment_length ? column_size : i * segment_length;
});

test::fixed_width_column_wrapper<InputType> input(data_it, data_it + column_size);
std::vector<size_type> h_offsets(offset_it, offset_it + num_segments + 1);
thrust::device_vector<size_type> d_offsets(h_offsets);

return std::make_pair(input.release(), d_offsets);
}

template <typename InputType, typename OutputType, aggregation::Kind kind>
std::enable_if_t<!is_boolean_output_agg(kind) || std::is_same_v<OutputType, bool>, void>
BM_Simple_Segmented_Reduction(nvbench::state& state,
nvbench::type_list<InputType, OutputType, nvbench::enum_type<kind>>)
{
// TODO: to be replaced by nvbench fixture once it's ready
cudf::rmm_pool_raii rmm_pool;

auto const column_size{size_type(state.get_int64("column_size"))};
auto [input, offsets] = make_test_data<InputType>(state);
auto agg = make_simple_aggregation<kind>();

state.add_element_count(column_size);
state.add_global_memory_reads<InputType>(column_size);
state.add_global_memory_writes<OutputType>(column_size);

state.exec(
nvbench::exec_tag::sync,
[input_view = input->view(), offset_span = device_span<size_type>{offsets}, &agg](
nvbench::launch& launch) {
segmented_reduce(
input_view, offset_span, *agg, data_type{type_to_id<OutputType>()}, null_policy::INCLUDE);
});
}

template <typename InputType, typename OutputType, aggregation::Kind kind>
std::enable_if_t<is_boolean_output_agg(kind) && !std::is_same_v<OutputType, bool>, void>
BM_Simple_Segmented_Reduction(nvbench::state& state,
nvbench::type_list<InputType, OutputType, nvbench::enum_type<kind>>)
{
state.skip("Invalid combination of dtype and aggregation type.");
}

using Types = nvbench::type_list<bool, int32_t, float, double>;
// Skip benchmarking MAX/ANY since they are covered by MIN/ALL respectively.
using AggKinds = nvbench::
enum_type_list<aggregation::SUM, aggregation::PRODUCT, aggregation::MIN, aggregation::ALL>;

NVBENCH_BENCH_TYPES(BM_Simple_Segmented_Reduction, NVBENCH_TYPE_AXES(Types, Types, AggKinds))
.set_name("segmented_reduction_simple")
.set_type_axes_names({"InputType", "OutputType", "AggregationKinds"})
.add_int64_axis("column_size", {100'000, 1'000'000, 10'000'000, 100'000'000})
.add_int64_axis("num_segments", {1'000, 10'000, 100'000});

} // namespace cudf
11 changes: 11 additions & 0 deletions cpp/include/cudf/aggregation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,17 @@ class groupby_scan_aggregation : public virtual aggregation {
groupby_scan_aggregation() {}
};

/**
* @brief Derived class intended for segmented reduction usage.
*/
class segmented_reduce_aggregation : public virtual aggregation {
public:
~segmented_reduce_aggregation() override = default;

protected:
segmented_reduce_aggregation() {}
};

enum class udf_type : bool { CUDA, PTX };
enum class correlation_type : int32_t { PEARSON, KENDALL, SPEARMAN };

Expand Down
15 changes: 9 additions & 6 deletions cpp/include/cudf/detail/aggregation/aggregation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ class aggregation_finalizer { // Declares the interface for the finalizer
*/
class sum_aggregation final : public rolling_aggregation,
public groupby_aggregation,
public groupby_scan_aggregation {
public groupby_scan_aggregation,
public segmented_reduce_aggregation {
public:
sum_aggregation() : aggregation(SUM) {}

Expand All @@ -166,7 +167,7 @@ class sum_aggregation final : public rolling_aggregation,
/**
* @brief Derived class for specifying a product aggregation
*/
class product_aggregation final : public groupby_aggregation {
class product_aggregation final : public groupby_aggregation, public segmented_reduce_aggregation {
public:
product_aggregation() : aggregation(PRODUCT) {}

Expand All @@ -187,7 +188,8 @@ class product_aggregation final : public groupby_aggregation {
*/
class min_aggregation final : public rolling_aggregation,
public groupby_aggregation,
public groupby_scan_aggregation {
public groupby_scan_aggregation,
public segmented_reduce_aggregation {
public:
min_aggregation() : aggregation(MIN) {}

Expand All @@ -208,7 +210,8 @@ class min_aggregation final : public rolling_aggregation,
*/
class max_aggregation final : public rolling_aggregation,
public groupby_aggregation,
public groupby_scan_aggregation {
public groupby_scan_aggregation,
public segmented_reduce_aggregation {
public:
max_aggregation() : aggregation(MAX) {}

Expand Down Expand Up @@ -248,7 +251,7 @@ class count_aggregation final : public rolling_aggregation,
/**
* @brief Derived class for specifying an any aggregation
*/
class any_aggregation final : public aggregation {
class any_aggregation final : public segmented_reduce_aggregation {
public:
any_aggregation() : aggregation(ANY) {}

Expand All @@ -267,7 +270,7 @@ class any_aggregation final : public aggregation {
/**
* @brief Derived class for specifying an all aggregation
*/
class all_aggregation final : public aggregation {
class all_aggregation final : public segmented_reduce_aggregation {
public:
all_aggregation() : aggregation(ALL) {}

Expand Down
99 changes: 89 additions & 10 deletions cpp/include/cudf/detail/null_mask.cuh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
* Copyright (c) 2021-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@
#include <cudf/column/column_device_view.cuh>
#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/detail/valid_if.cuh>
#include <cudf/null_mask.hpp>
#include <cudf/types.hpp>
#include <cudf/utilities/span.hpp>
Expand All @@ -32,6 +33,7 @@
#include <thrust/for_each.h>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/iterator/transform_iterator.h>
#include <thrust/iterator/zip_iterator.h>

#include <algorithm>
#include <iterator>
Expand Down Expand Up @@ -279,7 +281,8 @@ rmm::device_uvector<size_type> segmented_count_bits(bitmask_type const* bitmask,
OffsetIterator first_bit_indices_end,
OffsetIterator last_bit_indices_begin,
count_bits_policy count_bits,
rmm::cuda_stream_view stream)
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
auto const num_ranges =
static_cast<size_type>(std::distance(first_bit_indices_begin, first_bit_indices_end));
Expand Down Expand Up @@ -329,14 +332,15 @@ rmm::device_uvector<size_type> segmented_count_bits(bitmask_type const* bitmask,
// set bits from the length of the segment.
auto segments_begin =
thrust::make_zip_iterator(first_bit_indices_begin, last_bit_indices_begin);
auto segments_size = thrust::transform_iterator(segments_begin, [] __device__(auto segment) {
auto const begin = thrust::get<0>(segment);
auto const end = thrust::get<1>(segment);
return end - begin;
});
auto segment_length_iterator =
thrust::transform_iterator(segments_begin, [] __device__(auto const& segment) {
auto const begin = thrust::get<0>(segment);
auto const end = thrust::get<1>(segment);
return end - begin;
});
thrust::transform(rmm::exec_policy(stream),
segments_size,
segments_size + num_ranges,
segment_length_iterator,
segment_length_iterator + num_ranges,
d_bit_counts.data(),
d_bit_counts.data(),
[] __device__(auto segment_size, auto segment_bit_count) {
Expand Down Expand Up @@ -438,7 +442,8 @@ std::vector<size_type> segmented_count_bits(bitmask_type const* bitmask,
first_bit_indices_end,
last_bit_indices_begin,
count_bits,
stream);
stream,
rmm::mr::get_current_device_resource());

// Copy the results back to the host.
return make_std_vector_sync(d_bit_counts, stream);
Expand Down Expand Up @@ -501,6 +506,80 @@ std::vector<size_type> segmented_null_count(bitmask_type const* bitmask,
return detail::segmented_count_unset_bits(bitmask, indices_begin, indices_end, stream);
}

/**
* @brief Create an output null mask whose validity is determined by the
* validity of any/all elements of segments of an input null mask.
*
* @tparam OffsetIterator Random-access input iterator type.
* @param bitmask Null mask residing in device memory whose segments will be
* reduced into a new mask.
* @param first_bit_indices_begin Random-access input iterator to the beginning
* of a sequence of indices of the first bit in each segment (inclusive).
* @param first_bit_indices_end Random-access input iterator to the end of a
* sequence of indices of the first bit in each segment (inclusive).
* @param last_bit_indices_begin Random-access input iterator to the beginning
* of a sequence of indices of the last bit in each segment (exclusive).
* @param null_handling If `null_policy::INCLUDE`, all elements in a segment
* must be valid for the reduced value to be valid. If `null_policy::EXCLUDE`,
* the reduction is valid if any element in the segment is valid.
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned buffer's device memory.
* @return A pair containing the reduced null mask and number of nulls.
*/
template <typename OffsetIterator>
std::pair<rmm::device_buffer, size_type> segmented_null_mask_reduction(
bitmask_type const* bitmask,
OffsetIterator first_bit_indices_begin,
OffsetIterator first_bit_indices_end,
OffsetIterator last_bit_indices_begin,
null_policy null_handling,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
auto const segments_begin =
thrust::make_zip_iterator(first_bit_indices_begin, last_bit_indices_begin);
auto const segment_length_iterator =
thrust::make_transform_iterator(segments_begin, [] __device__(auto const& segment) {
auto const begin = thrust::get<0>(segment);
auto const end = thrust::get<1>(segment);
return end - begin;
});

auto const num_segments =
static_cast<size_type>(std::distance(first_bit_indices_begin, first_bit_indices_end));

if (bitmask == nullptr) {
return cudf::detail::valid_if(
segment_length_iterator,
segment_length_iterator + num_segments,
[] __device__(auto const& length) { return length > 0; },
stream,
mr);
}

auto const segment_valid_counts =
cudf::detail::segmented_count_bits(bitmask,
first_bit_indices_begin,
first_bit_indices_end,
last_bit_indices_begin,
cudf::detail::count_bits_policy::SET_BITS,
stream,
rmm::mr::get_current_device_resource());
auto const length_and_valid_count =
thrust::make_zip_iterator(segment_length_iterator, segment_valid_counts.begin());
return cudf::detail::valid_if(
length_and_valid_count,
length_and_valid_count + num_segments,
[null_handling] __device__(auto const& length_and_valid_count) {
auto const length = thrust::get<0>(length_and_valid_count);
auto const valid_count = thrust::get<1>(length_and_valid_count);
return (length > 0) and
((null_handling == null_policy::EXCLUDE) ? valid_count > 0 : valid_count == length);
},
stream,
mr);
}

} // namespace detail

} // namespace cudf
Loading

0 comments on commit 7ff1956

Please sign in to comment.