Skip to content

Commit

Permalink
MS BFS python APIs + EgoNet updates (#1469)
Browse files Browse the repository at this point in the history
There are various things in this PR.

Multi-Seed (MS) BFS: 
- API tentative 
- Saving research on memory feasability helper function (not in production)
- Saving research on running the current BFS concurrently with streams and threads for analysis perf comparison (not in production)

EgoNet:
- Multithreading in EgoNet which deserializes execution and comes with mild performance improvements on large sizes
- Some cleanup

Authors:
  - Alex Fender (@afender)

Approvers:
  - Chuck Hastings (@ChuckHastings)
  - @Iroy30
  - Brad Rees (@BradReesWork)

URL: #1469
  • Loading branch information
afender authored Mar 26, 2021
1 parent b85bd47 commit 1f0f14e
Show file tree
Hide file tree
Showing 10 changed files with 868 additions and 223 deletions.
19 changes: 11 additions & 8 deletions cpp/src/community/egonet.cu
Original file line number Diff line number Diff line change
Expand Up @@ -79,23 +79,26 @@ extract(

// Streams will allocate concurrently later
std::vector<rmm::device_uvector<vertex_t>> reached{};
reached.reserve(handle.get_num_internal_streams());
reached.reserve(n_subgraphs);
for (vertex_t i = 0; i < n_subgraphs; i++) {
// Allocations and operations are attached to the worker stream
rmm::device_uvector<vertex_t> local_reach(v, handle.get_internal_stream_view(i));
reached.push_back(std::move(local_reach));
}

// h_source_vertex[i] is used by other streams in the for loop
user_stream_view.synchronize();
#ifdef TIMING
HighResTimer hr_timer;
hr_timer.start("ego_neighbors");
#endif

#pragma omp parallel for
for (vertex_t i = 0; i < n_subgraphs; i++) {
// get light handle from worker pool
raft::handle_t light_handle(handle, i);
auto worker_stream_view = light_handle.get_stream_view();

// Allocations and operations are attached to the worker stream
rmm::device_uvector<vertex_t> local_reach(v, worker_stream_view);
reached.push_back(std::move(local_reach));

// BFS with cutoff
// consider adding a device API to BFS (ie. accept source on the device)
rmm::device_uvector<vertex_t> predecessors(v, worker_stream_view); // not used
Expand Down Expand Up @@ -149,10 +152,10 @@ extract(
neighbors.resize(h_neighbors_offsets[n_subgraphs]);
user_stream_view.synchronize();

// Construct the neighboors list concurrently
// Construct the neighboors list concurrently
#pragma omp parallel for
for (vertex_t i = 0; i < n_subgraphs; i++) {
raft::handle_t light_handle(handle, i);
auto worker_stream_view = light_handle.get_stream_view();
auto worker_stream_view = handle.get_internal_stream_view(i);
thrust::copy(rmm::exec_policy(worker_stream_view),
reached[i].begin(),
reached[i].end(),
Expand Down
16 changes: 16 additions & 0 deletions cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,13 @@ set(MST_TEST_SRC

ConfigureTest(MST_TEST "${MST_TEST_SRC}")

###################################################################################################
# - Experimental stream tests -----------------------------------------------------

set(EXPERIMENTAL_STREAM_SRCS
"${CMAKE_CURRENT_SOURCE_DIR}/experimental/streams.cu")

ConfigureTest(EXPERIMENTAL_STREAM "${EXPERIMENTAL_STREAM_SRCS}" "")

###################################################################################################
# - Experimental R-mat graph generation tests -----------------------------------------------------
Expand Down Expand Up @@ -375,6 +382,15 @@ set(EXPERIMENTAL_BFS_TEST_SRCS

ConfigureTest(EXPERIMENTAL_BFS_TEST "${EXPERIMENTAL_BFS_TEST_SRCS}")

###################################################################################################
# - Experimental BFS tests ------------------------------------------------------------------------

set(EXPERIMENTAL_MSBFS_TEST_SRCS
"${CMAKE_CURRENT_SOURCE_DIR}/experimental/ms_bfs_test.cpp")

ConfigureTest(EXPERIMENTAL_MSBFS_TEST "${EXPERIMENTAL_MSBFS_TEST_SRCS}")


###################################################################################################
# - Experimental SSSP tests -----------------------------------------------------------------------

Expand Down
283 changes: 137 additions & 146 deletions cpp/tests/community/egonet_test.cu

Large diffs are not rendered by default.

301 changes: 301 additions & 0 deletions cpp/tests/experimental/ms_bfs_test.cpp

Large diffs are not rendered by default.

44 changes: 44 additions & 0 deletions cpp/tests/experimental/streams.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governin_from_mtxg permissions and
* limitations under the License.
*/

#include <raft/cudart_utils.h>
#include <thrust/transform.h>
#include <raft/handle.hpp>
#include <rmm/device_uvector.hpp>
#include <rmm/exec_policy.hpp>
#include <rmm/mr/device/cuda_memory_resource.hpp>
#include "gtest/gtest.h"
struct StreamTest : public ::testing::Test {
};
TEST_F(StreamTest, basic_test)
{
int n_streams = 4;
raft::handle_t handle(n_streams);

const size_t intput_size = 4096;

#pragma omp parallel for
for (int i = 0; i < n_streams; i++) {
rmm::device_uvector<int> u(intput_size, handle.get_internal_stream_view(i)),
v(intput_size, handle.get_internal_stream_view(i));
thrust::transform(rmm::exec_policy(handle.get_internal_stream_view(i)),
u.begin(),
u.end(),
v.begin(),
v.begin(),
2 * thrust::placeholders::_1 + thrust::placeholders::_2);
}
}
4 changes: 3 additions & 1 deletion python/cugraph/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@
shortest_path,
filter_unreachable,
shortest_path_length,
traveling_salesperson
traveling_salesperson,
concurrent_bfs,
multi_source_bfs,
)

from cugraph.tree import minimum_spanning_tree, maximum_spanning_tree
Expand Down
32 changes: 4 additions & 28 deletions python/cugraph/tests/test_egonet.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,29 +58,6 @@ def test_ego_graph_nx(graph_file, seed, radius):
@pytest.mark.parametrize("seeds", [[0, 5, 13]])
@pytest.mark.parametrize("radius", [1, 2, 3])
def test_batched_ego_graphs(graph_file, seeds, radius):
"""
Compute the induced subgraph of neighbors for each node in seeds
within a given radius.
Parameters
----------
G : cugraph.Graph, networkx.Graph, CuPy or SciPy sparse matrix
Graph or matrix object, which should contain the connectivity
information. Edge weights, if present, should be single or double
precision floating point values.
seeds : cudf.Series
Specifies the seeds of the induced egonet subgraphs
radius: integer, optional
Include all neighbors of distance<=radius from n.
Returns
-------
ego_edge_lists : cudf.DataFrame
GPU data frame containing all induced sources identifiers,
destination identifiers, edge weights
seeds_offsets: cudf.Series
Series containing the starting offset in the returned edge list
for each seed.
"""
gc.collect()

# Nx
Expand All @@ -93,9 +70,8 @@ def test_batched_ego_graphs(graph_file, seeds, radius):
df, offsets = cugraph.batched_ego_graphs(Gnx, seeds, radius=radius)
for i in range(len(seeds)):
ego_nx = nx.ego_graph(Gnx, seeds[i], radius=radius)
ego_df = df[offsets[i]:offsets[i+1]]
ego_cugraph = nx.from_pandas_edgelist(ego_df,
source="src",
target="dst",
edge_attr="weight")
ego_df = df[offsets[i]:offsets[i + 1]]
ego_cugraph = nx.from_pandas_edgelist(
ego_df, source="src", target="dst", edge_attr="weight"
)
assert nx.is_isomorphic(ego_nx, ego_cugraph)
4 changes: 3 additions & 1 deletion python/cugraph/traversal/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
sssp,
shortest_path,
filter_unreachable,
shortest_path_length
shortest_path_length,
)
from cugraph.traversal.traveling_salesperson import traveling_salesperson

from cugraph.traversal.ms_bfs import concurrent_bfs, multi_source_bfs
Loading

0 comments on commit 1f0f14e

Please sign in to comment.