Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fea cleanup stream part1 #1653

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cpp/include/cugraph/dendrogram.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ class Dendrogram {
public:
void add_level(vertex_t first_index,
vertex_t num_verts,
cudaStream_t stream,
rmm::cuda_stream_view stream_view,
rmm::mr::device_memory_resource *mr = rmm::mr::get_current_device_resource())
{
level_ptr_.push_back(std::make_unique<rmm::device_uvector<vertex_t>>(num_verts, stream, mr));
level_ptr_.push_back(
std::make_unique<rmm::device_uvector<vertex_t>>(num_verts, stream_view, mr));
level_first_index_.push_back(first_index);
}

Expand Down
58 changes: 29 additions & 29 deletions cpp/include/cugraph/utilities/collect_comm.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ collect_values_for_keys(raft::comms::comms_t const &comm,
VertexIterator1 collect_key_first,
VertexIterator1 collect_key_last,
KeyToGPUIdOp key_to_gpu_id_op,
cudaStream_t stream)
rmm::cuda_stream_view stream_view)
{
using vertex_t = typename std::iterator_traits<VertexIterator0>::value_type;
static_assert(
Expand All @@ -66,7 +66,7 @@ collect_values_for_keys(raft::comms::comms_t const &comm,
// 1. build a cuco::static_map object for the map k, v pairs.

auto poly_alloc = rmm::mr::polymorphic_allocator<char>(rmm::mr::get_current_device_resource());
auto stream_adapter = rmm::mr::make_stream_allocator_adaptor(poly_alloc, cudaStream_t{nullptr});
auto stream_adapter = rmm::mr::make_stream_allocator_adaptor(poly_alloc, stream_view);
auto kv_map_ptr = std::make_unique<
cuco::static_map<vertex_t, value_t, cuda::thread_scope_device, decltype(stream_adapter)>>(
// cuco::static_map requires at least one empty slot
Expand All @@ -84,45 +84,45 @@ collect_values_for_keys(raft::comms::comms_t const &comm,
// 2. collect values for the unique keys in [collect_key_first, collect_key_last)

rmm::device_uvector<vertex_t> unique_keys(thrust::distance(collect_key_first, collect_key_last),
stream);
stream_view);
thrust::copy(
rmm::exec_policy(stream)->on(stream), collect_key_first, collect_key_last, unique_keys.begin());
thrust::sort(rmm::exec_policy(stream)->on(stream), unique_keys.begin(), unique_keys.end());
rmm::exec_policy(stream_view), collect_key_first, collect_key_last, unique_keys.begin());
thrust::sort(rmm::exec_policy(stream_view), unique_keys.begin(), unique_keys.end());
unique_keys.resize(
thrust::distance(
unique_keys.begin(),
thrust::unique(rmm::exec_policy(stream)->on(stream), unique_keys.begin(), unique_keys.end())),
stream);
thrust::unique(rmm::exec_policy(stream_view), unique_keys.begin(), unique_keys.end())),
stream_view);

rmm::device_uvector<value_t> values_for_unique_keys(0, stream);
rmm::device_uvector<value_t> values_for_unique_keys(0, stream_view);
{
rmm::device_uvector<vertex_t> rx_unique_keys(0, stream);
rmm::device_uvector<vertex_t> rx_unique_keys(0, stream_view);
std::vector<size_t> rx_value_counts{};
std::tie(rx_unique_keys, rx_value_counts) = groupby_gpuid_and_shuffle_values(
comm,
unique_keys.begin(),
unique_keys.end(),
[key_to_gpu_id_op] __device__(auto val) { return key_to_gpu_id_op(val); },
stream);
stream_view);

rmm::device_uvector<value_t> values_for_rx_unique_keys(rx_unique_keys.size(), stream);
rmm::device_uvector<value_t> values_for_rx_unique_keys(rx_unique_keys.size(), stream_view);

CUDA_TRY(cudaStreamSynchronize(stream)); // cuco::static_map currently does not take stream
stream_view.synchronize(); // cuco::static_map currently does not take stream

kv_map_ptr->find(
rx_unique_keys.begin(), rx_unique_keys.end(), values_for_rx_unique_keys.begin());

rmm::device_uvector<value_t> rx_values_for_unique_keys(0, stream);
rmm::device_uvector<value_t> rx_values_for_unique_keys(0, stream_view);
std::tie(rx_values_for_unique_keys, std::ignore) =
shuffle_values(comm, values_for_rx_unique_keys.begin(), rx_value_counts, stream);
shuffle_values(comm, values_for_rx_unique_keys.begin(), rx_value_counts, stream_view);

values_for_unique_keys = std::move(rx_values_for_unique_keys);
}

// 3. re-build a cuco::static_map object for the k, v pairs in unique_keys,
// values_for_unique_keys.

CUDA_TRY(cudaStreamSynchronize(stream)); // cuco::static_map currently does not take stream
stream_view.synchronize(); // cuco::static_map currently does not take stream

kv_map_ptr.reset();

Expand All @@ -143,7 +143,7 @@ collect_values_for_keys(raft::comms::comms_t const &comm,
// 4. find values for [collect_key_first, collect_key_last)

auto value_buffer = allocate_dataframe_buffer<value_t>(
thrust::distance(collect_key_first, collect_key_last), stream);
thrust::distance(collect_key_first, collect_key_last), stream_view);
kv_map_ptr->find(
collect_key_first, collect_key_last, get_dataframe_buffer_begin<value_t>(value_buffer));

Expand All @@ -165,7 +165,7 @@ collect_values_for_unique_keys(raft::comms::comms_t const &comm,
VertexIterator1 collect_unique_key_first,
VertexIterator1 collect_unique_key_last,
KeyToGPUIdOp key_to_gpu_id_op,
cudaStream_t stream)
rmm::cuda_stream_view stream_view)
{
using vertex_t = typename std::iterator_traits<VertexIterator0>::value_type;
static_assert(
Expand All @@ -181,7 +181,7 @@ collect_values_for_unique_keys(raft::comms::comms_t const &comm,
// 1. build a cuco::static_map object for the map k, v pairs.

auto poly_alloc = rmm::mr::polymorphic_allocator<char>(rmm::mr::get_current_device_resource());
auto stream_adapter = rmm::mr::make_stream_allocator_adaptor(poly_alloc, cudaStream_t{nullptr});
auto stream_adapter = rmm::mr::make_stream_allocator_adaptor(poly_alloc, stream_view);
auto kv_map_ptr = std::make_unique<
cuco::static_map<vertex_t, value_t, cuda::thread_scope_device, decltype(stream_adapter)>>(
// cuco::static_map requires at least one empty slot
Expand All @@ -199,41 +199,41 @@ collect_values_for_unique_keys(raft::comms::comms_t const &comm,
// 2. collect values for the unique keys in [collect_unique_key_first, collect_unique_key_last)

rmm::device_uvector<vertex_t> unique_keys(
thrust::distance(collect_unique_key_first, collect_unique_key_last), stream);
thrust::copy(rmm::exec_policy(stream)->on(stream),
thrust::distance(collect_unique_key_first, collect_unique_key_last), stream_view);
thrust::copy(rmm::exec_policy(stream_view),
collect_unique_key_first,
collect_unique_key_last,
unique_keys.begin());

rmm::device_uvector<value_t> values_for_unique_keys(0, stream);
rmm::device_uvector<value_t> values_for_unique_keys(0, stream_view);
{
rmm::device_uvector<vertex_t> rx_unique_keys(0, stream);
rmm::device_uvector<vertex_t> rx_unique_keys(0, stream_view);
std::vector<size_t> rx_value_counts{};
std::tie(rx_unique_keys, rx_value_counts) = groupby_gpuid_and_shuffle_values(
comm,
unique_keys.begin(),
unique_keys.end(),
[key_to_gpu_id_op] __device__(auto val) { return key_to_gpu_id_op(val); },
stream);
stream_view);

rmm::device_uvector<value_t> values_for_rx_unique_keys(rx_unique_keys.size(), stream);
rmm::device_uvector<value_t> values_for_rx_unique_keys(rx_unique_keys.size(), stream_view);

CUDA_TRY(cudaStreamSynchronize(stream)); // cuco::static_map currently does not take stream
stream_view.synchronize(); // cuco::static_map currently does not take stream

kv_map_ptr->find(
rx_unique_keys.begin(), rx_unique_keys.end(), values_for_rx_unique_keys.begin());

rmm::device_uvector<value_t> rx_values_for_unique_keys(0, stream);
rmm::device_uvector<value_t> rx_values_for_unique_keys(0, stream_view);
std::tie(rx_values_for_unique_keys, std::ignore) =
shuffle_values(comm, values_for_rx_unique_keys.begin(), rx_value_counts, stream);
shuffle_values(comm, values_for_rx_unique_keys.begin(), rx_value_counts, stream_view);

values_for_unique_keys = std::move(rx_values_for_unique_keys);
}

// 3. re-build a cuco::static_map object for the k, v pairs in unique_keys,
// values_for_unique_keys.

CUDA_TRY(cudaStreamSynchronize(stream)); // cuco::static_map currently does not take stream
stream_view.synchronize(); // cuco::static_map currently does not take stream

kv_map_ptr.reset();

Expand All @@ -254,7 +254,7 @@ collect_values_for_unique_keys(raft::comms::comms_t const &comm,
// 4. find values for [collect_unique_key_first, collect_unique_key_last)

auto value_buffer = allocate_dataframe_buffer<value_t>(
thrust::distance(collect_unique_key_first, collect_unique_key_last), stream);
thrust::distance(collect_unique_key_first, collect_unique_key_last), stream_view);
kv_map_ptr->find(collect_unique_key_first,
collect_unique_key_last,
get_dataframe_buffer_begin<value_t>(value_buffer));
Expand Down
54 changes: 30 additions & 24 deletions cpp/include/cugraph/utilities/dataframe_buffer.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <cugraph/utilities/thrust_tuple_utils.cuh>

#include <raft/handle.hpp>
#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_uvector.hpp>

#include <thrust/iterator/zip_iterator.h>
Expand All @@ -31,49 +32,50 @@ namespace experimental {
namespace detail {

template <typename TupleType, size_t I>
auto allocate_dataframe_buffer_tuple_element_impl(size_t buffer_size, cudaStream_t stream)
auto allocate_dataframe_buffer_tuple_element_impl(size_t buffer_size,
rmm::cuda_stream_view stream_view)
{
using element_t = typename thrust::tuple_element<I, TupleType>::type;
return rmm::device_uvector<element_t>(buffer_size, stream);
return rmm::device_uvector<element_t>(buffer_size, stream_view);
}

template <typename TupleType, size_t... Is>
auto allocate_dataframe_buffer_tuple_impl(std::index_sequence<Is...>,
size_t buffer_size,
cudaStream_t stream)
rmm::cuda_stream_view stream_view)
{
return std::make_tuple(
allocate_dataframe_buffer_tuple_element_impl<TupleType, Is>(buffer_size, stream)...);
allocate_dataframe_buffer_tuple_element_impl<TupleType, Is>(buffer_size, stream_view)...);
}

template <typename TupleType, typename BufferType, size_t I, size_t N>
struct resize_dataframe_buffer_tuple_iterator_element_impl {
void run(BufferType& buffer, size_t new_buffer_size, cudaStream_t stream)
void run(BufferType& buffer, size_t new_buffer_size, rmm::cuda_stream_view stream_view)
{
std::get<I>(buffer).resize(new_buffer_size, stream);
std::get<I>(buffer).resize(new_buffer_size, stream_view);
resize_dataframe_buffer_tuple_iterator_element_impl<TupleType, BufferType, I + 1, N>().run(
buffer, new_buffer_size, stream);
buffer, new_buffer_size, stream_view);
}
};

template <typename TupleType, typename BufferType, size_t I>
struct resize_dataframe_buffer_tuple_iterator_element_impl<TupleType, BufferType, I, I> {
void run(BufferType& buffer, size_t new_buffer_size, cudaStream_t stream) {}
void run(BufferType& buffer, size_t new_buffer_size, rmm::cuda_stream_view stream_view) {}
};

template <typename TupleType, typename BufferType, size_t I, size_t N>
struct shrink_to_fit_dataframe_buffer_tuple_iterator_element_impl {
void run(BufferType& buffer, cudaStream_t stream)
void run(BufferType& buffer, rmm::cuda_stream_view stream_view)
{
std::get<I>(buffer).shrink_to_fit(stream);
std::get<I>(buffer).shrink_to_fit(stream_view);
shrink_to_fit_dataframe_buffer_tuple_iterator_element_impl<TupleType, BufferType, I + 1, N>()
.run(buffer, stream);
.run(buffer, stream_view);
}
};

template <typename TupleType, typename BufferType, size_t I>
struct shrink_to_fit_dataframe_buffer_tuple_iterator_element_impl<TupleType, BufferType, I, I> {
void run(BufferType& buffer, cudaStream_t stream) {}
void run(BufferType& buffer, rmm::cuda_stream_view stream_view) {}
};

template <typename TupleType, size_t I, typename BufferType>
Expand Down Expand Up @@ -108,57 +110,61 @@ auto get_dataframe_buffer_end_tuple_impl(std::index_sequence<Is...>, BufferType&
} // namespace detail

template <typename T, typename std::enable_if_t<std::is_arithmetic<T>::value>* = nullptr>
auto allocate_dataframe_buffer(size_t buffer_size, cudaStream_t stream)
auto allocate_dataframe_buffer(size_t buffer_size, rmm::cuda_stream_view stream_view)
{
return rmm::device_uvector<T>(buffer_size, stream);
return rmm::device_uvector<T>(buffer_size, stream_view);
}

template <typename T, typename std::enable_if_t<is_thrust_tuple_of_arithmetic<T>::value>* = nullptr>
auto allocate_dataframe_buffer(size_t buffer_size, cudaStream_t stream)
auto allocate_dataframe_buffer(size_t buffer_size, rmm::cuda_stream_view stream_view)
{
size_t constexpr tuple_size = thrust::tuple_size<T>::value;
return detail::allocate_dataframe_buffer_tuple_impl<T>(
std::make_index_sequence<tuple_size>(), buffer_size, stream);
std::make_index_sequence<tuple_size>(), buffer_size, stream_view);
}

template <typename T,
typename BufferType,
typename std::enable_if_t<std::is_arithmetic<T>::value>* = nullptr>
void resize_dataframe_buffer(BufferType& buffer, size_t new_buffer_size, cudaStream_t stream)
void resize_dataframe_buffer(BufferType& buffer,
size_t new_buffer_size,
rmm::cuda_stream_view stream_view)
{
buffer.resize(new_buffer_size, stream);
buffer.resize(new_buffer_size, stream_view);
}

template <typename T,
typename BufferType,
typename std::enable_if_t<is_thrust_tuple_of_arithmetic<T>::value>* = nullptr>
void resize_dataframe_buffer(BufferType& buffer, size_t new_buffer_size, cudaStream_t stream)
void resize_dataframe_buffer(BufferType& buffer,
size_t new_buffer_size,
rmm::cuda_stream_view stream_view)
{
size_t constexpr tuple_size = thrust::tuple_size<T>::value;
detail::
resize_dataframe_buffer_tuple_iterator_element_impl<T, BufferType, size_t{0}, tuple_size>()
.run(buffer, new_buffer_size, stream);
.run(buffer, new_buffer_size, stream_view);
}

template <typename T,
typename BufferType,
typename std::enable_if_t<std::is_arithmetic<T>::value>* = nullptr>
void shrink_to_fit_dataframe_buffer(BufferType& buffer, cudaStream_t stream)
void shrink_to_fit_dataframe_buffer(BufferType& buffer, rmm::cuda_stream_view stream_view)
{
buffer.shrink_to_fit(stream);
buffer.shrink_to_fit(stream_view);
}

template <typename T,
typename BufferType,
typename std::enable_if_t<is_thrust_tuple_of_arithmetic<T>::value>* = nullptr>
void shrink_to_fit_dataframe_buffer(BufferType& buffer, cudaStream_t stream)
void shrink_to_fit_dataframe_buffer(BufferType& buffer, rmm::cuda_stream_view stream_view)
{
size_t constexpr tuple_size = thrust::tuple_size<T>::value;
detail::shrink_to_fit_dataframe_buffer_tuple_iterator_element_impl<T,
BufferType,
size_t{0},
tuple_size>()
.run(buffer, stream);
.run(buffer, stream_view);
}

template <typename T,
Expand Down
Loading