From b08a04409be749354a12f00cab3d2150677bf48d Mon Sep 17 00:00:00 2001 From: Joseph Nke Date: Tue, 3 Aug 2021 11:40:42 -0700 Subject: [PATCH 1/5] map partition to corresponding worker, ensure appropriate personalization dtype --- python/cugraph/dask/common/part_utils.py | 4 +++- python/cugraph/dask/link_analysis/pagerank.py | 6 ++++++ python/cugraph/link_analysis/pagerank.py | 6 ++++++ 3 files changed, 15 insertions(+), 1 deletion(-) diff --git a/python/cugraph/dask/common/part_utils.py b/python/cugraph/dask/common/part_utils.py index 2bff490d35c..22363dab4fd 100644 --- a/python/cugraph/dask/common/part_utils.py +++ b/python/cugraph/dask/common/part_utils.py @@ -88,7 +88,9 @@ async def _extract_partitions(dask_obj, client=None, batch_enabled=False): if batch_enabled: persisted = client.persist(dask_obj, workers=worker_list[0]) else: - persisted = client.persist(dask_obj) + persisted = [client.persist( + dask_obj.get_partition(p), workers=w) for p, w in enumerate( + worker_list)] parts = futures_of(persisted) # iterable of dask collections (need to colocate them) elif isinstance(dask_obj, collections.Sequence): diff --git a/python/cugraph/dask/link_analysis/pagerank.py b/python/cugraph/dask/link_analysis/pagerank.py index 87ef94d1600..f10372bfe70 100644 --- a/python/cugraph/dask/link_analysis/pagerank.py +++ b/python/cugraph/dask/link_analysis/pagerank.py @@ -19,6 +19,7 @@ from cugraph.dask.link_analysis import mg_pagerank_wrapper as mg_pagerank import cugraph.comms.comms as Comms import dask_cudf +import cudf def call_pagerank(sID, @@ -139,6 +140,11 @@ def pagerank(input_graph, data = get_distributed_data(ddf) if personalization is not None: + if not isinstance(personalization, cudf.DataFrame): + raise NotImplementedError( + "personalization other than a cudf dataframe " + "currently not supported" + ) null_check(personalization["vertex"]) null_check(personalization["values"]) if input_graph.renumbered is True: diff --git a/python/cugraph/link_analysis/pagerank.py b/python/cugraph/link_analysis/pagerank.py index 94b1491e944..b875246fb03 100644 --- a/python/cugraph/link_analysis/pagerank.py +++ b/python/cugraph/link_analysis/pagerank.py @@ -13,6 +13,7 @@ from cugraph.link_analysis import pagerank_wrapper import cugraph +import cudf def pagerank( @@ -97,6 +98,11 @@ def pagerank( G, isNx = cugraph.utilities.check_nx_graph(G, weight) if personalization is not None: + if not isinstance(personalization, cudf.DataFrame): + raise NotImplementedError( + "personalization other than a cudf dataframe " + "currently not supported" + ) if G.renumbered is True: if len(G.renumber_map.implementation.col_names) > 1: cols = personalization.columns[:-1].to_list() From 937315af285236c41ea9c09b26e9f708948f7ed3 Mon Sep 17 00:00:00 2001 From: root Date: Fri, 6 Aug 2021 08:56:41 -0700 Subject: [PATCH 2/5] personalization fix --- python/cugraph/dask/link_analysis/pagerank.py | 38 +++++++++++++++---- 1 file changed, 30 insertions(+), 8 deletions(-) diff --git a/python/cugraph/dask/link_analysis/pagerank.py b/python/cugraph/dask/link_analysis/pagerank.py index f10372bfe70..af136fc7522 100644 --- a/python/cugraph/dask/link_analysis/pagerank.py +++ b/python/cugraph/dask/link_analysis/pagerank.py @@ -20,6 +20,7 @@ import cugraph.comms.comms as Comms import dask_cudf import cudf +from dask.dataframe.shuffle import rearrange_by_column def call_pagerank(sID, @@ -38,6 +39,8 @@ def call_pagerank(sID, local_size = len(aggregate_segment_offsets) // Comms.get_n_workers(sID) segment_offsets = \ aggregate_segment_offsets[local_size * wid: local_size * (wid + 1)] + print(wid, personalization) + print(vertex_partition_offsets) return mg_pagerank.mg_pagerank(data[0], num_verts, num_edges, @@ -140,18 +143,37 @@ def pagerank(input_graph, data = get_distributed_data(ddf) if personalization is not None: - if not isinstance(personalization, cudf.DataFrame): - raise NotImplementedError( - "personalization other than a cudf dataframe " - "currently not supported" - ) - null_check(personalization["vertex"]) - null_check(personalization["values"]) if input_graph.renumbered is True: personalization = input_graph.add_internal_vertex_id( personalization, "vertex", "vertex" ) - p_data = get_distributed_data(personalization) + + def _set_partitions_pre(s, divisions): + partitions = divisions.searchsorted(s, side="right") - 1 + partitions[ + divisions.tail(1).searchsorted(s, side="right").astype("bool") + ] = (len(divisions) - 2) + return partitions + + df = personalization + by = ['vertex'] + meta = df._meta._constructor_sliced([0]) + divisions = vertex_partition_offsets + partitions = df[by].map_partitions( + _set_partitions_pre, divisions=divisions, meta=meta + ) + + df2 = df.assign(_partitions=partitions) + df3 = rearrange_by_column( + df2, + "_partitions", + max_branch=None, + npartitions=len(divisions) - 1, + shuffle="tasks", + ignore_index=False, + ).drop(columns=["_partitions"]) + + p_data = get_distributed_data(df3) result = [client.submit(call_pagerank, Comms.get_session_id(), From b1290da18e17b39483e596d5886a00bf172a9f58 Mon Sep 17 00:00:00 2001 From: root Date: Fri, 6 Aug 2021 09:06:18 -0700 Subject: [PATCH 3/5] flake8 --- python/cugraph/dask/link_analysis/pagerank.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/python/cugraph/dask/link_analysis/pagerank.py b/python/cugraph/dask/link_analysis/pagerank.py index af136fc7522..0d809fd064f 100644 --- a/python/cugraph/dask/link_analysis/pagerank.py +++ b/python/cugraph/dask/link_analysis/pagerank.py @@ -19,7 +19,6 @@ from cugraph.dask.link_analysis import mg_pagerank_wrapper as mg_pagerank import cugraph.comms.comms as Comms import dask_cudf -import cudf from dask.dataframe.shuffle import rearrange_by_column @@ -39,8 +38,6 @@ def call_pagerank(sID, local_size = len(aggregate_segment_offsets) // Comms.get_n_workers(sID) segment_offsets = \ aggregate_segment_offsets[local_size * wid: local_size * (wid + 1)] - print(wid, personalization) - print(vertex_partition_offsets) return mg_pagerank.mg_pagerank(data[0], num_verts, num_edges, @@ -128,8 +125,6 @@ def pagerank(input_graph, edge_attr='value') >>> pr = dcg.pagerank(dg) """ - from cugraph.structure.graph_classes import null_check - nstart = None client = default_client() From c4386771313dd34aebbd1b4646df693ea7e956c9 Mon Sep 17 00:00:00 2001 From: Iroy30 <41401566+Iroy30@users.noreply.github.com> Date: Wed, 11 Aug 2021 12:39:21 -0500 Subject: [PATCH 4/5] Add comments --- python/cugraph/dask/link_analysis/pagerank.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/python/cugraph/dask/link_analysis/pagerank.py b/python/cugraph/dask/link_analysis/pagerank.py index 0d809fd064f..5fb5d6542f8 100644 --- a/python/cugraph/dask/link_analysis/pagerank.py +++ b/python/cugraph/dask/link_analysis/pagerank.py @@ -143,6 +143,7 @@ def pagerank(input_graph, personalization, "vertex", "vertex" ) + # Function to assign partition id to personalization dataframe def _set_partitions_pre(s, divisions): partitions = divisions.searchsorted(s, side="right") - 1 partitions[ @@ -150,6 +151,7 @@ def _set_partitions_pre(s, divisions): ] = (len(divisions) - 2) return partitions + # Assign partition id column as per vertex_partition_offsets df = personalization by = ['vertex'] meta = df._meta._constructor_sliced([0]) @@ -159,6 +161,8 @@ def _set_partitions_pre(s, divisions): ) df2 = df.assign(_partitions=partitions) + + # Shuffle personalization values according to the partition id df3 = rearrange_by_column( df2, "_partitions", From 4b5bd864f8947be78aa8294d990a8c929aff2a42 Mon Sep 17 00:00:00 2001 From: Iroy30 <41401566+Iroy30@users.noreply.github.com> Date: Wed, 11 Aug 2021 12:53:44 -0500 Subject: [PATCH 5/5] flake8 --- python/cugraph/dask/link_analysis/pagerank.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/cugraph/dask/link_analysis/pagerank.py b/python/cugraph/dask/link_analysis/pagerank.py index 5fb5d6542f8..85d61233179 100644 --- a/python/cugraph/dask/link_analysis/pagerank.py +++ b/python/cugraph/dask/link_analysis/pagerank.py @@ -161,7 +161,7 @@ def _set_partitions_pre(s, divisions): ) df2 = df.assign(_partitions=partitions) - + # Shuffle personalization values according to the partition id df3 = rearrange_by_column( df2,