Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[gpuCI] Forward-merge branch-21.06 to branch-21.08 [skip ci] #1626

Merged
merged 1 commit into from
May 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# limitations under the License.

from cugraph.structure import graph_primtypes_wrapper
from cugraph.structure.graph_primtypes_wrapper import Direction
from cugraph.structure.number_map import NumberMap
import cudf
import dask_cudf
Expand Down Expand Up @@ -211,7 +212,7 @@ def in_degree(self, vertex_subset=None):
>>> G.from_cudf_edgelist(M, '0', '1')
>>> df = G.in_degree([0,9,12])
"""
return self._degree(vertex_subset, x=1)
return self._degree(vertex_subset, direction=Direction.IN)

def out_degree(self, vertex_subset=None):
"""
Expand Down Expand Up @@ -245,8 +246,7 @@ def out_degree(self, vertex_subset=None):
>>> G.from_cudf_edgelist(M, '0', '1')
>>> df = G.out_degree([0,9,12])
"""
# TODO: Add support
raise Exception("Not supported for distributed graph")
return self._degree(vertex_subset, direction=Direction.OUT)

def degree(self, vertex_subset=None):
"""
Expand Down Expand Up @@ -319,14 +319,15 @@ def degrees(self, vertex_subset=None):
"""
raise Exception("Not supported for distributed graph")

def _degree(self, vertex_subset, x=0):
vertex_col, degree_col = graph_primtypes_wrapper._degree(self, x)
def _degree(self, vertex_subset, direction=Direction.ALL):
vertex_col, degree_col = graph_primtypes_wrapper._mg_degree(self,
direction)
df = cudf.DataFrame()
df["vertex"] = vertex_col
df["degree"] = degree_col

if self.renumbered is True:
df = self.unrenumber(df, "vertex")
if self.properties.renumbered is True:
df = self.renumber_map.unrenumber(df, "vertex")

if vertex_subset is not None:
df = df[df['vertex'].isin(vertex_subset)]
Expand Down
10 changes: 6 additions & 4 deletions python/cugraph/structure/graph_implementation/simpleGraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# limitations under the License.

from cugraph.structure import graph_primtypes_wrapper
from cugraph.structure.graph_primtypes_wrapper import Direction
from cugraph.structure.symmetrize import symmetrize
from cugraph.structure.number_map import NumberMap
import cugraph.dask.common.mg_utils as mg_utils
Expand Down Expand Up @@ -566,7 +567,7 @@ def in_degree(self, vertex_subset=None):
>>> G.from_cudf_edgelist(M, '0', '1')
>>> df = G.in_degree([0,9,12])
"""
return self._degree(vertex_subset, x=1)
return self._degree(vertex_subset, direction=Direction.IN)

def out_degree(self, vertex_subset=None):
"""
Expand Down Expand Up @@ -600,7 +601,7 @@ def out_degree(self, vertex_subset=None):
>>> G.from_cudf_edgelist(M, '0', '1')
>>> df = G.out_degree([0,9,12])
"""
return self._degree(vertex_subset, x=2)
return self._degree(vertex_subset, direction=Direction.OUT)

def degree(self, vertex_subset=None):
"""
Expand Down Expand Up @@ -690,8 +691,9 @@ def degrees(self, vertex_subset=None):

return df

def _degree(self, vertex_subset, x=0):
vertex_col, degree_col = graph_primtypes_wrapper._degree(self, x)
def _degree(self, vertex_subset, direction=Direction.ALL):
vertex_col, degree_col = graph_primtypes_wrapper._degree(self,
direction)
df = cudf.DataFrame()
df["vertex"] = vertex_col
df["degree"] = degree_col
Expand Down
75 changes: 43 additions & 32 deletions python/cugraph/structure/graph_primtypes_wrapper.pyx
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2019-2020, NVIDIA CORPORATION.
# Copyright (c) 2019-2021, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand All @@ -21,6 +21,7 @@ from cugraph.structure.graph_primtypes cimport get_two_hop_neighbors as c_get_tw
from cugraph.structure.graph_primtypes cimport renumber_vertices as c_renumber_vertices
from cugraph.structure.utils_wrapper import *
from libcpp cimport bool
import enum
from libc.stdint cimport uintptr_t

from rmm._lib.device_buffer cimport device_buffer, DeviceBuffer
Expand All @@ -45,6 +46,12 @@ def datatype_cast(cols, dtypes):
return cols_out


class Direction(enum.Enum):
ALL = 0
IN = 1
OUT = 2


def renumber(source_col, dest_col):
num_edges = len(source_col)

Expand Down Expand Up @@ -137,7 +144,7 @@ def view_edge_list(input_graph):
return src_indices, indices, weights


def _degree_coo(edgelist_df, src_name, dst_name, x=0, num_verts=None, sID=None):
def _degree_coo(edgelist_df, src_name, dst_name, direction=Direction.ALL, num_verts=None, sID=None):
#
# Computing the degree of the input graph from COO
#
Expand All @@ -146,11 +153,11 @@ def _degree_coo(edgelist_df, src_name, dst_name, x=0, num_verts=None, sID=None):
src = edgelist_df[src_name]
dst = edgelist_df[dst_name]

if x == 0:
if direction == Direction.ALL:
dir = DIRECTION_IN_PLUS_OUT
elif x == 1:
elif direction == Direction.IN:
dir = DIRECTION_IN
elif x == 2:
elif direction == Direction.OUT:
dir = DIRECTION_OUT
else:
raise Exception("x should be 0, 1 or 2")
Expand Down Expand Up @@ -185,17 +192,17 @@ def _degree_coo(edgelist_df, src_name, dst_name, x=0, num_verts=None, sID=None):
return vertex_col, degree_col


def _degree_csr(offsets, indices, x=0):
def _degree_csr(offsets, indices, direction=Direction.ALL):
cdef DegreeDirection dir

if x == 0:
if direction == Direction.ALL:
dir = DIRECTION_IN_PLUS_OUT
elif x == 1:
elif direction == Direction.IN:
dir = DIRECTION_IN
elif x == 2:
elif direction == Direction.OUT:
dir = DIRECTION_OUT
else:
raise Exception("x should be 0, 1 or 2")
raise Exception("direction should be 0, 1 or 2")

[offsets, indices] = datatype_cast([offsets, indices], [np.int32])

Expand All @@ -220,44 +227,48 @@ def _degree_csr(offsets, indices, x=0):
return vertex_col, degree_col


def _degree(input_graph, x=0):
transpose_x = { 0: 0,
2: 1,
1: 2 }
def _mg_degree(input_graph, direction=Direction.ALL):
if input_graph.edgelist is None:
input_graph.compute_renumber_edge_list(transposed=False)
input_ddf = input_graph.edgelist.edgelist_df
num_verts = input_ddf[['src', 'dst']].max().max().compute() + 1
data = DistributedDataHandler.create(data=input_ddf)
comms = Comms.get_comms()
client = default_client()
data.calculate_parts_to_sizes(comms)
if direction==Direction.IN:
degree_ddf = [client.submit(_degree_coo, wf[1][0], 'src', 'dst', Direction.IN, num_verts, comms.sessionId, workers=[wf[0]]) for idx, wf in enumerate(data.worker_to_parts.items())]
if direction==Direction.OUT:
degree_ddf = [client.submit(_degree_coo, wf[1][0], 'dst', 'src', Direction.IN, num_verts, comms.sessionId, workers=[wf[0]]) for idx, wf in enumerate(data.worker_to_parts.items())]
wait(degree_ddf)
return degree_ddf[0].result()


def _degree(input_graph, direction=Direction.ALL):
transpose_direction = { Direction.ALL: Direction.ALL,
Direction.IN: Direction.OUT,
Direction.OUT: Direction.IN }

if input_graph.adjlist is not None:
return _degree_csr(input_graph.adjlist.offsets,
input_graph.adjlist.indices,
x)
direction)

if input_graph.transposedadjlist is not None:
return _degree_csr(input_graph.transposedadjlist.offsets,
input_graph.transposedadjlist.indices,
transpose_x[x])

if input_graph.edgelist is None and input_graph.distributed:
input_graph.compute_renumber_edge_list(transposed=False)
transpose_direction[direction])

if input_graph.edgelist is not None:
if isinstance(input_graph.edgelist.edgelist_df, dc.DataFrame):
input_ddf = input_graph.edgelist.edgelist_df
num_verts = input_ddf[['src', 'dst']].max().max().compute() + 1
data = DistributedDataHandler.create(data=input_ddf)
comms = Comms.get_comms()
client = default_client()
data.calculate_parts_to_sizes(comms)
degree_ddf = [client.submit(_degree_coo, wf[1][0], 'src', 'dst', x, num_verts, comms.sessionId, workers=[wf[0]]) for idx, wf in enumerate(data.worker_to_parts.items())]
wait(degree_ddf)
return degree_ddf[0].result()
return _degree_coo(input_graph.edgelist.edgelist_df,
'src', 'dst', x)
'src', 'dst', direction)

raise Exception("input_graph not COO, CSR or CSC")


def _degrees(input_graph):
verts, indegrees = _degree(input_graph,1)
verts, outdegrees = _degree(input_graph, 2)
verts, indegrees = _degree(input_graph, Direction.IN)
verts, outdegrees = _degree(input_graph, Direction.OUT)

return verts, indegrees, outdegrees

Expand Down
14 changes: 11 additions & 3 deletions python/cugraph/tests/dask/test_mg_degree.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def test_dask_mg_degree(client_connection):

# FIXME: update this to allow dataset to be parameterized and have dataset
# part of test param id (see other tests)
input_data_path = r"../datasets/karate.csv"
input_data_path = r"../datasets/karate-asymmetric.csv"
print(f"dataset={input_data_path}")

chunksize = cugraph.dask.get_chunksize(input_data_path)
Expand All @@ -62,10 +62,18 @@ def test_dask_mg_degree(client_connection):
g = cugraph.DiGraph()
g.from_cudf_edgelist(df, "src", "dst")

merge_df = (
merge_df_in = (
dg.in_degree()
.merge(g.in_degree(), on="vertex", suffixes=["_dg", "_g"])
.compute()
)

assert merge_df["degree_dg"].equals(merge_df["degree_g"])
merge_df_out = (
dg.out_degree()
.merge(g.out_degree(), on="vertex", suffixes=["_dg", "_g"])
.compute()
)

assert merge_df_in["degree_dg"].equals(merge_df_in["degree_g"])
assert merge_df_out["degree_dg"].equals(
merge_df_out["degree_g"])
15 changes: 3 additions & 12 deletions python/cugraph/tests/dask/test_mg_katz_centrality.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,21 +50,12 @@ def test_dask_katz_centrality(client_connection):
dtype=["int32", "int32", "float32"],
)

df = cudf.read_csv(
input_data_path,
delimiter=" ",
names=["src", "dst", "value"],
dtype=["int32", "int32", "float32"],
)

g = cugraph.DiGraph()
g.from_cudf_edgelist(df, "src", "dst")

dg = cugraph.DiGraph()
dg.from_dask_cudf_edgelist(ddf, "src", "dst")

largest_out_degree = g.degrees().nlargest(n=1, columns="out_degree")
largest_out_degree = largest_out_degree["out_degree"].iloc[0]
largest_out_degree = dg.out_degree().compute().\
nlargest(n=1, columns="degree")
largest_out_degree = largest_out_degree["degree"].iloc[0]
katz_alpha = 1 / (largest_out_degree + 1)

mg_res = dcg.katz_centrality(dg, alpha=katz_alpha, tol=1e-6)
Expand Down