diff --git a/.gitignore b/.gitignore index 485b71cac68..215dfb2922a 100644 --- a/.gitignore +++ b/.gitignore @@ -87,3 +87,4 @@ python/cugraph/cugraph/tests/dask-worker-space # Sphinx docs & build artifacts docs/cugraph/source/api_docs/api/* + diff --git a/python/cugraph/cugraph/dask/traversal/bfs.py b/python/cugraph/cugraph/dask/traversal/bfs.py index 644394df7d0..c7b847b4457 100644 --- a/python/cugraph/cugraph/dask/traversal/bfs.py +++ b/python/cugraph/cugraph/dask/traversal/bfs.py @@ -13,45 +13,75 @@ # limitations under the License. # -from collections.abc import Iterable +from pylibcugraph.experimental import (MGGraph, + ResourceHandle, + GraphProperties, + ) +from pylibcugraph import bfs as pylibcugraph_bfs from dask.distributed import wait, default_client -from cugraph.dask.common.input_utils import (get_distributed_data, - get_vertex_partition_offsets) -from cugraph.dask.traversal import mg_bfs_wrapper as mg_bfs +from cugraph.dask.common.input_utils import get_distributed_data import cugraph.dask.comms.comms as Comms import cudf import dask_cudf -def call_bfs(sID, - data, - src_col_name, - dst_col_name, - num_verts, - num_edges, - vertex_partition_offsets, - aggregate_segment_offsets, - start, - depth_limit, - return_distances): - wid = Comms.get_worker_id(sID) - handle = Comms.get_handle(sID) - local_size = len(aggregate_segment_offsets) // Comms.get_n_workers(sID) - segment_offsets = \ - aggregate_segment_offsets[local_size * wid: local_size * (wid + 1)] - return mg_bfs.mg_bfs(data[0], - src_col_name, - dst_col_name, - num_verts, - num_edges, - vertex_partition_offsets, - wid, - handle, - segment_offsets, - start, - depth_limit, - return_distances) +def _call_plc_mg_bfs( + sID, + data, + sources, + depth_limit, + src_col_name, + dst_col_name, + graph_properties, + num_edges, + direction_optimizing=False, + do_expensive_check=False, + return_predecessors=True): + comms_handle = Comms.get_handle(sID) + resource_handle = ResourceHandle(comms_handle.getHandle()) + + srcs = cudf.Series(data[0][src_col_name], dtype='int32') + dsts = cudf.Series(data[0][dst_col_name], dtype='int32') + weights = cudf.Series(data[0]['value'], dtype='float32') \ + if 'value' in data[0].columns \ + else cudf.Series((srcs + 1) / (srcs + 1), dtype='float32') + + mg = MGGraph( + resource_handle=resource_handle, + graph_properties=graph_properties, + src_array=srcs, + dst_array=dsts, + weight_array=weights, + store_transposed=False, + num_edges=num_edges, + do_expensive_check=do_expensive_check + ) + + res = \ + pylibcugraph_bfs( + resource_handle, + mg, + cudf.Series(sources, dtype='int32'), + direction_optimizing, + depth_limit if depth_limit is not None else 0, + return_predecessors, + True + ) + + return res + + +def convert_to_cudf(cp_arrays): + """ + create a cudf DataFrame from cupy arrays + """ + cupy_distances, cupy_predecessors, cupy_vertices = cp_arrays + df = cudf.DataFrame() + df["vertex"] = cupy_vertices + df["distance"] = cupy_distances + df["predecessor"] = cupy_predecessors + return df def bfs(input_graph, @@ -115,95 +145,81 @@ def bfs(input_graph, input_graph.compute_renumber_edge_list(transposed=False) ddf = input_graph.edgelist.edgelist_df - vertex_partition_offsets = get_vertex_partition_offsets(input_graph) - num_verts = vertex_partition_offsets.iloc[-1] + + graph_properties = GraphProperties( + is_multigraph=False) num_edges = len(ddf) data = get_distributed_data(ddf) + src_col_name = input_graph.renumber_map.renumbered_src_col_name + dst_col_name = input_graph.renumber_map.renumbered_dst_col_name + renumber_ddf = input_graph.renumber_map.implementation.ddf + col_names = input_graph.renumber_map.implementation.col_names + + if isinstance(start, dask_cudf.DataFrame) \ + or isinstance(start, cudf.DataFrame): + tmp_df = start + tmp_col_names = start.columns + else: + tmp_df = cudf.DataFrame() + tmp_df["0"] = cudf.Series(start) + tmp_col_names = ["0"] + + original_start_len = len(tmp_df) + + tmp_ddf = tmp_df[tmp_col_names].rename( + columns=dict(zip(tmp_col_names, col_names))) + for name in col_names: + tmp_ddf[name] = tmp_ddf[name].astype(renumber_ddf[name].dtype) + renumber_data = get_distributed_data(renumber_ddf) + def df_merge(df, tmp_df, tmp_col_names): x = df[0].merge(tmp_df, on=tmp_col_names, how='inner') return x['global_id'] - if input_graph.renumbered: - src_col_name = input_graph.renumber_map.renumbered_src_col_name - dst_col_name = input_graph.renumber_map.renumbered_dst_col_name - renumber_ddf = input_graph.renumber_map.implementation.ddf - col_names = input_graph.renumber_map.implementation.col_names - if isinstance(start, - dask_cudf.DataFrame) or isinstance(start, - cudf.DataFrame): - tmp_df = start - tmp_col_names = start.columns - else: - tmp_df = cudf.DataFrame() - tmp_df["0"] = cudf.Series(start) - tmp_col_names = ["0"] - - original_start_len = len(tmp_df) - - tmp_ddf = tmp_df[tmp_col_names].rename( - columns=dict(zip(tmp_col_names, col_names))) - for name in col_names: - tmp_ddf[name] = tmp_ddf[name].astype(renumber_ddf[name].dtype) - renumber_data = get_distributed_data(renumber_ddf) - start = [client.submit(df_merge, - wf[1], - tmp_ddf, - col_names, - workers=[wf[0]]) - for idx, wf in enumerate(renumber_data.worker_to_parts.items() - ) - ] - - def count_src(df): - return len(df) - - count_src_results = client.map(count_src, start) - cg = client.gather(count_src_results) - if sum(cg) < original_start_len: - raise ValueError('At least one start vertex provided was invalid') - - else: - # If the input graph was created with renumbering disabled (Graph(..., - # renumber=False), the above compute_renumber_edge_list() call will not - # perform a renumber step and the renumber_map will not have src/dst - # col names. In that case, the src/dst values specified when reading - # the edgelist dataframe are to be used, but only if they were single - # string values (ie. not a list representing multi-columns). - if isinstance(input_graph.source_columns, Iterable): - raise RuntimeError("input_graph was not renumbered but has a " - "non-string source column name (got: " - f"{input_graph.source_columns}). Re-create " - "input_graph with either renumbering enabled " - "or a source column specified as a string.") - if isinstance(input_graph.destination_columns, Iterable): - raise RuntimeError("input_graph was not renumbered but has a " - "non-string destination column name (got: " - f"{input_graph.destination_columns}). " - "Re-create input_graph with either renumbering " - "enabled or a destination column specified as " - "a string.") - src_col_name = input_graph.source_columns - dst_col_name = input_graph.destination_columns - - result = [client.submit( - call_bfs, + start = [ + client.submit( + df_merge, + wf[1], + tmp_ddf, + col_names, + workers=[wf[0]] + ) + for idx, wf in enumerate(renumber_data.worker_to_parts.items()) + ] + + def count_src(df): + return len(df) + + count_src_results = client.map(count_src, start) + cg = client.gather(count_src_results) + if sum(cg) < original_start_len: + raise ValueError('At least one start vertex provided was invalid') + + cupy_result = [client.submit( + _call_plc_mg_bfs, Comms.get_session_id(), wf[1], + start[idx], + depth_limit, src_col_name, dst_col_name, - num_verts, + graph_properties, num_edges, - vertex_partition_offsets, - input_graph.aggregate_segment_offsets, - start[idx], - depth_limit, + False, + True, return_distances, workers=[wf[0]]) for idx, wf in enumerate(data.worker_to_parts.items())] - wait(result) - ddf = dask_cudf.from_delayed(result) + wait(cupy_result) + + cudf_result = [client.submit(convert_to_cudf, + cp_arrays) + for cp_arrays in cupy_result] + wait(cudf_result) + + ddf = dask_cudf.from_delayed(cudf_result) if input_graph.renumbered: ddf = input_graph.unrenumber(ddf, 'vertex') diff --git a/python/cugraph/cugraph/dask/traversal/mg_bfs.pxd b/python/cugraph/cugraph/dask/traversal/mg_bfs.pxd deleted file mode 100644 index 4434ae104ef..00000000000 --- a/python/cugraph/cugraph/dask/traversal/mg_bfs.pxd +++ /dev/null @@ -1,35 +0,0 @@ -# -# Copyright (c) 2020-2021, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -from cugraph.structure.graph_utilities cimport * -from libcpp cimport bool - -cdef extern from "limits.h": - cdef int INT_MAX - cdef long LONG_MAX - -cdef extern from "cugraph/utilities/cython.hpp" namespace "cugraph::cython": - - cdef void call_bfs[vertex_t, weight_t]( - const handle_t &handle, - const graph_container_t &g, - vertex_t *identifiers, - vertex_t *distances, - vertex_t *predecessors, - vertex_t depth_limit, - const vertex_t *start_vertex, - size_t n_sources, - bool direction_optimizing) except + diff --git a/python/cugraph/cugraph/dask/traversal/mg_bfs_wrapper.pyx b/python/cugraph/cugraph/dask/traversal/mg_bfs_wrapper.pyx deleted file mode 100644 index 87f0eab1008..00000000000 --- a/python/cugraph/cugraph/dask/traversal/mg_bfs_wrapper.pyx +++ /dev/null @@ -1,142 +0,0 @@ -# -# Copyright (c) 2020-2022, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -from cugraph.structure.utils_wrapper import * -from cugraph.dask.traversal cimport mg_bfs as c_bfs -import cudf -from cugraph.structure.graph_utilities cimport * -import cugraph.structure.graph_primtypes_wrapper as graph_primtypes_wrapper -from libc.stdint cimport uintptr_t - -def mg_bfs(input_df, - src_col_name, - dst_col_name, - num_global_verts, - num_global_edges, - vertex_partition_offsets, - rank, - handle, - segment_offsets, - start, - depth_limit, - return_distances=False): - """ - Call BFS - """ - cdef size_t handle_size_t = handle.getHandle() - handle_ = handle_size_t - - # Local COO information - src = input_df[src_col_name] - dst = input_df[dst_col_name] - vertex_t = src.dtype - if num_global_edges > (2**31 - 1): - edge_t = np.dtype("int64") - else: - edge_t = vertex_t - if "value" in input_df.columns: - weights = input_df['value'] - weight_t = weights.dtype - else: - weight_t = np.dtype("float32") - - - # FIXME: Offsets and indices are currently hardcoded to int, but this may - # not be acceptable in the future. - numberTypeMap = {np.dtype("int32") : numberTypeEnum.int32Type, - np.dtype("int64") : numberTypeEnum.int64Type, - np.dtype("float32") : numberTypeEnum.floatType, - np.dtype("double") : numberTypeEnum.doubleType} - - # FIXME: needs to be edge_t type not int - cdef int num_local_edges = len(src) - - cdef uintptr_t c_src_vertices = src.__cuda_array_interface__['data'][0] - cdef uintptr_t c_dst_vertices = dst.__cuda_array_interface__['data'][0] - cdef uintptr_t c_edge_weights = NULL - - # FIXME: data is on device, move to host (to_pandas()), convert to np array and access pointer to pass to C - vertex_partition_offsets_host = vertex_partition_offsets.values_host - cdef uintptr_t c_vertex_partition_offsets = vertex_partition_offsets_host.__array_interface__['data'][0] - - cdef vector[int] v_segment_offsets_32 - cdef vector[long] v_segment_offsets_64 - cdef uintptr_t c_segment_offsets - if (vertex_t == np.dtype("int32")): - v_segment_offsets_32 = segment_offsets - c_segment_offsets = v_segment_offsets_32.data() - else: - v_segment_offsets_64 = segment_offsets - c_segment_offsets = v_segment_offsets_64.data() - - cdef graph_container_t graph_container - - populate_graph_container(graph_container, - handle_[0], - c_src_vertices, c_dst_vertices, c_edge_weights, - c_vertex_partition_offsets, - c_segment_offsets, - len(segment_offsets) - 1, - ((numberTypeMap[vertex_t])), - ((numberTypeMap[edge_t])), - ((numberTypeMap[weight_t])), - num_local_edges, - num_global_verts, num_global_edges, - False, # BFS runs on unweighted graphs - False, - False, True) - - # Generate the cudf.DataFrame result - df = cudf.DataFrame() - df['vertex'] = cudf.Series(np.arange(vertex_partition_offsets.iloc[rank], vertex_partition_offsets.iloc[rank+1]), dtype=vertex_t) - df['predecessor'] = cudf.Series(np.zeros(len(df['vertex']), dtype=vertex_t)) - if (return_distances): - df['distance'] = cudf.Series(np.zeros(len(df['vertex']), dtype=vertex_t)) - - # Associate to cudf Series - cdef uintptr_t c_distance_ptr = NULL # Pointer to the DataFrame 'distance' Series - cdef uintptr_t c_predecessor_ptr = df['predecessor'].__cuda_array_interface__['data'][0] - if (return_distances): - c_distance_ptr = df['distance'].__cuda_array_interface__['data'][0] - cdef uintptr_t c_start_ptr = start.__cuda_array_interface__['data'][0] - - cdef bool direction_optimizing = 0 - - if vertex_t == np.int32: - if depth_limit is None: - depth_limit = c_bfs.INT_MAX - c_bfs.call_bfs[int, float](handle_[0], - graph_container, - NULL, - c_distance_ptr, - c_predecessor_ptr, - depth_limit, - c_start_ptr, - len(start), - direction_optimizing) - else: - if depth_limit is None: - depth_limit = c_bfs.LONG_MAX - c_bfs.call_bfs[long, float](handle_[0], - graph_container, - NULL, - c_distance_ptr, - c_predecessor_ptr, - depth_limit, - c_start_ptr, - len(start), - direction_optimizing) - return df diff --git a/python/cugraph/cugraph/traversal/bfs.pxd b/python/cugraph/cugraph/traversal/bfs.pxd deleted file mode 100644 index c4dbe6dddbc..00000000000 --- a/python/cugraph/cugraph/traversal/bfs.pxd +++ /dev/null @@ -1,35 +0,0 @@ -# Copyright (c) 2019-2021, NVIDIA CORPORATION. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# cython: profile=False -# distutils: language = c++ -# cython: embedsignature = True -# cython: language_level = 3 - -from cugraph.structure.graph_utilities cimport * -from libcpp cimport bool - -cdef extern from "limits.h": - cdef int INT_MAX - -cdef extern from "cugraph/utilities/cython.hpp" namespace "cugraph::cython": - cdef void call_bfs[vertex_t, weight_t]( - const handle_t &handle, - const graph_container_t &g, - vertex_t *identifiers, - vertex_t *distances, - vertex_t *predecessors, - vertex_t depth_limit, - const vertex_t *start_vertex, - size_t n_sources, - bool direction_optimizing) except + diff --git a/python/cugraph/cugraph/traversal/bfs.py b/python/cugraph/cugraph/traversal/bfs.py index 5a48bf0f688..77fa02cad76 100644 --- a/python/cugraph/cugraph/traversal/bfs.py +++ b/python/cugraph/cugraph/traversal/bfs.py @@ -14,7 +14,12 @@ import cudf import dask_cudf -from cugraph.traversal import bfs_wrapper +from pylibcugraph.experimental import (ResourceHandle, + GraphProperties, + SGGraph, + ) +from pylibcugraph import bfs as pylibcugraph_bfs + from cugraph.structure.graph_classes import Graph, DiGraph from cugraph.utilities import (ensure_cugraph_obj, is_matrix_type, @@ -122,12 +127,51 @@ def _convert_df_to_output_type(df, input_type): raise TypeError(f"input type {input_type} is not a supported type.") +def _call_plc_bfs(G, sources, depth_limit, do_expensive_check=False, + direction_optimizing=False, return_predecessors=True): + handle = ResourceHandle() + + srcs = G.edgelist.edgelist_df['src'] + dsts = G.edgelist.edgelist_df['dst'] + weights = G.edgelist.edgelist_df['weights'] \ + if 'weights' in G.edgelist.edgelist_df \ + else cudf.Series((srcs + 1) / (srcs + 1), dtype='float32') + + sg = SGGraph( + resource_handle=handle, + graph_properties=GraphProperties(is_multigraph=G.is_multigraph()), + src_array=srcs, + dst_array=dsts, + weight_array=weights, + store_transposed=False, + renumber=False, + do_expensive_check=do_expensive_check + ) + + distances, predecessors, vertices = \ + pylibcugraph_bfs( + handle, + sg, + sources, + direction_optimizing, + depth_limit if depth_limit is not None else -1, + return_predecessors, + do_expensive_check + ) + + return cudf.DataFrame({ + 'distance': cudf.Series(distances), + 'vertex': cudf.Series(vertices), + 'predecessor': cudf.Series(predecessors), + }) + + def bfs(G, start=None, depth_limit=None, i_start=None, directed=None, - return_predecessors=None): + return_predecessors=True): """ Find the distances and predecessors for a breadth first traversal of a graph. @@ -157,8 +201,9 @@ def bfs(G, If True, then convert the input matrix to a directed cugraph.Graph, otherwise an undirected cugraph.Graph object will be used. - return_predecessors : - + return_predecessors : bool, optional (default=True) + Whether to return the predecessors for each vertex (returns -1 + for each vertex otherwise) Returns ------- @@ -229,7 +274,12 @@ def bfs(G, else: start = cudf.Series(start, name='starts') - df = bfs_wrapper.bfs(G, start, depth_limit) + df = _call_plc_bfs( + G, + start, + depth_limit, + return_predecessors=return_predecessors + ) if G.renumbered: df = G.unrenumber(df, "vertex") df = G.unrenumber(df, "predecessor") diff --git a/python/cugraph/cugraph/traversal/bfs_wrapper.pyx b/python/cugraph/cugraph/traversal/bfs_wrapper.pyx deleted file mode 100644 index a67cedc50cb..00000000000 --- a/python/cugraph/cugraph/traversal/bfs_wrapper.pyx +++ /dev/null @@ -1,113 +0,0 @@ -# Copyright (c) 2019-2021, NVIDIA CORPORATION. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# cython: profile=False -# distutils: language = c++ -# cython: embedsignature = True -# cython: language_level = 3 - -cimport cugraph.traversal.bfs as c_bfs -from cugraph.structure.graph_utilities cimport * -from cugraph.structure import graph_primtypes_wrapper -from libcpp cimport bool -from libc.stdint cimport uintptr_t -import cudf -import numpy as np - -def bfs(input_graph, start, depth_limit, direction_optimizing=False): - """ - Call bfs - """ - # Step 1: Declare the different varibales - cdef graph_container_t graph_container - - numberTypeMap = {np.dtype("int32") : numberTypeEnum.int32Type, - np.dtype("int64") : numberTypeEnum.int64Type, - np.dtype("float32") : numberTypeEnum.floatType, - np.dtype("double") : numberTypeEnum.doubleType} - - weight_t = np.dtype("float32") - [src, dst] = graph_primtypes_wrapper.datatype_cast([input_graph.edgelist.edgelist_df['src'], input_graph.edgelist.edgelist_df['dst']], [np.int32]) - weights = None - - # Pointers for SSSP / BFS - cdef uintptr_t c_identifier_ptr = NULL # Pointer to the DataFrame 'vertex' Series - cdef uintptr_t c_distance_ptr = NULL # Pointer to the DataFrame 'distance' Series - cdef uintptr_t c_predecessor_ptr = NULL # Pointer to the DataFrame 'predecessor' Series - if depth_limit is None: - depth_limit = c_bfs.INT_MAX - - # Step 2: Verifiy input_graph has the expected format - - cdef unique_ptr[handle_t] handle_ptr - handle_ptr.reset(new handle_t()) - handle_ = handle_ptr.get(); - - # Step 3: Setup number of vertices and edges - num_verts = input_graph.number_of_vertices() - num_edges = input_graph.number_of_edges(directed_edges=True) - - # Step 4: Extract COO - cdef uintptr_t c_src_vertices = src.__cuda_array_interface__['data'][0] - cdef uintptr_t c_dst_vertices = dst.__cuda_array_interface__['data'][0] - cdef uintptr_t c_edge_weights = NULL - - # Step 5: Check if source index is valid - # FIXME: Updates to multi-seed BFS support disabled this check. Re-enable ASAP. - #if not 0 <= start < num_verts: - # raise ValueError("Starting vertex should be between 0 to number of vertices") - - # Step 6: Generate the cudf.DataFrame result - # Current implementation expects int (signed 32-bit) for distance - df = cudf.DataFrame() - df['vertex'] = cudf.Series(np.arange(num_verts), dtype=np.int32) - df['distance'] = cudf.Series(np.zeros(num_verts, dtype=np.int32)) - df['predecessor'] = cudf.Series(np.zeros(num_verts, dtype=np.int32)) - - # Step 7: Associate to cudf Series - c_identifier_ptr = df['vertex'].__cuda_array_interface__['data'][0] - c_distance_ptr = df['distance'].__cuda_array_interface__['data'][0] - c_predecessor_ptr = df['predecessor'].__cuda_array_interface__['data'][0] - cdef uintptr_t c_start_ptr = start.__cuda_array_interface__['data'][0] - - is_symmetric = not input_graph.is_directed() - - # Step 8: Proceed to BFS - populate_graph_container(graph_container, - handle_[0], - c_src_vertices, c_dst_vertices, c_edge_weights, - NULL, - NULL, - 0, - ((numberTypeEnum.int32Type)), - ((numberTypeEnum.int32Type)), - ((numberTypeMap[weight_t])), - num_edges, - num_verts, num_edges, - False, - is_symmetric, - False, - False) - - # Different pathing wether shortest_path_counting is required or not - c_bfs.call_bfs[int, float](handle_ptr.get()[0], - graph_container, - c_identifier_ptr, - c_distance_ptr, - c_predecessor_ptr, - depth_limit, - c_start_ptr, - len(start), - direction_optimizing) - - return df diff --git a/python/cugraph/cugraph/traversal/sssp_wrapper.pyx b/python/cugraph/cugraph/traversal/sssp_wrapper.pyx index 46966cd3e99..b83f0ab7a7b 100644 --- a/python/cugraph/cugraph/traversal/sssp_wrapper.pyx +++ b/python/cugraph/cugraph/traversal/sssp_wrapper.pyx @@ -1,4 +1,4 @@ -# Copyright (c) 2019-2021, NVIDIA CORPORATION. +# Copyright (c) 2019-2022, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -17,7 +17,6 @@ # cython: language_level = 3 cimport cugraph.traversal.sssp as c_sssp -cimport cugraph.traversal.bfs as c_bfs from cugraph.structure.graph_utilities cimport * from cugraph.structure import graph_primtypes_wrapper from libcpp cimport bool diff --git a/python/pylibcugraph/pylibcugraph/__init__.py b/python/pylibcugraph/pylibcugraph/__init__.py index 96cee8e43c1..0a5bdc7cb66 100644 --- a/python/pylibcugraph/pylibcugraph/__init__.py +++ b/python/pylibcugraph/pylibcugraph/__init__.py @@ -34,3 +34,5 @@ from pylibcugraph.hits import hits from pylibcugraph.node2vec import node2vec + +from pylibcugraph.bfs import bfs diff --git a/python/pylibcugraph/pylibcugraph/bfs.pyx b/python/pylibcugraph/pylibcugraph/bfs.pyx new file mode 100644 index 00000000000..6886e6b059a --- /dev/null +++ b/python/pylibcugraph/pylibcugraph/bfs.pyx @@ -0,0 +1,200 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Have cython use python 3 syntax +# cython: language_level = 3 + +from libc.stdint cimport uintptr_t +from libc.stdint cimport int32_t +from libc.limits cimport INT_MAX + +from pylibcugraph.resource_handle cimport ResourceHandle +from pylibcugraph._cugraph_c.algorithms cimport ( + cugraph_bfs, + cugraph_paths_result_t, + cugraph_paths_result_get_vertices, + cugraph_paths_result_get_predecessors, + cugraph_paths_result_get_distances, + cugraph_paths_result_free, +) +from pylibcugraph._cugraph_c.array cimport ( + cugraph_type_erased_device_array_view, + cugraph_type_erased_device_array_t, + cugraph_type_erased_device_array_view_t, + cugraph_type_erased_device_array_view_create, + cugraph_type_erased_device_array_view_free, +) +from pylibcugraph._cugraph_c.resource_handle cimport ( + bool_t, + data_type_id_t, + cugraph_resource_handle_t, +) +from pylibcugraph._cugraph_c.error cimport ( + cugraph_error_code_t, + cugraph_error_t, +) +from pylibcugraph.utils cimport ( + assert_success, + copy_to_cupy_array, + assert_CAI_type, + get_c_type_from_numpy_type, +) +from pylibcugraph._cugraph_c.graph cimport ( + cugraph_graph_t, +) +from pylibcugraph.graphs cimport ( + _GPUGraph, +) + +def bfs(ResourceHandle handle, _GPUGraph graph, + sources, bool_t direction_optimizing, int32_t depth_limit, + bool_t compute_predecessors, bool_t do_expensive_check): + """ + Performs a Breadth-first search starting from the provided sources. + Returns the distances, and predecessors if requested. + + Parameters + ---------- + handle: ResourceHandle + The resource handle responsible for managing device resources + that this algorithm will use + graph: SGGraph or MGGraph + The graph to operate upon + sources: cudf.Series + The vertices to start the breadth-first search from. Should + match the numbering of the provided graph. All workers must + have a unique set of sources. Empty sets are allowed as long + as at least one worker has a source. + direction_optimizing: bool_t + Whether to treat the graph as undirected (should only be called + on a symmetric graph) + depth_limit: int32_t + The depth limit at which the traversal will be stopped. If this + is a negative number, the traversal will run without a depth limit. + compute_predecessors: bool_t + Whether to compute the predecessors. If left blank, -1 will be + returned instead of the correct predecessor of each vertex. + do_expensive_check : bool_t + If True, performs more extensive tests on the inputs to ensure + validitity, at the expense of increased run time. + + Returns + ------- + A tuple of device arrays (cupy arrays) of the form + (distances, predecessors, vertices) + + Examples + -------- + + M = cudf.read_csv(datasets_path / 'karate.csv', delimiter=' ', + dtype=['int32', 'int32', 'float32'], header=None) + G = cugraph.Graph() + G.from_cudf_edgelist(M, source='0', destination='1', edge_attr='2') + + handle = ResourceHandle() + + srcs = G.edgelist.edgelist_df['src'] + dsts = G.edgelist.edgelist_df['dst'] + weights = G.edgelist.edgelist_df['weights'] + + sg = SGGraph( + resource_handle = handle, + graph_properties = GraphProperties(is_multigraph=G.is_multigraph()), + src_array = srcs, + dst_array = dsts, + weight_array = weights, + store_transposed=False, + renumber=False, + do_expensive_check=do_expensive_check + ) + + res = pylibcugraph_bfs( + handle, + sg, + cudf.Series([0], dtype='int32'), + False, + 10, + True, + False + ) + + distances, predecessors, vertices = res + + final_results = cudf.DataFrame({ + 'distance': cudf.Series(distances), + 'vertex': cudf.Series(vertices), + 'predecessor': cudf.Series(predecessors), + }) + + """ + + try: + import cupy + except ModuleNotFoundError: + raise RuntimeError("bfs requires the cupy package, which could not " + "be imported") + assert_CAI_type(sources, "sources") + + if depth_limit <= 0: + depth_limit = INT_MAX - 1 + + cdef cugraph_resource_handle_t* c_resource_handle_ptr = \ + handle.c_resource_handle_ptr + cdef cugraph_graph_t* c_graph_ptr = graph.c_graph_ptr + + cdef cugraph_error_code_t error_code + cdef cugraph_error_t* error_ptr + + cdef uintptr_t cai_sources_ptr = \ + sources.__cuda_array_interface__["data"][0] + + cdef cugraph_type_erased_device_array_view_t* sources_view_ptr = \ + cugraph_type_erased_device_array_view_create( + cai_sources_ptr, + len(sources), + get_c_type_from_numpy_type(sources.dtype)) + + cdef cugraph_paths_result_t* result_ptr + + error_code = cugraph_bfs( + c_resource_handle_ptr, + c_graph_ptr, + sources_view_ptr, + direction_optimizing, + depth_limit, + compute_predecessors, + do_expensive_check, + &result_ptr, + &error_ptr + ) + assert_success(error_code, error_ptr, "cugraph_bfs") + + # Extract individual device array pointers from result + cdef cugraph_type_erased_device_array_view_t* distances_ptr = \ + cugraph_paths_result_get_distances(result_ptr) + + cdef cugraph_type_erased_device_array_view_t* predecessors_ptr = \ + cugraph_paths_result_get_predecessors(result_ptr) + + cdef cugraph_type_erased_device_array_view_t* vertices_ptr = \ + cugraph_paths_result_get_vertices(result_ptr) + + # copy to cupy arrays + cupy_distances = copy_to_cupy_array(c_resource_handle_ptr, distances_ptr) + cupy_predecessors = copy_to_cupy_array(c_resource_handle_ptr, predecessors_ptr) + cupy_vertices = copy_to_cupy_array(c_resource_handle_ptr, vertices_ptr) + + # deallocate the no-longer needed result struct + cugraph_paths_result_free(result_ptr) + + return (cupy_distances, cupy_predecessors, cupy_vertices)