diff --git a/benchmarks/python_e2e/benchmark.py b/benchmarks/python_e2e/benchmark.py index 080260679c6..0ce7597aa8d 100644 --- a/benchmarks/python_e2e/benchmark.py +++ b/benchmarks/python_e2e/benchmark.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021, NVIDIA CORPORATION. +# Copyright (c) 2021-2022, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -88,9 +88,14 @@ def __init__(self, #add starting node to algos: BFS and SSSP for i, algo in enumerate (algo_func_param_list): - if benchmark(algo).name in ["bfs", "sssp"]: + if benchmark(algo).name in ["bfs", "sssp", "neighborhood_sampling"]: param={} param["start"]=self.input_dataframe['src'].head()[0] + if benchmark(algo).name in ["neighborhood_sampling"]: + start = [param.pop("start")] + labels = [0] + param["start_info_list"] = (start, labels) + param["fanout_vals"] = [1] algo_func_param_list[i]=(algo,)+(param,) self.algos = [] @@ -125,7 +130,8 @@ def run(self): self.results.append(result) #algos with transposed=True : PageRank, Katz - #algos with transposed=False: BFS, SSSP, Louvain + #algos with transposed=False: BFS, SSSP, Louvain, HITS, Neighborhood_sampling + #algos supporting the legacy_renum_only: HITS, Neighborhood_sampling for i in range(len(self.algos)): if self.algos[i][0].name in ["pagerank", "katz"]: #set transpose=True when renumbering if self.algos[i][0].name == "katz" and self.construct_graph.name == "from_dask_cudf_edgelist": @@ -141,6 +147,9 @@ def run(self): self.algos[i][1]["alpha"] = katz_alpha if hasattr(G, "compute_renumber_edge_list"): G.compute_renumber_edge_list(transposed=True) + elif self.algos[i][0].name in ["neighborhood_sampling", "hits"]: + if hasattr(G, "compute_renumber_edge_list"): + G.compute_renumber_edge_list(transposed=False, legacy_renum_only=True) else: #set transpose=False when renumbering self.__log("running compute_renumber_edge_list...", end="") if hasattr(G, "compute_renumber_edge_list"): diff --git a/benchmarks/python_e2e/cugraph_dask_funcs.py b/benchmarks/python_e2e/cugraph_dask_funcs.py index 9ff22a9f248..4497034e998 100644 --- a/benchmarks/python_e2e/cugraph_dask_funcs.py +++ b/benchmarks/python_e2e/cugraph_dask_funcs.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021, NVIDIA CORPORATION. +# Copyright (c) 2021-2022, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -18,6 +18,8 @@ from cugraph.structure.symmetrize import symmetrize_ddf from cugraph.dask.common.mg_utils import get_visible_devices from dask_cuda.initialize import initialize +from cugraph.experimental.dask import uniform_neighborhood_sampling +import cudf import cugraph from cugraph.comms import comms as Comms @@ -151,6 +153,18 @@ def katz(G, alpha=None): print(alpha) return cugraph.dask.katz_centrality(G, alpha) +def hits(G): + return cugraph.dask.hits(G) + +def neighborhood_sampling(G, start_info_list=None, fanout_vals=None): + # convert list to cudf.Series + start_info_list = ( + cudf.Series(start_info_list[0], dtype="int32"), + cudf.Series(start_info_list[1], dtype="int32"), + ) + + return uniform_neighborhood_sampling( + G, start_info_list=start_info_list, fanout_vals=fanout_vals) ################################################################################ # Session-wide setup and teardown diff --git a/benchmarks/python_e2e/main.py b/benchmarks/python_e2e/main.py index 0c49374fe52..bae0ce39b23 100644 --- a/benchmarks/python_e2e/main.py +++ b/benchmarks/python_e2e/main.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021, NVIDIA CORPORATION. +# Copyright (c) 2021-2022, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -87,6 +87,8 @@ def run(algos, "wcc": funcs.wcc, "katz": funcs.katz, "wcc": funcs.wcc, + "hits": funcs.hits, + "neighborhood_sampling": funcs.neighborhood_sampling, } if algos: diff --git a/python/cugraph/cugraph/dask/link_analysis/hits.py b/python/cugraph/cugraph/dask/link_analysis/hits.py index 14194ac2851..ffb75269ab3 100644 --- a/python/cugraph/cugraph/dask/link_analysis/hits.py +++ b/python/cugraph/cugraph/dask/link_analysis/hits.py @@ -154,9 +154,12 @@ def hits(input_graph, tol=1.0e-5, max_iter=100, nstart=None, normalized=True): client = default_client() - # FIXME Still compute renumbering at this layer in case str - # vertex ID are passed - input_graph.compute_renumber_edge_list(transposed=False) + # FIXME: 'legacy_renum_only' will not trigger the C++ renumbering + # In the future, once all the algos follow the C/Pylibcugraph path, + # compute_renumber_edge_list will only be used for multicolumn and + # string vertices since the renumbering will be done in pylibcugraph + input_graph.compute_renumber_edge_list( + transposed=False, legacy_renum_only=True) ddf = input_graph.edgelist.edgelist_df graph_properties = GraphProperties( @@ -205,6 +208,7 @@ def hits(input_graph, tol=1.0e-5, max_iter=100, nstart=None, normalized=True): wait(cudf_result) ddf = dask_cudf.from_delayed(cudf_result) + if input_graph.renumbered: return input_graph.unrenumber(ddf, 'vertex') diff --git a/python/cugraph/cugraph/dask/sampling/neighborhood_sampling.py b/python/cugraph/cugraph/dask/sampling/neighborhood_sampling.py index 1e62659d22f..20bd6571c14 100644 --- a/python/cugraph/cugraph/dask/sampling/neighborhood_sampling.py +++ b/python/cugraph/cugraph/dask/sampling/neighborhood_sampling.py @@ -128,8 +128,12 @@ def EXPERIMENTAL__uniform_neighborhood(input_graph, """ # Initialize dask client client = default_client() - # Important for handling renumbering - input_graph.compute_renumber_edge_list(transposed=False) + # FIXME: 'legacy_renum_only' will not trigger the C++ renumbering + # In the future, once all the algos follow the C/Pylibcugraph path, + # compute_renumber_edge_list will only be used for multicolumn and + # string vertices since the renumbering will be done in pylibcugraph + input_graph.compute_renumber_edge_list( + transposed=False, legacy_renum_only=True) start_list, info_list = start_info_list @@ -158,9 +162,11 @@ def EXPERIMENTAL__uniform_neighborhood(input_graph, src_col_name = input_graph.renumber_map.renumbered_src_col_name dst_col_name = input_graph.renumber_map.renumbered_dst_col_name - # start_list uses "external" vertex IDs, but since the graph has been + # start_list uses "external" vertex IDs, but if the graph has been # renumbered, the start vertex IDs must also be renumbered. - start_list = input_graph.lookup_internal_vertex_id(start_list).compute() + if input_graph.renumbered: + start_list = input_graph.lookup_internal_vertex_id( + start_list).compute() do_expensive_check = True result = [client.submit(call_nbr_sampling, diff --git a/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py b/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py index e09094bd638..afed5ad97f8 100644 --- a/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py +++ b/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py @@ -124,10 +124,23 @@ def __from_edgelist( # # FIXME: Edge Attribute not handled + # FIXME: the parameter below is no longer used for unrenumbering self.properties.renumbered = renumber self.source_columns = source self.destination_columns = destination + @property + def renumbered(self): + # This property is now used to determine if a dataframe was renumbered + # by checking the column name. Only the renumbered dataframes will have + # their column names renamed to 'renumbered_src' and 'renumbered_dst' + renumbered_vertex_col_names = ["renumbered_src", "renumbered_dst"] + if self.edgelist.edgelist_df is not None and not ( + set(renumbered_vertex_col_names).issubset( + set(self.edgelist.edgelist_df.columns))): + return False + return True + def view_edge_list(self): """ Display the edge list. Compute it if needed. @@ -464,7 +477,9 @@ def neighbors(self, n): ddf = self.edgelist.edgelist_df return ddf[ddf["src"] == n]["dst"].reset_index(drop=True) - def compute_renumber_edge_list(self, transposed=False): + def compute_renumber_edge_list(self, + transposed=False, + legacy_renum_only=False): """ Compute a renumbered edge list This function works in the MNMG pipeline and will transform @@ -486,13 +501,12 @@ def compute_renumber_edge_list(self, transposed=False): If True, renumber with the intent to make a CSC-like structure. If False, renumber with the intent to make a CSR-like structure. Defaults to False. - """ - # FIXME: What to do about edge_attr??? - # currently ignored for MNMG - # FIXME: this is confusing - in the code below, - # self.properties.renumbered needs to be interpreted as "needs to be - # renumbered", everywhere else it means "has been renumbered". + legacy_renum_only : (optional) bool + if True, The C++ renumbering will not be triggered. + This parameter is added for new algos following the + C/Pylibcugraph path + """ if not self.properties.renumbered: self.edgelist = self.EdgeList(self.input_df) self.renumber_map = None @@ -507,10 +521,13 @@ def compute_renumber_edge_list(self, transposed=False): del self.edgelist renumbered_ddf, number_map, aggregate_segment_offsets = \ - NumberMap.renumber_and_segment(self.input_df, - self.source_columns, - self.destination_columns, - store_transposed=transposed) + NumberMap.renumber_and_segment( + self.input_df, + self.source_columns, + self.destination_columns, + store_transposed=transposed, + legacy_renum_only=legacy_renum_only) + self.edgelist = self.EdgeList(renumbered_ddf) self.renumber_map = number_map self.aggregate_segment_offsets = aggregate_segment_offsets diff --git a/python/cugraph/cugraph/structure/number_map.py b/python/cugraph/cugraph/structure/number_map.py index af1fabeaafd..10de74cd744 100644 --- a/python/cugraph/cugraph/structure/number_map.py +++ b/python/cugraph/cugraph/structure/number_map.py @@ -499,16 +499,27 @@ def from_internal_vertex_id( @staticmethod def renumber_and_segment( df, src_col_names, dst_col_names, preserve_order=False, - store_transposed=False + store_transposed=False, legacy_renum_only=False ): + # FIXME: Drop the renumber_type 'experimental' once all the + # algos follow the C/Pylibcugraph path + + # The renumber_type 'legacy' runs both the python and the + # C++ renumbering. if isinstance(src_col_names, list): renumber_type = 'legacy' elif not (df[src_col_names].dtype == np.int32 or df[src_col_names].dtype == np.int64): renumber_type = 'legacy' else: + # The renumber_type 'experimental' only runs the C++ + # renumbering renumber_type = 'experimental' + if legacy_renum_only and renumber_type == 'experimental': + # The original dataframe will be returned. + renumber_type = 'skip_renumbering' + renumber_map = NumberMap() if not isinstance(src_col_names, list): src_col_names = [src_col_names] @@ -547,6 +558,12 @@ def renumber_and_segment( df, renumber_map.renumbered_dst_col_name, dst_col_names, drop=True, preserve_order=preserve_order ) + elif renumber_type == 'skip_renumbering': + # Update the renumbered source and destination column name + # with the original input's source and destination name + renumber_map.renumbered_src_col_name = src_col_names[0] + renumber_map.renumbered_dst_col_name = dst_col_names[0] + else: df = df.rename( columns={src_col_names[0]: @@ -562,69 +579,77 @@ def renumber_and_segment( is_mnmg = False if is_mnmg: - client = default_client() - data = get_distributed_data(df) - result = [(client.submit(call_renumber, - Comms.get_session_id(), - wf[1], - renumber_map.renumbered_src_col_name, - renumber_map.renumbered_dst_col_name, - num_edges, - is_mnmg, - store_transposed, - workers=[wf[0]]), wf[0]) - for idx, wf in enumerate(data.worker_to_parts.items())] - wait(result) - - def get_renumber_map(id_type, data): - return data[0].astype(id_type) - - def get_segment_offsets(data): - return data[1] - - def get_renumbered_df(id_type, data): - data[2][renumber_map.renumbered_src_col_name] = \ - data[2][renumber_map.renumbered_src_col_name]\ - .astype(id_type) - data[2][renumber_map.renumbered_dst_col_name] = \ - data[2][renumber_map.renumbered_dst_col_name]\ - .astype(id_type) - return data[2] - - renumbering_map = dask_cudf.from_delayed( - [client.submit(get_renumber_map, - id_type, - data, - workers=[wf]) - for (data, wf) in result]) - - list_of_segment_offsets = client.gather( - [client.submit(get_segment_offsets, - data, - workers=[wf]) - for (data, wf) in result]) - aggregate_segment_offsets = [] - for segment_offsets in list_of_segment_offsets: - aggregate_segment_offsets.extend(segment_offsets) - - renumbered_df = dask_cudf.from_delayed( - [client.submit(get_renumbered_df, - id_type, - data, - workers=[wf]) - for (data, wf) in result]) - if renumber_type == 'legacy': - renumber_map.implementation.ddf = indirection_map.merge( - renumbering_map, - right_on='original_ids', left_on='global_id', - how='right').\ - drop(columns=['global_id', 'original_ids'])\ - .rename(columns={'new_ids': 'global_id'}) + # Do not renumber the algos following the C/Pylibcugraph path + if renumber_type in ['legacy', 'experimental']: + client = default_client() + data = get_distributed_data(df) + result = [(client.submit(call_renumber, + Comms.get_session_id(), + wf[1], + renumber_map.renumbered_src_col_name, + renumber_map.renumbered_dst_col_name, + num_edges, + is_mnmg, + store_transposed, + workers=[wf[0]]), wf[0]) + for idx, wf in enumerate( + data.worker_to_parts.items())] + wait(result) + + def get_renumber_map(id_type, data): + return data[0].astype(id_type) + + def get_segment_offsets(data): + return data[1] + + def get_renumbered_df(id_type, data): + data[2][renumber_map.renumbered_src_col_name] = \ + data[2][renumber_map.renumbered_src_col_name]\ + .astype(id_type) + data[2][renumber_map.renumbered_dst_col_name] = \ + data[2][renumber_map.renumbered_dst_col_name]\ + .astype(id_type) + return data[2] + + renumbering_map = dask_cudf.from_delayed( + [client.submit(get_renumber_map, + id_type, + data, + workers=[wf]) + for (data, wf) in result]) + + list_of_segment_offsets = client.gather( + [client.submit(get_segment_offsets, + data, + workers=[wf]) + for (data, wf) in result]) + aggregate_segment_offsets = [] + for segment_offsets in list_of_segment_offsets: + aggregate_segment_offsets.extend(segment_offsets) + + renumbered_df = dask_cudf.from_delayed( + [client.submit(get_renumbered_df, + id_type, + data, + workers=[wf]) + for (data, wf) in result]) + if renumber_type == 'legacy': + renumber_map.implementation.ddf = indirection_map.merge( + renumbering_map, + right_on='original_ids', left_on='global_id', + how='right').\ + drop(columns=['global_id', 'original_ids'])\ + .rename(columns={'new_ids': 'global_id'}) + else: + renumber_map.implementation.ddf = renumbering_map.rename( + columns={'original_ids': '0', 'new_ids': 'global_id'}) + renumber_map.implementation.numbered = True + return renumbered_df, renumber_map, aggregate_segment_offsets + else: - renumber_map.implementation.ddf = renumbering_map.rename( - columns={'original_ids': '0', 'new_ids': 'global_id'}) - renumber_map.implementation.numbered = True - return renumbered_df, renumber_map, aggregate_segment_offsets + # There is no aggregate_segment_offsets since the + # C++ renumbering is skipped + return df, renumber_map, None else: renumbering_map, segment_offsets, renumbered_df = \