Skip to content

Commit

Permalink
Migrate SG and MG SSSP to pylibcugraph (#2295)
Browse files Browse the repository at this point in the history
Uses the pylibcugraph C API to make these calls instead of the wrapper functions.  Removes wrapper functions from the code.

Authors:
  - Alex Barghi (https://github.com/alexbarghi-nv)
  - Chuck Hastings (https://github.com/ChuckHastings)

Approvers:
  - Chuck Hastings (https://github.com/ChuckHastings)
  - Joseph Nke (https://github.com/jnke2016)
  - Rick Ratzel (https://github.com/rlratzel)

URL: #2295
  • Loading branch information
alexbarghi-nv authored Jun 1, 2022
1 parent 83f9f0c commit 861cef0
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 381 deletions.
28 changes: 0 additions & 28 deletions python/cugraph/cugraph/dask/traversal/mg_sssp.pxd

This file was deleted.

146 changes: 0 additions & 146 deletions python/cugraph/cugraph/dask/traversal/mg_sssp_wrapper.pyx

This file was deleted.

122 changes: 76 additions & 46 deletions python/cugraph/cugraph/dask/traversal/sssp.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,41 +16,67 @@
from collections.abc import Iterable

from dask.distributed import wait, default_client
from cugraph.dask.common.input_utils import (get_distributed_data,
get_vertex_partition_offsets)
from cugraph.dask.traversal import mg_sssp_wrapper as mg_sssp
from cugraph.dask.common.input_utils import get_distributed_data
import cugraph.dask.comms.comms as Comms
import cupy
import cudf
import dask_cudf


def call_sssp(sID,
data,
src_col_name,
dst_col_name,
num_verts,
num_edges,
vertex_partition_offsets,
aggregate_segment_offsets,
start):
wid = Comms.get_worker_id(sID)
handle = Comms.get_handle(sID)
local_size = len(aggregate_segment_offsets) // Comms.get_n_workers(sID)
segment_offsets = \
aggregate_segment_offsets[local_size * wid: local_size * (wid + 1)]
return mg_sssp.mg_sssp(data[0],
src_col_name,
dst_col_name,
num_verts,
num_edges,
vertex_partition_offsets,
wid,
handle,
segment_offsets,
start)


def sssp(input_graph, source):
from pylibcugraph import sssp as pylibcugraph_sssp
from pylibcugraph import (ResourceHandle,
GraphProperties,
MGGraph)


def _call_plc_sssp(
sID,
data,
src_col_name,
dst_col_name,
num_edges,
source,
cutoff,
compute_predecessors=True,
do_expensive_check=False):

comms_handle = Comms.get_handle(sID)
resource_handle = ResourceHandle(comms_handle.getHandle())

srcs = data[0][src_col_name]
dsts = data[0][dst_col_name]
weights = data[0]['value'] \
if 'value' in data[0].columns \
else cudf.Series((srcs + 1) / (srcs + 1), dtype='float32')
if weights.dtype not in ('float32', 'double'):
weights = weights.astype('double')

mg = MGGraph(
resource_handle=resource_handle,
graph_properties=GraphProperties(is_multigraph=False),
src_array=srcs,
dst_array=dsts,
weight_array=weights,
store_transposed=False,
num_edges=num_edges,
do_expensive_check=do_expensive_check
)

vertices, distances, predecessors = pylibcugraph_sssp(
resource_handle=resource_handle,
graph=mg,
source=source,
cutoff=cutoff,
compute_predecessors=compute_predecessors,
do_expensive_check=do_expensive_check
)

return cudf.DataFrame({
'distance': cudf.Series(distances),
'vertex': cudf.Series(vertices),
'predecessor': cudf.Series(predecessors),
})


def sssp(input_graph, source, cutoff=None):
"""
Compute the distance and predecessors for shortest paths from the specified
source to all the vertices in the input_graph. The distances column will
Expand All @@ -70,6 +96,9 @@ def sssp(input_graph, source):
source : Integer
Specify source vertex
cutoff : double, optional (default = None)
Maximum edge weight sum considered by the algorithm
Returns
-------
df : dask_cudf.DataFrame
Expand Down Expand Up @@ -104,8 +133,6 @@ def sssp(input_graph, source):

input_graph.compute_renumber_edge_list(transposed=False)
ddf = input_graph.edgelist.edgelist_df
vertex_partition_offsets = get_vertex_partition_offsets(input_graph)
num_verts = vertex_partition_offsets.iloc[-1]
num_edges = len(ddf)
data = get_distributed_data(ddf)

Expand Down Expand Up @@ -142,19 +169,22 @@ def sssp(input_graph, source):
src_col_name = input_graph.source_columns
dst_col_name = input_graph.destination_columns

if cutoff is None:
cutoff = cupy.inf

result = [client.submit(
call_sssp,
Comms.get_session_id(),
wf[1],
src_col_name,
dst_col_name,
num_verts,
num_edges,
vertex_partition_offsets,
input_graph.aggregate_segment_offsets,
source,
workers=[wf[0]])
for idx, wf in enumerate(data.worker_to_parts.items())]
_call_plc_sssp,
Comms.get_session_id(),
wf[1],
src_col_name,
dst_col_name,
num_edges,
source,
cutoff,
True,
False,
workers=[wf[0]])
for idx, wf in enumerate(data.worker_to_parts.items())]
wait(result)
ddf = dask_cudf.from_delayed(result)

Expand Down
29 changes: 0 additions & 29 deletions python/cugraph/cugraph/traversal/sssp.pxd

This file was deleted.

Loading

0 comments on commit 861cef0

Please sign in to comment.