Skip to content

Commit

Permalink
Bulk Loading Support for cuGraph-PyG (#3170)
Browse files Browse the repository at this point in the history
Closes #3152 
Depends on #3148 and #3159 
This PR does support multiple trainers, even though there is no example for it.

Authors:
  - Alex Barghi (https://github.com/alexbarghi-nv)
  - Chuck Hastings (https://github.com/ChuckHastings)
  - Vibhu Jawa (https://github.com/VibhuJawa)

Approvers:
  - Vibhu Jawa (https://github.com/VibhuJawa)
  - Rick Ratzel (https://github.com/rlratzel)

URL: #3170
  • Loading branch information
alexbarghi-nv authored Jan 31, 2023
1 parent 9edc8f7 commit 867a0ac
Show file tree
Hide file tree
Showing 20 changed files with 698 additions and 622 deletions.
2 changes: 1 addition & 1 deletion python/cugraph-dgl/cugraph_dgl/dataloading/dataloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ def __iter__(self):
batch_size=self._batch_size,
graph=self._cugraph_graph,
batches_per_partition=self._batches_per_partition,
saturation_level=self._seeds_per_call,
seeds_per_call=self._seeds_per_call,
rank=rank,
fanout_vals=self.graph_sampler._reversed_fanout_vals,
with_replacement=self.graph_sampler.replace,
Expand Down
2 changes: 1 addition & 1 deletion python/cugraph-pyg/cugraph_pyg/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from cugraph_pyg.utilities.api_tools import experimental_warning_wrapper
from cugraph.utilities.api_tools import experimental_warning_wrapper

from cugraph_pyg.data.cugraph_store import EXPERIMENTAL__CuGraphStore

Expand Down
97 changes: 64 additions & 33 deletions python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Optional, Tuple, Any, Union, List
from typing import Optional, Tuple, Any, Union, List, Dict

from enum import Enum

from dataclasses import dataclass
from collections import defaultdict
from itertools import chain
from functools import cached_property

# numpy is always available
import numpy as np
import cupy
import pandas


import cudf
import cugraph

from cugraph.utilities.utils import import_optional, MissingModule
import cudf

# FIXME drop cupy support and make torch the only backend (#2995)
import cupy

import dask.dataframe as dd
from dask.distributed import get_client
Expand Down Expand Up @@ -214,10 +210,10 @@ def __init__(
num_nodes_dict : dict (Required)
A dictionary mapping each node type to the count of nodes
of that type in the graph.
backend : ('torch', 'cupy') (Required)
backend : ('torch', 'cupy') (Optional, default = 'torch')
The backend that manages tensors (default = 'torch')
Should usually be 'torch' ('torch', 'cupy' supported).
multi_gpu : bool (Required)
multi_gpu : bool (Optional, default = False)
Whether the store should be backed by a multi-GPU graph.
Requires dask to have been set up.
"""
Expand Down Expand Up @@ -287,7 +283,11 @@ def __make_offsets(self, input_dict):

return offsets

def __infer_offsets(self, num_nodes_dict, num_edges_dict) -> None:
def __infer_offsets(
self,
num_nodes_dict: Dict[str, int],
num_edges_dict: Dict[Tuple[str, str, str], int],
) -> None:
"""
Sets the vertex offsets for this store.
"""
Expand All @@ -303,9 +303,35 @@ def __infer_offsets(self, num_nodes_dict, num_edges_dict) -> None:
}
)

def __construct_graph(self, edge_info, multi_gpu=False) -> cugraph.MultiGraph:
def __construct_graph(
self, edge_info: Dict[Tuple[str, str, str], List], multi_gpu: bool = False
) -> cugraph.MultiGraph:
"""
This function takes edge information and uses it to construct
a cugraph Graph. It determines the numerical edge type by
sorting the keys of the input dictionary
(the canonical edge types).
Parameters
----------
edge_info: Dict[Tuple[str, str, str]] (Required)
Input edge info dictionary, where keys are the canonical
edge type and values are the edge index (src/dst).
multi_gpu: bool (Optional, default=False)
Whether to construct a single-GPU or multi-GPU cugraph Graph.
Defaults to a single-GPU graph.
Returns
-------
A newly-constructed directed cugraph.MultiGraph object.
"""
# Ensure the original dict is not modified.
edge_info_cg = {}

# Iterate over the keys in sorted order so that the created
# numerical types correspond to the lexicographic order
# of the keys, which is critical to converting the numeric
# keys back to canonical edge types later.
for pyg_can_edge_type in sorted(edge_info.keys()):
src_type, _, dst_type = pyg_can_edge_type
srcs, dsts = edge_info[pyg_can_edge_type]
Expand All @@ -332,15 +358,12 @@ def __construct_graph(self, edge_info, multi_gpu=False) -> cugraph.MultiGraph:
]
)

et_offsets = self.__edge_type_offsets
na_etp = np.concatenate(
[
np.array(
[i]
* int(
self.__edge_type_offsets["stop"][i]
- self.__edge_type_offsets["start"][i]
+ 1
),
np.full(
int(et_offsets["stop"][i] - et_offsets["start"][i] + 1),
i,
dtype="int32",
)
for i in range(len(self.__edge_type_offsets["start"]))
Expand All @@ -351,18 +374,15 @@ def __construct_graph(self, edge_info, multi_gpu=False) -> cugraph.MultiGraph:
{
"src": na_src,
"dst": na_dst,
# FIXME use the edge type property
# "w": np.zeros(len(na_src)),
"w": na_etp,
"w": np.zeros(len(na_src)),
"eid": np.arange(len(na_src)),
"etp": na_etp,
}
)

if multi_gpu:
nworkers = len(get_client().scheduler_info()["workers"])
npartitions = nworkers * 1
df = dd.from_pandas(df, npartitions=npartitions).persist()
df = dd.from_pandas(df, npartitions=nworkers).persist()
df = df.map_partitions(cudf.DataFrame.from_pandas)
else:
df = cudf.from_pandas(df)
Expand Down Expand Up @@ -399,7 +419,7 @@ def backend(self) -> str:

@cached_property
def _is_delayed(self):
return isinstance(self.__graph._plc_graph, dict)
return self.__graph.is_multi_gpu()

def get_vertex_index(self, vtypes) -> TensorType:
if isinstance(vtypes, str):
Expand Down Expand Up @@ -465,6 +485,14 @@ def _get_edge_index(self, attr: CuGraphEdgeAttr) -> Tuple[TensorType, TensorType
if attr.layout != EdgeLayout.COO:
raise TypeError("Only COO direct access is supported!")

# Currently, graph creation enforces that legacy_renum_only=True
# is always called, and the input vertex ids are always of integer
# type. Therefore, it is currently safe to assume that for MG
# graphs, the src/dst col names are renumbered_src/dst
# and for SG graphs, the src/dst col names are src/dst.
# This may change in the future if/when renumbering or the graph
# creation process is refactored.
# See Issue #3201 for more details.
if self._is_delayed:
src_col_name = self.__graph.renumber_map.renumbered_src_col_name
dst_col_name = self.__graph.renumber_map.renumbered_dst_col_name
Expand Down Expand Up @@ -561,7 +589,7 @@ def get_edge_index(self, *args, **kwargs) -> Tuple[TensorType, TensorType]:
raise KeyError(f"An edge corresponding to '{edge_attr}' was not " f"found")
return edge_index

def _subgraph(self, edge_types: List[tuple]) -> cugraph.MultiGraph:
def _subgraph(self, edge_types: List[tuple] = None) -> cugraph.MultiGraph:
"""
Returns a subgraph with edges limited to those of a given type
Expand All @@ -577,10 +605,13 @@ def _subgraph(self, edge_types: List[tuple]) -> cugraph.MultiGraph:
if it has not already been extracted.
"""
if set(edge_types) != set(self.__edge_types_to_attrs.keys()):
if edge_types is not None and set(edge_types) != set(
self.__edge_types_to_attrs.keys()
):
raise ValueError(
"Subgraphing is currently unsupported, please"
" specify all edge types in the graph."
" specify all edge types in the graph or leave"
" this argument empty."
)

return self.__graph
Expand Down Expand Up @@ -645,7 +676,7 @@ def _get_renumbered_edge_groups_from_sample(
Example Input: Series({
'sources': [0, 5, 11, 3],
'destinations': [8, 2, 3, 5]},
'indices': [1, 3, 5, 14]
'edge_type': [1, 3, 5, 14]
}),
{
'blue_vertex': [0, 5],
Expand Down Expand Up @@ -684,18 +715,18 @@ def _get_renumbered_edge_groups_from_sample(
# It needs to be converted to a tuple in the for loop below.
eoi_types = (
cudf.Series(self.__edge_type_offsets["type"])
.iloc[sampling_results.indices.astype("int32")]
.iloc[sampling_results.edge_type.astype("int32")]
.reset_index(drop=True)
)
print("eoi_types:", eoi_types)

eoi_types = cudf.Series(eoi_types, name="t").groupby("t").groups

for pyg_can_edge_type_str, ix in eoi_types.items():
pyg_can_edge_type = tuple(pyg_can_edge_type_str.split("__"))
src_type, _, dst_type = pyg_can_edge_type

# Get the de-offsetted sources
sources = self.asarray(sampling_results.sources.loc[ix])
sources = self.asarray(sampling_results.sources.iloc[ix])
sources_ix = self.searchsorted(
self.__vertex_type_offsets["stop"], sources
)
Expand All @@ -707,7 +738,7 @@ def _get_renumbered_edge_groups_from_sample(
row_dict[pyg_can_edge_type] = src

# Get the de-offsetted destinations
destinations = self.asarray(sampling_results.destinations.loc[ix])
destinations = self.asarray(sampling_results.destinations.iloc[ix])
destinations_ix = self.searchsorted(
self.__vertex_type_offsets["stop"], destinations
)
Expand Down
14 changes: 13 additions & 1 deletion python/cugraph-pyg/cugraph_pyg/loader/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2019-2022, NVIDIA CORPORATION.
# Copyright (c) 2022-2023, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand All @@ -10,3 +10,15 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from cugraph.utilities.api_tools import experimental_warning_wrapper

from cugraph_pyg.loader.cugraph_node_loader import EXPERIMENTAL__CuGraphNeighborLoader

CuGraphNeighborLoader = experimental_warning_wrapper(
EXPERIMENTAL__CuGraphNeighborLoader
)

from cugraph_pyg.loader.cugraph_node_loader import EXPERIMENTAL__BulkSampleLoader

BulkSampleLoader = experimental_warning_wrapper(EXPERIMENTAL__BulkSampleLoader)
Loading

0 comments on commit 867a0ac

Please sign in to comment.