Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate SG and MG BFS to pylibcugraph #2284

Merged
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
fb2d21c
Merge pull request #1 from rapidsai/branch-22.06
alexbarghi-nv May 16, 2022
cfdb13b
Merge branch 'rapidsai:branch-22.06' into branch-22.06
alexbarghi-nv May 17, 2022
29f6a8b
sg and mg bfs plc migration work
alexbarghi-nv May 17, 2022
3c00af1
remove cufile
alexbarghi-nv May 17, 2022
47388f8
Merge remote-tracking branch 'origin/branch-22.06' into branch-22.06-…
alexbarghi-nv May 17, 2022
a151b63
Merge pull request #2 from rapidsai/branch-22.06
alexbarghi-nv May 17, 2022
764782a
Merge remote-tracking branch 'origin/branch-22.06' into branch-22.06-…
alexbarghi-nv May 17, 2022
42c9e16
add bfs mg c api test, small fixes
alexbarghi-nv May 18, 2022
af0eb2a
remove testing files
alexbarghi-nv May 18, 2022
97b76e7
update mg bfs test
alexbarghi-nv May 19, 2022
3b9914f
Add Alex B's mg_bfs_test, add mg_sssp_test, fix support for MG in bfs…
ChuckHastings May 19, 2022
0dbe0b5
Merge pull request #3 from ChuckHastings/fix_add_shuffle_to_capi_algo…
alexbarghi-nv May 19, 2022
59f3760
add updates
alexbarghi-nv May 20, 2022
9d61c35
fixes
alexbarghi-nv May 20, 2022
969a56a
various fixes
alexbarghi-nv May 20, 2022
d2323ab
Remove test files
alexbarghi-nv May 20, 2022
e743538
add line at end of file
alexbarghi-nv May 20, 2022
a7eaefc
fix style
alexbarghi-nv May 20, 2022
3c08809
fix style
alexbarghi-nv May 20, 2022
0651f13
remove experimental warning, fix build issue
alexbarghi-nv May 20, 2022
9cac9ed
Delete cufile.log
alexbarghi-nv May 20, 2022
9f074a3
remove old wrappers
alexbarghi-nv May 20, 2022
c5fc19a
Merge branch 'branch-22.06-pylibcugraph-bfs' of https://github.com/al…
alexbarghi-nv May 20, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,4 @@ python/cugraph/cugraph/tests/dask-worker-space

# Sphinx docs & build artifacts
docs/cugraph/source/api_docs/api/*

9 changes: 9 additions & 0 deletions python/cufile.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
20-05-2022 17:09:25:265 [pid=71536 tid=71536] ERROR cufio-drv:632 nvidia-fs.ko driver not loaded
20-05-2022 17:09:25:461 [pid=71706 tid=72121] ERROR cufio-drv:632 nvidia-fs.ko driver not loaded
20-05-2022 17:09:25:462 [pid=71701 tid=72122] ERROR cufio-drv:632 nvidia-fs.ko driver not loaded
20-05-2022 17:09:25:464 [pid=71699 tid=72125] ERROR cufio-drv:632 nvidia-fs.ko driver not loaded
20-05-2022 17:09:25:464 [pid=71705 tid=72124] ERROR cufio-drv:632 nvidia-fs.ko driver not loaded
20-05-2022 17:09:25:464 [pid=71690 tid=72123] ERROR cufio-drv:632 nvidia-fs.ko driver not loaded
20-05-2022 17:09:25:464 [pid=71713 tid=72126] ERROR cufio-drv:632 nvidia-fs.ko driver not loaded
20-05-2022 17:09:25:464 [pid=71693 tid=72127] ERROR cufio-drv:632 nvidia-fs.ko driver not loaded
20-05-2022 17:09:25:465 [pid=71711 tid=72128] ERROR cufio-drv:632 nvidia-fs.ko driver not loaded
226 changes: 121 additions & 105 deletions python/cugraph/cugraph/dask/traversal/bfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,45 +13,75 @@
# limitations under the License.
#

from collections.abc import Iterable
from pylibcugraph.experimental import (MGGraph,
ResourceHandle,
GraphProperties,
)
from pylibcugraph import bfs as pylibcugraph_bfs

from dask.distributed import wait, default_client
from cugraph.dask.common.input_utils import (get_distributed_data,
get_vertex_partition_offsets)
from cugraph.dask.traversal import mg_bfs_wrapper as mg_bfs
from cugraph.dask.common.input_utils import get_distributed_data
import cugraph.dask.comms.comms as Comms
import cudf
import dask_cudf


def call_bfs(sID,
data,
src_col_name,
dst_col_name,
num_verts,
num_edges,
vertex_partition_offsets,
aggregate_segment_offsets,
start,
depth_limit,
return_distances):
wid = Comms.get_worker_id(sID)
handle = Comms.get_handle(sID)
local_size = len(aggregate_segment_offsets) // Comms.get_n_workers(sID)
segment_offsets = \
aggregate_segment_offsets[local_size * wid: local_size * (wid + 1)]
return mg_bfs.mg_bfs(data[0],
src_col_name,
dst_col_name,
num_verts,
num_edges,
vertex_partition_offsets,
wid,
handle,
segment_offsets,
start,
depth_limit,
return_distances)
def _call_plc_mg_bfs(
sID,
data,
sources,
depth_limit,
src_col_name,
dst_col_name,
graph_properties,
num_edges,
direction_optimizing=False,
do_expensive_check=False,
return_predecessors=True):
comms_handle = Comms.get_handle(sID)
resource_handle = ResourceHandle(comms_handle.getHandle())

srcs = cudf.Series(data[0][src_col_name], dtype='int32')
dsts = cudf.Series(data[0][dst_col_name], dtype='int32')
weights = cudf.Series(data[0]['value'], dtype='float32') \
if 'value' in data[0].columns \
else cudf.Series((srcs + 1) / (srcs + 1), dtype='float32')

mg = MGGraph(
resource_handle=resource_handle,
graph_properties=graph_properties,
src_array=srcs,
dst_array=dsts,
weight_array=weights,
store_transposed=False,
num_edges=num_edges,
do_expensive_check=do_expensive_check
)

res = \
pylibcugraph_bfs(
resource_handle,
mg,
cudf.Series(sources, dtype='int32'),
direction_optimizing,
depth_limit if depth_limit is not None else 0,
return_predecessors,
True
)

return res


def convert_to_cudf(cp_arrays):
"""
create a cudf DataFrame from cupy arrays
"""
cupy_distances, cupy_predecessors, cupy_vertices = cp_arrays
df = cudf.DataFrame()
df["vertex"] = cupy_vertices
df["distance"] = cupy_distances
df["predecessor"] = cupy_predecessors
return df


def bfs(input_graph,
Expand Down Expand Up @@ -115,95 +145,81 @@ def bfs(input_graph,

input_graph.compute_renumber_edge_list(transposed=False)
ddf = input_graph.edgelist.edgelist_df
vertex_partition_offsets = get_vertex_partition_offsets(input_graph)
num_verts = vertex_partition_offsets.iloc[-1]

graph_properties = GraphProperties(
is_multigraph=False)

num_edges = len(ddf)
data = get_distributed_data(ddf)

src_col_name = input_graph.renumber_map.renumbered_src_col_name
dst_col_name = input_graph.renumber_map.renumbered_dst_col_name
renumber_ddf = input_graph.renumber_map.implementation.ddf
col_names = input_graph.renumber_map.implementation.col_names

if isinstance(start, dask_cudf.DataFrame) \
or isinstance(start, cudf.DataFrame):
tmp_df = start
tmp_col_names = start.columns
else:
tmp_df = cudf.DataFrame()
tmp_df["0"] = cudf.Series(start)
tmp_col_names = ["0"]

original_start_len = len(tmp_df)

tmp_ddf = tmp_df[tmp_col_names].rename(
columns=dict(zip(tmp_col_names, col_names)))
for name in col_names:
tmp_ddf[name] = tmp_ddf[name].astype(renumber_ddf[name].dtype)
renumber_data = get_distributed_data(renumber_ddf)

def df_merge(df, tmp_df, tmp_col_names):
x = df[0].merge(tmp_df, on=tmp_col_names, how='inner')
return x['global_id']

if input_graph.renumbered:
src_col_name = input_graph.renumber_map.renumbered_src_col_name
dst_col_name = input_graph.renumber_map.renumbered_dst_col_name
renumber_ddf = input_graph.renumber_map.implementation.ddf
col_names = input_graph.renumber_map.implementation.col_names
if isinstance(start,
dask_cudf.DataFrame) or isinstance(start,
cudf.DataFrame):
tmp_df = start
tmp_col_names = start.columns
else:
tmp_df = cudf.DataFrame()
tmp_df["0"] = cudf.Series(start)
tmp_col_names = ["0"]

original_start_len = len(tmp_df)

tmp_ddf = tmp_df[tmp_col_names].rename(
columns=dict(zip(tmp_col_names, col_names)))
for name in col_names:
tmp_ddf[name] = tmp_ddf[name].astype(renumber_ddf[name].dtype)
renumber_data = get_distributed_data(renumber_ddf)
start = [client.submit(df_merge,
wf[1],
tmp_ddf,
col_names,
workers=[wf[0]])
for idx, wf in enumerate(renumber_data.worker_to_parts.items()
)
]

def count_src(df):
return len(df)

count_src_results = client.map(count_src, start)
cg = client.gather(count_src_results)
if sum(cg) < original_start_len:
raise ValueError('At least one start vertex provided was invalid')

else:
# If the input graph was created with renumbering disabled (Graph(...,
# renumber=False), the above compute_renumber_edge_list() call will not
# perform a renumber step and the renumber_map will not have src/dst
# col names. In that case, the src/dst values specified when reading
# the edgelist dataframe are to be used, but only if they were single
# string values (ie. not a list representing multi-columns).
if isinstance(input_graph.source_columns, Iterable):
raise RuntimeError("input_graph was not renumbered but has a "
"non-string source column name (got: "
f"{input_graph.source_columns}). Re-create "
"input_graph with either renumbering enabled "
"or a source column specified as a string.")
if isinstance(input_graph.destination_columns, Iterable):
raise RuntimeError("input_graph was not renumbered but has a "
"non-string destination column name (got: "
f"{input_graph.destination_columns}). "
"Re-create input_graph with either renumbering "
"enabled or a destination column specified as "
"a string.")
src_col_name = input_graph.source_columns
dst_col_name = input_graph.destination_columns

result = [client.submit(
call_bfs,
start = [
client.submit(
df_merge,
wf[1],
tmp_ddf,
col_names,
workers=[wf[0]]
)
for idx, wf in enumerate(renumber_data.worker_to_parts.items())
]

def count_src(df):
return len(df)

count_src_results = client.map(count_src, start)
cg = client.gather(count_src_results)
if sum(cg) < original_start_len:
raise ValueError('At least one start vertex provided was invalid')

cupy_result = [client.submit(
_call_plc_mg_bfs,
Comms.get_session_id(),
wf[1],
start[idx],
depth_limit,
src_col_name,
dst_col_name,
num_verts,
graph_properties,
num_edges,
vertex_partition_offsets,
input_graph.aggregate_segment_offsets,
start[idx],
depth_limit,
False,
True,
return_distances,
workers=[wf[0]])
for idx, wf in enumerate(data.worker_to_parts.items())]
wait(result)
ddf = dask_cudf.from_delayed(result)
wait(cupy_result)

cudf_result = [client.submit(convert_to_cudf,
cp_arrays)
for cp_arrays in cupy_result]
wait(cudf_result)

ddf = dask_cudf.from_delayed(cudf_result)

if input_graph.renumbered:
ddf = input_graph.unrenumber(ddf, 'vertex')
Expand Down
35 changes: 0 additions & 35 deletions python/cugraph/cugraph/traversal/bfs.pxd

This file was deleted.

Loading