Skip to content

Commit

Permalink
Merge 445d019 into e906c98
Browse files Browse the repository at this point in the history
  • Loading branch information
jnke2016 authored May 11, 2022
2 parents e906c98 + 445d019 commit 93c88e1
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 86 deletions.
15 changes: 12 additions & 3 deletions benchmarks/python_e2e/benchmark.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2021, NVIDIA CORPORATION.
# Copyright (c) 2021-2022, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand Down Expand Up @@ -88,9 +88,14 @@ def __init__(self,

#add starting node to algos: BFS and SSSP
for i, algo in enumerate (algo_func_param_list):
if benchmark(algo).name in ["bfs", "sssp"]:
if benchmark(algo).name in ["bfs", "sssp", "neighborhood_sampling"]:
param={}
param["start"]=self.input_dataframe['src'].head()[0]
if benchmark(algo).name in ["neighborhood_sampling"]:
start = [param.pop("start")]
labels = [0]
param["start_info_list"] = (start, labels)
param["fanout_vals"] = [1]
algo_func_param_list[i]=(algo,)+(param,)

self.algos = []
Expand Down Expand Up @@ -125,7 +130,8 @@ def run(self):
self.results.append(result)

#algos with transposed=True : PageRank, Katz
#algos with transposed=False: BFS, SSSP, Louvain
#algos with transposed=False: BFS, SSSP, Louvain, HITS, Neighborhood_sampling
#algos supporting the legacy_renum_only: HITS, Neighborhood_sampling
for i in range(len(self.algos)):
if self.algos[i][0].name in ["pagerank", "katz"]: #set transpose=True when renumbering
if self.algos[i][0].name == "katz" and self.construct_graph.name == "from_dask_cudf_edgelist":
Expand All @@ -141,6 +147,9 @@ def run(self):
self.algos[i][1]["alpha"] = katz_alpha
if hasattr(G, "compute_renumber_edge_list"):
G.compute_renumber_edge_list(transposed=True)
elif self.algos[i][0].name in ["neighborhood_sampling", "hits"]:
if hasattr(G, "compute_renumber_edge_list"):
G.compute_renumber_edge_list(transposed=False, legacy_renum_only=True)
else: #set transpose=False when renumbering
self.__log("running compute_renumber_edge_list...", end="")
if hasattr(G, "compute_renumber_edge_list"):
Expand Down
16 changes: 15 additions & 1 deletion benchmarks/python_e2e/cugraph_dask_funcs.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2021, NVIDIA CORPORATION.
# Copyright (c) 2021-2022, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand All @@ -18,6 +18,8 @@
from cugraph.structure.symmetrize import symmetrize_ddf
from cugraph.dask.common.mg_utils import get_visible_devices
from dask_cuda.initialize import initialize
from cugraph.experimental.dask import uniform_neighborhood_sampling
import cudf

import cugraph
from cugraph.comms import comms as Comms
Expand Down Expand Up @@ -151,6 +153,18 @@ def katz(G, alpha=None):
print(alpha)
return cugraph.dask.katz_centrality(G, alpha)

def hits(G):
return cugraph.dask.hits(G)

def neighborhood_sampling(G, start_info_list=None, fanout_vals=None):
# convert list to cudf.Series
start_info_list = (
cudf.Series(start_info_list[0], dtype="int32"),
cudf.Series(start_info_list[1], dtype="int32"),
)

return uniform_neighborhood_sampling(
G, start_info_list=start_info_list, fanout_vals=fanout_vals)

################################################################################
# Session-wide setup and teardown
Expand Down
4 changes: 3 additions & 1 deletion benchmarks/python_e2e/main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2021, NVIDIA CORPORATION.
# Copyright (c) 2021-2022, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand Down Expand Up @@ -87,6 +87,8 @@ def run(algos,
"wcc": funcs.wcc,
"katz": funcs.katz,
"wcc": funcs.wcc,
"hits": funcs.hits,
"neighborhood_sampling": funcs.neighborhood_sampling,
}

if algos:
Expand Down
10 changes: 7 additions & 3 deletions python/cugraph/cugraph/dask/link_analysis/hits.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,12 @@ def hits(input_graph, tol=1.0e-5, max_iter=100, nstart=None, normalized=True):

client = default_client()

# FIXME Still compute renumbering at this layer in case str
# vertex ID are passed
input_graph.compute_renumber_edge_list(transposed=False)
# FIXME: 'legacy_renum_only' will not trigger the C++ renumbering
# In the future, once all the algos follow the C/Pylibcugraph path,
# compute_renumber_edge_list will only be used for multicolumn and
# string vertices since the renumbering will be done in pylibcugraph
input_graph.compute_renumber_edge_list(
transposed=False, legacy_renum_only=True)
ddf = input_graph.edgelist.edgelist_df

graph_properties = GraphProperties(
Expand Down Expand Up @@ -205,6 +208,7 @@ def hits(input_graph, tol=1.0e-5, max_iter=100, nstart=None, normalized=True):
wait(cudf_result)

ddf = dask_cudf.from_delayed(cudf_result)

if input_graph.renumbered:
return input_graph.unrenumber(ddf, 'vertex')

Expand Down
14 changes: 10 additions & 4 deletions python/cugraph/cugraph/dask/sampling/neighborhood_sampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,12 @@ def EXPERIMENTAL__uniform_neighborhood(input_graph,
"""
# Initialize dask client
client = default_client()
# Important for handling renumbering
input_graph.compute_renumber_edge_list(transposed=False)
# FIXME: 'legacy_renum_only' will not trigger the C++ renumbering
# In the future, once all the algos follow the C/Pylibcugraph path,
# compute_renumber_edge_list will only be used for multicolumn and
# string vertices since the renumbering will be done in pylibcugraph
input_graph.compute_renumber_edge_list(
transposed=False, legacy_renum_only=True)

start_list, info_list = start_info_list

Expand Down Expand Up @@ -158,9 +162,11 @@ def EXPERIMENTAL__uniform_neighborhood(input_graph,
src_col_name = input_graph.renumber_map.renumbered_src_col_name
dst_col_name = input_graph.renumber_map.renumbered_dst_col_name

# start_list uses "external" vertex IDs, but since the graph has been
# start_list uses "external" vertex IDs, but if the graph has been
# renumbered, the start vertex IDs must also be renumbered.
start_list = input_graph.lookup_internal_vertex_id(start_list).compute()
if input_graph.renumbered:
start_list = input_graph.lookup_internal_vertex_id(
start_list).compute()
do_expensive_check = True

result = [client.submit(call_nbr_sampling,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,23 @@ def __from_edgelist(
#

# FIXME: Edge Attribute not handled
# FIXME: the parameter below is no longer used for unrenumbering
self.properties.renumbered = renumber
self.source_columns = source
self.destination_columns = destination

@property
def renumbered(self):
# This property is now used to determine if a dataframe was renumbered
# by checking the column name. Only the renumbered dataframes will have
# their column names renamed to 'renumbered_src' and 'renumbered_dst'
renumbered_vertex_col_names = ["renumbered_src", "renumbered_dst"]
if self.edgelist.edgelist_df is not None and not (
set(renumbered_vertex_col_names).issubset(
set(self.edgelist.edgelist_df.columns))):
return False
return True

def view_edge_list(self):
"""
Display the edge list. Compute it if needed.
Expand Down Expand Up @@ -464,7 +477,9 @@ def neighbors(self, n):
ddf = self.edgelist.edgelist_df
return ddf[ddf["src"] == n]["dst"].reset_index(drop=True)

def compute_renumber_edge_list(self, transposed=False):
def compute_renumber_edge_list(self,
transposed=False,
legacy_renum_only=False):
"""
Compute a renumbered edge list
This function works in the MNMG pipeline and will transform
Expand All @@ -486,13 +501,12 @@ def compute_renumber_edge_list(self, transposed=False):
If True, renumber with the intent to make a CSC-like
structure. If False, renumber with the intent to make
a CSR-like structure. Defaults to False.
"""
# FIXME: What to do about edge_attr???
# currently ignored for MNMG
# FIXME: this is confusing - in the code below,
# self.properties.renumbered needs to be interpreted as "needs to be
# renumbered", everywhere else it means "has been renumbered".
legacy_renum_only : (optional) bool
if True, The C++ renumbering will not be triggered.
This parameter is added for new algos following the
C/Pylibcugraph path
"""
if not self.properties.renumbered:
self.edgelist = self.EdgeList(self.input_df)
self.renumber_map = None
Expand All @@ -507,10 +521,13 @@ def compute_renumber_edge_list(self, transposed=False):
del self.edgelist

renumbered_ddf, number_map, aggregate_segment_offsets = \
NumberMap.renumber_and_segment(self.input_df,
self.source_columns,
self.destination_columns,
store_transposed=transposed)
NumberMap.renumber_and_segment(
self.input_df,
self.source_columns,
self.destination_columns,
store_transposed=transposed,
legacy_renum_only=legacy_renum_only)

self.edgelist = self.EdgeList(renumbered_ddf)
self.renumber_map = number_map
self.aggregate_segment_offsets = aggregate_segment_offsets
Expand Down
151 changes: 88 additions & 63 deletions python/cugraph/cugraph/structure/number_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,16 +499,27 @@ def from_internal_vertex_id(
@staticmethod
def renumber_and_segment(
df, src_col_names, dst_col_names, preserve_order=False,
store_transposed=False
store_transposed=False, legacy_renum_only=False
):
# FIXME: Drop the renumber_type 'experimental' once all the
# algos follow the C/Pylibcugraph path

# The renumber_type 'legacy' runs both the python and the
# C++ renumbering.
if isinstance(src_col_names, list):
renumber_type = 'legacy'
elif not (df[src_col_names].dtype == np.int32 or
df[src_col_names].dtype == np.int64):
renumber_type = 'legacy'
else:
# The renumber_type 'experimental' only runs the C++
# renumbering
renumber_type = 'experimental'

if legacy_renum_only and renumber_type == 'experimental':
# The original dataframe will be returned.
renumber_type = 'skip_renumbering'

renumber_map = NumberMap()
if not isinstance(src_col_names, list):
src_col_names = [src_col_names]
Expand Down Expand Up @@ -547,6 +558,12 @@ def renumber_and_segment(
df, renumber_map.renumbered_dst_col_name, dst_col_names,
drop=True, preserve_order=preserve_order
)
elif renumber_type == 'skip_renumbering':
# Update the renumbered source and destination column name
# with the original input's source and destination name
renumber_map.renumbered_src_col_name = src_col_names[0]
renumber_map.renumbered_dst_col_name = dst_col_names[0]

else:
df = df.rename(
columns={src_col_names[0]:
Expand All @@ -562,69 +579,77 @@ def renumber_and_segment(
is_mnmg = False

if is_mnmg:
client = default_client()
data = get_distributed_data(df)
result = [(client.submit(call_renumber,
Comms.get_session_id(),
wf[1],
renumber_map.renumbered_src_col_name,
renumber_map.renumbered_dst_col_name,
num_edges,
is_mnmg,
store_transposed,
workers=[wf[0]]), wf[0])
for idx, wf in enumerate(data.worker_to_parts.items())]
wait(result)

def get_renumber_map(id_type, data):
return data[0].astype(id_type)

def get_segment_offsets(data):
return data[1]

def get_renumbered_df(id_type, data):
data[2][renumber_map.renumbered_src_col_name] = \
data[2][renumber_map.renumbered_src_col_name]\
.astype(id_type)
data[2][renumber_map.renumbered_dst_col_name] = \
data[2][renumber_map.renumbered_dst_col_name]\
.astype(id_type)
return data[2]

renumbering_map = dask_cudf.from_delayed(
[client.submit(get_renumber_map,
id_type,
data,
workers=[wf])
for (data, wf) in result])

list_of_segment_offsets = client.gather(
[client.submit(get_segment_offsets,
data,
workers=[wf])
for (data, wf) in result])
aggregate_segment_offsets = []
for segment_offsets in list_of_segment_offsets:
aggregate_segment_offsets.extend(segment_offsets)

renumbered_df = dask_cudf.from_delayed(
[client.submit(get_renumbered_df,
id_type,
data,
workers=[wf])
for (data, wf) in result])
if renumber_type == 'legacy':
renumber_map.implementation.ddf = indirection_map.merge(
renumbering_map,
right_on='original_ids', left_on='global_id',
how='right').\
drop(columns=['global_id', 'original_ids'])\
.rename(columns={'new_ids': 'global_id'})
# Do not renumber the algos following the C/Pylibcugraph path
if renumber_type in ['legacy', 'experimental']:
client = default_client()
data = get_distributed_data(df)
result = [(client.submit(call_renumber,
Comms.get_session_id(),
wf[1],
renumber_map.renumbered_src_col_name,
renumber_map.renumbered_dst_col_name,
num_edges,
is_mnmg,
store_transposed,
workers=[wf[0]]), wf[0])
for idx, wf in enumerate(
data.worker_to_parts.items())]
wait(result)

def get_renumber_map(id_type, data):
return data[0].astype(id_type)

def get_segment_offsets(data):
return data[1]

def get_renumbered_df(id_type, data):
data[2][renumber_map.renumbered_src_col_name] = \
data[2][renumber_map.renumbered_src_col_name]\
.astype(id_type)
data[2][renumber_map.renumbered_dst_col_name] = \
data[2][renumber_map.renumbered_dst_col_name]\
.astype(id_type)
return data[2]

renumbering_map = dask_cudf.from_delayed(
[client.submit(get_renumber_map,
id_type,
data,
workers=[wf])
for (data, wf) in result])

list_of_segment_offsets = client.gather(
[client.submit(get_segment_offsets,
data,
workers=[wf])
for (data, wf) in result])
aggregate_segment_offsets = []
for segment_offsets in list_of_segment_offsets:
aggregate_segment_offsets.extend(segment_offsets)

renumbered_df = dask_cudf.from_delayed(
[client.submit(get_renumbered_df,
id_type,
data,
workers=[wf])
for (data, wf) in result])
if renumber_type == 'legacy':
renumber_map.implementation.ddf = indirection_map.merge(
renumbering_map,
right_on='original_ids', left_on='global_id',
how='right').\
drop(columns=['global_id', 'original_ids'])\
.rename(columns={'new_ids': 'global_id'})
else:
renumber_map.implementation.ddf = renumbering_map.rename(
columns={'original_ids': '0', 'new_ids': 'global_id'})
renumber_map.implementation.numbered = True
return renumbered_df, renumber_map, aggregate_segment_offsets

else:
renumber_map.implementation.ddf = renumbering_map.rename(
columns={'original_ids': '0', 'new_ids': 'global_id'})
renumber_map.implementation.numbered = True
return renumbered_df, renumber_map, aggregate_segment_offsets
# There is no aggregate_segment_offsets since the
# C++ renumbering is skipped
return df, renumber_map, None

else:
renumbering_map, segment_offsets, renumbered_df = \
Expand Down

0 comments on commit 93c88e1

Please sign in to comment.