From 861cef04ea864270d792640bb4b720b72d08ff52 Mon Sep 17 00:00:00 2001 From: Alex Barghi <105237337+alexbarghi-nv@users.noreply.github.com> Date: Wed, 1 Jun 2022 10:08:04 -0400 Subject: [PATCH] Migrate SG and MG SSSP to pylibcugraph (#2295) Uses the pylibcugraph C API to make these calls instead of the wrapper functions. Removes wrapper functions from the code. Authors: - Alex Barghi (https://github.com/alexbarghi-nv) - Chuck Hastings (https://github.com/ChuckHastings) Approvers: - Chuck Hastings (https://github.com/ChuckHastings) - Joseph Nke (https://github.com/jnke2016) - Rick Ratzel (https://github.com/rlratzel) URL: https://github.com/rapidsai/cugraph/pull/2295 --- .../cugraph/dask/traversal/mg_sssp.pxd | 28 ---- .../dask/traversal/mg_sssp_wrapper.pyx | 146 ------------------ python/cugraph/cugraph/dask/traversal/sssp.py | 122 +++++++++------ python/cugraph/cugraph/traversal/sssp.pxd | 29 ---- python/cugraph/cugraph/traversal/sssp.py | 66 +++++++- .../cugraph/traversal/sssp_wrapper.pyx | 129 ---------------- 6 files changed, 139 insertions(+), 381 deletions(-) delete mode 100644 python/cugraph/cugraph/dask/traversal/mg_sssp.pxd delete mode 100644 python/cugraph/cugraph/dask/traversal/mg_sssp_wrapper.pyx delete mode 100644 python/cugraph/cugraph/traversal/sssp.pxd delete mode 100644 python/cugraph/cugraph/traversal/sssp_wrapper.pyx diff --git a/python/cugraph/cugraph/dask/traversal/mg_sssp.pxd b/python/cugraph/cugraph/dask/traversal/mg_sssp.pxd deleted file mode 100644 index 937b42147e6..00000000000 --- a/python/cugraph/cugraph/dask/traversal/mg_sssp.pxd +++ /dev/null @@ -1,28 +0,0 @@ -# -# Copyright (c) 2020-2021, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -from cugraph.structure.graph_utilities cimport * -from libcpp cimport bool - - -cdef extern from "cugraph/utilities/cython.hpp" namespace "cugraph::cython": - - cdef void call_sssp[vertex_t, weight_t]( - const handle_t &handle, - const graph_container_t &g, - vertex_t *identifiers, - weight_t *distances, - vertex_t *predecessors, - const vertex_t start_vertex) diff --git a/python/cugraph/cugraph/dask/traversal/mg_sssp_wrapper.pyx b/python/cugraph/cugraph/dask/traversal/mg_sssp_wrapper.pyx deleted file mode 100644 index e0a4c82b346..00000000000 --- a/python/cugraph/cugraph/dask/traversal/mg_sssp_wrapper.pyx +++ /dev/null @@ -1,146 +0,0 @@ -# -# Copyright (c) 2020-2022, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -from cugraph.structure.utils_wrapper import * -from cugraph.dask.traversal cimport mg_sssp as c_sssp -import cudf -from cugraph.structure.graph_utilities cimport * -import cugraph.structure.graph_primtypes_wrapper as graph_primtypes_wrapper -from libc.stdint cimport uintptr_t - -def mg_sssp(input_df, - src_col_name, - dst_col_name, - num_global_verts, - num_global_edges, - vertex_partition_offsets, - rank, - handle, - segment_offsets, - start): - """ - Call sssp - """ - - cdef size_t handle_size_t = handle.getHandle() - handle_ = handle_size_t - - # Local COO information - src = input_df[src_col_name] - dst = input_df[dst_col_name] - vertex_t = src.dtype - if num_global_edges > (2**31 - 1): - edge_t = np.dtype("int64") - else: - edge_t = vertex_t - if "value" in input_df.columns: - weights = input_df['value'] - weight_t = weights.dtype - is_weighted = True - else: - weights = None - weight_t = np.dtype("float32") - is_weighted = False - - # FIXME: Offsets and indices are currently hardcoded to int, but this may - # not be acceptable in the future. - numberTypeMap = {np.dtype("int32") : numberTypeEnum.int32Type, - np.dtype("int64") : numberTypeEnum.int64Type, - np.dtype("float32") : numberTypeEnum.floatType, - np.dtype("double") : numberTypeEnum.doubleType} - - # FIXME: needs to be edge_t type not int - cdef int num_local_edges = len(src) - - cdef uintptr_t c_src_vertices = src.__cuda_array_interface__['data'][0] - cdef uintptr_t c_dst_vertices = dst.__cuda_array_interface__['data'][0] - cdef uintptr_t c_edge_weights = NULL - if weights is not None: - c_edge_weights = weights.__cuda_array_interface__['data'][0] - - # FIXME: data is on device, move to host (to_pandas()), convert to np array and access pointer to pass to C - vertex_partition_offsets_host = vertex_partition_offsets.values_host - cdef uintptr_t c_vertex_partition_offsets = vertex_partition_offsets_host.__array_interface__['data'][0] - - cdef vector[int] v_segment_offsets_32 - cdef vector[long] v_segment_offsets_64 - cdef uintptr_t c_segment_offsets - if (vertex_t == np.dtype("int32")): - v_segment_offsets_32 = segment_offsets - c_segment_offsets = v_segment_offsets_32.data() - else: - v_segment_offsets_64 = segment_offsets - c_segment_offsets = v_segment_offsets_64.data() - - cdef graph_container_t graph_container - - populate_graph_container(graph_container, - handle_[0], - c_src_vertices, c_dst_vertices, c_edge_weights, - c_vertex_partition_offsets, - c_segment_offsets, - len(segment_offsets) - 1, - ((numberTypeMap[vertex_t])), - ((numberTypeMap[edge_t])), - ((numberTypeMap[weight_t])), - num_local_edges, - num_global_verts, num_global_edges, - is_weighted, - False, - False, True) - - # Generate the cudf.DataFrame result - df = cudf.DataFrame() - df['vertex'] = cudf.Series(np.arange(vertex_partition_offsets.iloc[rank], vertex_partition_offsets.iloc[rank+1]), dtype=vertex_t) - df['predecessor'] = cudf.Series(np.zeros(len(df['vertex']), dtype=vertex_t)) - df['distance'] = cudf.Series(np.zeros(len(df['vertex']), dtype=weight_t)) - - # Associate to cudf Series - cdef uintptr_t c_predecessor_ptr = df['predecessor'].__cuda_array_interface__['data'][0] - cdef uintptr_t c_distance_ptr = df['distance'].__cuda_array_interface__['data'][0] - - # MG BFS path assumes directed is true - if vertex_t == np.int32: - if weight_t == np.float32: - c_sssp.call_sssp[int, float](handle_[0], - graph_container, - NULL, - c_distance_ptr, - c_predecessor_ptr, - start) - elif weight_t == np.float64: - c_sssp.call_sssp[int, double](handle_[0], - graph_container, - NULL, - c_distance_ptr, - c_predecessor_ptr, - start) - else: - if weight_t == np.float32: - c_sssp.call_sssp[long, float](handle_[0], - graph_container, - NULL, - c_distance_ptr, - c_predecessor_ptr, - start) - elif weight_t == np.float64: - c_sssp.call_sssp[long, double](handle_[0], - graph_container, - NULL, - c_distance_ptr, - c_predecessor_ptr, - start) - return df diff --git a/python/cugraph/cugraph/dask/traversal/sssp.py b/python/cugraph/cugraph/dask/traversal/sssp.py index 66b25ca73dd..bcee10fa377 100644 --- a/python/cugraph/cugraph/dask/traversal/sssp.py +++ b/python/cugraph/cugraph/dask/traversal/sssp.py @@ -16,41 +16,67 @@ from collections.abc import Iterable from dask.distributed import wait, default_client -from cugraph.dask.common.input_utils import (get_distributed_data, - get_vertex_partition_offsets) -from cugraph.dask.traversal import mg_sssp_wrapper as mg_sssp +from cugraph.dask.common.input_utils import get_distributed_data import cugraph.dask.comms.comms as Comms +import cupy import cudf import dask_cudf - - -def call_sssp(sID, - data, - src_col_name, - dst_col_name, - num_verts, - num_edges, - vertex_partition_offsets, - aggregate_segment_offsets, - start): - wid = Comms.get_worker_id(sID) - handle = Comms.get_handle(sID) - local_size = len(aggregate_segment_offsets) // Comms.get_n_workers(sID) - segment_offsets = \ - aggregate_segment_offsets[local_size * wid: local_size * (wid + 1)] - return mg_sssp.mg_sssp(data[0], - src_col_name, - dst_col_name, - num_verts, - num_edges, - vertex_partition_offsets, - wid, - handle, - segment_offsets, - start) - - -def sssp(input_graph, source): +from pylibcugraph import sssp as pylibcugraph_sssp +from pylibcugraph import (ResourceHandle, + GraphProperties, + MGGraph) + + +def _call_plc_sssp( + sID, + data, + src_col_name, + dst_col_name, + num_edges, + source, + cutoff, + compute_predecessors=True, + do_expensive_check=False): + + comms_handle = Comms.get_handle(sID) + resource_handle = ResourceHandle(comms_handle.getHandle()) + + srcs = data[0][src_col_name] + dsts = data[0][dst_col_name] + weights = data[0]['value'] \ + if 'value' in data[0].columns \ + else cudf.Series((srcs + 1) / (srcs + 1), dtype='float32') + if weights.dtype not in ('float32', 'double'): + weights = weights.astype('double') + + mg = MGGraph( + resource_handle=resource_handle, + graph_properties=GraphProperties(is_multigraph=False), + src_array=srcs, + dst_array=dsts, + weight_array=weights, + store_transposed=False, + num_edges=num_edges, + do_expensive_check=do_expensive_check + ) + + vertices, distances, predecessors = pylibcugraph_sssp( + resource_handle=resource_handle, + graph=mg, + source=source, + cutoff=cutoff, + compute_predecessors=compute_predecessors, + do_expensive_check=do_expensive_check + ) + + return cudf.DataFrame({ + 'distance': cudf.Series(distances), + 'vertex': cudf.Series(vertices), + 'predecessor': cudf.Series(predecessors), + }) + + +def sssp(input_graph, source, cutoff=None): """ Compute the distance and predecessors for shortest paths from the specified source to all the vertices in the input_graph. The distances column will @@ -70,6 +96,9 @@ def sssp(input_graph, source): source : Integer Specify source vertex + cutoff : double, optional (default = None) + Maximum edge weight sum considered by the algorithm + Returns ------- df : dask_cudf.DataFrame @@ -104,8 +133,6 @@ def sssp(input_graph, source): input_graph.compute_renumber_edge_list(transposed=False) ddf = input_graph.edgelist.edgelist_df - vertex_partition_offsets = get_vertex_partition_offsets(input_graph) - num_verts = vertex_partition_offsets.iloc[-1] num_edges = len(ddf) data = get_distributed_data(ddf) @@ -142,19 +169,22 @@ def sssp(input_graph, source): src_col_name = input_graph.source_columns dst_col_name = input_graph.destination_columns + if cutoff is None: + cutoff = cupy.inf + result = [client.submit( - call_sssp, - Comms.get_session_id(), - wf[1], - src_col_name, - dst_col_name, - num_verts, - num_edges, - vertex_partition_offsets, - input_graph.aggregate_segment_offsets, - source, - workers=[wf[0]]) - for idx, wf in enumerate(data.worker_to_parts.items())] + _call_plc_sssp, + Comms.get_session_id(), + wf[1], + src_col_name, + dst_col_name, + num_edges, + source, + cutoff, + True, + False, + workers=[wf[0]]) + for idx, wf in enumerate(data.worker_to_parts.items())] wait(result) ddf = dask_cudf.from_delayed(result) diff --git a/python/cugraph/cugraph/traversal/sssp.pxd b/python/cugraph/cugraph/traversal/sssp.pxd deleted file mode 100644 index 3109668d747..00000000000 --- a/python/cugraph/cugraph/traversal/sssp.pxd +++ /dev/null @@ -1,29 +0,0 @@ -# Copyright (c) 2019-2021, NVIDIA CORPORATION. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# cython: profile=False -# distutils: language = c++ -# cython: embedsignature = True -# cython: language_level = 3 - -from cugraph.structure.graph_utilities cimport * - -cdef extern from "cugraph/utilities/cython.hpp" namespace "cugraph::cython": - - cdef void call_sssp[vertex_t, weight_t]( - const handle_t &handle, - const graph_container_t &g, - vertex_t *identifiers, - weight_t *distances, - vertex_t *predecessors, - vertex_t start_vertex) except + diff --git a/python/cugraph/cugraph/traversal/sssp.py b/python/cugraph/cugraph/traversal/sssp.py index ae6a69cae7d..4533a834952 100644 --- a/python/cugraph/cugraph/traversal/sssp.py +++ b/python/cugraph/cugraph/traversal/sssp.py @@ -15,13 +15,16 @@ import cudf from cugraph.structure import Graph, DiGraph, MultiGraph, MultiDiGraph -from cugraph.traversal import sssp_wrapper from cugraph.utilities import (ensure_cugraph_obj, is_matrix_type, is_cp_matrix_type, is_nx_graph_type, cupy_package as cp, ) +from pylibcugraph import sssp as pylibcugraph_sssp +from pylibcugraph import (ResourceHandle, + GraphProperties, + SGGraph) def _ensure_args(G, source, method, directed, @@ -122,6 +125,51 @@ def _convert_df_to_output_type(df, input_type, return_predecessors): raise TypeError(f"input type {input_type} is not a supported type.") +def _call_plc_sssp( + G, + source, + cutoff, + compute_predecessors=True, + do_expensive_check=False): + srcs = G.edgelist.edgelist_df['src'] + dsts = G.edgelist.edgelist_df['dst'] + weights = G.edgelist.edgelist_df['weights'] \ + if 'weights' in G.edgelist.edgelist_df \ + else cudf.Series((srcs + 1) / (srcs + 1), dtype='float32') + if weights.dtype not in ('float32', 'double'): + weights = weights.astype('double') + + handle = ResourceHandle() + + sg = SGGraph( + resource_handle=handle, + graph_properties=GraphProperties(is_multigraph=G.is_multigraph()), + src_array=srcs, + dst_array=dsts, + weight_array=weights, + store_transposed=False, + renumber=False, + do_expensive_check=do_expensive_check + ) + + vertices, distances, predecessors = pylibcugraph_sssp( + resource_handle=handle, + graph=sg, + source=source, + cutoff=cutoff, + compute_predecessors=compute_predecessors, + do_expensive_check=do_expensive_check + ) + + df = cudf.DataFrame({ + 'distance': cudf.Series(distances), + 'vertex': cudf.Series(vertices), + 'predecessor': cudf.Series(predecessors), + }) + + return df + + # FIXME: if G is a Nx type, the weight attribute is assumed to be "weight", if # set. An additional optional parameter for the weight attr name when accepting # Nx graphs may be needed. From the Nx docs: @@ -134,7 +182,8 @@ def sssp(G, return_predecessors=None, unweighted=None, overwrite=None, - indices=None): + indices=None, + cutoff=None): """ Compute the distance and predecessors for shortest paths from the specified source to all the vertices in the graph. The distances column will store @@ -153,6 +202,8 @@ def sssp(G, point values. source : int Index of the source vertex. + cutoff : double, optional (default = None) + Maximum edge weight sum considered by the algorithm Returns ------- @@ -192,6 +243,11 @@ def sssp(G, >>> G = cugraph.Graph() >>> G.from_cudf_edgelist(M, source='0', destination='1') >>> distances = cugraph.sssp(G, 0) + >>> distances + distance vertex predecessor + ... ... ... ... + ... ... ... ... + ... ... ... ... """ (source, directed, return_predecessors) = _ensure_args( @@ -214,7 +270,11 @@ def sssp(G, raise ValueError( "Starting vertex should be between 0 to number of vertices") - df = sssp_wrapper.sssp(G, source) + if cutoff is None: + cutoff = np.inf + + # compute_predecessors MUST be true in the current version of sssp + df = _call_plc_sssp(G, source, cutoff, compute_predecessors=True) if G.renumbered: df = G.unrenumber(df, "vertex") diff --git a/python/cugraph/cugraph/traversal/sssp_wrapper.pyx b/python/cugraph/cugraph/traversal/sssp_wrapper.pyx deleted file mode 100644 index b83f0ab7a7b..00000000000 --- a/python/cugraph/cugraph/traversal/sssp_wrapper.pyx +++ /dev/null @@ -1,129 +0,0 @@ -# Copyright (c) 2019-2022, NVIDIA CORPORATION. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# cython: profile=False -# distutils: language = c++ -# cython: embedsignature = True -# cython: language_level = 3 - -cimport cugraph.traversal.sssp as c_sssp -from cugraph.structure.graph_utilities cimport * -from cugraph.structure import graph_primtypes_wrapper -from libcpp cimport bool -from libc.stdint cimport uintptr_t -import cudf -import numpy as np - - -def sssp(input_graph, source): - """ - Call sssp - """ - # Step 1: Declare the different variables - cdef graph_container_t graph_container - # FIXME: Offsets and indices are currently hardcoded to int, but this may - # not be acceptable in the future. - numberTypeMap = {np.dtype("int32") : numberTypeEnum.int32Type, - np.dtype("int64") : numberTypeEnum.int64Type, - np.dtype("float32") : numberTypeEnum.floatType, - np.dtype("double") : numberTypeEnum.doubleType} - - # Pointers required for CSR Graph - cdef uintptr_t c_offsets_ptr = NULL # Pointer to the CSR offsets - cdef uintptr_t c_indices_ptr = NULL # Pointer to the CSR indices - cdef uintptr_t c_weights_ptr = NULL # Pointer to the CSR weights - cdef uintptr_t c_local_verts = NULL; - cdef uintptr_t c_local_edges = NULL; - cdef uintptr_t c_local_offsets = NULL; - weight_t = np.dtype("float32") - - # Pointers for SSSP / BFS - cdef uintptr_t c_identifier_ptr = NULL # Pointer to the DataFrame 'vertex' Series - cdef uintptr_t c_distance_ptr = NULL # Pointer to the DataFrame 'distance' Series - cdef uintptr_t c_predecessor_ptr = NULL # Pointer to the DataFrame 'predecessor' Series - - cdef unique_ptr[handle_t] handle_ptr - handle_ptr.reset(new handle_t()) - handle_ = handle_ptr.get(); - - # Step 2: Verify that input_graph has the expected format - # the SSSP implementation expects CSR format - if not input_graph.adjlist: - input_graph.view_adj_list() - - # Step 3: Extract CSR offsets, indices and indices - # - offsets: int (signed, 32-bit) - # - indices: int (signed, 32-bit) - # - weights: float / double - # Extract data_type from weights (not None: float / double, None: signed int 32-bit) - [offsets, indices] = graph_primtypes_wrapper.datatype_cast([input_graph.adjlist.offsets, input_graph.adjlist.indices], [np.int32]) - [weights] = graph_primtypes_wrapper.datatype_cast([input_graph.adjlist.weights], [np.float32, np.float64]) - c_offsets_ptr = offsets.__cuda_array_interface__['data'][0] - c_indices_ptr = indices.__cuda_array_interface__['data'][0] - - if weights is not None: - weight_t = weights.dtype - c_weights_ptr = weights.__cuda_array_interface__['data'][0] - - # Step 4: Setup number of vertices and number of edges - num_verts = input_graph.number_of_vertices() - num_edges = input_graph.number_of_edges(directed_edges=True) - - # Step 5: Check if source index is valid - if not 0 <= source < num_verts: - raise ValueError("Starting vertex should be between 0 to number of vertices") - - # Step 6: Generation of the result cudf.DataFrame - # Distances depends on data_type (c.f. Step 3) - df = cudf.DataFrame() - - df['vertex'] = cudf.Series(np.zeros(num_verts, dtype=np.int32)) - df['distance'] = cudf.Series(np.zeros(num_verts, dtype=weight_t)) - df['predecessor'] = cudf.Series(np.zeros(num_verts, dtype=np.int32)) - - # Step 7: Associate to cudf Series - c_identifier_ptr = df['vertex'].__cuda_array_interface__['data'][0] - c_distance_ptr = df['distance'].__cuda_array_interface__['data'][0] - c_predecessor_ptr = df['predecessor'].__cuda_array_interface__['data'][0] - - # Step 8: Dispatch to SSSP / BFS Based on weights - # - weights is not None: SSSP float or SSSP double - # - weights is None: BFS - populate_graph_container_legacy(graph_container, - ((graphTypeEnum.LegacyCSR)), - handle_[0], - c_offsets_ptr, c_indices_ptr, c_weights_ptr, - ((numberTypeEnum.int32Type)), - ((numberTypeEnum.int32Type)), - ((numberTypeMap[weight_t])), - num_verts, num_edges, - c_local_verts, c_local_edges, c_local_offsets) - - if weight_t == np.float32: - c_sssp.call_sssp[int, float](handle_[0], - graph_container, - c_identifier_ptr, - c_distance_ptr, - c_predecessor_ptr, - source) - elif weight_t == np.float64: - c_sssp.call_sssp[int, double](handle_[0], - graph_container, - c_identifier_ptr, - c_distance_ptr, - c_predecessor_ptr, - source) - else: # This case should not happen - raise NotImplementedError - - return df