Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW] python 2D shuffling #1133

Merged
merged 10 commits into from
Sep 25, 2020
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
## Improvements
- PR 1081 MNMG Renumbering - sort partitions by degree
- PR 1115 Replace deprecated rmm::mr::get_default_resource with rmm::mr::get_current_device_resource
- PR #1133 added python 2D shuffling
- PR 1129 Refactored test to use common dataset and added additional doc pages
- PR 1135 SG Updates to Louvain et. al.
- PR 1132 Upgrade Thrust to latest commit
Expand Down
1 change: 1 addition & 0 deletions python/cugraph/structure/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@
from cugraph.structure.symmetrize import symmetrize, symmetrize_df
from cugraph.structure.convert_matrix import from_cudf_edgelist
from cugraph.structure.hypergraph import hypergraph
from cugraph.structure.shuffle import shuffle
21 changes: 21 additions & 0 deletions python/cugraph/structure/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,17 @@ def from_cudf_edgelist(
if self.edgelist is not None or self.adjlist is not None:
raise Exception("Graph already has values")

s_col = source
d_col = destination
if not isinstance(s_col, list):
s_col = [s_col]
if not isinstance(d_col, list):
d_col = [d_col]
if not (set(s_col).issubset(set(input_df.columns)) and
set(d_col).issubset(set(input_df.columns))):
raise Exception('source column names and/or destination column \
names not found in input. Recheck the source and destination parameters')

# Consolidation
if isinstance(input_df, cudf.DataFrame):
if len(input_df[source]) > 2147483100:
Expand Down Expand Up @@ -445,6 +456,16 @@ def from_dask_cudf_edgelist(self, input_ddf, source='source',
if type(self) is Graph:
raise Exception('Undirected distributed graph not supported')

s_col = source
d_col = destination
if not isinstance(s_col, list):
s_col = [s_col]
if not isinstance(d_col, list):
d_col = [d_col]
if not (set(s_col).issubset(set(input_ddf.columns)) and
set(d_col).issubset(set(input_ddf.columns))):
raise Exception('source column names and/or destination column \
names not found in input. Recheck the source and destination parameters')
#
# Keep all of the original parameters so we can lazily
# evaluate this function
Expand Down
93 changes: 93 additions & 0 deletions python/cugraph/structure/shuffle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import math
from dask.dataframe.shuffle import rearrange_by_column
import cudf


def get_n_workers():
from dask.distributed import default_client
client = default_client()
return len(client.scheduler_info()['workers'])


def get_2D_div(ngpus):
pcols = int(math.sqrt(ngpus))
while ngpus % pcols != 0:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Too lazy today to sit down and do math, but we should be able to directly compute this without a while loop.

pcols = pcols - 1
return int(ngpus/pcols), pcols
Comment on lines +12 to +16
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Python code to find prows and pcols



def _set_partitions_pre(df, vertex_row_partitions, vertex_col_partitions,
prows, pcols, transposed):
if transposed:
r = df['dst']
c = df['src']
else:
r = df['src']
c = df['dst']
r_div = vertex_row_partitions.searchsorted(r, side='right')-1
c_div = vertex_col_partitions.searchsorted(c, side='right')-1
partitions = r_div % prows + c_div * prows
return partitions


def shuffle(dg, transposed=False, prows=None, pcols=None):
"""
Shuffles the renumbered input distributed graph edgelist into ngpu
partitions. The number of processes/gpus P = prows*pcols. The 2D
partitioning divides the matrix into P*pcols rectangular partitions
as per vertex partitioning performed in renumbering, and then shuffles
these partitions into P gpus.
"""

ddf = dg.edgelist.edgelist_df
ngpus = get_n_workers()
if prows is None and pcols is None:
prows, pcols = get_2D_div(ngpus)
else:
if prows is not None and pcols is not None:
if ngpus != prows*pcols:
raise Exception('prows*pcols should be equal to the\
number of processes')
elif prows is not None:
if ngpus % prows != 0:
raise Exception('prows must be a factor of the number\
of processes')
pcols = int(ngpus/prows)
elif pcols is not None:
if ngpus % pcols != 0:
raise Exception('pcols must be a factor of the number\
of processes')
prows = int(ngpus/pcols)

renumber_vertex_count = dg.renumber_map.implementation.\
ddf.map_partitions(len).compute()
renumber_vertex_cumsum = renumber_vertex_count.cumsum()
src_dtype = ddf['src'].dtype
dst_dtype = ddf['dst'].dtype

vertex_row_partitions = cudf.Series([0], dtype=src_dtype)
vertex_row_partitions = vertex_row_partitions.append(cudf.Series(
renumber_vertex_cumsum, dtype=src_dtype))
num_verts = vertex_row_partitions.iloc[-1]
vertex_col_partitions = []
for i in range(pcols + 1):
vertex_col_partitions.append(vertex_row_partitions.iloc[i*prows])
vertex_col_partitions = cudf.Series(vertex_col_partitions, dtype=dst_dtype)

meta = ddf._meta._constructor_sliced([0])
partitions = ddf.map_partitions(
_set_partitions_pre,
vertex_row_partitions=vertex_row_partitions,
vertex_col_partitions=vertex_col_partitions, prows=prows,
pcols=pcols, transposed=transposed, meta=meta)
ddf2 = ddf.assign(_partitions=partitions)
ddf3 = rearrange_by_column(
ddf2,
"_partitions",
max_branch=None,
npartitions=ngpus,
shuffle="tasks",
ignore_index=True,
).drop(columns=["_partitions"])

return ddf3, num_verts, vertex_row_partitions