From c375bf065702a5dda9a0e17d96a821d8f7c547a9 Mon Sep 17 00:00:00 2001 From: Joseph Nke Date: Thu, 28 Apr 2022 12:46:03 -0700 Subject: [PATCH 1/9] Add MG Hits and MG Neighborhood_sampling to benchmarks --- benchmarks/python_e2e/benchmark.py | 7 ++++++- benchmarks/python_e2e/cugraph_dask_funcs.py | 14 ++++++++++++++ benchmarks/python_e2e/main.py | 2 ++ 3 files changed, 22 insertions(+), 1 deletion(-) diff --git a/benchmarks/python_e2e/benchmark.py b/benchmarks/python_e2e/benchmark.py index 080260679c6..329c5ba18d8 100644 --- a/benchmarks/python_e2e/benchmark.py +++ b/benchmarks/python_e2e/benchmark.py @@ -88,9 +88,14 @@ def __init__(self, #add starting node to algos: BFS and SSSP for i, algo in enumerate (algo_func_param_list): - if benchmark(algo).name in ["bfs", "sssp"]: + if benchmark(algo).name in ["bfs", "sssp", "neighborhood_sampling"]: param={} param["start"]=self.input_dataframe['src'].head()[0] + if benchmark(algo).name in ["neighborhood_sampling"]: + start = [param.pop("start")] + labels = [0] + param["start_info_list"] = (start, labels) + param["fanout_vals"] = [1] algo_func_param_list[i]=(algo,)+(param,) self.algos = [] diff --git a/benchmarks/python_e2e/cugraph_dask_funcs.py b/benchmarks/python_e2e/cugraph_dask_funcs.py index 9ff22a9f248..fbef7a41144 100644 --- a/benchmarks/python_e2e/cugraph_dask_funcs.py +++ b/benchmarks/python_e2e/cugraph_dask_funcs.py @@ -18,6 +18,8 @@ from cugraph.structure.symmetrize import symmetrize_ddf from cugraph.dask.common.mg_utils import get_visible_devices from dask_cuda.initialize import initialize +from cugraph.experimental.dask import uniform_neighborhood_sampling +import cudf import cugraph from cugraph.comms import comms as Comms @@ -151,6 +153,18 @@ def katz(G, alpha=None): print(alpha) return cugraph.dask.katz_centrality(G, alpha) +def hits(G): + return cugraph.dask.hits(G) + +def neighborhood_sampling(G, start_info_list=None, fanout_vals=None): + # convert list to cudf.Series + start_info_list = ( + cudf.Series(start_info_list[0], dtype="int32"), + cudf.Series(start_info_list[1], dtype="int32"), + ) + + return uniform_neighborhood_sampling( + G, start_info_list=start_info_list, fanout_vals=fanout_vals) ################################################################################ # Session-wide setup and teardown diff --git a/benchmarks/python_e2e/main.py b/benchmarks/python_e2e/main.py index 0c49374fe52..1821bad67b4 100644 --- a/benchmarks/python_e2e/main.py +++ b/benchmarks/python_e2e/main.py @@ -87,6 +87,8 @@ def run(algos, "wcc": funcs.wcc, "katz": funcs.katz, "wcc": funcs.wcc, + "hits": funcs.hits, + "neighborhood_sampling": funcs.neighborhood_sampling, } if algos: From 880be9a68d6f2790a6ab43f52d46cd083cfd1f88 Mon Sep 17 00:00:00 2001 From: root Date: Thu, 5 May 2022 13:39:46 +0000 Subject: [PATCH 2/9] skip the C++ renumbering when running the new MG algos following the C/Pylibcugraph path --- benchmarks/python_e2e/benchmark.py | 6 +- .../cugraph/dask/link_analysis/hits.py | 6 +- .../dask/sampling/neighborhood_sampling.py | 9 +- .../simpleDistributedGraph.py | 31 ++-- .../cugraph/cugraph/structure/number_map.py | 150 ++++++++++-------- 5 files changed, 122 insertions(+), 80 deletions(-) diff --git a/benchmarks/python_e2e/benchmark.py b/benchmarks/python_e2e/benchmark.py index 329c5ba18d8..d75971586a0 100644 --- a/benchmarks/python_e2e/benchmark.py +++ b/benchmarks/python_e2e/benchmark.py @@ -130,7 +130,8 @@ def run(self): self.results.append(result) #algos with transposed=True : PageRank, Katz - #algos with transposed=False: BFS, SSSP, Louvain + #algos with transposed=False: BFS, SSSP, Louvain, HITS, Neighborhood_sampling + #algos supporting the legacy_renum_only: HITS, Neighborhood_sampling for i in range(len(self.algos)): if self.algos[i][0].name in ["pagerank", "katz"]: #set transpose=True when renumbering if self.algos[i][0].name == "katz" and self.construct_graph.name == "from_dask_cudf_edgelist": @@ -146,6 +147,9 @@ def run(self): self.algos[i][1]["alpha"] = katz_alpha if hasattr(G, "compute_renumber_edge_list"): G.compute_renumber_edge_list(transposed=True) + elif self.algos[i][0].name in ["neighborhood_sampling", "hits"]: + if hasattr(G, "compute_renumber_edge_list"): + G.compute_renumber_edge_list(transposed=False, legacy_renum_only=True) else: #set transpose=False when renumbering self.__log("running compute_renumber_edge_list...", end="") if hasattr(G, "compute_renumber_edge_list"): diff --git a/python/cugraph/cugraph/dask/link_analysis/hits.py b/python/cugraph/cugraph/dask/link_analysis/hits.py index aee00eab228..f502e06af6a 100644 --- a/python/cugraph/cugraph/dask/link_analysis/hits.py +++ b/python/cugraph/cugraph/dask/link_analysis/hits.py @@ -154,9 +154,8 @@ def hits(input_graph, tol=1.0e-5, max_iter=100, nstart=None, normalized=True): client = default_client() - # FIXME Still compute renumbering at this layer in case str - # vertex ID are passed - input_graph.compute_renumber_edge_list(transposed=False) + # FIXME: 'legacy_renum_only' will not trigger the C++ renumbering + input_graph.compute_renumber_edge_list(transposed=False, legacy_renum_only=True) ddf = input_graph.edgelist.edgelist_df graph_properties = GraphProperties( @@ -211,6 +210,7 @@ def hits(input_graph, tol=1.0e-5, max_iter=100, nstart=None, normalized=True): wait(cudf_result) ddf = dask_cudf.from_delayed(cudf_result) + if input_graph.renumbered: return input_graph.unrenumber(ddf, 'vertex') diff --git a/python/cugraph/cugraph/dask/sampling/neighborhood_sampling.py b/python/cugraph/cugraph/dask/sampling/neighborhood_sampling.py index b7e842c6f31..6cfd24222d5 100644 --- a/python/cugraph/cugraph/dask/sampling/neighborhood_sampling.py +++ b/python/cugraph/cugraph/dask/sampling/neighborhood_sampling.py @@ -128,8 +128,8 @@ def EXPERIMENTAL__uniform_neighborhood(input_graph, """ # Initialize dask client client = default_client() - # Important for handling renumbering - input_graph.compute_renumber_edge_list(transposed=False) + # FIXME: 'legacy_renum_only' will not trigger the C++ renumbering + input_graph.compute_renumber_edge_list(transposed=False, legacy_renum_only=True) start_list, info_list = start_info_list @@ -152,9 +152,10 @@ def EXPERIMENTAL__uniform_neighborhood(input_graph, src_col_name = input_graph.renumber_map.renumbered_src_col_name dst_col_name = input_graph.renumber_map.renumbered_dst_col_name - # start_list uses "external" vertex IDs, but since the graph has been + # start_list uses "external" vertex IDs, but if the graph has been # renumbered, the start vertex IDs must also be renumbered. - start_list = input_graph.lookup_internal_vertex_id(start_list).compute() + if input_graph.renumbered: + start_list = input_graph.lookup_internal_vertex_id(start_list).compute() do_expensive_check = True result = [client.submit(call_nbr_sampling, diff --git a/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py b/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py index 01616e397cf..709429d4558 100644 --- a/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py +++ b/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py @@ -101,11 +101,27 @@ def __from_edgelist( # # FIXME: Edge Attribute not handled + # FIXME: the parameter below is no longer used for unrenumbering self.properties.renumbered = renumber self.input_df = input_ddf self.source_columns = source self.destination_columns = destination + + @property + def renumbered(self): + # This property is now used to determine if a dataframe was renumbered + # by checking the column name. Only the renumbered dataframes will have + # their column names renamed to 'renumbered_src' and 'renumbered_dst' + renumbered_vertex_col_names = ["renumbered_src", "renumbered_dst"] + if self.edgelist.edgelist_df is not None and not ( + set(renumbered_vertex_col_names).issubset( + set(self.edgelist.edgelist_df.columns))): + return False + return True + + + def view_edge_list(self): """ Display the edge list. Compute it if needed. @@ -442,7 +458,7 @@ def neighbors(self, n): ddf = self.edgelist.edgelist_df return ddf[ddf["src"] == n]["dst"].reset_index(drop=True) - def compute_renumber_edge_list(self, transposed=False): + def compute_renumber_edge_list(self, transposed=False, legacy_renum_only=False): """ Compute a renumbered edge list This function works in the MNMG pipeline and will transform @@ -464,13 +480,7 @@ def compute_renumber_edge_list(self, transposed=False): If True, renumber with the intent to make a CSC-like structure. If False, renumber with the intent to make a CSR-like structure. Defaults to False. - """ - # FIXME: What to do about edge_attr??? - # currently ignored for MNMG - - # FIXME: this is confusing - in the code below, - # self.properties.renumbered needs to be interpreted as "needs to be - # renumbered", everywhere else it means "has been renumbered". + """ if not self.properties.renumbered: self.edgelist = self.EdgeList(self.input_df) self.renumber_map = None @@ -488,7 +498,10 @@ def compute_renumber_edge_list(self, transposed=False): NumberMap.renumber_and_segment(self.input_df, self.source_columns, self.destination_columns, - store_transposed=transposed) + store_transposed=transposed, + legacy_renum_only=legacy_renum_only) + + self.edgelist = self.EdgeList(renumbered_ddf) self.renumber_map = number_map self.aggregate_segment_offsets = aggregate_segment_offsets diff --git a/python/cugraph/cugraph/structure/number_map.py b/python/cugraph/cugraph/structure/number_map.py index d587bf92263..0d16294c809 100644 --- a/python/cugraph/cugraph/structure/number_map.py +++ b/python/cugraph/cugraph/structure/number_map.py @@ -500,15 +500,26 @@ def from_internal_vertex_id( @staticmethod def renumber_and_segment( df, src_col_names, dst_col_names, preserve_order=False, - store_transposed=False + store_transposed=False, legacy_renum_only=False ): + # FIXME: Drop the renumber_type 'experimental' once all the + # algos follow the C/Pylibcugraph path + + # The renumber_type 'legacy' runs both the python and the + # C++ renumbering. if isinstance(src_col_names, list): renumber_type = 'legacy' elif not (df[src_col_names].dtype == np.int32 or df[src_col_names].dtype == np.int64): renumber_type = 'legacy' else: + # The renumber_type 'experimental' only runs the C++ + # renumbering renumber_type = 'experimental' + + if legacy_renum_only and renumber_type == 'experimental': + # The original dataframe will be returned. + renumber_type = 'skip_renumbering' renumber_map = NumberMap() if not isinstance(src_col_names, list): @@ -548,12 +559,18 @@ def renumber_and_segment( df, renumber_map.renumbered_dst_col_name, dst_col_names, drop=True, preserve_order=preserve_order ) + elif renumber_type == 'skip_renumbering': + # Update the renumbered source and destination column name + # with the original input's source and destination name + renumber_map.renumbered_src_col_name = src_col_names[0] + renumber_map.renumbered_dst_col_name = dst_col_names[0] + else: df = df.rename( columns={src_col_names[0]: - renumber_map.renumbered_src_col_name, - dst_col_names[0]: - renumber_map.renumbered_dst_col_name} + renumber_map.renumbered_src_col_name, + dst_col_names[0]: + renumber_map.renumbered_dst_col_name} ) num_edges = len(df) @@ -563,69 +580,76 @@ def renumber_and_segment( is_mnmg = False if is_mnmg: - client = default_client() - data = get_distributed_data(df) - result = [(client.submit(call_renumber, - Comms.get_session_id(), - wf[1], - renumber_map.renumbered_src_col_name, - renumber_map.renumbered_dst_col_name, - num_edges, - is_mnmg, - store_transposed, - workers=[wf[0]]), wf[0]) - for idx, wf in enumerate(data.worker_to_parts.items())] - wait(result) - - def get_renumber_map(id_type, data): - return data[0].astype(id_type) - - def get_segment_offsets(data): - return data[1] - - def get_renumbered_df(id_type, data): - data[2][renumber_map.renumbered_src_col_name] = \ - data[2][renumber_map.renumbered_src_col_name]\ - .astype(id_type) - data[2][renumber_map.renumbered_dst_col_name] = \ - data[2][renumber_map.renumbered_dst_col_name]\ - .astype(id_type) - return data[2] - - renumbering_map = dask_cudf.from_delayed( - [client.submit(get_renumber_map, + # Do not renumber the algos following the C/Pylibcugraph path + if renumber_type in ['legacy', 'experimental']: + client = default_client() + data = get_distributed_data(df) + result = [(client.submit(call_renumber, + Comms.get_session_id(), + wf[1], + renumber_map.renumbered_src_col_name, + renumber_map.renumbered_dst_col_name, + num_edges, + is_mnmg, + store_transposed, + workers=[wf[0]]), wf[0]) + for idx, wf in enumerate(data.worker_to_parts.items())] + wait(result) + + def get_renumber_map(id_type, data): + return data[0].astype(id_type) + + def get_segment_offsets(data): + return data[1] + + def get_renumbered_df(id_type, data): + data[2][renumber_map.renumbered_src_col_name] = \ + data[2][renumber_map.renumbered_src_col_name]\ + .astype(id_type) + data[2][renumber_map.renumbered_dst_col_name] = \ + data[2][renumber_map.renumbered_dst_col_name]\ + .astype(id_type) + return data[2] + + renumbering_map = dask_cudf.from_delayed( + [client.submit(get_renumber_map, + id_type, + data, + workers=[wf]) + for (data, wf) in result]) + + list_of_segment_offsets = client.gather( + [client.submit(get_segment_offsets, + data, + workers=[wf]) + for (data, wf) in result]) + aggregate_segment_offsets = [] + for segment_offsets in list_of_segment_offsets: + aggregate_segment_offsets.extend(segment_offsets) + + renumbered_df = dask_cudf.from_delayed( + [client.submit(get_renumbered_df, id_type, data, workers=[wf]) - for (data, wf) in result]) - - list_of_segment_offsets = client.gather( - [client.submit(get_segment_offsets, - data, - workers=[wf]) - for (data, wf) in result]) - aggregate_segment_offsets = [] - for segment_offsets in list_of_segment_offsets: - aggregate_segment_offsets.extend(segment_offsets) - - renumbered_df = dask_cudf.from_delayed( - [client.submit(get_renumbered_df, - id_type, - data, - workers=[wf]) - for (data, wf) in result]) - if renumber_type == 'legacy': - renumber_map.implementation.ddf = indirection_map.merge( - renumbering_map, - right_on='original_ids', left_on='global_id', - how='right').\ - drop(columns=['global_id', 'original_ids'])\ - .rename(columns={'new_ids': 'global_id'}) + for (data, wf) in result]) + if renumber_type == 'legacy': + renumber_map.implementation.ddf = indirection_map.merge( + renumbering_map, + right_on='original_ids', left_on='global_id', + how='right').\ + drop(columns=['global_id', 'original_ids'])\ + .rename(columns={'new_ids': 'global_id'}) + else: + renumber_map.implementation.ddf = renumbering_map.rename( + columns={'original_ids': '0', 'new_ids': 'global_id'}) + renumber_map.implementation.numbered = True + return renumbered_df, renumber_map, aggregate_segment_offsets + else: - renumber_map.implementation.ddf = renumbering_map.rename( - columns={'original_ids': '0', 'new_ids': 'global_id'}) - renumber_map.implementation.numbered = True - return renumbered_df, renumber_map, aggregate_segment_offsets + # There is no aggregate_segment_offsets since the C++ renumbering + # is skipped + return df, renumber_map, None else: renumbering_map, segment_offsets, renumbered_df = \ From 7a98967908d2e72e6eb6ca756e535f17ab971026 Mon Sep 17 00:00:00 2001 From: root Date: Thu, 5 May 2022 14:07:11 +0000 Subject: [PATCH 3/9] fix flake8 errors --- .../cugraph/dask/link_analysis/hits.py | 3 +- .../dask/sampling/neighborhood_sampling.py | 6 ++- .../simpleDistributedGraph.py | 23 ++++----- .../cugraph/cugraph/structure/number_map.py | 51 ++++++++++--------- 4 files changed, 43 insertions(+), 40 deletions(-) diff --git a/python/cugraph/cugraph/dask/link_analysis/hits.py b/python/cugraph/cugraph/dask/link_analysis/hits.py index f502e06af6a..0099b583eed 100644 --- a/python/cugraph/cugraph/dask/link_analysis/hits.py +++ b/python/cugraph/cugraph/dask/link_analysis/hits.py @@ -155,7 +155,8 @@ def hits(input_graph, tol=1.0e-5, max_iter=100, nstart=None, normalized=True): client = default_client() # FIXME: 'legacy_renum_only' will not trigger the C++ renumbering - input_graph.compute_renumber_edge_list(transposed=False, legacy_renum_only=True) + input_graph.compute_renumber_edge_list( + transposed=False, legacy_renum_only=True) ddf = input_graph.edgelist.edgelist_df graph_properties = GraphProperties( diff --git a/python/cugraph/cugraph/dask/sampling/neighborhood_sampling.py b/python/cugraph/cugraph/dask/sampling/neighborhood_sampling.py index 6cfd24222d5..3446f2e2d9b 100644 --- a/python/cugraph/cugraph/dask/sampling/neighborhood_sampling.py +++ b/python/cugraph/cugraph/dask/sampling/neighborhood_sampling.py @@ -129,7 +129,8 @@ def EXPERIMENTAL__uniform_neighborhood(input_graph, # Initialize dask client client = default_client() # FIXME: 'legacy_renum_only' will not trigger the C++ renumbering - input_graph.compute_renumber_edge_list(transposed=False, legacy_renum_only=True) + input_graph.compute_renumber_edge_list( + transposed=False, legacy_renum_only=True) start_list, info_list = start_info_list @@ -155,7 +156,8 @@ def EXPERIMENTAL__uniform_neighborhood(input_graph, # start_list uses "external" vertex IDs, but if the graph has been # renumbered, the start vertex IDs must also be renumbered. if input_graph.renumbered: - start_list = input_graph.lookup_internal_vertex_id(start_list).compute() + start_list = input_graph.lookup_internal_vertex_id( + start_list).compute() do_expensive_check = True result = [client.submit(call_nbr_sampling, diff --git a/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py b/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py index 709429d4558..70068d83548 100644 --- a/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py +++ b/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py @@ -107,7 +107,6 @@ def __from_edgelist( self.source_columns = source self.destination_columns = destination - @property def renumbered(self): # This property is now used to determine if a dataframe was renumbered @@ -116,12 +115,10 @@ def renumbered(self): renumbered_vertex_col_names = ["renumbered_src", "renumbered_dst"] if self.edgelist.edgelist_df is not None and not ( set(renumbered_vertex_col_names).issubset( - set(self.edgelist.edgelist_df.columns))): + set(self.edgelist.edgelist_df.columns))): return False return True - - def view_edge_list(self): """ Display the edge list. Compute it if needed. @@ -458,7 +455,9 @@ def neighbors(self, n): ddf = self.edgelist.edgelist_df return ddf[ddf["src"] == n]["dst"].reset_index(drop=True) - def compute_renumber_edge_list(self, transposed=False, legacy_renum_only=False): + def compute_renumber_edge_list(self, + transposed=False, + legacy_renum_only=False): """ Compute a renumbered edge list This function works in the MNMG pipeline and will transform @@ -480,7 +479,7 @@ def compute_renumber_edge_list(self, transposed=False, legacy_renum_only=False): If True, renumber with the intent to make a CSC-like structure. If False, renumber with the intent to make a CSR-like structure. Defaults to False. - """ + """ if not self.properties.renumbered: self.edgelist = self.EdgeList(self.input_df) self.renumber_map = None @@ -495,12 +494,12 @@ def compute_renumber_edge_list(self, transposed=False, legacy_renum_only=False): del self.edgelist renumbered_ddf, number_map, aggregate_segment_offsets = \ - NumberMap.renumber_and_segment(self.input_df, - self.source_columns, - self.destination_columns, - store_transposed=transposed, - legacy_renum_only=legacy_renum_only) - + NumberMap.renumber_and_segment( + self.input_df, + self.source_columns, + self.destination_columns, + store_transposed=transposed, + legacy_renum_only=legacy_renum_only) self.edgelist = self.EdgeList(renumbered_ddf) self.renumber_map = number_map diff --git a/python/cugraph/cugraph/structure/number_map.py b/python/cugraph/cugraph/structure/number_map.py index 0d16294c809..d29323250e3 100644 --- a/python/cugraph/cugraph/structure/number_map.py +++ b/python/cugraph/cugraph/structure/number_map.py @@ -516,9 +516,9 @@ def renumber_and_segment( # The renumber_type 'experimental' only runs the C++ # renumbering renumber_type = 'experimental' - + if legacy_renum_only and renumber_type == 'experimental': - # The original dataframe will be returned. + # The original dataframe will be returned. renumber_type = 'skip_renumbering' renumber_map = NumberMap() @@ -568,9 +568,9 @@ def renumber_and_segment( else: df = df.rename( columns={src_col_names[0]: - renumber_map.renumbered_src_col_name, - dst_col_names[0]: - renumber_map.renumbered_dst_col_name} + renumber_map.renumbered_src_col_name, + dst_col_names[0]: + renumber_map.renumbered_dst_col_name} ) num_edges = len(df) @@ -585,15 +585,16 @@ def renumber_and_segment( client = default_client() data = get_distributed_data(df) result = [(client.submit(call_renumber, - Comms.get_session_id(), - wf[1], - renumber_map.renumbered_src_col_name, - renumber_map.renumbered_dst_col_name, - num_edges, - is_mnmg, - store_transposed, - workers=[wf[0]]), wf[0]) - for idx, wf in enumerate(data.worker_to_parts.items())] + Comms.get_session_id(), + wf[1], + renumber_map.renumbered_src_col_name, + renumber_map.renumbered_dst_col_name, + num_edges, + is_mnmg, + store_transposed, + workers=[wf[0]]), wf[0]) + for idx, wf in enumerate( + data.worker_to_parts.items())] wait(result) def get_renumber_map(id_type, data): @@ -613,15 +614,15 @@ def get_renumbered_df(id_type, data): renumbering_map = dask_cudf.from_delayed( [client.submit(get_renumber_map, - id_type, - data, - workers=[wf]) + id_type, + data, + workers=[wf]) for (data, wf) in result]) list_of_segment_offsets = client.gather( [client.submit(get_segment_offsets, - data, - workers=[wf]) + data, + workers=[wf]) for (data, wf) in result]) aggregate_segment_offsets = [] for segment_offsets in list_of_segment_offsets: @@ -629,9 +630,9 @@ def get_renumbered_df(id_type, data): renumbered_df = dask_cudf.from_delayed( [client.submit(get_renumbered_df, - id_type, - data, - workers=[wf]) + id_type, + data, + workers=[wf]) for (data, wf) in result]) if renumber_type == 'legacy': renumber_map.implementation.ddf = indirection_map.merge( @@ -645,10 +646,10 @@ def get_renumbered_df(id_type, data): columns={'original_ids': '0', 'new_ids': 'global_id'}) renumber_map.implementation.numbered = True return renumbered_df, renumber_map, aggregate_segment_offsets - + else: - # There is no aggregate_segment_offsets since the C++ renumbering - # is skipped + # There is no aggregate_segment_offsets since the + # C++ renumbering is skipped return df, renumber_map, None else: From 2df6dc82fc6dd3863a00afc23da1b9327a428cee Mon Sep 17 00:00:00 2001 From: root Date: Thu, 5 May 2022 14:12:25 +0000 Subject: [PATCH 4/9] update branch by fetching the latest cugraph --- README.md | 2 +- ci/test.sh | 3 +- docs/cugraph/source/api_docs/dask-cugraph.rst | 2 +- .../source/api_docs/helper_functions.rst | 20 +- .../link_prediction/Overlap-Similarity.ipynb | 33 +-- python/cugraph/cugraph/__init__.py | 2 +- .../betweenness_centrality_wrapper.pyx | 4 +- .../edge_betweenness_centrality_wrapper.pyx | 4 +- .../dask/centrality/katz_centrality.py | 4 +- .../cugraph/dask/common/input_utils.py | 13 +- .../cugraph/cugraph/dask/common/mg_utils.py | 13 +- .../cugraph/cugraph/dask/common/part_utils.py | 2 +- .../cugraph/cugraph/dask/common/read_utils.py | 6 + .../cugraph/{ => dask}/comms/__init__.py | 0 .../cugraph/{ => dask}/comms/comms.pxd | 0 .../cugraph/cugraph/{ => dask}/comms/comms.py | 18 +- .../{ => dask}/comms/comms_wrapper.pyx | 2 +- .../cugraph/cugraph/dask/community/louvain.py | 2 +- .../cugraph/dask/components/connectivity.py | 2 +- .../cugraph/dask/link_analysis/hits.py | 2 +- .../cugraph/dask/link_analysis/pagerank.py | 4 +- .../dask/sampling/neighborhood_sampling.py | 4 +- .../cugraph/dask/structure/replication.pyx | 4 +- python/cugraph/cugraph/dask/traversal/bfs.py | 6 +- python/cugraph/cugraph/dask/traversal/sssp.py | 5 +- python/cugraph/cugraph/generators/rmat.py | 2 +- .../cugraph/link_prediction/jaccard.py | 6 +- .../simpleDistributedGraph.py | 2 - .../graph_implementation/simpleGraph.py | 2 +- .../structure/graph_primtypes_wrapper.pyx | 2 +- .../cugraph/cugraph/structure/number_map.py | 2 +- python/cugraph/cugraph/structure/shuffle.py | 2 +- .../cugraph/cugraph/structure/symmetrize.py | 2 +- python/cugraph/cugraph/tests/conftest.py | 4 +- .../cugraph/tests/{dask => mg}/__init__.py | 2 +- .../cugraph/tests/{dask => mg}/mg_context.py | 4 +- .../test_mg_batch_betweenness_centrality.py | 2 +- ...st_mg_batch_edge_betweenness_centrality.py | 2 +- .../cugraph/tests/{dask => mg}/test_mg_bfs.py | 27 ++- .../tests/{dask => mg}/test_mg_comms.py | 24 ++- .../{dask => mg}/test_mg_connectivity.py | 20 +- .../tests/{dask => mg}/test_mg_degree.py | 19 +- .../tests/{dask => mg}/test_mg_doctests.py | 2 +- .../tests/{dask => mg}/test_mg_hits.py | 16 +- .../{dask => mg}/test_mg_katz_centrality.py | 27 ++- .../tests/{dask => mg}/test_mg_louvain.py | 51 ++++- .../test_mg_neighborhood_sampling.py | 21 +- .../tests/{dask => mg}/test_mg_pagerank.py | 8 +- .../tests/{dask => mg}/test_mg_renumber.py | 22 +- .../tests/{dask => mg}/test_mg_replication.py | 0 .../tests/{dask => mg}/test_mg_sssp.py | 20 +- .../cugraph/tests/mg/test_mg_symmetrize.py | 201 ++++++++++++++++++ .../tests/{dask => mg}/test_mg_utility.py | 10 +- python/cugraph/cugraph/tests/test_jaccard.py | 20 +- .../tests/test_maximum_spanning_tree.py | 32 ++- .../tests/test_minimum_spanning_tree.py | 32 ++- .../cugraph/cugraph/tests/test_symmetrize.py | 88 +------- .../cugraph/tree/minimum_spanning_tree.py | 25 ++- 58 files changed, 603 insertions(+), 253 deletions(-) rename python/cugraph/cugraph/{ => dask}/comms/__init__.py (100%) rename python/cugraph/cugraph/{ => dask}/comms/comms.pxd (100%) rename python/cugraph/cugraph/{ => dask}/comms/comms.py (92%) rename python/cugraph/cugraph/{ => dask}/comms/comms_wrapper.pyx (92%) rename python/cugraph/cugraph/tests/{dask => mg}/__init__.py (92%) rename python/cugraph/cugraph/tests/{dask => mg}/mg_context.py (97%) rename python/cugraph/cugraph/tests/{dask => mg}/test_mg_batch_betweenness_centrality.py (98%) rename python/cugraph/cugraph/tests/{dask => mg}/test_mg_batch_edge_betweenness_centrality.py (98%) rename python/cugraph/cugraph/tests/{dask => mg}/test_mg_bfs.py (86%) rename python/cugraph/cugraph/tests/{dask => mg}/test_mg_comms.py (86%) rename python/cugraph/cugraph/tests/{dask => mg}/test_mg_connectivity.py (82%) rename python/cugraph/cugraph/tests/{dask => mg}/test_mg_degree.py (82%) rename python/cugraph/cugraph/tests/{dask => mg}/test_mg_doctests.py (99%) rename python/cugraph/cugraph/tests/{dask => mg}/test_mg_hits.py (92%) rename python/cugraph/cugraph/tests/{dask => mg}/test_mg_katz_centrality.py (79%) rename python/cugraph/cugraph/tests/{dask => mg}/test_mg_louvain.py (63%) rename python/cugraph/cugraph/tests/{dask => mg}/test_mg_neighborhood_sampling.py (89%) rename python/cugraph/cugraph/tests/{dask => mg}/test_mg_pagerank.py (93%) rename python/cugraph/cugraph/tests/{dask => mg}/test_mg_renumber.py (93%) rename python/cugraph/cugraph/tests/{dask => mg}/test_mg_replication.py (100%) rename python/cugraph/cugraph/tests/{dask => mg}/test_mg_sssp.py (81%) create mode 100644 python/cugraph/cugraph/tests/mg/test_mg_symmetrize.py rename python/cugraph/cugraph/tests/{dask => mg}/test_mg_utility.py (94%) diff --git a/README.md b/README.md index 7594e359c74..194ad0a3f0f 100644 --- a/README.md +++ b/README.md @@ -153,7 +153,7 @@ cuGraph tries to match the return type based on the input type. So a NetworkX i ## cuGraph Notice -Vertex IDs are expected to be contiguous integers starting from 0. If your data doesn't match that restriction, we have a solution. cuGraph provides the renumber function, which is by default automatically called when data is addted to a graph. Input vertex IDs for the renumber function can be any type, can be non-contiguous, can be multiple columns, and can start from an arbitrary number. The renumber function maps the provided input vertex IDs to either 32- or 64-bit contiguous integers starting from 0. +Vertex IDs are expected to be contiguous integers starting from 0. If your data doesn't match that restriction, we have a solution. cuGraph provides the renumber function, which is by default automatically called when data is added to a graph. Input vertex IDs for the renumber function can be any type, can be non-contiguous, can be multiple columns, and can start from an arbitrary number. The renumber function maps the provided input vertex IDs to either 32- or 64-bit contiguous integers starting from 0. Additionally, when using the auto-renumbering feature, vertices are automatically un-renumbered in results. diff --git a/ci/test.sh b/ci/test.sh index 821ac8cd313..51f12b81c7e 100755 --- a/ci/test.sh +++ b/ci/test.sh @@ -91,7 +91,8 @@ echo "Ran Python pytest for pylibcugraph : return code was: $?, test script exit echo "Python pytest for cuGraph (single-GPU only)..." cd ${CUGRAPH_ROOT}/python/cugraph/cugraph -pytest --cache-clear --junitxml=${CUGRAPH_ROOT}/junit-cugraph-pytests.xml -v --cov-config=.coveragerc --cov=cugraph --cov-report=xml:${WORKSPACE}/python/cugraph/cugraph-coverage.xml --cov-report term --ignore=raft --ignore=tests/dask --benchmark-disable +# rmat is not tested because of MG testing +pytest --cache-clear --junitxml=${CUGRAPH_ROOT}/junit-cugraph-pytests.xml -v --cov-config=.coveragerc --cov=cugraph --cov-report=xml:${WORKSPACE}/python/cugraph/cugraph-coverage.xml --cov-report term -k "not mg and not rmat" --benchmark-disable echo "Ran Python pytest for cugraph : return code was: $?, test script exit code is now: $EXITCODE" echo "Python benchmarks for cuGraph (running as tests)..." diff --git a/docs/cugraph/source/api_docs/dask-cugraph.rst b/docs/cugraph/source/api_docs/dask-cugraph.rst index 51487bfbf05..9ccc429a780 100644 --- a/docs/cugraph/source/api_docs/dask-cugraph.rst +++ b/docs/cugraph/source/api_docs/dask-cugraph.rst @@ -20,7 +20,7 @@ Example from dask.distributed import Client, wait from dask_cuda import LocalCUDACluster - import cugraph.comms as Comms + import cugraph.dask.comms as Comms import cugraph.dask as dask_cugraph cluster = LocalCUDACluster() diff --git a/docs/cugraph/source/api_docs/helper_functions.rst b/docs/cugraph/source/api_docs/helper_functions.rst index 249ea9c1457..08585d264e1 100644 --- a/docs/cugraph/source/api_docs/helper_functions.rst +++ b/docs/cugraph/source/api_docs/helper_functions.rst @@ -10,15 +10,15 @@ Methods .. autosummary:: :toctree: api/ - cugraph.comms.comms.initialize - cugraph.comms.comms.destroy - cugraph.comms.comms.is_initialized - cugraph.comms.comms.get_comms - cugraph.comms.comms.get_workers - cugraph.comms.comms.get_session_id - cugraph.comms.comms.get_2D_partition - cugraph.comms.comms.get_default_handle - cugraph.comms.comms.get_handle - cugraph.comms.comms.get_worker_id + cugraph.dask.comms.comms.initialize + cugraph.dask.comms.comms.destroy + cugraph.dask.comms.comms.is_initialized + cugraph.dask.comms.comms.get_comms + cugraph.dask.comms.comms.get_workers + cugraph.dask.comms.comms.get_session_id + cugraph.dask.comms.comms.get_2D_partition + cugraph.dask.comms.comms.get_default_handle + cugraph.dask.comms.comms.get_handle + cugraph.dask.comms.comms.get_worker_id cugraph.dask.common.read_utils.get_chunksize diff --git a/notebooks/link_prediction/Overlap-Similarity.ipynb b/notebooks/link_prediction/Overlap-Similarity.ipynb index b8733ce4d80..6e04835b978 100755 --- a/notebooks/link_prediction/Overlap-Similarity.ipynb +++ b/notebooks/link_prediction/Overlap-Similarity.ipynb @@ -12,14 +12,11 @@ "\n", "Notebook Credits\n", "\n", - " Original Authors: Brad Rees\n", - " Created: 10/14/2019\n", - " Last Edit: 08/16/2020\n", - "\n", - "RAPIDS Versions: 0.12.0a\n", - "\n", - "Test Hardware\n", - "* GV100 32G, CUDA 10.0\n" + "| Author Credit | Date | Update | cuGraph Version | Test Hardware |\n", + "| --------------|------------|------------------|-----------------|--------------------|\n", + "| Brad Rees | 10/14/2019 | created | 0.08 | GV100, CUDA 10.0 |\n", + "| | 08/16/2020 | upadted | 0.12 | GV100, CUDA 10.0 |\n", + "| | 08/05/2021 | tested / updated | 21.10 nightly | RTX 3090 CUDA 11.4 |\n" ] }, { @@ -110,7 +107,7 @@ "metadata": {}, "source": [ "### Some notes about vertex IDs...\n", - "* The current version of cuGraph requires that vertex IDs be representable as 32-bit integers, meaning graphs currently can contain at most 2^32 unique vertex IDs. However, this limitation is being actively addressed and a version of cuGraph that accommodates more than 2^32 vertices will be available in the near future.\n", + "\n", "* cuGraph will automatically renumber graphs to an internal format consisting of a contiguous series of integers starting from 0, and convert back to the original IDs when returning data to the caller. If the vertex IDs of the data are already a contiguous series of integers starting from 0, the auto-renumbering step can be skipped for faster graph creation times.\n", " * To skip auto-renumbering, set the `renumber` boolean arg to `False` when calling the appropriate graph creation API (eg. `G.from_cudf_edgelist(gdf_r, source='src', destination='dst', renumber=False)`).\n", " * For more advanced renumbering support, see the examples in `structure/renumber.ipynb` and `structure/renumber-2.ipynb`\n" @@ -149,8 +146,7 @@ "source": [ "# Import needed libraries\n", "import cugraph\n", - "import cudf\n", - "from collections import OrderedDict" + "import cudf" ] }, { @@ -303,21 +299,10 @@ "metadata": {}, "outputs": [], "source": [ - "# How many vertices are in the graph? Remember that Graph is zero based\n", + "# How many vertices are in the graph? \n", "G.number_of_vertices()" ] }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "_The test graph has only 34 vertices, so why is the Graph listing 35?_\n", - "\n", - "As mentioned above, cuGraph vertex numbering is zero-based, meaning that the first vertex ID starts at zero. The test dataset is 1-based. Because of that, the Graph object adds an extra isolated vertex with an ID of zero. Hence the difference in vertex count. \n", - "We could have run _renumbering_ on the data, or updated the value of each element _gdf['src'] = gdf['src'] - 1_ \n", - "for now, we will just state that vertex 0 is not part of the dataset and can be ignored" - ] - }, { "cell_type": "markdown", "metadata": {}, @@ -611,7 +596,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.7.6" + "version": "3.8.13" } }, "nbformat": 4, diff --git a/python/cugraph/cugraph/__init__.py b/python/cugraph/cugraph/__init__.py index 5578b071474..134915f1333 100644 --- a/python/cugraph/cugraph/__init__.py +++ b/python/cugraph/cugraph/__init__.py @@ -105,7 +105,7 @@ from cugraph.linear_assignment import hungarian, dense_hungarian from cugraph.layout import force_atlas2 from raft import raft_include_test -from cugraph.comms import comms +from cugraph.dask.comms import comms from cugraph.sampling import random_walks, rw_path, node2vec diff --git a/python/cugraph/cugraph/centrality/betweenness_centrality_wrapper.pyx b/python/cugraph/cugraph/centrality/betweenness_centrality_wrapper.pyx index 4984f70822a..3d34304ff13 100644 --- a/python/cugraph/cugraph/centrality/betweenness_centrality_wrapper.pyx +++ b/python/cugraph/cugraph/centrality/betweenness_centrality_wrapper.pyx @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2021, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -22,7 +22,7 @@ from libc.stdint cimport uintptr_t from libcpp cimport bool import cudf import numpy as np -import cugraph.comms.comms as Comms +import cugraph.dask.comms.comms as Comms from cugraph.dask.common.mg_utils import get_client import dask.distributed diff --git a/python/cugraph/cugraph/centrality/edge_betweenness_centrality_wrapper.pyx b/python/cugraph/cugraph/centrality/edge_betweenness_centrality_wrapper.pyx index f6c868de6e9..bf4a80701ff 100644 --- a/python/cugraph/cugraph/centrality/edge_betweenness_centrality_wrapper.pyx +++ b/python/cugraph/cugraph/centrality/edge_betweenness_centrality_wrapper.pyx @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2021, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -24,7 +24,7 @@ from libcpp cimport bool import cudf import numpy as np from cugraph.dask.common.mg_utils import get_client -import cugraph.comms.comms as Comms +import cugraph.dask.comms.comms as Comms import dask.distributed diff --git a/python/cugraph/cugraph/dask/centrality/katz_centrality.py b/python/cugraph/cugraph/dask/centrality/katz_centrality.py index 86070f11bb2..03f8b0bc4be 100644 --- a/python/cugraph/cugraph/dask/centrality/katz_centrality.py +++ b/python/cugraph/cugraph/dask/centrality/katz_centrality.py @@ -18,7 +18,7 @@ get_vertex_partition_offsets) from cugraph.dask.centrality import\ mg_katz_centrality_wrapper as mg_katz_centrality -import cugraph.comms.comms as Comms +import cugraph.dask.comms.comms as Comms import dask_cudf @@ -72,7 +72,7 @@ def katz_centrality(input_graph, ---------- input_graph : cuGraph.Graph cuGraph graph descriptor with connectivity information. The graph can - contain either directed (DiGraph) or undirected edges (Graph). + contain either directed or undirected edges. alpha : float, optional (default=None) Attenuation factor. If alpha is not specified then diff --git a/python/cugraph/cugraph/dask/common/input_utils.py b/python/cugraph/cugraph/dask/common/input_utils.py index edb200f29ed..ca2b45a0a26 100644 --- a/python/cugraph/cugraph/dask/common/input_utils.py +++ b/python/cugraph/cugraph/dask/common/input_utils.py @@ -19,8 +19,17 @@ from dask_cudf.core import DataFrame as dcDataFrame from dask_cudf.core import Series as daskSeries -import cugraph.comms.comms as Comms -from raft.dask.common.utils import get_client +import cugraph.dask.comms.comms as Comms +# FIXME: this raft import breaks the library if ucx-py is +# not available. They are necessary only when doing MG work. +from cugraph.dask.common.read_utils import MissingUCXPy +try: + from raft.dask.common.utils import get_client +except ModuleNotFoundError as err: + if err.name == "ucp": + get_client = MissingUCXPy() + else: + raise from cugraph.dask.common.part_utils import _extract_partitions from dask.distributed import default_client from toolz import first diff --git a/python/cugraph/cugraph/dask/common/mg_utils.py b/python/cugraph/cugraph/dask/common/mg_utils.py index 20024cc00fa..4ac472e36e5 100644 --- a/python/cugraph/cugraph/dask/common/mg_utils.py +++ b/python/cugraph/cugraph/dask/common/mg_utils.py @@ -18,12 +18,21 @@ from dask_cuda import LocalCUDACluster from dask.distributed import Client -from raft.dask.common.utils import default_client +# FIXME: this raft import breaks the library if ucx-py is +# not available. They are necessary only when doing MG work. +from cugraph.dask.common.read_utils import MissingUCXPy +try: + from raft.dask.common.utils import default_client +except ModuleNotFoundError as err: + if err.name == "ucp": + default_client = MissingUCXPy() + else: + raise # FIXME: cugraph/__init__.py also imports the comms module, but # depending on the import environment, cugraph/comms/__init__.py # may be imported instead. The following imports the comms.py # module directly -from cugraph.comms import comms as Comms +from cugraph.dask.comms import comms as Comms # FIXME: We currently look for the default client from dask, as such is the diff --git a/python/cugraph/cugraph/dask/common/part_utils.py b/python/cugraph/cugraph/dask/common/part_utils.py index bc15e0a7da9..30d252aebe6 100644 --- a/python/cugraph/cugraph/dask/common/part_utils.py +++ b/python/cugraph/cugraph/dask/common/part_utils.py @@ -21,7 +21,7 @@ from dask_cudf.core import DataFrame as daskDataFrame from dask_cudf.core import Series as daskSeries from functools import reduce -import cugraph.comms.comms as Comms +import cugraph.dask.comms.comms as Comms from dask.delayed import delayed import cudf diff --git a/python/cugraph/cugraph/dask/common/read_utils.py b/python/cugraph/cugraph/dask/common/read_utils.py index b41a85008d5..bd943b47fb9 100644 --- a/python/cugraph/cugraph/dask/common/read_utils.py +++ b/python/cugraph/cugraph/dask/common/read_utils.py @@ -43,3 +43,9 @@ def get_chunksize(input_path): size = [os.path.getsize(_file) for _file in input_files] chunksize = max(size) return chunksize + + +class MissingUCXPy: + def __getattr__(self, *args, **kwargs): + raise ModuleNotFoundError("ucx-py could not be imported but is" + " required for MG operations") diff --git a/python/cugraph/cugraph/comms/__init__.py b/python/cugraph/cugraph/dask/comms/__init__.py similarity index 100% rename from python/cugraph/cugraph/comms/__init__.py rename to python/cugraph/cugraph/dask/comms/__init__.py diff --git a/python/cugraph/cugraph/comms/comms.pxd b/python/cugraph/cugraph/dask/comms/comms.pxd similarity index 100% rename from python/cugraph/cugraph/comms/comms.pxd rename to python/cugraph/cugraph/dask/comms/comms.pxd diff --git a/python/cugraph/cugraph/comms/comms.py b/python/cugraph/cugraph/dask/comms/comms.py similarity index 92% rename from python/cugraph/cugraph/comms/comms.py rename to python/cugraph/cugraph/dask/comms/comms.py index 735a82d3136..4837628dec6 100644 --- a/python/cugraph/cugraph/comms/comms.py +++ b/python/cugraph/cugraph/dask/comms/comms.py @@ -11,10 +11,20 @@ # See the License for the specific language governing permissions and # limitations under the License. -from raft.dask.common.comms import Comms as raftComms -from raft.dask.common.comms import get_raft_comm_state +# FIXME: these raft imports break the library if ucx-py is +# not available. They are necessary only when doing MG work. +from cugraph.dask.common.read_utils import MissingUCXPy +try: + from raft.dask.common.comms import Comms as raftComms + from raft.dask.common.comms import get_raft_comm_state +except ModuleNotFoundError as err: + if err.name == "ucp": + raftComms = MissingUCXPy() + get_raft_comm_state = MissingUCXPy() + else: + raise from raft.common.handle import Handle -from cugraph.comms.comms_wrapper import init_subcomms as c_init_subcomms +from cugraph.dask.comms.comms_wrapper import init_subcomms as c_init_subcomms from dask.distributed import default_client from cugraph.dask.common import read_utils import math @@ -113,7 +123,7 @@ def initialize(comms=None, -------- >>> from dask.distributed import Client >>> from dask_cuda import LocalCUDACluster - >>> import cugraph.comms as Comms + >>> import cugraph.dask.comms as Comms >>> cluster = LocalCUDACluster() >>> client = Client(cluster) >>> Comms.initialize(p2p=True) diff --git a/python/cugraph/cugraph/comms/comms_wrapper.pyx b/python/cugraph/cugraph/dask/comms/comms_wrapper.pyx similarity index 92% rename from python/cugraph/cugraph/comms/comms_wrapper.pyx rename to python/cugraph/cugraph/dask/comms/comms_wrapper.pyx index 64b094b5ad3..cbed6b1b449 100644 --- a/python/cugraph/cugraph/comms/comms_wrapper.pyx +++ b/python/cugraph/cugraph/dask/comms/comms_wrapper.pyx @@ -18,7 +18,7 @@ from raft.common.handle cimport * -from cugraph.comms.comms cimport init_subcomms as c_init_subcomms +from cugraph.dask.comms.comms cimport init_subcomms as c_init_subcomms def init_subcomms(handle, row_comm_size): diff --git a/python/cugraph/cugraph/dask/community/louvain.py b/python/cugraph/cugraph/dask/community/louvain.py index c3a8cb61ab2..58b4c21168c 100644 --- a/python/cugraph/cugraph/dask/community/louvain.py +++ b/python/cugraph/cugraph/dask/community/louvain.py @@ -15,7 +15,7 @@ from dask.distributed import wait, default_client -import cugraph.comms.comms as Comms +import cugraph.dask.comms.comms as Comms from cugraph.dask.common.input_utils import (get_distributed_data, get_vertex_partition_offsets) from cugraph.dask.community import louvain_wrapper as c_mg_louvain diff --git a/python/cugraph/cugraph/dask/components/connectivity.py b/python/cugraph/cugraph/dask/components/connectivity.py index 77f9e4c1cb2..a4336c5b9b5 100644 --- a/python/cugraph/cugraph/dask/components/connectivity.py +++ b/python/cugraph/cugraph/dask/components/connectivity.py @@ -15,7 +15,7 @@ from cugraph.dask.common.input_utils import (get_distributed_data, get_vertex_partition_offsets) from cugraph.dask.components import mg_connectivity_wrapper as mg_connectivity -import cugraph.comms.comms as Comms +import cugraph.dask.comms.comms as Comms import dask_cudf diff --git a/python/cugraph/cugraph/dask/link_analysis/hits.py b/python/cugraph/cugraph/dask/link_analysis/hits.py index 0099b583eed..450a1b74ae9 100644 --- a/python/cugraph/cugraph/dask/link_analysis/hits.py +++ b/python/cugraph/cugraph/dask/link_analysis/hits.py @@ -16,7 +16,7 @@ from dask.distributed import wait, default_client from cugraph.dask.common.input_utils import get_distributed_data -import cugraph.comms.comms as Comms +import cugraph.dask.comms.comms as Comms import dask_cudf import cudf diff --git a/python/cugraph/cugraph/dask/link_analysis/pagerank.py b/python/cugraph/cugraph/dask/link_analysis/pagerank.py index 7e9d2ffd564..04ee580a34f 100644 --- a/python/cugraph/cugraph/dask/link_analysis/pagerank.py +++ b/python/cugraph/cugraph/dask/link_analysis/pagerank.py @@ -17,7 +17,7 @@ from cugraph.dask.common.input_utils import (get_distributed_data, get_vertex_partition_offsets) from cugraph.dask.link_analysis import mg_pagerank_wrapper as mg_pagerank -import cugraph.comms.comms as Comms +import cugraph.dask.comms.comms as Comms import dask_cudf from dask.dataframe.shuffle import rearrange_by_column @@ -74,7 +74,7 @@ def pagerank(input_graph, input_graph : cugraph.DiGraph cuGraph graph descriptor, should contain the connectivity information as dask cudf edge list dataframe(edge weights are not used for this - algorithm). Undirected Graph not currently supported. + algorithm). alpha : float, optional (default=0.85) The damping factor alpha represents the probability to follow an diff --git a/python/cugraph/cugraph/dask/sampling/neighborhood_sampling.py b/python/cugraph/cugraph/dask/sampling/neighborhood_sampling.py index 3446f2e2d9b..5e73b01f6d3 100644 --- a/python/cugraph/cugraph/dask/sampling/neighborhood_sampling.py +++ b/python/cugraph/cugraph/dask/sampling/neighborhood_sampling.py @@ -24,7 +24,7 @@ uniform_neighborhood_sampling, ) from cugraph.dask.common.input_utils import get_distributed_data -from cugraph.comms import comms as Comms +from cugraph.dask.comms import comms as Comms def call_nbr_sampling(sID, @@ -95,7 +95,7 @@ def EXPERIMENTAL__uniform_neighborhood(input_graph, Parameters ---------- - input_graph : cugraph.DiGraph + input_graph : cugraph.Graph cuGraph graph, which contains connectivity information as dask cudf edge list dataframe diff --git a/python/cugraph/cugraph/dask/structure/replication.pyx b/python/cugraph/cugraph/dask/structure/replication.pyx index 417300f806f..64f43663517 100644 --- a/python/cugraph/cugraph/dask/structure/replication.pyx +++ b/python/cugraph/cugraph/dask/structure/replication.pyx @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2021, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -25,7 +25,7 @@ import cudf import dask.distributed as dd from cugraph.dask.common.input_utils import get_mg_batch_data import dask_cudf -import cugraph.comms.comms as Comms +import cugraph.dask.comms.comms as Comms import cugraph.dask.common.mg_utils as mg_utils import numpy as np diff --git a/python/cugraph/cugraph/dask/traversal/bfs.py b/python/cugraph/cugraph/dask/traversal/bfs.py index 2c4632f509b..ab9dbde1091 100644 --- a/python/cugraph/cugraph/dask/traversal/bfs.py +++ b/python/cugraph/cugraph/dask/traversal/bfs.py @@ -19,7 +19,7 @@ from cugraph.dask.common.input_utils import (get_distributed_data, get_vertex_partition_offsets) from cugraph.dask.traversal import mg_bfs_wrapper as mg_bfs -import cugraph.comms.comms as Comms +import cugraph.dask.comms.comms as Comms import cudf import dask_cudf @@ -66,10 +66,10 @@ def bfs(input_graph, Parameters ---------- - input_graph : directed cugraph.Graph + input_graph : cugraph.Graph cuGraph graph instance, should contain the connectivity information as dask cudf edge list dataframe(edge weights are not used for this - algorithm). Undirected Graph not currently supported. + algorithm). start : Integer Specify starting vertex for breadth-first search; this function diff --git a/python/cugraph/cugraph/dask/traversal/sssp.py b/python/cugraph/cugraph/dask/traversal/sssp.py index 513410bb133..d1007faf196 100644 --- a/python/cugraph/cugraph/dask/traversal/sssp.py +++ b/python/cugraph/cugraph/dask/traversal/sssp.py @@ -19,7 +19,7 @@ from cugraph.dask.common.input_utils import (get_distributed_data, get_vertex_partition_offsets) from cugraph.dask.traversal import mg_sssp_wrapper as mg_sssp -import cugraph.comms.comms as Comms +import cugraph.dask.comms.comms as Comms import cudf import dask_cudf @@ -63,10 +63,9 @@ def sssp(input_graph, source): Parameters ---------- - input_graph : directed cugraph.Graph + input_graph : cugraph.Graph cuGraph graph descriptor, should contain the connectivity information as dask cudf edge list dataframe. - Undirected Graph not currently supported. source : Integer Specify source vertex diff --git a/python/cugraph/cugraph/generators/rmat.py b/python/cugraph/cugraph/generators/rmat.py index c6d243f9eb1..4417e5a351f 100644 --- a/python/cugraph/cugraph/generators/rmat.py +++ b/python/cugraph/cugraph/generators/rmat.py @@ -15,7 +15,7 @@ import dask_cudf from cugraph.generators import rmat_wrapper -from cugraph.comms import comms as Comms +from cugraph.dask.comms import comms as Comms import cugraph _graph_types = [cugraph.Graph, cugraph.MultiGraph] diff --git a/python/cugraph/cugraph/link_prediction/jaccard.py b/python/cugraph/cugraph/link_prediction/jaccard.py index 8c9ad6754db..10bfd35f252 100644 --- a/python/cugraph/cugraph/link_prediction/jaccard.py +++ b/python/cugraph/cugraph/link_prediction/jaccard.py @@ -12,7 +12,6 @@ # limitations under the License. import cudf -from cugraph.structure.graph_classes import Graph from cugraph.link_prediction import jaccard_wrapper from cugraph.utilities import (ensure_cugraph_obj_for_nx, df_edge_score_to_dictionary, @@ -108,9 +107,8 @@ def jaccard(input_graph, vertex_pair=None): >>> df = cugraph.jaccard(G) """ - if type(input_graph) is not Graph: - raise TypeError("input graph must a Graph") - + if input_graph.is_directed(): + raise ValueError("Input must be an undirected Graph.") if type(vertex_pair) == cudf.DataFrame: vertex_pair = renumber_vertex_pair(input_graph, vertex_pair) elif vertex_pair is not None: diff --git a/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py b/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py index 70068d83548..42889d36635 100644 --- a/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py +++ b/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py @@ -64,8 +64,6 @@ def __from_edgelist( ): if not isinstance(input_ddf, dask_cudf.DataFrame): raise TypeError("input should be a dask_cudf dataFrame") - if self.properties.directed is False: - raise TypeError("Undirected distributed graph not supported") s_col = source d_col = destination diff --git a/python/cugraph/cugraph/structure/graph_implementation/simpleGraph.py b/python/cugraph/cugraph/structure/graph_implementation/simpleGraph.py index 23054151d1a..6bd597d7185 100644 --- a/python/cugraph/cugraph/structure/graph_implementation/simpleGraph.py +++ b/python/cugraph/cugraph/structure/graph_implementation/simpleGraph.py @@ -18,7 +18,7 @@ import cugraph.dask.common.mg_utils as mg_utils import cudf import dask_cudf -import cugraph.comms.comms as Comms +import cugraph.dask.comms.comms as Comms import pandas as pd import numpy as np from cugraph.dask.structure import replication diff --git a/python/cugraph/cugraph/structure/graph_primtypes_wrapper.pyx b/python/cugraph/cugraph/structure/graph_primtypes_wrapper.pyx index 7a2c0e57e82..b680be143c9 100644 --- a/python/cugraph/cugraph/structure/graph_primtypes_wrapper.pyx +++ b/python/cugraph/cugraph/structure/graph_primtypes_wrapper.pyx @@ -24,7 +24,7 @@ import enum from libc.stdint cimport uintptr_t import dask_cudf as dc -import cugraph.comms.comms as Comms +import cugraph.dask.comms.comms as Comms from dask.distributed import wait, default_client from cugraph.dask.common.input_utils import DistributedDataHandler diff --git a/python/cugraph/cugraph/structure/number_map.py b/python/cugraph/cugraph/structure/number_map.py index d29323250e3..d6d70cdd02d 100644 --- a/python/cugraph/cugraph/structure/number_map.py +++ b/python/cugraph/cugraph/structure/number_map.py @@ -22,7 +22,7 @@ from cugraph.dask.common.input_utils import get_distributed_data from cugraph.structure import renumber_wrapper as c_renumber -import cugraph.comms.comms as Comms +import cugraph.dask.comms.comms as Comms def call_renumber(sID, diff --git a/python/cugraph/cugraph/structure/shuffle.py b/python/cugraph/cugraph/structure/shuffle.py index 120d24e2cc5..4f50d37f5c3 100644 --- a/python/cugraph/cugraph/structure/shuffle.py +++ b/python/cugraph/cugraph/structure/shuffle.py @@ -13,7 +13,7 @@ from dask.dataframe.shuffle import rearrange_by_column import cudf -import cugraph.comms.comms as Comms +import cugraph.dask.comms.comms as Comms def _set_partitions_pre(df, vertex_row_partitions, vertex_col_partitions, diff --git a/python/cugraph/cugraph/structure/symmetrize.py b/python/cugraph/cugraph/structure/symmetrize.py index 311d4ca99f5..477805e3a8b 100644 --- a/python/cugraph/cugraph/structure/symmetrize.py +++ b/python/cugraph/cugraph/structure/symmetrize.py @@ -14,7 +14,7 @@ from cugraph.structure import graph_classes as csg import cudf import dask_cudf -from cugraph.comms import comms as Comms +from cugraph.dask.comms import comms as Comms def symmetrize_df(df, src_name, dst_name, multi=False, symmetrize=True): diff --git a/python/cugraph/cugraph/tests/conftest.py b/python/cugraph/cugraph/tests/conftest.py index 046aa629286..f5bcb35995e 100644 --- a/python/cugraph/cugraph/tests/conftest.py +++ b/python/cugraph/cugraph/tests/conftest.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021, NVIDIA CORPORATION. +# Copyright (c) 2021-2022, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -20,7 +20,7 @@ from dask_cuda import LocalCUDACluster from dask_cuda.initialize import initialize -from cugraph.comms import comms as Comms +from cugraph.dask.comms import comms as Comms from cugraph.dask.common.mg_utils import get_visible_devices diff --git a/python/cugraph/cugraph/tests/dask/__init__.py b/python/cugraph/cugraph/tests/mg/__init__.py similarity index 92% rename from python/cugraph/cugraph/tests/dask/__init__.py rename to python/cugraph/cugraph/tests/mg/__init__.py index 61cad32a731..30e611b870c 100644 --- a/python/cugraph/cugraph/tests/dask/__init__.py +++ b/python/cugraph/cugraph/tests/mg/__init__.py @@ -1,4 +1,4 @@ -# Copyright (c) 2018-2021, NVIDIA CORPORATION. +# Copyright (c) 2018-2022, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at diff --git a/python/cugraph/cugraph/tests/dask/mg_context.py b/python/cugraph/cugraph/tests/mg/mg_context.py similarity index 97% rename from python/cugraph/cugraph/tests/dask/mg_context.py rename to python/cugraph/cugraph/tests/mg/mg_context.py index cf2909ad029..fd93668b4b0 100644 --- a/python/cugraph/cugraph/tests/dask/mg_context.py +++ b/python/cugraph/cugraph/tests/mg/mg_context.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2021, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -20,7 +20,7 @@ from cugraph.dask.common.mg_utils import get_visible_devices from dask_cuda import LocalCUDACluster as CUDACluster -import cugraph.comms as Comms +import cugraph.dask.comms as Comms # Maximal number of verifications of the number of workers diff --git a/python/cugraph/cugraph/tests/dask/test_mg_batch_betweenness_centrality.py b/python/cugraph/cugraph/tests/mg/test_mg_batch_betweenness_centrality.py similarity index 98% rename from python/cugraph/cugraph/tests/dask/test_mg_batch_betweenness_centrality.py rename to python/cugraph/cugraph/tests/mg/test_mg_batch_betweenness_centrality.py index 834192e360f..6501ac6a04b 100644 --- a/python/cugraph/cugraph/tests/dask/test_mg_batch_betweenness_centrality.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_batch_betweenness_centrality.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021, NVIDIA CORPORATION. +# Copyright (c) 2021-2022, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at diff --git a/python/cugraph/cugraph/tests/dask/test_mg_batch_edge_betweenness_centrality.py b/python/cugraph/cugraph/tests/mg/test_mg_batch_edge_betweenness_centrality.py similarity index 98% rename from python/cugraph/cugraph/tests/dask/test_mg_batch_edge_betweenness_centrality.py rename to python/cugraph/cugraph/tests/mg/test_mg_batch_edge_betweenness_centrality.py index 50015279d7c..ca17bd75bed 100644 --- a/python/cugraph/cugraph/tests/dask/test_mg_batch_edge_betweenness_centrality.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_batch_edge_betweenness_centrality.py @@ -1,4 +1,4 @@ -# Copyright (c) 2019-2021, NVIDIA CORPORATION. +# Copyright (c) 2019-2022, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at diff --git a/python/cugraph/cugraph/tests/dask/test_mg_bfs.py b/python/cugraph/cugraph/tests/mg/test_mg_bfs.py similarity index 86% rename from python/cugraph/cugraph/tests/dask/test_mg_bfs.py rename to python/cugraph/cugraph/tests/mg/test_mg_bfs.py index 58b18a02cf5..f5aa1a05b98 100644 --- a/python/cugraph/cugraph/tests/dask/test_mg_bfs.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_bfs.py @@ -11,6 +11,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import pytest import cugraph.dask as dcg import gc # import pytest @@ -20,12 +21,23 @@ # from cugraph.dask.common.mg_utils import is_single_gpu from cugraph.tests.utils import RAPIDS_DATASET_ROOT_DIR_PATH +# ============================================================================= +# Pytest Setup / Teardown - called for each test function +# ============================================================================= + + +def setup_function(): + gc.collect() + + +IS_DIRECTED = [True, False] + # @pytest.mark.skipif( # is_single_gpu(), reason="skipping MG testing on Single GPU system" # ) -def test_dask_bfs(dask_client): - gc.collect() +@pytest.mark.parametrize("directed", IS_DIRECTED) +def test_dask_bfs(dask_client, directed): input_data_path = (RAPIDS_DATASET_ROOT_DIR_PATH / "netscience.csv").as_posix() @@ -60,10 +72,10 @@ def modify_dataset(df): df = modify_dataset(df) - g = cugraph.Graph(directed=True) + g = cugraph.Graph(directed=directed) g.from_cudf_edgelist(df, "src", "dst") - dg = cugraph.Graph(directed=True) + dg = cugraph.Graph(directed=directed) dg.from_dask_cudf_edgelist(ddf, "src", "dst") expected_dist = cugraph.bfs(g, [0, 1000]) @@ -88,7 +100,8 @@ def modify_dataset(df): # @pytest.mark.skipif( # is_single_gpu(), reason="skipping MG testing on Single GPU system" # ) -def test_dask_bfs_multi_column_depthlimit(dask_client): +@pytest.mark.parametrize("directed", IS_DIRECTED) +def test_dask_bfs_multi_column_depthlimit(dask_client, directed): gc.collect() input_data_path = (RAPIDS_DATASET_ROOT_DIR_PATH / @@ -115,10 +128,10 @@ def test_dask_bfs_multi_column_depthlimit(dask_client): df['src_b'] = df['src_a'] + 1000 df['dst_b'] = df['dst_a'] + 1000 - g = cugraph.Graph(directed=True) + g = cugraph.Graph(directed=directed) g.from_cudf_edgelist(df, ["src_a", "src_b"], ["dst_a", "dst_b"]) - dg = cugraph.Graph(directed=True) + dg = cugraph.Graph(directed=directed) dg.from_dask_cudf_edgelist(ddf, ["src_a", "src_b"], ["dst_a", "dst_b"]) start = cudf.DataFrame() diff --git a/python/cugraph/cugraph/tests/dask/test_mg_comms.py b/python/cugraph/cugraph/tests/mg/test_mg_comms.py similarity index 86% rename from python/cugraph/cugraph/tests/dask/test_mg_comms.py rename to python/cugraph/cugraph/tests/mg/test_mg_comms.py index 2c02779f199..cb3dfdc3eb7 100644 --- a/python/cugraph/cugraph/tests/dask/test_mg_comms.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_comms.py @@ -11,6 +11,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import pytest import cugraph.dask as dcg import gc # import pytest @@ -20,12 +21,23 @@ # from cugraph.dask.common.mg_utils import is_single_gpu from cugraph.tests.utils import RAPIDS_DATASET_ROOT_DIR_PATH +# ============================================================================= +# Pytest Setup / Teardown - called for each test function +# ============================================================================= + + +def setup_function(): + gc.collect() + + +IS_DIRECTED = [True, False] + # @pytest.mark.skipif( # is_single_gpu(), reason="skipping MG testing on Single GPU system" # ) -def test_dask_pagerank(dask_client): - gc.collect() +@pytest.mark.parametrize("directed", IS_DIRECTED) +def test_dask_pagerank(dask_client, directed): # Initialize and run pagerank on two distributed graphs # with same communicator @@ -48,7 +60,7 @@ def test_dask_pagerank(dask_client): dtype=["int32", "int32", "float32"], ) - dg1 = cugraph.Graph(directed=True) + dg1 = cugraph.Graph(directed=directed) dg1.from_dask_cudf_edgelist(ddf1, "src", "dst") result_pr1 = dcg.pagerank(dg1).compute() @@ -61,7 +73,7 @@ def test_dask_pagerank(dask_client): dtype=["int32", "int32", "float32"], ) - dg2 = cugraph.Graph(directed=True) + dg2 = cugraph.Graph(directed=directed) dg2.from_dask_cudf_edgelist(ddf2, "src", "dst") result_pr2 = dcg.pagerank(dg2).compute() @@ -74,7 +86,7 @@ def test_dask_pagerank(dask_client): dtype=["int32", "int32", "float32"], ) - g1 = cugraph.Graph(directed=True) + g1 = cugraph.Graph(directed=directed) g1.from_cudf_edgelist(df1, "src", "dst") expected_pr1 = cugraph.pagerank(g1) @@ -85,7 +97,7 @@ def test_dask_pagerank(dask_client): dtype=["int32", "int32", "float32"], ) - g2 = cugraph.Graph(directed=True) + g2 = cugraph.Graph(directed=directed) g2.from_cudf_edgelist(df2, "src", "dst") expected_pr2 = cugraph.pagerank(g2) diff --git a/python/cugraph/cugraph/tests/dask/test_mg_connectivity.py b/python/cugraph/cugraph/tests/mg/test_mg_connectivity.py similarity index 82% rename from python/cugraph/cugraph/tests/dask/test_mg_connectivity.py rename to python/cugraph/cugraph/tests/mg/test_mg_connectivity.py index 9427b18aa92..fbe04da6348 100644 --- a/python/cugraph/cugraph/tests/dask/test_mg_connectivity.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_connectivity.py @@ -11,6 +11,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import pytest import cugraph.dask as dcg import gc # import pytest @@ -20,12 +21,23 @@ # from cugraph.dask.common.mg_utils import is_single_gpu from cugraph.tests.utils import RAPIDS_DATASET_ROOT_DIR_PATH +# ============================================================================= +# Pytest Setup / Teardown - called for each test function +# ============================================================================= + + +def setup_function(): + gc.collect() + + +IS_DIRECTED = [True, False] + # @pytest.mark.skipif( # is_single_gpu(), reason="skipping MG testing on Single GPU system" # ) -def test_dask_wcc(dask_client): - gc.collect() +@pytest.mark.parametrize("directed", IS_DIRECTED) +def test_dask_wcc(dask_client, directed): input_data_path = (RAPIDS_DATASET_ROOT_DIR_PATH / "netscience.csv").as_posix() @@ -47,10 +59,10 @@ def test_dask_wcc(dask_client): dtype=["int32", "int32", "float32"], ) - g = cugraph.Graph(directed=True) + g = cugraph.Graph(directed=directed) g.from_cudf_edgelist(df, "src", "dst", renumber=True) - dg = cugraph.Graph(directed=True) + dg = cugraph.Graph(directed=directed) dg.from_dask_cudf_edgelist(ddf, "src", "dst") expected_dist = cugraph.weakly_connected_components(g) diff --git a/python/cugraph/cugraph/tests/dask/test_mg_degree.py b/python/cugraph/cugraph/tests/mg/test_mg_degree.py similarity index 82% rename from python/cugraph/cugraph/tests/dask/test_mg_degree.py rename to python/cugraph/cugraph/tests/mg/test_mg_degree.py index b0b46e93d10..ce27c6e6c95 100644 --- a/python/cugraph/cugraph/tests/dask/test_mg_degree.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_degree.py @@ -21,12 +21,23 @@ from cugraph.dask.common.mg_utils import is_single_gpu from cugraph.tests.utils import RAPIDS_DATASET_ROOT_DIR_PATH +# ============================================================================= +# Pytest Setup / Teardown - called for each test function +# ============================================================================= + + +def setup_function(): + gc.collect() + + +IS_DIRECTED = [True, False] + @pytest.mark.skipif( is_single_gpu(), reason="skipping MG testing on Single GPU system" ) -def test_dask_mg_degree(dask_client): - gc.collect() +@pytest.mark.parametrize("directed", IS_DIRECTED) +def test_dask_mg_degree(dask_client, directed): input_data_path = (RAPIDS_DATASET_ROOT_DIR_PATH / "karate-asymmetric.csv").as_posix() @@ -49,10 +60,10 @@ def test_dask_mg_degree(dask_client): dtype=["int32", "int32", "float32"], ) - dg = cugraph.Graph(directed=True) + dg = cugraph.Graph(directed=directed) dg.from_dask_cudf_edgelist(ddf, "src", "dst") - g = cugraph.Graph(directed=True) + g = cugraph.Graph(directed=directed) g.from_cudf_edgelist(df, "src", "dst") merge_df_in = ( diff --git a/python/cugraph/cugraph/tests/dask/test_mg_doctests.py b/python/cugraph/cugraph/tests/mg/test_mg_doctests.py similarity index 99% rename from python/cugraph/cugraph/tests/dask/test_mg_doctests.py rename to python/cugraph/cugraph/tests/mg/test_mg_doctests.py index 547d0bc770b..1ed387095e3 100644 --- a/python/cugraph/cugraph/tests/dask/test_mg_doctests.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_doctests.py @@ -28,7 +28,7 @@ from dask.distributed import Client from dask_cuda import LocalCUDACluster -import cugraph.comms as Comms +import cugraph.dask.comms.comms as Comms datasets = utils.RAPIDS_DATASET_ROOT_DIR_PATH diff --git a/python/cugraph/cugraph/tests/dask/test_mg_hits.py b/python/cugraph/cugraph/tests/mg/test_mg_hits.py similarity index 92% rename from python/cugraph/cugraph/tests/dask/test_mg_hits.py rename to python/cugraph/cugraph/tests/mg/test_mg_hits.py index 124bc5066cb..dde91690b7a 100644 --- a/python/cugraph/cugraph/tests/dask/test_mg_hits.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_hits.py @@ -19,6 +19,7 @@ # from cugraph.dask.common.mg_utils import is_single_gpu from cugraph.tests import utils + # ============================================================================= # Pytest Setup / Teardown - called for each test function # ============================================================================= @@ -28,6 +29,9 @@ def setup_function(): gc.collect() +IS_DIRECTED = [True, False] + + # ============================================================================= # Pytest fixtures # ============================================================================= @@ -38,6 +42,7 @@ def setup_function(): fixture_params = utils.genFixtureParamsProduct((datasets, "graph_file"), ([50], "max_iter"), ([1.0e-6], "tol"), + (IS_DIRECTED, "directed") ) @@ -47,7 +52,10 @@ def input_combo(request): Simply return the current combination of params as a dictionary for use in tests or other parameterized fixtures. """ - parameters = dict(zip(("graph_file", "max_iter", "tol"), request.param)) + parameters = dict(zip(("graph_file", + "max_iter", + "tol", + "directed"), request.param)) return parameters @@ -60,9 +68,9 @@ def input_expected_output(input_combo): """ input_data_path = input_combo["graph_file"] - + directed = input_combo["directed"] G = utils.generate_cugraph_graph_from_file( - input_data_path) + input_data_path, directed=directed) sg_cugraph_hits = cugraph.hits( G, input_combo["max_iter"], @@ -83,7 +91,7 @@ def input_expected_output(input_combo): dtype=["int32", "int32", "float32"], ) - dg = cugraph.Graph(directed=True) + dg = cugraph.Graph(directed=directed) dg.from_dask_cudf_edgelist( ddf, source='src', destination='dst', edge_attr='value', renumber=True) diff --git a/python/cugraph/cugraph/tests/dask/test_mg_katz_centrality.py b/python/cugraph/cugraph/tests/mg/test_mg_katz_centrality.py similarity index 79% rename from python/cugraph/cugraph/tests/dask/test_mg_katz_centrality.py rename to python/cugraph/cugraph/tests/mg/test_mg_katz_centrality.py index 1b0f89c1fd7..97b5b56fe9a 100644 --- a/python/cugraph/cugraph/tests/dask/test_mg_katz_centrality.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_katz_centrality.py @@ -22,11 +22,23 @@ from cugraph.tests.utils import RAPIDS_DATASET_ROOT_DIR_PATH +# ============================================================================= +# Pytest Setup / Teardown - called for each test function +# ============================================================================= + + +def setup_function(): + gc.collect() + + +IS_DIRECTED = [True, False] + + @pytest.mark.skipif( is_single_gpu(), reason="skipping MG testing on Single GPU system" ) -def test_dask_katz_centrality(dask_client): - gc.collect() +@pytest.mark.parametrize("directed", IS_DIRECTED) +def test_dask_katz_centrality(dask_client, directed): input_data_path = (RAPIDS_DATASET_ROOT_DIR_PATH / "karate.csv").as_posix() @@ -55,9 +67,14 @@ def test_dask_katz_centrality(dask_client): import networkx as nx from cugraph.tests import utils NM = utils.read_csv_for_nx(input_data_path) - Gnx = nx.from_pandas_edgelist( - NM, create_using=nx.DiGraph(), source="0", target="1" - ) + if directed: + Gnx = nx.from_pandas_edgelist( + NM, create_using=nx.DiGraph(), source="0", target="1" + ) + else: + Gnx = nx.from_pandas_edgelist( + NM, create_using=nx.Graph(), source="0", target="1" + ) nk = nx.katz_centrality(Gnx, alpha=katz_alpha) import pandas as pd pdf = pd.DataFrame(nk.items(), columns=['vertex', 'katz_centrality']) diff --git a/python/cugraph/cugraph/tests/dask/test_mg_louvain.py b/python/cugraph/cugraph/tests/mg/test_mg_louvain.py similarity index 63% rename from python/cugraph/cugraph/tests/dask/test_mg_louvain.py rename to python/cugraph/cugraph/tests/mg/test_mg_louvain.py index fbf3dab90e2..43389c0f679 100644 --- a/python/cugraph/cugraph/tests/dask/test_mg_louvain.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_louvain.py @@ -49,6 +49,7 @@ def setFixtureParamNames(*args, **kwargs): def daskGraphFromDataset(request, dask_client): """ Returns a new dask dataframe created from the dataset file param. + This creates un undirected Graph. """ # Since parameterized fixtures do not assign param names to param values, # manually call the helper to do so. @@ -69,15 +70,43 @@ def daskGraphFromDataset(request, dask_client): return dg +@pytest.fixture(scope="module", + params=utils.DATASETS_UNDIRECTED, + ids=[f"dataset={d.as_posix()}" + for d in utils.DATASETS_UNDIRECTED]) +def uddaskGraphFromDataset(request, dask_client): + """ + Returns a new dask dataframe created from the dataset file param. + This creates un undirected Graph. + """ + # Since parameterized fixtures do not assign param names to param + # values, manually call the helper to do so. + setFixtureParamNames(request, ["dataset"]) + dataset = request.param + + chunksize = dcg.get_chunksize(dataset) + ddf = dask_cudf.read_csv( + dataset, + chunksize=chunksize, + delimiter=" ", + names=["src", "dst", "value"], + dtype=["int32", "int32", "float32"], + ) + + dg = cugraph.Graph(directed=False) + dg.from_dask_cudf_edgelist(ddf, "src", "dst") + return dg + + ############################################################################### # Tests # @pytest.mark.skipif( # is_single_gpu(), reason="skipping MG testing on Single GPU system" # ) def test_mg_louvain_with_edgevals(daskGraphFromDataset): - # FIXME: daskGraphFromDataset returns a DiGraph, which Louvain is currently - # accepting. In the future, an MNMG symmeterize will need to be called to - # create a Graph for Louvain. + # FIXME: daskGraphFromDataset returns a Directed graph, which Louvain is + # currently accepting. In the future, an MNMG symmeterize will need to + # be called to create a Graph for Louvain. parts, mod = dcg.louvain(daskGraphFromDataset) # FIXME: either call Nx with the same dataset and compare results, or @@ -86,3 +115,19 @@ def test_mg_louvain_with_edgevals(daskGraphFromDataset): print(parts.compute()) print(mod) print() + + +############################################################################### +# Tests +# @pytest.mark.skipif( +# is_single_gpu(), reason="skipping MG testing on Single GPU system" +# ) +def test_mg_udlouvain_with_edgevals(uddaskGraphFromDataset): + parts, mod = dcg.louvain(uddaskGraphFromDataset) + + # FIXME: either call Nx with the same dataset and compare results, or + # hardcode golden results to compare to. + print() + print(parts.compute()) + print(mod) + print() diff --git a/python/cugraph/cugraph/tests/dask/test_mg_neighborhood_sampling.py b/python/cugraph/cugraph/tests/mg/test_mg_neighborhood_sampling.py similarity index 89% rename from python/cugraph/cugraph/tests/dask/test_mg_neighborhood_sampling.py rename to python/cugraph/cugraph/tests/mg/test_mg_neighborhood_sampling.py index 25838445e11..4dcba8d6ee9 100644 --- a/python/cugraph/cugraph/tests/dask/test_mg_neighborhood_sampling.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_neighborhood_sampling.py @@ -28,6 +28,9 @@ def setup_function(): gc.collect() +IS_DIRECTED = [True, False] + + # datasets = utils.RAPIDS_DATASET_ROOT_DIR_PATH/"karate.csv" datasets = utils.DATASETS_SMALL fixture_params = utils.genFixtureParamsProduct((datasets, "graph_file")) @@ -43,10 +46,12 @@ def _get_param_args(param_name, param_values): [pytest.param(v, id=f"{param_name}={v}") for v in param_values]) -@pytest.mark.skipif( - is_single_gpu(), reason="skipping MG testing on Single GPU system" -) -def test_mg_neighborhood_sampling_simple(dask_client): +# @pytest.mark.skipif( +# is_single_gpu(), reason="skipping MG testing on Single GPU system" +# ) +@pytest.mark.skip(reason="Currently hangs, awaiting fix in algo") +@pytest.mark.parametrize("directed", IS_DIRECTED) +def test_mg_neighborhood_sampling_simple(dask_client, directed): from cugraph.experimental.dask import uniform_neighborhood_sampling @@ -60,7 +65,7 @@ def test_mg_neighborhood_sampling_simple(dask_client): }) ddf = dask_cudf.from_cudf(df, npartitions=2) - G = cugraph.Graph(directed=True) + G = cugraph.Graph(directed=directed) G.from_dask_cudf_edgelist(ddf, "src", "dst", "value") # TODO: Incomplete, include more testing for tree graph as well as @@ -90,7 +95,9 @@ def test_mg_neighborhood_sampling_simple(dask_client): @pytest.mark.skipif( is_single_gpu(), reason="skipping MG testing on Single GPU system" ) -def test_mg_neighborhood_sampling_tree(dask_client): +@pytest.mark.parametrize("directed", IS_DIRECTED) +@pytest.mark.skip(reason="Currently hangs, awaiting fix in algo") +def test_mg_neighborhood_sampling_tree(dask_client, directed): from cugraph.experimental.dask import uniform_neighborhood_sampling @@ -106,7 +113,7 @@ def test_mg_neighborhood_sampling_tree(dask_client): dtype=["int32", "int32", "float32"], ) - G = cugraph.Graph(directed=True) + G = cugraph.Graph(directed=directed) G.from_dask_cudf_edgelist(ddf, "src", "dst", "value") # TODO: Incomplete, include more testing for tree graph as well as diff --git a/python/cugraph/cugraph/tests/dask/test_mg_pagerank.py b/python/cugraph/cugraph/tests/mg/test_mg_pagerank.py similarity index 93% rename from python/cugraph/cugraph/tests/dask/test_mg_pagerank.py rename to python/cugraph/cugraph/tests/mg/test_mg_pagerank.py index 957e9a7747e..f03a77a46f6 100644 --- a/python/cugraph/cugraph/tests/dask/test_mg_pagerank.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_pagerank.py @@ -48,13 +48,15 @@ def personalize(vertices, personalization_perc): PERSONALIZATION_PERC = [0, 10, 50] +IS_DIRECTED = [True, False] # @pytest.mark.skipif( # is_single_gpu(), reason="skipping MG testing on Single GPU system" # ) @pytest.mark.parametrize("personalization_perc", PERSONALIZATION_PERC) -def test_dask_pagerank(dask_client, personalization_perc): +@pytest.mark.parametrize("directed", IS_DIRECTED) +def test_dask_pagerank(dask_client, personalization_perc, directed): gc.collect() input_data_path = (RAPIDS_DATASET_ROOT_DIR_PATH / @@ -77,10 +79,10 @@ def test_dask_pagerank(dask_client, personalization_perc): dtype=["int32", "int32", "float32"], ) - g = cugraph.Graph(directed=True) + g = cugraph.Graph(directed=directed) g.from_cudf_edgelist(df, "src", "dst") - dg = cugraph.Graph(directed=True) + dg = cugraph.Graph(directed=directed) dg.from_dask_cudf_edgelist(ddf, "src", "dst") personalization = None diff --git a/python/cugraph/cugraph/tests/dask/test_mg_renumber.py b/python/cugraph/cugraph/tests/mg/test_mg_renumber.py similarity index 93% rename from python/cugraph/cugraph/tests/dask/test_mg_renumber.py rename to python/cugraph/cugraph/tests/mg/test_mg_renumber.py index a0e428d1d65..d21b96cf4c5 100644 --- a/python/cugraph/cugraph/tests/dask/test_mg_renumber.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_renumber.py @@ -38,6 +38,9 @@ def setup_function(): gc.collect() +IS_DIRECTED = [True, False] + + @pytest.mark.skipif( is_single_gpu(), reason="skipping MG testing on Single GPU system" ) @@ -130,7 +133,8 @@ def test_mg_renumber_add_internal_vertex_id(graph_file, dask_client): @pytest.mark.skipif( is_single_gpu(), reason="skipping MG testing on Single GPU system" ) -def test_dask_pagerank(dask_client): +@pytest.mark.parametrize("directed", IS_DIRECTED) +def test_dask_pagerank(dask_client, directed): pandas.set_option("display.max_rows", 10000) input_data_path = (RAPIDS_DATASET_ROOT_DIR_PATH / @@ -152,10 +156,10 @@ def test_dask_pagerank(dask_client): dtype=["int32", "int32", "float32"], ) - g = cugraph.Graph(directed=True) + g = cugraph.Graph(directed=directed) g.from_cudf_edgelist(df, "src", "dst") - dg = cugraph.Graph(directed=True) + dg = cugraph.Graph(directed=directed) dg.from_dask_cudf_edgelist(ddf, "src", "dst") expected_pr = cugraph.pagerank(g) @@ -185,7 +189,8 @@ def test_dask_pagerank(dask_client): is_single_gpu(), reason="skipping MG testing on Single GPU system" ) @pytest.mark.parametrize("renumber", [False]) -def test_directed_graph_renumber_false(renumber, dask_client): +@pytest.mark.parametrize("directed", IS_DIRECTED) +def test_graph_renumber_false(renumber, dask_client, directed): input_data_path = (RAPIDS_DATASET_ROOT_DIR_PATH / "karate.csv").as_posix() chunksize = dcg.get_chunksize(input_data_path) @@ -197,7 +202,7 @@ def test_directed_graph_renumber_false(renumber, dask_client): names=["src", "dst", "value"], dtype=["int32", "int32", "float32"], ) - dg = cugraph.Graph(directed=True) + dg = cugraph.Graph(directed=directed) with pytest.raises(ValueError): dg.from_dask_cudf_edgelist(ddf, "src", "dst", renumber=renumber) @@ -207,7 +212,8 @@ def test_directed_graph_renumber_false(renumber, dask_client): is_single_gpu(), reason="skipping MG testing on Single GPU system" ) @pytest.mark.parametrize("renumber", [False]) -def test_multi_directed_graph_renumber_false(renumber, dask_client): +@pytest.mark.parametrize("directed", IS_DIRECTED) +def test_multi_graph_renumber_false(renumber, dask_client, directed): input_data_path = (RAPIDS_DATASET_ROOT_DIR_PATH / "karate_multi_edge.csv").as_posix() chunksize = dcg.get_chunksize(input_data_path) @@ -219,8 +225,10 @@ def test_multi_directed_graph_renumber_false(renumber, dask_client): names=["src", "dst", "value"], dtype=["int32", "int32", "float32"], ) - dg = cugraph.MultiGraph(directed=True) + dg = cugraph.MultiGraph(directed=directed) + # ValueError always thrown since renumber must be True with + # MNMG algorithms with pytest.raises(ValueError): dg.from_dask_cudf_edgelist(ddf, "src", "dst", renumber=renumber) diff --git a/python/cugraph/cugraph/tests/dask/test_mg_replication.py b/python/cugraph/cugraph/tests/mg/test_mg_replication.py similarity index 100% rename from python/cugraph/cugraph/tests/dask/test_mg_replication.py rename to python/cugraph/cugraph/tests/mg/test_mg_replication.py diff --git a/python/cugraph/cugraph/tests/dask/test_mg_sssp.py b/python/cugraph/cugraph/tests/mg/test_mg_sssp.py similarity index 81% rename from python/cugraph/cugraph/tests/dask/test_mg_sssp.py rename to python/cugraph/cugraph/tests/mg/test_mg_sssp.py index 656c91d1754..c8a2361b6ad 100644 --- a/python/cugraph/cugraph/tests/dask/test_mg_sssp.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_sssp.py @@ -11,6 +11,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import pytest import cugraph.dask as dcg import gc # import pytest @@ -20,12 +21,23 @@ # from cugraph.dask.common.mg_utils import is_single_gpu from cugraph.tests.utils import RAPIDS_DATASET_ROOT_DIR_PATH +# ============================================================================= +# Pytest Setup / Teardown - called for each test function +# ============================================================================= + + +def setup_function(): + gc.collect() + + +IS_DIRECTED = [True, False] + # @pytest.mark.skipif( # is_single_gpu(), reason="skipping MG testing on Single GPU system" # ) -def test_dask_sssp(dask_client): - gc.collect() +@pytest.mark.parametrize("directed", IS_DIRECTED) +def test_dask_sssp(dask_client, directed): input_data_path = (RAPIDS_DATASET_ROOT_DIR_PATH / "netscience.csv").as_posix() @@ -47,10 +59,10 @@ def test_dask_sssp(dask_client): dtype=["int32", "int32", "float32"], ) - g = cugraph.Graph(directed=True) + g = cugraph.Graph(directed=directed) g.from_cudf_edgelist(df, "src", "dst", "value", renumber=True) - dg = cugraph.Graph(directed=True) + dg = cugraph.Graph(directed=directed) dg.from_dask_cudf_edgelist(ddf, "src", "dst", "value") expected_dist = cugraph.sssp(g, 0) diff --git a/python/cugraph/cugraph/tests/mg/test_mg_symmetrize.py b/python/cugraph/cugraph/tests/mg/test_mg_symmetrize.py new file mode 100644 index 00000000000..2749069b59a --- /dev/null +++ b/python/cugraph/cugraph/tests/mg/test_mg_symmetrize.py @@ -0,0 +1,201 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import gc + +import pytest + +import pandas as pd +import cudf +import cugraph +from cugraph.tests import utils +from cugraph.dask.common.mg_utils import (is_single_gpu, + setup_local_dask_cluster, + teardown_local_dask_cluster) + + +def test_version(): + gc.collect() + cugraph.__version__ + + +def compare(src1, dst1, val1, src2, dst2, val2): + # + # We will do comparison computations by using dataframe + # merge functions (essentially doing fast joins). We + # start by making two data frames + # + df1 = cudf.DataFrame() + df1["src1"] = src1 + df1["dst1"] = dst1 + if val1 is not None: + df1["val1"] = val1 + + df2 = cudf.DataFrame() + df2["src2"] = src2 + df2["dst2"] = dst2 + if val2 is not None: + df2["val2"] = val2 + + # + # Check to see if all pairs in the original data frame + # still exist in the new data frame. If we join (merge) + # the data frames where (src1[i]=src2[i]) and (dst1[i]=dst2[i]) + # then we should get exactly the same number of entries in + # the data frame if we did not lose any data. + # + join = df1.merge(df2, left_on=["src1", "dst1"], right_on=["src2", "dst2"]) + + if len(df1) != len(join): + join2 = df1.merge(df2, how='left', + left_on=["src1", "dst1"], right_on=["src2", "dst2"]) + pd.set_option('display.max_rows', 500) + print('df1 = \n', df1.sort_values(["src1", "dst1"])) + print('df2 = \n', df2.sort_values(["src2", "dst2"])) + print('join2 = \n', join2.sort_values(["src1", "dst1"]) + .to_pandas().query('src2.isnull()', engine='python')) + + assert len(df1) == len(join) + + if val1 is not None: + # + # Check the values. In this join, if val1 and val2 are + # the same then we are good. If they are different then + # we need to check if the value is selected from the opposite + # direction, so we'll merge with the edges reversed and + # check to make sure that the values all match + # + diffs = join.query("val1 != val2") + diffs_check = diffs.merge( + df1, left_on=["src1", "dst1"], right_on=["dst1", "src1"] + ) + query = diffs_check.query("val1_y != val2") + if len(query) > 0: + print("differences: ") + print(query) + assert 0 == len(query) + + # + # Now check the symmetrized edges are present. If the original + # data contains (u,v) we want to make sure that (v,u) is present + # in the new data frame. + # + # We can accomplish this by doing the join (merge) where + # (src1[i] = dst2[i]) and (dst1[i] = src2[i]), and verifying + # that we get exactly the same number of entries in the data frame. + # + join = df1.merge(df2, left_on=["src1", "dst1"], right_on=["dst2", "src2"]) + assert len(df1) == len(join) + + if val1 is not None: + # + # Check the values. In this join, if val1 and val2 are + # the same then we are good. If they are different then + # we need to check if the value is selected from the opposite + # direction, so we'll merge with the edges reversed and + # check to make sure that the values all match + # + diffs = join.query("val1 != val2") + diffs_check = diffs.merge( + df1, left_on=["src2", "dst2"], right_on=["src1", "dst1"] + ) + query = diffs_check.query("val1_y != val2") + if len(query) > 0: + print("differences: ") + print(query) + assert 0 == len(query) + + # + # Finally, let's check (in both directions) backwards. + # We want to make sure that no edges were created in + # the symmetrize logic that didn't already exist in one + # direction or the other. This is a bit more complicated. + # + # The complication here is that the original data could, + # for some edge (u,v) ALREADY contain the edge (v,u). The + # symmetrized graph will not duplicate any edges, so the edge + # (u,v) will only be present once. So we can't simply check + # counts of df2 joined with df1. + # + # join1 will contain the join (merge) of df2 to df1 in the + # forward direction + # join2 will contain the join (merge) of df2 to df1 in the + # reverse direction + # + # Finally, we'll do an outer join of join1 and join2, which + # will combine any (u,v)/(v,u) pairs that might exist into + # a joined row while keeping any (u,v) pairs that don't exist + # in both data frames as single rows. This gives us a data frame + # with the same number of rows as the symmetrized data. + # + join1 = df2.merge(df1, left_on=["src2", "dst2"], right_on=["src1", "dst1"]) + join2 = df2.merge(df1, left_on=["src2", "dst2"], right_on=["dst1", "src1"]) + joinM = join1.merge(join2, how="outer", on=["src2", "dst2"]) + + assert len(df2) == len(joinM) + + # + # Note, we don't need to check the reverse values... we checked + # them in both directions earlier. + # + + +@pytest.fixture(scope="module") +def client_connection(): + (cluster, client) = setup_local_dask_cluster(p2p=True) + yield client + teardown_local_dask_cluster(cluster, client) + + +@pytest.mark.skipif( + is_single_gpu(), reason="skipping MG testing on Single GPU system" +) +@pytest.mark.parametrize("graph_file", utils.DATASETS_UNDIRECTED) +def test_mg_symmetrize(graph_file, client_connection): + gc.collect() + + ddf = utils.read_dask_cudf_csv_file(graph_file) + sym_src, sym_dst = cugraph.symmetrize(ddf["src"], ddf["dst"]) + + # convert to regular cudf to facilitate comparison + df = ddf.compute() + + compare( + df["src"], df["dst"], None, sym_src.compute(), sym_dst.compute(), None + ) + + +@pytest.mark.skipif( + is_single_gpu(), reason="skipping MG testing on Single GPU system" +) +@pytest.mark.parametrize("graph_file", utils.DATASETS_UNDIRECTED) +def test_mg_symmetrize_df(graph_file, client_connection): + gc.collect() + + pd.set_option('display.max_rows', 500) + + ddf = utils.read_dask_cudf_csv_file(graph_file) + sym_ddf = cugraph.symmetrize_ddf(ddf, "src", "dst", "weight") + + # convert to regular cudf to facilitate comparison + df = ddf.compute() + sym_df = sym_ddf.compute() + + compare( + df["src"], + df["dst"], + df["weight"], + sym_df["src"], + sym_df["dst"], + sym_df["weight"], + ) diff --git a/python/cugraph/cugraph/tests/dask/test_mg_utility.py b/python/cugraph/cugraph/tests/mg/test_mg_utility.py similarity index 94% rename from python/cugraph/cugraph/tests/dask/test_mg_utility.py rename to python/cugraph/cugraph/tests/mg/test_mg_utility.py index 732c785df68..4aec0ba93c1 100644 --- a/python/cugraph/cugraph/tests/dask/test_mg_utility.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_utility.py @@ -35,10 +35,14 @@ def setup_function(): gc.collect() +IS_DIRECTED = [True, False] + + # @pytest.mark.skipif( # is_single_gpu(), reason="skipping MG testing on Single GPU system" # ) -def test_from_edgelist(dask_client): +@pytest.mark.parametrize("directed", IS_DIRECTED) +def test_from_edgelist(dask_client, directed): input_data_path = (RAPIDS_DATASET_ROOT_DIR_PATH / "karate.csv").as_posix() print(f"dataset={input_data_path}") @@ -53,9 +57,9 @@ def test_from_edgelist(dask_client): dg1 = cugraph.from_edgelist( ddf, source="src", destination="dst", edge_attr="value", - create_using=cugraph.Graph(directed=True)) + create_using=cugraph.Graph(directed=directed)) - dg2 = cugraph.Graph(directed=True) + dg2 = cugraph.Graph(directed=directed) dg2.from_dask_cudf_edgelist( ddf, source="src", destination="dst", edge_attr="value" ) diff --git a/python/cugraph/cugraph/tests/test_jaccard.py b/python/cugraph/cugraph/tests/test_jaccard.py index 50bffe71fff..b7ce514d5f9 100644 --- a/python/cugraph/cugraph/tests/test_jaccard.py +++ b/python/cugraph/cugraph/tests/test_jaccard.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2021, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -155,6 +155,24 @@ def test_jaccard(read_csv, gpubenchmark): assert err == 0 +def test_directed_graph_check(read_csv): + M, _ = read_csv + + cu_M = cudf.DataFrame() + cu_M["src_0"] = cudf.Series(M["0"]) + cu_M["dst_0"] = cudf.Series(M["1"]) + cu_M["src_1"] = cu_M["src_0"] + 1000 + cu_M["dst_1"] = cu_M["dst_0"] + 1000 + G1 = cugraph.Graph(directed=True) + G1.from_cudf_edgelist(cu_M, source=["src_0", "src_1"], + destination=["dst_0", "dst_1"]) + + vertex_pair = cu_M[["src_0", "src_1", "dst_0", "dst_1"]] + vertex_pair = vertex_pair[:5] + with pytest.raises(ValueError): + cugraph.jaccard(G1, vertex_pair) + + def test_nx_jaccard_time(read_csv, gpubenchmark): M, _ = read_csv diff --git a/python/cugraph/cugraph/tests/test_maximum_spanning_tree.py b/python/cugraph/cugraph/tests/test_maximum_spanning_tree.py index 311f28bd6f8..341fc1b26d6 100644 --- a/python/cugraph/cugraph/tests/test_maximum_spanning_tree.py +++ b/python/cugraph/cugraph/tests/test_maximum_spanning_tree.py @@ -1,4 +1,4 @@ -# Copyright (c) 2019-2021, NVIDIA CORPORATION. +# Copyright (c) 2019-2022, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -37,9 +37,25 @@ print("Networkx version : {} ".format(nx.__version__)) +# ============================================================================= +# Pytest Setup / Teardown - called for each test function +# ============================================================================= +def setup_function(): + gc.collect() + + +def _get_param_args(param_name, param_values): + """ + Returns a tuple of (, ) which can be applied + as the args to pytest.mark.parametrize(). The pytest.param list also + contains param id string formed from the param name and values. + """ + return (param_name, + [pytest.param(v, id=f"{param_name}={v}") for v in param_values]) + + @pytest.mark.parametrize("graph_file", utils.DATASETS_UNDIRECTED_WEIGHTS) def test_maximum_spanning_tree_nx(graph_file): - gc.collect() # cugraph cuG = utils.read_csv_file(graph_file, read_weights_in_sp=True) G = cugraph.Graph() @@ -64,6 +80,17 @@ def test_maximum_spanning_tree_nx(graph_file): utils.compare_mst(cugraph_mst, mst_nx) +@pytest.mark.parametrize("graph_file", utils.DATASETS_UNDIRECTED_WEIGHTS) +@pytest.mark.parametrize(*_get_param_args("use_adjlist", [True, False])) +def test_maximum_spanning_tree_graph_repr_compat(graph_file, use_adjlist): + cuG = utils.read_csv_file(graph_file, read_weights_in_sp=True) + G = cugraph.Graph() + G.from_cudf_edgelist(cuG, source="0", destination="1", edge_attr="2") + if use_adjlist: + G.view_adj_list() + cugraph.maximum_spanning_tree(G) + + DATASETS_SIZES = [ 100000, 1000000, @@ -75,7 +102,6 @@ def test_maximum_spanning_tree_nx(graph_file): @pytest.mark.skip(reason="Skipping large tests") @pytest.mark.parametrize("graph_size", DATASETS_SIZES) def test_random_maximum_spanning_tree_nx(graph_size): - gc.collect() rmm.reinitialize(managed_memory=True) df = utils.random_edgelist( e=graph_size, diff --git a/python/cugraph/cugraph/tests/test_minimum_spanning_tree.py b/python/cugraph/cugraph/tests/test_minimum_spanning_tree.py index d1588507bce..97b5630eca0 100644 --- a/python/cugraph/cugraph/tests/test_minimum_spanning_tree.py +++ b/python/cugraph/cugraph/tests/test_minimum_spanning_tree.py @@ -1,4 +1,4 @@ -# Copyright (c) 2019-2021, NVIDIA CORPORATION. +# Copyright (c) 2019-2022, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -37,9 +37,25 @@ print("Networkx version : {} ".format(nx.__version__)) +# ============================================================================= +# Pytest Setup / Teardown - called for each test function +# ============================================================================= +def setup_function(): + gc.collect() + + +def _get_param_args(param_name, param_values): + """ + Returns a tuple of (, ) which can be applied + as the args to pytest.mark.parametrize(). The pytest.param list also + contains param id string formed from the param name and values. + """ + return (param_name, + [pytest.param(v, id=f"{param_name}={v}") for v in param_values]) + + @pytest.mark.parametrize("graph_file", utils.DATASETS_UNDIRECTED_WEIGHTS) def test_minimum_spanning_tree_nx(graph_file): - gc.collect() # cugraph cuG = utils.read_csv_file(graph_file, read_weights_in_sp=True) G = cugraph.Graph() @@ -64,6 +80,17 @@ def test_minimum_spanning_tree_nx(graph_file): utils.compare_mst(cugraph_mst, mst_nx) +@pytest.mark.parametrize("graph_file", utils.DATASETS_UNDIRECTED_WEIGHTS) +@pytest.mark.parametrize(*_get_param_args("use_adjlist", [True, False])) +def test_minimum_spanning_tree_graph_repr_compat(graph_file, use_adjlist): + cuG = utils.read_csv_file(graph_file, read_weights_in_sp=True) + G = cugraph.Graph() + G.from_cudf_edgelist(cuG, source="0", destination="1", edge_attr="2") + if use_adjlist: + G.view_adj_list() + cugraph.minimum_spanning_tree(G) + + DATASETS_SIZES = [ 100000, 1000000, @@ -75,7 +102,6 @@ def test_minimum_spanning_tree_nx(graph_file): @pytest.mark.skip(reason="Skipping large tests") @pytest.mark.parametrize("graph_size", DATASETS_SIZES) def test_random_minimum_spanning_tree_nx(graph_size): - gc.collect() rmm.reinitialize(managed_memory=True) df = utils.random_edgelist( e=graph_size, diff --git a/python/cugraph/cugraph/tests/test_symmetrize.py b/python/cugraph/cugraph/tests/test_symmetrize.py index e4e42df1fda..9d740b585e4 100644 --- a/python/cugraph/cugraph/tests/test_symmetrize.py +++ b/python/cugraph/cugraph/tests/test_symmetrize.py @@ -1,4 +1,4 @@ -# Copyright (c) 2019-2021, NVIDIA CORPORATION. +# Copyright (c) 2019-2022, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -19,9 +19,6 @@ import cudf import cugraph from cugraph.tests import utils -from cugraph.dask.common.mg_utils import (is_single_gpu, - setup_local_dask_cluster, - teardown_local_dask_cluster) def test_version(): @@ -185,86 +182,3 @@ def test_symmetrize_weighted(graph_file): ) compare(cu_M["0"], cu_M["1"], cu_M["2"], sym_src, sym_dst, sym_w) - - -@pytest.fixture(scope="module") -def client_connection(): - (cluster, client) = setup_local_dask_cluster(p2p=True) - yield client - teardown_local_dask_cluster(cluster, client) - - -@pytest.mark.skipif( - is_single_gpu(), reason="skipping MG testing on Single GPU system" -) -@pytest.mark.parametrize("graph_file", utils.DATASETS_UNDIRECTED) -def test_mg_symmetrize(graph_file, client_connection): - gc.collect() - - ddf = utils.read_dask_cudf_csv_file(graph_file) - sym_src, sym_dst = cugraph.symmetrize(ddf["src"], ddf["dst"]) - - # convert to regular cudf to facilitate comparison - df = ddf.compute() - - compare( - df["src"], df["dst"], None, sym_src.compute(), sym_dst.compute(), None - ) - - -@pytest.mark.skipif( - is_single_gpu(), reason="skipping MG testing on Single GPU system" -) -@pytest.mark.parametrize("graph_file", utils.DATASETS_UNDIRECTED) -def test_mg_symmetrize_df(graph_file, client_connection): - gc.collect() - - pd.set_option('display.max_rows', 500) - - ddf = utils.read_dask_cudf_csv_file(graph_file) - sym_ddf = cugraph.symmetrize_ddf(ddf, "src", "dst", "weight") - - # convert to regular cudf to facilitate comparison - df = ddf.compute() - sym_df = sym_ddf.compute() - - compare( - df["src"], - df["dst"], - df["weight"], - sym_df["src"], - sym_df["dst"], - sym_df["weight"], - ) - - -@pytest.mark.parametrize("graph_file", utils.DATASETS_UNDIRECTED) -def test_symmetrize_df(graph_file): - gc.collect() - - cu_M = utils.read_csv_file(graph_file) - sym_df = cugraph.symmetrize_df(cu_M, "0", "1") - - compare( - cu_M["0"], cu_M["1"], cu_M["2"], sym_df["0"], sym_df["1"], sym_df["2"] - ) - - -def test_symmetrize_bad_weights(): - src = [0, 0, 0, 0, 1, 2] - dst = [1, 2, 3, 4, 0, 3] - val = [1.0, 1.0, 1.0, 1.0, 2.0, 1.0] - - df = pd.DataFrame({"src": src, "dst": dst, "val": val}) - - gdf = cudf.DataFrame.from_pandas(df[["src", "dst", "val"]]) - sym_df = cugraph.symmetrize_df(gdf, "src", "dst") - - compare( - gdf["src"], - gdf["dst"], - gdf["val"], - sym_df["src"], - sym_df["dst"], - sym_df["val"], - ) diff --git a/python/cugraph/cugraph/tree/minimum_spanning_tree.py b/python/cugraph/cugraph/tree/minimum_spanning_tree.py index 2cc0233f2af..8ad1af0f704 100644 --- a/python/cugraph/cugraph/tree/minimum_spanning_tree.py +++ b/python/cugraph/cugraph/tree/minimum_spanning_tree.py @@ -20,8 +20,8 @@ def _minimum_spanning_tree_subgraph(G): mst_subgraph = Graph() - if type(G) is not Graph: - raise Exception("input graph must be undirected") + if G.is_directed(): + raise ValueError("input graph must be undirected") mst_df = minimum_spanning_tree_wrapper.minimum_spanning_tree(G) if G.renumbered: mst_df = G.unrenumber(mst_df, "src") @@ -35,8 +35,11 @@ def _minimum_spanning_tree_subgraph(G): def _maximum_spanning_tree_subgraph(G): mst_subgraph = Graph() - if type(G) is not Graph: - raise Exception("input graph must be undirected") + if G.is_directed(): + raise ValueError("input graph must be undirected") + + if not G.adjlist: + G.view_adj_list() if G.adjlist.weights is not None: G.adjlist.weights = G.adjlist.weights.mul(-1) @@ -89,15 +92,13 @@ def minimum_spanning_tree( Examples -------- - >>> M = cudf.read_csv(datasets_path / 'netscience.csv', delimiter='\t', + >>> M = cudf.read_csv(datasets_path / 'netscience.csv', delimiter=' ', ... dtype=['int32', 'int32', 'float32'], header=None) >>> G = cugraph.Graph() >>> G.from_cudf_edgelist(M, source='0', destination='1') - >>> # cugraph.minimum_spanning_tree(G) + >>> G_mst = cugraph.minimum_spanning_tree(G) """ - # FIXME: Uncomment out the above example - G, isNx = ensure_cugraph_obj_for_nx(G) if isNx is True: @@ -112,7 +113,7 @@ def maximum_spanning_tree( ): """ Returns a maximum spanning tree (MST) or forest (MSF) on an undirected - graph + graph. Also computes the adjacency list if G does not have one. Parameters ---------- @@ -138,15 +139,13 @@ def maximum_spanning_tree( Examples -------- - >>> M = cudf.read_csv(datasets_path / 'netscience.csv', delimiter='\t', + >>> M = cudf.read_csv(datasets_path / 'netscience.csv', delimiter=' ', ... dtype=['int32', 'int32', 'float32'], header=None) >>> G = cugraph.Graph() >>> G.from_cudf_edgelist(M, source='0', destination='1') - >>> # cugraph.maximum_spanning_tree(G) + >>> G_mst = cugraph.maximum_spanning_tree(G) """ - # FIXME: Uncomment out the above (broken) example - G, isNx = ensure_cugraph_obj_for_nx(G) if isNx is True: From cc5ac352c9402747fc6cb1e316af78e8c99492fd Mon Sep 17 00:00:00 2001 From: root Date: Thu, 5 May 2022 14:19:06 +0000 Subject: [PATCH 5/9] update copyright --- benchmarks/python_e2e/benchmark.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmarks/python_e2e/benchmark.py b/benchmarks/python_e2e/benchmark.py index d75971586a0..0ce7597aa8d 100644 --- a/benchmarks/python_e2e/benchmark.py +++ b/benchmarks/python_e2e/benchmark.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021, NVIDIA CORPORATION. +# Copyright (c) 2021-2022, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at From 17e03f3cfb2fb63daa98f5a2a0e3260171c5a8ed Mon Sep 17 00:00:00 2001 From: root Date: Thu, 5 May 2022 14:22:43 +0000 Subject: [PATCH 6/9] update copyright --- benchmarks/python_e2e/cugraph_dask_funcs.py | 2 +- benchmarks/python_e2e/main.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/benchmarks/python_e2e/cugraph_dask_funcs.py b/benchmarks/python_e2e/cugraph_dask_funcs.py index fbef7a41144..4497034e998 100644 --- a/benchmarks/python_e2e/cugraph_dask_funcs.py +++ b/benchmarks/python_e2e/cugraph_dask_funcs.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021, NVIDIA CORPORATION. +# Copyright (c) 2021-2022, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at diff --git a/benchmarks/python_e2e/main.py b/benchmarks/python_e2e/main.py index 1821bad67b4..bae0ce39b23 100644 --- a/benchmarks/python_e2e/main.py +++ b/benchmarks/python_e2e/main.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021, NVIDIA CORPORATION. +# Copyright (c) 2021-2022, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at From 5da156e9e696d012d031b5f72ca999de137f7797 Mon Sep 17 00:00:00 2001 From: root Date: Wed, 11 May 2022 17:27:44 +0000 Subject: [PATCH 7/9] update docstrings --- .../structure/graph_implementation/simpleDistributedGraph.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py b/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py index 42889d36635..3f8005d0df5 100644 --- a/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py +++ b/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py @@ -477,6 +477,11 @@ def compute_renumber_edge_list(self, If True, renumber with the intent to make a CSC-like structure. If False, renumber with the intent to make a CSR-like structure. Defaults to False. + + legacy_renum_only : (optional) bool + if True, The C++ renumbering will not be triggered. + This parameter is added for new algos following the + C/Pylibcugraph path """ if not self.properties.renumbered: self.edgelist = self.EdgeList(self.input_df) From 2b47100519639c41d7363e78ef7d908805c32e80 Mon Sep 17 00:00:00 2001 From: root Date: Wed, 11 May 2022 17:29:49 +0000 Subject: [PATCH 8/9] fix flake8 errors --- .../structure/graph_implementation/simpleDistributedGraph.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py b/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py index 3f8005d0df5..35fd9f47c97 100644 --- a/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py +++ b/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py @@ -477,7 +477,7 @@ def compute_renumber_edge_list(self, If True, renumber with the intent to make a CSC-like structure. If False, renumber with the intent to make a CSR-like structure. Defaults to False. - + legacy_renum_only : (optional) bool if True, The C++ renumbering will not be triggered. This parameter is added for new algos following the From 445d0192d628b433889b2d3de4a19905233b1ad7 Mon Sep 17 00:00:00 2001 From: root Date: Wed, 11 May 2022 21:01:37 +0000 Subject: [PATCH 9/9] update FIXME with more detail --- python/cugraph/cugraph/dask/link_analysis/hits.py | 3 +++ python/cugraph/cugraph/dask/sampling/neighborhood_sampling.py | 3 +++ 2 files changed, 6 insertions(+) diff --git a/python/cugraph/cugraph/dask/link_analysis/hits.py b/python/cugraph/cugraph/dask/link_analysis/hits.py index 450a1b74ae9..323aab1f5ad 100644 --- a/python/cugraph/cugraph/dask/link_analysis/hits.py +++ b/python/cugraph/cugraph/dask/link_analysis/hits.py @@ -155,6 +155,9 @@ def hits(input_graph, tol=1.0e-5, max_iter=100, nstart=None, normalized=True): client = default_client() # FIXME: 'legacy_renum_only' will not trigger the C++ renumbering + # In the future, once all the algos follow the C/Pylibcugraph path, + # compute_renumber_edge_list will only be used for multicolumn and + # string vertices since the renumbering will be done in pylibcugraph input_graph.compute_renumber_edge_list( transposed=False, legacy_renum_only=True) ddf = input_graph.edgelist.edgelist_df diff --git a/python/cugraph/cugraph/dask/sampling/neighborhood_sampling.py b/python/cugraph/cugraph/dask/sampling/neighborhood_sampling.py index 5e73b01f6d3..b8b720ef1c4 100644 --- a/python/cugraph/cugraph/dask/sampling/neighborhood_sampling.py +++ b/python/cugraph/cugraph/dask/sampling/neighborhood_sampling.py @@ -129,6 +129,9 @@ def EXPERIMENTAL__uniform_neighborhood(input_graph, # Initialize dask client client = default_client() # FIXME: 'legacy_renum_only' will not trigger the C++ renumbering + # In the future, once all the algos follow the C/Pylibcugraph path, + # compute_renumber_edge_list will only be used for multicolumn and + # string vertices since the renumbering will be done in pylibcugraph input_graph.compute_renumber_edge_list( transposed=False, legacy_renum_only=True)