Skip to content

Commit

Permalink
Add MG Hits and MG Neighborhood_sampling to benchmarks (#2254)
Browse files Browse the repository at this point in the history
This PR 

1. Adds MG Hits and MG Neighborhood sampling to our list of algos to benchmarks in the nightlies
2. Drops the legacy renumbering not needed in the `C/Pylibcugraph` path

Authors:
  - Joseph Nke (https://github.com/jnke2016)

Approvers:
  - Rick Ratzel (https://github.com/rlratzel)

URL: #2254
  • Loading branch information
jnke2016 authored May 17, 2022
1 parent 1489712 commit d9ec8f7
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 86 deletions.
15 changes: 12 additions & 3 deletions benchmarks/python_e2e/benchmark.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2021, NVIDIA CORPORATION.
# Copyright (c) 2021-2022, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand Down Expand Up @@ -88,9 +88,14 @@ def __init__(self,

#add starting node to algos: BFS and SSSP
for i, algo in enumerate (algo_func_param_list):
if benchmark(algo).name in ["bfs", "sssp"]:
if benchmark(algo).name in ["bfs", "sssp", "neighborhood_sampling"]:
param={}
param["start"]=self.input_dataframe['src'].head()[0]
if benchmark(algo).name in ["neighborhood_sampling"]:
start = [param.pop("start")]
labels = [0]
param["start_info_list"] = (start, labels)
param["fanout_vals"] = [1]
algo_func_param_list[i]=(algo,)+(param,)

self.algos = []
Expand Down Expand Up @@ -125,7 +130,8 @@ def run(self):
self.results.append(result)

#algos with transposed=True : PageRank, Katz
#algos with transposed=False: BFS, SSSP, Louvain
#algos with transposed=False: BFS, SSSP, Louvain, HITS, Neighborhood_sampling
#algos supporting the legacy_renum_only: HITS, Neighborhood_sampling
for i in range(len(self.algos)):
if self.algos[i][0].name in ["pagerank", "katz"]: #set transpose=True when renumbering
if self.algos[i][0].name == "katz" and self.construct_graph.name == "from_dask_cudf_edgelist":
Expand All @@ -141,6 +147,9 @@ def run(self):
self.algos[i][1]["alpha"] = katz_alpha
if hasattr(G, "compute_renumber_edge_list"):
G.compute_renumber_edge_list(transposed=True)
elif self.algos[i][0].name in ["neighborhood_sampling", "hits"]:
if hasattr(G, "compute_renumber_edge_list"):
G.compute_renumber_edge_list(transposed=False, legacy_renum_only=True)
else: #set transpose=False when renumbering
self.__log("running compute_renumber_edge_list...", end="")
if hasattr(G, "compute_renumber_edge_list"):
Expand Down
16 changes: 15 additions & 1 deletion benchmarks/python_e2e/cugraph_dask_funcs.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2021, NVIDIA CORPORATION.
# Copyright (c) 2021-2022, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand All @@ -18,6 +18,8 @@
from cugraph.structure.symmetrize import symmetrize_ddf
from cugraph.dask.common.mg_utils import get_visible_devices
from dask_cuda.initialize import initialize
from cugraph.experimental.dask import uniform_neighborhood_sampling
import cudf

import cugraph
from cugraph.comms import comms as Comms
Expand Down Expand Up @@ -151,6 +153,18 @@ def katz(G, alpha=None):
print(alpha)
return cugraph.dask.katz_centrality(G, alpha)

def hits(G):
return cugraph.dask.hits(G)

def neighborhood_sampling(G, start_info_list=None, fanout_vals=None):
# convert list to cudf.Series
start_info_list = (
cudf.Series(start_info_list[0], dtype="int32"),
cudf.Series(start_info_list[1], dtype="int32"),
)

return uniform_neighborhood_sampling(
G, start_info_list=start_info_list, fanout_vals=fanout_vals)

################################################################################
# Session-wide setup and teardown
Expand Down
4 changes: 3 additions & 1 deletion benchmarks/python_e2e/main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2021, NVIDIA CORPORATION.
# Copyright (c) 2021-2022, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand Down Expand Up @@ -87,6 +87,8 @@ def run(algos,
"wcc": funcs.wcc,
"katz": funcs.katz,
"wcc": funcs.wcc,
"hits": funcs.hits,
"neighborhood_sampling": funcs.neighborhood_sampling,
}

if algos:
Expand Down
10 changes: 7 additions & 3 deletions python/cugraph/cugraph/dask/link_analysis/hits.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,12 @@ def hits(input_graph, tol=1.0e-5, max_iter=100, nstart=None, normalized=True):

client = default_client()

# FIXME Still compute renumbering at this layer in case str
# vertex ID are passed
input_graph.compute_renumber_edge_list(transposed=False)
# FIXME: 'legacy_renum_only' will not trigger the C++ renumbering
# In the future, once all the algos follow the C/Pylibcugraph path,
# compute_renumber_edge_list will only be used for multicolumn and
# string vertices since the renumbering will be done in pylibcugraph
input_graph.compute_renumber_edge_list(
transposed=False, legacy_renum_only=True)
ddf = input_graph.edgelist.edgelist_df

graph_properties = GraphProperties(
Expand Down Expand Up @@ -205,6 +208,7 @@ def hits(input_graph, tol=1.0e-5, max_iter=100, nstart=None, normalized=True):
wait(cudf_result)

ddf = dask_cudf.from_delayed(cudf_result)

if input_graph.renumbered:
return input_graph.unrenumber(ddf, 'vertex')

Expand Down
14 changes: 10 additions & 4 deletions python/cugraph/cugraph/dask/sampling/neighborhood_sampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,12 @@ def EXPERIMENTAL__uniform_neighborhood(input_graph,
"""
# Initialize dask client
client = default_client()
# Important for handling renumbering
input_graph.compute_renumber_edge_list(transposed=False)
# FIXME: 'legacy_renum_only' will not trigger the C++ renumbering
# In the future, once all the algos follow the C/Pylibcugraph path,
# compute_renumber_edge_list will only be used for multicolumn and
# string vertices since the renumbering will be done in pylibcugraph
input_graph.compute_renumber_edge_list(
transposed=False, legacy_renum_only=True)

start_list, info_list = start_info_list

Expand Down Expand Up @@ -158,9 +162,11 @@ def EXPERIMENTAL__uniform_neighborhood(input_graph,
src_col_name = input_graph.renumber_map.renumbered_src_col_name
dst_col_name = input_graph.renumber_map.renumbered_dst_col_name

# start_list uses "external" vertex IDs, but since the graph has been
# start_list uses "external" vertex IDs, but if the graph has been
# renumbered, the start vertex IDs must also be renumbered.
start_list = input_graph.lookup_internal_vertex_id(start_list).compute()
if input_graph.renumbered:
start_list = input_graph.lookup_internal_vertex_id(
start_list).compute()
do_expensive_check = True

result = [client.submit(call_nbr_sampling,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,23 @@ def __from_edgelist(
#

# FIXME: Edge Attribute not handled
# FIXME: the parameter below is no longer used for unrenumbering
self.properties.renumbered = renumber
self.source_columns = source
self.destination_columns = destination

@property
def renumbered(self):
# This property is now used to determine if a dataframe was renumbered
# by checking the column name. Only the renumbered dataframes will have
# their column names renamed to 'renumbered_src' and 'renumbered_dst'
renumbered_vertex_col_names = ["renumbered_src", "renumbered_dst"]
if self.edgelist.edgelist_df is not None and not (
set(renumbered_vertex_col_names).issubset(
set(self.edgelist.edgelist_df.columns))):
return False
return True

def view_edge_list(self):
"""
Display the edge list. Compute it if needed.
Expand Down Expand Up @@ -464,7 +477,9 @@ def neighbors(self, n):
ddf = self.edgelist.edgelist_df
return ddf[ddf["src"] == n]["dst"].reset_index(drop=True)

def compute_renumber_edge_list(self, transposed=False):
def compute_renumber_edge_list(self,
transposed=False,
legacy_renum_only=False):
"""
Compute a renumbered edge list
This function works in the MNMG pipeline and will transform
Expand All @@ -486,13 +501,12 @@ def compute_renumber_edge_list(self, transposed=False):
If True, renumber with the intent to make a CSC-like
structure. If False, renumber with the intent to make
a CSR-like structure. Defaults to False.
"""
# FIXME: What to do about edge_attr???
# currently ignored for MNMG
# FIXME: this is confusing - in the code below,
# self.properties.renumbered needs to be interpreted as "needs to be
# renumbered", everywhere else it means "has been renumbered".
legacy_renum_only : (optional) bool
if True, The C++ renumbering will not be triggered.
This parameter is added for new algos following the
C/Pylibcugraph path
"""
if not self.properties.renumbered:
self.edgelist = self.EdgeList(self.input_df)
self.renumber_map = None
Expand All @@ -507,10 +521,13 @@ def compute_renumber_edge_list(self, transposed=False):
del self.edgelist

renumbered_ddf, number_map, aggregate_segment_offsets = \
NumberMap.renumber_and_segment(self.input_df,
self.source_columns,
self.destination_columns,
store_transposed=transposed)
NumberMap.renumber_and_segment(
self.input_df,
self.source_columns,
self.destination_columns,
store_transposed=transposed,
legacy_renum_only=legacy_renum_only)

self.edgelist = self.EdgeList(renumbered_ddf)
self.renumber_map = number_map
self.aggregate_segment_offsets = aggregate_segment_offsets
Expand Down
151 changes: 88 additions & 63 deletions python/cugraph/cugraph/structure/number_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,16 +499,27 @@ def from_internal_vertex_id(
@staticmethod
def renumber_and_segment(
df, src_col_names, dst_col_names, preserve_order=False,
store_transposed=False
store_transposed=False, legacy_renum_only=False
):
# FIXME: Drop the renumber_type 'experimental' once all the
# algos follow the C/Pylibcugraph path

# The renumber_type 'legacy' runs both the python and the
# C++ renumbering.
if isinstance(src_col_names, list):
renumber_type = 'legacy'
elif not (df[src_col_names].dtype == np.int32 or
df[src_col_names].dtype == np.int64):
renumber_type = 'legacy'
else:
# The renumber_type 'experimental' only runs the C++
# renumbering
renumber_type = 'experimental'

if legacy_renum_only and renumber_type == 'experimental':
# The original dataframe will be returned.
renumber_type = 'skip_renumbering'

renumber_map = NumberMap()
if not isinstance(src_col_names, list):
src_col_names = [src_col_names]
Expand Down Expand Up @@ -547,6 +558,12 @@ def renumber_and_segment(
df, renumber_map.renumbered_dst_col_name, dst_col_names,
drop=True, preserve_order=preserve_order
)
elif renumber_type == 'skip_renumbering':
# Update the renumbered source and destination column name
# with the original input's source and destination name
renumber_map.renumbered_src_col_name = src_col_names[0]
renumber_map.renumbered_dst_col_name = dst_col_names[0]

else:
df = df.rename(
columns={src_col_names[0]:
Expand All @@ -562,69 +579,77 @@ def renumber_and_segment(
is_mnmg = False

if is_mnmg:
client = default_client()
data = get_distributed_data(df)
result = [(client.submit(call_renumber,
Comms.get_session_id(),
wf[1],
renumber_map.renumbered_src_col_name,
renumber_map.renumbered_dst_col_name,
num_edges,
is_mnmg,
store_transposed,
workers=[wf[0]]), wf[0])
for idx, wf in enumerate(data.worker_to_parts.items())]
wait(result)

def get_renumber_map(id_type, data):
return data[0].astype(id_type)

def get_segment_offsets(data):
return data[1]

def get_renumbered_df(id_type, data):
data[2][renumber_map.renumbered_src_col_name] = \
data[2][renumber_map.renumbered_src_col_name]\
.astype(id_type)
data[2][renumber_map.renumbered_dst_col_name] = \
data[2][renumber_map.renumbered_dst_col_name]\
.astype(id_type)
return data[2]

renumbering_map = dask_cudf.from_delayed(
[client.submit(get_renumber_map,
id_type,
data,
workers=[wf])
for (data, wf) in result])

list_of_segment_offsets = client.gather(
[client.submit(get_segment_offsets,
data,
workers=[wf])
for (data, wf) in result])
aggregate_segment_offsets = []
for segment_offsets in list_of_segment_offsets:
aggregate_segment_offsets.extend(segment_offsets)

renumbered_df = dask_cudf.from_delayed(
[client.submit(get_renumbered_df,
id_type,
data,
workers=[wf])
for (data, wf) in result])
if renumber_type == 'legacy':
renumber_map.implementation.ddf = indirection_map.merge(
renumbering_map,
right_on='original_ids', left_on='global_id',
how='right').\
drop(columns=['global_id', 'original_ids'])\
.rename(columns={'new_ids': 'global_id'})
# Do not renumber the algos following the C/Pylibcugraph path
if renumber_type in ['legacy', 'experimental']:
client = default_client()
data = get_distributed_data(df)
result = [(client.submit(call_renumber,
Comms.get_session_id(),
wf[1],
renumber_map.renumbered_src_col_name,
renumber_map.renumbered_dst_col_name,
num_edges,
is_mnmg,
store_transposed,
workers=[wf[0]]), wf[0])
for idx, wf in enumerate(
data.worker_to_parts.items())]
wait(result)

def get_renumber_map(id_type, data):
return data[0].astype(id_type)

def get_segment_offsets(data):
return data[1]

def get_renumbered_df(id_type, data):
data[2][renumber_map.renumbered_src_col_name] = \
data[2][renumber_map.renumbered_src_col_name]\
.astype(id_type)
data[2][renumber_map.renumbered_dst_col_name] = \
data[2][renumber_map.renumbered_dst_col_name]\
.astype(id_type)
return data[2]

renumbering_map = dask_cudf.from_delayed(
[client.submit(get_renumber_map,
id_type,
data,
workers=[wf])
for (data, wf) in result])

list_of_segment_offsets = client.gather(
[client.submit(get_segment_offsets,
data,
workers=[wf])
for (data, wf) in result])
aggregate_segment_offsets = []
for segment_offsets in list_of_segment_offsets:
aggregate_segment_offsets.extend(segment_offsets)

renumbered_df = dask_cudf.from_delayed(
[client.submit(get_renumbered_df,
id_type,
data,
workers=[wf])
for (data, wf) in result])
if renumber_type == 'legacy':
renumber_map.implementation.ddf = indirection_map.merge(
renumbering_map,
right_on='original_ids', left_on='global_id',
how='right').\
drop(columns=['global_id', 'original_ids'])\
.rename(columns={'new_ids': 'global_id'})
else:
renumber_map.implementation.ddf = renumbering_map.rename(
columns={'original_ids': '0', 'new_ids': 'global_id'})
renumber_map.implementation.numbered = True
return renumbered_df, renumber_map, aggregate_segment_offsets

else:
renumber_map.implementation.ddf = renumbering_map.rename(
columns={'original_ids': '0', 'new_ids': 'global_id'})
renumber_map.implementation.numbered = True
return renumbered_df, renumber_map, aggregate_segment_offsets
# There is no aggregate_segment_offsets since the
# C++ renumbering is skipped
return df, renumber_map, None

else:
renumbering_map, segment_offsets, renumbered_df = \
Expand Down

0 comments on commit d9ec8f7

Please sign in to comment.