diff --git a/python/cugraph/structure/graph_implementation/simpleDistributedGraph.py b/python/cugraph/structure/graph_implementation/simpleDistributedGraph.py index e85f3b6ab6c..21eff99d2bf 100644 --- a/python/cugraph/structure/graph_implementation/simpleDistributedGraph.py +++ b/python/cugraph/structure/graph_implementation/simpleDistributedGraph.py @@ -12,6 +12,7 @@ # limitations under the License. from cugraph.structure import graph_primtypes_wrapper +from cugraph.structure.graph_primtypes_wrapper import Direction from cugraph.structure.number_map import NumberMap import cudf import dask_cudf @@ -211,7 +212,7 @@ def in_degree(self, vertex_subset=None): >>> G.from_cudf_edgelist(M, '0', '1') >>> df = G.in_degree([0,9,12]) """ - return self._degree(vertex_subset, x=1) + return self._degree(vertex_subset, direction=Direction.IN) def out_degree(self, vertex_subset=None): """ @@ -245,8 +246,7 @@ def out_degree(self, vertex_subset=None): >>> G.from_cudf_edgelist(M, '0', '1') >>> df = G.out_degree([0,9,12]) """ - # TODO: Add support - raise Exception("Not supported for distributed graph") + return self._degree(vertex_subset, direction=Direction.OUT) def degree(self, vertex_subset=None): """ @@ -319,14 +319,15 @@ def degrees(self, vertex_subset=None): """ raise Exception("Not supported for distributed graph") - def _degree(self, vertex_subset, x=0): - vertex_col, degree_col = graph_primtypes_wrapper._degree(self, x) + def _degree(self, vertex_subset, direction=Direction.ALL): + vertex_col, degree_col = graph_primtypes_wrapper._mg_degree(self, + direction) df = cudf.DataFrame() df["vertex"] = vertex_col df["degree"] = degree_col - if self.renumbered is True: - df = self.unrenumber(df, "vertex") + if self.properties.renumbered is True: + df = self.renumber_map.unrenumber(df, "vertex") if vertex_subset is not None: df = df[df['vertex'].isin(vertex_subset)] diff --git a/python/cugraph/structure/graph_implementation/simpleGraph.py b/python/cugraph/structure/graph_implementation/simpleGraph.py index 4e632a72231..3fa65fd8de6 100644 --- a/python/cugraph/structure/graph_implementation/simpleGraph.py +++ b/python/cugraph/structure/graph_implementation/simpleGraph.py @@ -12,6 +12,7 @@ # limitations under the License. from cugraph.structure import graph_primtypes_wrapper +from cugraph.structure.graph_primtypes_wrapper import Direction from cugraph.structure.symmetrize import symmetrize from cugraph.structure.number_map import NumberMap import cugraph.dask.common.mg_utils as mg_utils @@ -566,7 +567,7 @@ def in_degree(self, vertex_subset=None): >>> G.from_cudf_edgelist(M, '0', '1') >>> df = G.in_degree([0,9,12]) """ - return self._degree(vertex_subset, x=1) + return self._degree(vertex_subset, direction=Direction.IN) def out_degree(self, vertex_subset=None): """ @@ -600,7 +601,7 @@ def out_degree(self, vertex_subset=None): >>> G.from_cudf_edgelist(M, '0', '1') >>> df = G.out_degree([0,9,12]) """ - return self._degree(vertex_subset, x=2) + return self._degree(vertex_subset, direction=Direction.OUT) def degree(self, vertex_subset=None): """ @@ -690,8 +691,9 @@ def degrees(self, vertex_subset=None): return df - def _degree(self, vertex_subset, x=0): - vertex_col, degree_col = graph_primtypes_wrapper._degree(self, x) + def _degree(self, vertex_subset, direction=Direction.ALL): + vertex_col, degree_col = graph_primtypes_wrapper._degree(self, + direction) df = cudf.DataFrame() df["vertex"] = vertex_col df["degree"] = degree_col diff --git a/python/cugraph/structure/graph_primtypes_wrapper.pyx b/python/cugraph/structure/graph_primtypes_wrapper.pyx index 7bc62b9a1af..91af28380c3 100644 --- a/python/cugraph/structure/graph_primtypes_wrapper.pyx +++ b/python/cugraph/structure/graph_primtypes_wrapper.pyx @@ -1,4 +1,4 @@ -# Copyright (c) 2019-2020, NVIDIA CORPORATION. +# Copyright (c) 2019-2021, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -21,6 +21,7 @@ from cugraph.structure.graph_primtypes cimport get_two_hop_neighbors as c_get_tw from cugraph.structure.graph_primtypes cimport renumber_vertices as c_renumber_vertices from cugraph.structure.utils_wrapper import * from libcpp cimport bool +import enum from libc.stdint cimport uintptr_t from rmm._lib.device_buffer cimport device_buffer, DeviceBuffer @@ -45,6 +46,12 @@ def datatype_cast(cols, dtypes): return cols_out +class Direction(enum.Enum): + ALL = 0 + IN = 1 + OUT = 2 + + def renumber(source_col, dest_col): num_edges = len(source_col) @@ -137,7 +144,7 @@ def view_edge_list(input_graph): return src_indices, indices, weights -def _degree_coo(edgelist_df, src_name, dst_name, x=0, num_verts=None, sID=None): +def _degree_coo(edgelist_df, src_name, dst_name, direction=Direction.ALL, num_verts=None, sID=None): # # Computing the degree of the input graph from COO # @@ -146,11 +153,11 @@ def _degree_coo(edgelist_df, src_name, dst_name, x=0, num_verts=None, sID=None): src = edgelist_df[src_name] dst = edgelist_df[dst_name] - if x == 0: + if direction == Direction.ALL: dir = DIRECTION_IN_PLUS_OUT - elif x == 1: + elif direction == Direction.IN: dir = DIRECTION_IN - elif x == 2: + elif direction == Direction.OUT: dir = DIRECTION_OUT else: raise Exception("x should be 0, 1 or 2") @@ -185,17 +192,17 @@ def _degree_coo(edgelist_df, src_name, dst_name, x=0, num_verts=None, sID=None): return vertex_col, degree_col -def _degree_csr(offsets, indices, x=0): +def _degree_csr(offsets, indices, direction=Direction.ALL): cdef DegreeDirection dir - if x == 0: + if direction == Direction.ALL: dir = DIRECTION_IN_PLUS_OUT - elif x == 1: + elif direction == Direction.IN: dir = DIRECTION_IN - elif x == 2: + elif direction == Direction.OUT: dir = DIRECTION_OUT else: - raise Exception("x should be 0, 1 or 2") + raise Exception("direction should be 0, 1 or 2") [offsets, indices] = datatype_cast([offsets, indices], [np.int32]) @@ -220,44 +227,48 @@ def _degree_csr(offsets, indices, x=0): return vertex_col, degree_col -def _degree(input_graph, x=0): - transpose_x = { 0: 0, - 2: 1, - 1: 2 } +def _mg_degree(input_graph, direction=Direction.ALL): + if input_graph.edgelist is None: + input_graph.compute_renumber_edge_list(transposed=False) + input_ddf = input_graph.edgelist.edgelist_df + num_verts = input_ddf[['src', 'dst']].max().max().compute() + 1 + data = DistributedDataHandler.create(data=input_ddf) + comms = Comms.get_comms() + client = default_client() + data.calculate_parts_to_sizes(comms) + if direction==Direction.IN: + degree_ddf = [client.submit(_degree_coo, wf[1][0], 'src', 'dst', Direction.IN, num_verts, comms.sessionId, workers=[wf[0]]) for idx, wf in enumerate(data.worker_to_parts.items())] + if direction==Direction.OUT: + degree_ddf = [client.submit(_degree_coo, wf[1][0], 'dst', 'src', Direction.IN, num_verts, comms.sessionId, workers=[wf[0]]) for idx, wf in enumerate(data.worker_to_parts.items())] + wait(degree_ddf) + return degree_ddf[0].result() + + +def _degree(input_graph, direction=Direction.ALL): + transpose_direction = { Direction.ALL: Direction.ALL, + Direction.IN: Direction.OUT, + Direction.OUT: Direction.IN } if input_graph.adjlist is not None: return _degree_csr(input_graph.adjlist.offsets, input_graph.adjlist.indices, - x) + direction) if input_graph.transposedadjlist is not None: return _degree_csr(input_graph.transposedadjlist.offsets, input_graph.transposedadjlist.indices, - transpose_x[x]) - - if input_graph.edgelist is None and input_graph.distributed: - input_graph.compute_renumber_edge_list(transposed=False) + transpose_direction[direction]) if input_graph.edgelist is not None: - if isinstance(input_graph.edgelist.edgelist_df, dc.DataFrame): - input_ddf = input_graph.edgelist.edgelist_df - num_verts = input_ddf[['src', 'dst']].max().max().compute() + 1 - data = DistributedDataHandler.create(data=input_ddf) - comms = Comms.get_comms() - client = default_client() - data.calculate_parts_to_sizes(comms) - degree_ddf = [client.submit(_degree_coo, wf[1][0], 'src', 'dst', x, num_verts, comms.sessionId, workers=[wf[0]]) for idx, wf in enumerate(data.worker_to_parts.items())] - wait(degree_ddf) - return degree_ddf[0].result() return _degree_coo(input_graph.edgelist.edgelist_df, - 'src', 'dst', x) + 'src', 'dst', direction) raise Exception("input_graph not COO, CSR or CSC") def _degrees(input_graph): - verts, indegrees = _degree(input_graph,1) - verts, outdegrees = _degree(input_graph, 2) + verts, indegrees = _degree(input_graph, Direction.IN) + verts, outdegrees = _degree(input_graph, Direction.OUT) return verts, indegrees, outdegrees diff --git a/python/cugraph/tests/dask/test_mg_degree.py b/python/cugraph/tests/dask/test_mg_degree.py index 93e8a365dea..bad55df1ca9 100644 --- a/python/cugraph/tests/dask/test_mg_degree.py +++ b/python/cugraph/tests/dask/test_mg_degree.py @@ -36,7 +36,7 @@ def test_dask_mg_degree(client_connection): # FIXME: update this to allow dataset to be parameterized and have dataset # part of test param id (see other tests) - input_data_path = r"../datasets/karate.csv" + input_data_path = r"../datasets/karate-asymmetric.csv" print(f"dataset={input_data_path}") chunksize = cugraph.dask.get_chunksize(input_data_path) @@ -62,10 +62,18 @@ def test_dask_mg_degree(client_connection): g = cugraph.DiGraph() g.from_cudf_edgelist(df, "src", "dst") - merge_df = ( + merge_df_in = ( dg.in_degree() .merge(g.in_degree(), on="vertex", suffixes=["_dg", "_g"]) .compute() ) - assert merge_df["degree_dg"].equals(merge_df["degree_g"]) + merge_df_out = ( + dg.out_degree() + .merge(g.out_degree(), on="vertex", suffixes=["_dg", "_g"]) + .compute() + ) + + assert merge_df_in["degree_dg"].equals(merge_df_in["degree_g"]) + assert merge_df_out["degree_dg"].equals( + merge_df_out["degree_g"]) diff --git a/python/cugraph/tests/dask/test_mg_katz_centrality.py b/python/cugraph/tests/dask/test_mg_katz_centrality.py index eadf0f662d4..8ed604954f4 100644 --- a/python/cugraph/tests/dask/test_mg_katz_centrality.py +++ b/python/cugraph/tests/dask/test_mg_katz_centrality.py @@ -50,21 +50,12 @@ def test_dask_katz_centrality(client_connection): dtype=["int32", "int32", "float32"], ) - df = cudf.read_csv( - input_data_path, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - - g = cugraph.DiGraph() - g.from_cudf_edgelist(df, "src", "dst") - dg = cugraph.DiGraph() dg.from_dask_cudf_edgelist(ddf, "src", "dst") - largest_out_degree = g.degrees().nlargest(n=1, columns="out_degree") - largest_out_degree = largest_out_degree["out_degree"].iloc[0] + largest_out_degree = dg.out_degree().compute().\ + nlargest(n=1, columns="degree") + largest_out_degree = largest_out_degree["degree"].iloc[0] katz_alpha = 1 / (largest_out_degree + 1) mg_res = dcg.katz_centrality(dg, alpha=katz_alpha, tol=1e-6)