diff --git a/python/cugraph/cugraph/dask/traversal/bfs.py b/python/cugraph/cugraph/dask/traversal/bfs.py index bbf1a5faabb..c6b9f8c8366 100644 --- a/python/cugraph/cugraph/dask/traversal/bfs.py +++ b/python/cugraph/cugraph/dask/traversal/bfs.py @@ -35,15 +35,23 @@ def convert_to_cudf(cp_arrays): return df -def _call_plc_bfs(sID, mg_graph_x, st_x, depth_limit=None, return_distances=True): +def _call_plc_bfs( + sID, + mg_graph_x, + st_x, + depth_limit=None, + direction_optimizing=False, + return_distances=True, + do_expensive_check=False, +): return pylibcugraph_bfs( ResourceHandle(Comms.get_handle(sID).getHandle()), - mg_graph_x, - cudf.Series(st_x, dtype="int32"), - False, - depth_limit if depth_limit is not None else 0, - return_distances, - True, + graph=mg_graph_x, + sources=st_x, + direction_optimizing=direction_optimizing, + depth_limit=depth_limit if depth_limit is not None else 0, + compute_predecessors=return_distances, + do_expensive_check=do_expensive_check, ) @@ -156,6 +164,10 @@ def bfs(input_graph, start, depth_limit=None, return_distances=True, check_start start = input_graph.lookup_internal_vertex_id(start, tmp_col_names) data_start = get_distributed_data(start) + do_expensive_check = False + # FIXME: Why is 'direction_optimizing' not part of the python cugraph API + # and why is it set to 'False' by default + direction_optimizing = False cupy_result = [ client.submit( @@ -164,7 +176,9 @@ def bfs(input_graph, start, depth_limit=None, return_distances=True, check_start input_graph._plc_graph[w], st[0], depth_limit, + direction_optimizing, return_distances, + do_expensive_check, workers=[w], allow_other_workers=False, ) diff --git a/python/cugraph/cugraph/structure/number_map.py b/python/cugraph/cugraph/structure/number_map.py index 2ae5bdcc326..128e479a227 100644 --- a/python/cugraph/cugraph/structure/number_map.py +++ b/python/cugraph/cugraph/structure/number_map.py @@ -258,15 +258,18 @@ def indirection_map(self, ddf, src_col_names, dst_col_names): # Set global index tmp_ddf = tmp_ddf.assign(idx=1) - tmp_ddf["global_id"] = tmp_ddf.idx.cumsum() - 1 + # ensure the original vertex and the 'global_id' columns are + # of the same type unless the original vertex type is 'string' + tmp_ddf["global_id"] = tmp_ddf.idx.cumsum().astype(self.id_type) - 1 tmp_ddf = tmp_ddf.drop(columns="idx") tmp_ddf = tmp_ddf.persist() self.ddf = tmp_ddf return tmp_ddf - def __init__(self, id_type=np.int32, renumber_type=None): + def __init__(self, renumber_id_type=np.int32, unrenumbered_id_type=np.int32): self.implementation = None - self.id_type = id_type + self.renumber_id_type = renumber_id_type + self.unrenumbered_id_type = unrenumbered_id_type # The default src/dst column names in the resulting renumbered # dataframe. These may be updated by the renumbering methods if the # input dataframe uses the default names. @@ -479,6 +482,21 @@ def renumber_and_segment( legacy_renum_only=False, ): renumbered = True + + # For columns with mismatch dtypes, set the renumbered + # id_type to either 'int32' or 'int64' + if df.dtypes.nunique() > 1: + # can't determine the edgelist input type + unrenumbered_id_type = None + else: + unrenumbered_id_type = df.dtypes[0] + + if np.int64 in list(df.dtypes): + renumber_id_type = np.int64 + else: + # renumber the edgelist to 'int32' + renumber_id_type = np.int32 + # FIXME: Drop the renumber_type 'experimental' once all the # algos follow the C/Pylibcugraph path @@ -486,6 +504,7 @@ def renumber_and_segment( # C++ renumbering. if isinstance(src_col_names, list): renumber_type = "legacy" + elif not ( df[src_col_names].dtype == np.int32 or df[src_col_names].dtype == np.int64 ): @@ -500,7 +519,7 @@ def renumber_and_segment( renumber_type = "skip_renumbering" renumbered = False - renumber_map = NumberMap() + renumber_map = NumberMap(renumber_id_type, unrenumbered_id_type) if not isinstance(src_col_names, list): src_col_names = [src_col_names] dst_col_names = [dst_col_names] @@ -512,11 +531,19 @@ def renumber_and_segment( if isinstance(df, cudf.DataFrame): renumber_map.implementation = NumberMap.SingleGPU( - df, src_col_names, dst_col_names, renumber_map.id_type, store_transposed + df, + src_col_names, + dst_col_names, + renumber_map.renumber_id_type, + store_transposed, ) elif isinstance(df, dask_cudf.DataFrame): renumber_map.implementation = NumberMap.MultiGPU( - df, src_col_names, dst_col_names, renumber_map.id_type, store_transposed + df, + src_col_names, + dst_col_names, + renumber_map.renumber_id_type, + store_transposed, ) else: raise TypeError("df must be cudf.DataFrame or dask_cudf.DataFrame") diff --git a/python/cugraph/cugraph/tests/mg/test_mg_renumber.py b/python/cugraph/cugraph/tests/mg/test_mg_renumber.py index 1340558732f..d273dacfd0c 100644 --- a/python/cugraph/cugraph/tests/mg/test_mg_renumber.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_renumber.py @@ -308,7 +308,7 @@ def test_pagerank_string_vertex_ids(dask_client): G.from_cudf_edgelist(df, source="src", destination="dst") sg_results = cugraph.pagerank(G) - sg_results = sg_results.sort_values("pagerank").reset_index(drop=True) + sg_results = sg_results.sort_values("vertex").reset_index(drop=True) # MG ddf = dask_cudf.from_cudf(df, npartitions=2) @@ -318,7 +318,28 @@ def test_pagerank_string_vertex_ids(dask_client): mg_results = dcg.pagerank(G_dask) # Organize results for easy comparison, this does not change the values. MG # Pagerank defaults to float64, so convert to float32 when comparing to SG - mg_results = mg_results.compute().sort_values("pagerank").reset_index(drop=True) + mg_results = mg_results.compute().sort_values("vertex").reset_index(drop=True) mg_results["pagerank"] = mg_results["pagerank"].astype("float32") assert_frame_equal(sg_results, mg_results) + + +@pytest.mark.parametrize("dtype", ["int32", "int64"]) +def test_mg_renumber_multi_column(dtype, dask_client): + df = cudf.DataFrame( + {"src_a": [i for i in range(0, 10)], "dst_a": [i for i in range(10, 20)]} + ).astype(dtype) + + df["src_b"] = df["src_a"] + 10 + df["dst_b"] = df["dst_a"] + 20 + src_col = ["src_a", "src_b"] + dst_col = ["dst_a", "dst_b"] + + ddf = dask_cudf.from_cudf(df, npartitions=2) + edgelist_type = list(ddf.dtypes) + G = cugraph.Graph() + G.from_dask_cudf_edgelist(ddf, source=src_col, destination=dst_col) + renumbered_ddf = G.edgelist.edgelist_df + renumbered_edgelist_type = list(renumbered_ddf.dtypes) + + assert set(renumbered_edgelist_type).issubset(set(edgelist_type))