Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update vertex_frontier_t to take unsorted (tagged-)vertex list with possible duplicates #2584

Merged
merged 7 commits into from
Aug 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions cpp/src/components/weakly_connected_components_impl.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -466,8 +466,8 @@ void weakly_connected_components_impl(raft::handle_t const& handle,
// 2-3. initialize vertex frontier, edge_buffer, and edge_dst_components (if
// multi-gpu)

vertex_frontier_t<vertex_t, vertex_t, GraphViewType::is_multi_gpu> vertex_frontier(handle,
num_buckets);
vertex_frontier_t<vertex_t, vertex_t, GraphViewType::is_multi_gpu, true> vertex_frontier(
handle, num_buckets);
vertex_t next_candidate_offset{0};
edge_t edge_count{0};

Expand Down Expand Up @@ -533,7 +533,7 @@ void weakly_connected_components_impl(raft::handle_t const& handle,

auto max_pushes = GraphViewType::is_multi_gpu
? compute_num_out_nbrs_from_frontier(
handle, level_graph_view, vertex_frontier, bucket_idx_cur)
handle, level_graph_view, vertex_frontier.bucket(bucket_idx_cur))
: edge_count;

// FIXME: if we use cuco::static_map (no duplicates, ideally we need static_set), edge_buffer
Expand All @@ -545,8 +545,7 @@ void weakly_connected_components_impl(raft::handle_t const& handle,
auto new_frontier_tagged_vertex_buffer = transform_reduce_v_frontier_outgoing_e_by_dst(
handle,
level_graph_view,
vertex_frontier,
bucket_idx_cur,
vertex_frontier.bucket(bucket_idx_cur),
edge_src_dummy_property_t{}.view(),
edge_dst_dummy_property_t{}.view(),
[col_components =
Expand Down
5 changes: 2 additions & 3 deletions cpp/src/cores/core_number_impl.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ void core_number(raft::handle_t const& handle,
constexpr size_t bucket_idx_next = 1;
constexpr size_t num_buckets = 2;

vertex_frontier_t<vertex_t, void, multi_gpu> vertex_frontier(handle, num_buckets);
vertex_frontier_t<vertex_t, void, multi_gpu, true> vertex_frontier(handle, num_buckets);

edge_dst_property_t<graph_view_t<vertex_t, edge_t, weight_t, false, multi_gpu>, edge_t>
dst_core_numbers(handle, graph_view);
Expand Down Expand Up @@ -213,8 +213,7 @@ void core_number(raft::handle_t const& handle,
transform_reduce_v_frontier_outgoing_e_by_dst(
handle,
graph_view,
vertex_frontier,
bucket_idx_cur,
vertex_frontier.bucket(bucket_idx_cur),
edge_src_dummy_property_t{}.view(),
dst_core_numbers.view(),
[k, delta] __device__(vertex_t src, vertex_t dst, auto, auto dst_val) {
Expand Down
60 changes: 28 additions & 32 deletions cpp/src/prims/transform_reduce_v_frontier_outgoing_e_by_dst.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -792,38 +792,35 @@ auto sort_and_reduce_buffer_elements(

} // namespace detail

template <typename GraphViewType, typename VertexFrontierType>
template <typename GraphViewType, typename VertexFrontierBucketType>
typename GraphViewType::edge_type compute_num_out_nbrs_from_frontier(
raft::handle_t const& handle,
GraphViewType const& graph_view,
VertexFrontierType const& frontier,
size_t cur_frontier_bucket_idx)
VertexFrontierBucketType const& frontier)
{
static_assert(!GraphViewType::is_storage_transposed,
"GraphViewType should support the push model.");

using vertex_t = typename GraphViewType::vertex_type;
using edge_t = typename GraphViewType::edge_type;
using weight_t = typename GraphViewType::weight_type;
using key_t = typename VertexFrontierType::key_type;
using key_t = typename VertexFrontierBucketType::key_type;

edge_t ret{0};

auto const& cur_frontier_bucket = frontier.bucket(cur_frontier_bucket_idx);
vertex_t const* local_frontier_vertex_first{nullptr};
if constexpr (std::is_same_v<key_t, vertex_t>) {
local_frontier_vertex_first = cur_frontier_bucket.begin();
local_frontier_vertex_first = frontier.begin();
} else {
local_frontier_vertex_first = thrust::get<0>(cur_frontier_bucket.begin().get_iterator_tuple());
local_frontier_vertex_first = thrust::get<0>(frontier.begin().get_iterator_tuple());
}

std::vector<size_t> local_frontier_sizes{};
if constexpr (GraphViewType::is_multi_gpu) {
auto& col_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().col_name());
local_frontier_sizes =
host_scalar_allgather(col_comm, cur_frontier_bucket.size(), handle.get_stream());
auto& col_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().col_name());
local_frontier_sizes = host_scalar_allgather(col_comm, frontier.size(), handle.get_stream());
} else {
local_frontier_sizes = std::vector<size_t>{static_cast<size_t>(cur_frontier_bucket.size())};
local_frontier_sizes = std::vector<size_t>{static_cast<size_t>(frontier.size())};
}
for (size_t i = 0; i < graph_view.number_of_local_edge_partitions(); ++i) {
auto edge_partition =
Expand Down Expand Up @@ -882,7 +879,7 @@ typename GraphViewType::edge_type compute_num_out_nbrs_from_frontier(
ret += thrust::transform_reduce(
handle.get_thrust_policy(),
local_frontier_vertex_first,
local_frontier_vertex_first + cur_frontier_bucket.size(),
local_frontier_vertex_first + frontier.size(),
[edge_partition] __device__(auto major) {
auto major_offset = edge_partition.major_offset_from_major_nocheck(major);
return edge_partition.local_degree(major_offset);
Expand All @@ -900,23 +897,20 @@ typename GraphViewType::edge_type compute_num_out_nbrs_from_frontier(
* outputs by (tagged-)destination ID.
*
* Edge functor outputs are thrust::optional objects and invalid if thrust::nullopt. Vertices are
* assumed to be tagged if VertexFrontierType::key_type is a tuple of a vertex type and a tag type
* (VertexFrontierType::key_type is identical to a vertex type otherwise).
* assumed to be tagged if VertexFrontierBucketType::key_type is a tuple of a vertex type and a tag
* type (VertexFrontierBucketType::key_type is identical to a vertex type otherwise).
*
* @tparam GraphViewType Type of the passed non-owning graph object.
* @tparam VertexFrontierType Type of the vertex frontier class which abstracts vertex frontier
* managements.
* @tparam VertexFrontierBucketType Type of the vertex frontier bucket class which abstracts the
* current (tagged-)vertex frontier.
* @tparam EdgeSrcValueInputWrapper Type of the wrapper for edge source property values.
* @tparam EdgeDstValueInputWrapper Type of the wrapper for edge destination property values.
* @tparam EdgeOp Type of the quaternary (or quinary) edge operator.
* @tparam ReduceOp Type of the binary reduction operator.
* @param handle RAFT handle object to encapsulate resources (e.g. CUDA stream, communicator, and
* handles to various CUDA libraries) to run graph algorithms.
* @param graph_view Non-owning graph object.
* @param frontier VertexFrontierType class object for vertex frontier managements. This object
* includes multiple bucket objects.
* @param cur_frontier_bucket_idx Index of the vertex frontier bucket holding vertices for the
* current iteration.
* @param frontier VertexFrontierBucketType class object for the current vertex frontier.
* @param edge_src_value_input Wrapper used to access source input property values (for the edge
* sources assigned to this process in multi-GPU). Use either cugraph::edge_src_property_t::view()
* (if @p e_op needs to access source property values) or cugraph::edge_src_dummy_property_t::view()
Expand Down Expand Up @@ -945,23 +939,22 @@ typename GraphViewType::edge_type compute_num_out_nbrs_from_frontier(
* values (if ReduceOp::value_type is void).
*/
template <typename GraphViewType,
typename VertexFrontierType,
typename VertexFrontierBucketType,
typename EdgeSrcValueInputWrapper,
typename EdgeDstValueInputWrapper,
typename EdgeOp,
typename ReduceOp>
std::conditional_t<
!std::is_same_v<typename ReduceOp::value_type, void>,
std::tuple<decltype(allocate_dataframe_buffer<typename VertexFrontierType::key_type>(
std::tuple<decltype(allocate_dataframe_buffer<typename VertexFrontierBucketType::key_type>(
0, rmm::cuda_stream_view{})),
decltype(detail::allocate_optional_payload_buffer<typename ReduceOp::value_type>(
0, rmm::cuda_stream_view{}))>,
decltype(
allocate_dataframe_buffer<typename VertexFrontierType::key_type>(0, rmm::cuda_stream_view{}))>
decltype(allocate_dataframe_buffer<typename VertexFrontierBucketType::key_type>(
0, rmm::cuda_stream_view{}))>
transform_reduce_v_frontier_outgoing_e_by_dst(raft::handle_t const& handle,
GraphViewType const& graph_view,
VertexFrontierType const& frontier,
size_t cur_frontier_bucket_idx,
VertexFrontierBucketType const& frontier,
EdgeSrcValueInputWrapper edge_src_value_input,
EdgeDstValueInputWrapper edge_dst_value_input,
EdgeOp e_op,
Expand All @@ -974,7 +967,7 @@ transform_reduce_v_frontier_outgoing_e_by_dst(raft::handle_t const& handle,
using vertex_t = typename GraphViewType::vertex_type;
using edge_t = typename GraphViewType::edge_type;
using weight_t = typename GraphViewType::weight_type;
using key_t = typename VertexFrontierType::key_type;
using key_t = typename VertexFrontierBucketType::key_type;
using payload_t = typename ReduceOp::value_type;

using edge_partition_src_input_device_view_t = std::conditional_t<
Expand All @@ -998,15 +991,12 @@ transform_reduce_v_frontier_outgoing_e_by_dst(raft::handle_t const& handle,
vertex_t,
typename EdgeDstValueInputWrapper::value_iterator>>>;

CUGRAPH_EXPECTS(cur_frontier_bucket_idx < frontier.num_buckets(),
"Invalid input argument: invalid current bucket index.");

if (do_expensive_check) {
// currently, nothing to do
}

auto frontier_key_first = frontier.bucket(cur_frontier_bucket_idx).begin();
auto frontier_key_last = frontier.bucket(cur_frontier_bucket_idx).end();
auto frontier_key_first = frontier.begin();
auto frontier_key_last = frontier.end();

// 1. fill the buffer

Expand Down Expand Up @@ -1124,6 +1114,12 @@ transform_reduce_v_frontier_outgoing_e_by_dst(raft::handle_t const& handle,
}

if (segment_offsets) {
if constexpr (!VertexFrontierBucketType::is_sorted_unique) {
thrust::sort(handle.get_thrust_policy(),
edge_partition_frontier_src_first,
edge_partition_frontier_src_last);
}

static_assert(detail::num_sparse_segments_per_vertex_partition == 3);
std::vector<vertex_t> h_thresholds(detail::num_sparse_segments_per_vertex_partition +
(graph_view.use_dcs() ? 1 : 0) - 1);
Expand Down
12 changes: 6 additions & 6 deletions cpp/src/prims/update_v_frontier.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,9 @@ void update_v_frontier(raft::handle_t const& handle,
VertexFrontierType& frontier,
std::vector<size_t> const& next_frontier_bucket_indices,
VertexValueInputIterator vertex_value_input_first,
// FIXME: currently, it is undefined behavior if vertices in the frontier are
// tagged and the same vertex property is updated by multiple v_op
// invocations with the same vertex but with different tags.
// FIXME: currently, it is undefined behavior if there are more than one @p
// key_buffer elements with the same vertex ID and the same vertex property
// value is updated by multiple @p v_op invocations with the same vertex ID.
VertexValueOutputIterator vertex_value_output_first,
VertexOp v_op,
bool do_expensive_check = false)
Expand Down Expand Up @@ -313,9 +313,9 @@ void update_v_frontier(raft::handle_t const& handle,
VertexFrontierType& frontier,
std::vector<size_t> const& next_frontier_bucket_indices,
VertexValueInputIterator vertex_value_input_first,
// FIXME: currently, it is undefined behavior if vertices in the frontier are
// tagged and the same vertex property is updated by multiple v_op
// invocations with the same vertex but with different tags.
// FIXME: currently, it is undefined behavior if there are more than one @p
// key_buffer elements with the same vertex ID and the same vertex property
// value is updated by multiple @p v_op invocations with the same vertex ID.
VertexValueOutputIterator vertex_value_output_first,
VertexOp v_op,
bool do_expensive_check = false)
Expand Down
Loading