Skip to content

Commit

Permalink
Add mnmg out degree (#1592)
Browse files Browse the repository at this point in the history
  • Loading branch information
Iroy30 authored May 25, 2021
1 parent d1f3fb9 commit 0859228
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# limitations under the License.

from cugraph.structure import graph_primtypes_wrapper
from cugraph.structure.graph_primtypes_wrapper import Direction
from cugraph.structure.number_map import NumberMap
import cudf
import dask_cudf
Expand Down Expand Up @@ -211,7 +212,7 @@ def in_degree(self, vertex_subset=None):
>>> G.from_cudf_edgelist(M, '0', '1')
>>> df = G.in_degree([0,9,12])
"""
return self._degree(vertex_subset, x=1)
return self._degree(vertex_subset, direction=Direction.IN)

def out_degree(self, vertex_subset=None):
"""
Expand Down Expand Up @@ -245,8 +246,7 @@ def out_degree(self, vertex_subset=None):
>>> G.from_cudf_edgelist(M, '0', '1')
>>> df = G.out_degree([0,9,12])
"""
# TODO: Add support
raise Exception("Not supported for distributed graph")
return self._degree(vertex_subset, direction=Direction.OUT)

def degree(self, vertex_subset=None):
"""
Expand Down Expand Up @@ -319,14 +319,15 @@ def degrees(self, vertex_subset=None):
"""
raise Exception("Not supported for distributed graph")

def _degree(self, vertex_subset, x=0):
vertex_col, degree_col = graph_primtypes_wrapper._degree(self, x)
def _degree(self, vertex_subset, direction=Direction.ALL):
vertex_col, degree_col = graph_primtypes_wrapper._mg_degree(self,
direction)
df = cudf.DataFrame()
df["vertex"] = vertex_col
df["degree"] = degree_col

if self.renumbered is True:
df = self.unrenumber(df, "vertex")
if self.properties.renumbered is True:
df = self.renumber_map.unrenumber(df, "vertex")

if vertex_subset is not None:
df = df[df['vertex'].isin(vertex_subset)]
Expand Down
10 changes: 6 additions & 4 deletions python/cugraph/structure/graph_implementation/simpleGraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# limitations under the License.

from cugraph.structure import graph_primtypes_wrapper
from cugraph.structure.graph_primtypes_wrapper import Direction
from cugraph.structure.symmetrize import symmetrize
from cugraph.structure.number_map import NumberMap
import cugraph.dask.common.mg_utils as mg_utils
Expand Down Expand Up @@ -566,7 +567,7 @@ def in_degree(self, vertex_subset=None):
>>> G.from_cudf_edgelist(M, '0', '1')
>>> df = G.in_degree([0,9,12])
"""
return self._degree(vertex_subset, x=1)
return self._degree(vertex_subset, direction=Direction.IN)

def out_degree(self, vertex_subset=None):
"""
Expand Down Expand Up @@ -600,7 +601,7 @@ def out_degree(self, vertex_subset=None):
>>> G.from_cudf_edgelist(M, '0', '1')
>>> df = G.out_degree([0,9,12])
"""
return self._degree(vertex_subset, x=2)
return self._degree(vertex_subset, direction=Direction.OUT)

def degree(self, vertex_subset=None):
"""
Expand Down Expand Up @@ -690,8 +691,9 @@ def degrees(self, vertex_subset=None):

return df

def _degree(self, vertex_subset, x=0):
vertex_col, degree_col = graph_primtypes_wrapper._degree(self, x)
def _degree(self, vertex_subset, direction=Direction.ALL):
vertex_col, degree_col = graph_primtypes_wrapper._degree(self,
direction)
df = cudf.DataFrame()
df["vertex"] = vertex_col
df["degree"] = degree_col
Expand Down
75 changes: 43 additions & 32 deletions python/cugraph/structure/graph_primtypes_wrapper.pyx
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2019-2020, NVIDIA CORPORATION.
# Copyright (c) 2019-2021, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand All @@ -21,6 +21,7 @@ from cugraph.structure.graph_primtypes cimport get_two_hop_neighbors as c_get_tw
from cugraph.structure.graph_primtypes cimport renumber_vertices as c_renumber_vertices
from cugraph.structure.utils_wrapper import *
from libcpp cimport bool
import enum
from libc.stdint cimport uintptr_t

from rmm._lib.device_buffer cimport device_buffer, DeviceBuffer
Expand All @@ -45,6 +46,12 @@ def datatype_cast(cols, dtypes):
return cols_out


class Direction(enum.Enum):
ALL = 0
IN = 1
OUT = 2


def renumber(source_col, dest_col):
num_edges = len(source_col)

Expand Down Expand Up @@ -137,7 +144,7 @@ def view_edge_list(input_graph):
return src_indices, indices, weights


def _degree_coo(edgelist_df, src_name, dst_name, x=0, num_verts=None, sID=None):
def _degree_coo(edgelist_df, src_name, dst_name, direction=Direction.ALL, num_verts=None, sID=None):
#
# Computing the degree of the input graph from COO
#
Expand All @@ -146,11 +153,11 @@ def _degree_coo(edgelist_df, src_name, dst_name, x=0, num_verts=None, sID=None):
src = edgelist_df[src_name]
dst = edgelist_df[dst_name]

if x == 0:
if direction == Direction.ALL:
dir = DIRECTION_IN_PLUS_OUT
elif x == 1:
elif direction == Direction.IN:
dir = DIRECTION_IN
elif x == 2:
elif direction == Direction.OUT:
dir = DIRECTION_OUT
else:
raise Exception("x should be 0, 1 or 2")
Expand Down Expand Up @@ -185,17 +192,17 @@ def _degree_coo(edgelist_df, src_name, dst_name, x=0, num_verts=None, sID=None):
return vertex_col, degree_col


def _degree_csr(offsets, indices, x=0):
def _degree_csr(offsets, indices, direction=Direction.ALL):
cdef DegreeDirection dir

if x == 0:
if direction == Direction.ALL:
dir = DIRECTION_IN_PLUS_OUT
elif x == 1:
elif direction == Direction.IN:
dir = DIRECTION_IN
elif x == 2:
elif direction == Direction.OUT:
dir = DIRECTION_OUT
else:
raise Exception("x should be 0, 1 or 2")
raise Exception("direction should be 0, 1 or 2")

[offsets, indices] = datatype_cast([offsets, indices], [np.int32])

Expand All @@ -220,44 +227,48 @@ def _degree_csr(offsets, indices, x=0):
return vertex_col, degree_col


def _degree(input_graph, x=0):
transpose_x = { 0: 0,
2: 1,
1: 2 }
def _mg_degree(input_graph, direction=Direction.ALL):
if input_graph.edgelist is None:
input_graph.compute_renumber_edge_list(transposed=False)
input_ddf = input_graph.edgelist.edgelist_df
num_verts = input_ddf[['src', 'dst']].max().max().compute() + 1
data = DistributedDataHandler.create(data=input_ddf)
comms = Comms.get_comms()
client = default_client()
data.calculate_parts_to_sizes(comms)
if direction==Direction.IN:
degree_ddf = [client.submit(_degree_coo, wf[1][0], 'src', 'dst', Direction.IN, num_verts, comms.sessionId, workers=[wf[0]]) for idx, wf in enumerate(data.worker_to_parts.items())]
if direction==Direction.OUT:
degree_ddf = [client.submit(_degree_coo, wf[1][0], 'dst', 'src', Direction.IN, num_verts, comms.sessionId, workers=[wf[0]]) for idx, wf in enumerate(data.worker_to_parts.items())]
wait(degree_ddf)
return degree_ddf[0].result()


def _degree(input_graph, direction=Direction.ALL):
transpose_direction = { Direction.ALL: Direction.ALL,
Direction.IN: Direction.OUT,
Direction.OUT: Direction.IN }

if input_graph.adjlist is not None:
return _degree_csr(input_graph.adjlist.offsets,
input_graph.adjlist.indices,
x)
direction)

if input_graph.transposedadjlist is not None:
return _degree_csr(input_graph.transposedadjlist.offsets,
input_graph.transposedadjlist.indices,
transpose_x[x])

if input_graph.edgelist is None and input_graph.distributed:
input_graph.compute_renumber_edge_list(transposed=False)
transpose_direction[direction])

if input_graph.edgelist is not None:
if isinstance(input_graph.edgelist.edgelist_df, dc.DataFrame):
input_ddf = input_graph.edgelist.edgelist_df
num_verts = input_ddf[['src', 'dst']].max().max().compute() + 1
data = DistributedDataHandler.create(data=input_ddf)
comms = Comms.get_comms()
client = default_client()
data.calculate_parts_to_sizes(comms)
degree_ddf = [client.submit(_degree_coo, wf[1][0], 'src', 'dst', x, num_verts, comms.sessionId, workers=[wf[0]]) for idx, wf in enumerate(data.worker_to_parts.items())]
wait(degree_ddf)
return degree_ddf[0].result()
return _degree_coo(input_graph.edgelist.edgelist_df,
'src', 'dst', x)
'src', 'dst', direction)

raise Exception("input_graph not COO, CSR or CSC")


def _degrees(input_graph):
verts, indegrees = _degree(input_graph,1)
verts, outdegrees = _degree(input_graph, 2)
verts, indegrees = _degree(input_graph, Direction.IN)
verts, outdegrees = _degree(input_graph, Direction.OUT)

return verts, indegrees, outdegrees

Expand Down
14 changes: 11 additions & 3 deletions python/cugraph/tests/dask/test_mg_degree.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def test_dask_mg_degree(client_connection):

# FIXME: update this to allow dataset to be parameterized and have dataset
# part of test param id (see other tests)
input_data_path = r"../datasets/karate.csv"
input_data_path = r"../datasets/karate-asymmetric.csv"
print(f"dataset={input_data_path}")

chunksize = cugraph.dask.get_chunksize(input_data_path)
Expand All @@ -62,10 +62,18 @@ def test_dask_mg_degree(client_connection):
g = cugraph.DiGraph()
g.from_cudf_edgelist(df, "src", "dst")

merge_df = (
merge_df_in = (
dg.in_degree()
.merge(g.in_degree(), on="vertex", suffixes=["_dg", "_g"])
.compute()
)

assert merge_df["degree_dg"].equals(merge_df["degree_g"])
merge_df_out = (
dg.out_degree()
.merge(g.out_degree(), on="vertex", suffixes=["_dg", "_g"])
.compute()
)

assert merge_df_in["degree_dg"].equals(merge_df_in["degree_g"])
assert merge_df_out["degree_dg"].equals(
merge_df_out["degree_g"])
15 changes: 3 additions & 12 deletions python/cugraph/tests/dask/test_mg_katz_centrality.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,21 +50,12 @@ def test_dask_katz_centrality(client_connection):
dtype=["int32", "int32", "float32"],
)

df = cudf.read_csv(
input_data_path,
delimiter=" ",
names=["src", "dst", "value"],
dtype=["int32", "int32", "float32"],
)

g = cugraph.DiGraph()
g.from_cudf_edgelist(df, "src", "dst")

dg = cugraph.DiGraph()
dg.from_dask_cudf_edgelist(ddf, "src", "dst")

largest_out_degree = g.degrees().nlargest(n=1, columns="out_degree")
largest_out_degree = largest_out_degree["out_degree"].iloc[0]
largest_out_degree = dg.out_degree().compute().\
nlargest(n=1, columns="degree")
largest_out_degree = largest_out_degree["degree"].iloc[0]
katz_alpha = 1 / (largest_out_degree + 1)

mg_res = dcg.katz_centrality(dg, alpha=katz_alpha, tol=1e-6)
Expand Down

0 comments on commit 0859228

Please sign in to comment.