Skip to content

Commit

Permalink
cuGraph-PyG Loader Improvements (#3795)
Browse files Browse the repository at this point in the history
Consolidates various speed improvements tested while running performance benchmarks.  Avoids copying batch data, removes redundant data loading code, simplifies and improves de-offsetting, even though that is now being bypassed entirely for homogeneous graphs.  Removes extra host to device copy.  Properly flips the src/dst columns in the returned `HeteroData` minibatch objects, avoid exposing this to the end user.

I've confirmed this cuts the MFG time by a factor of 4.

Closes #3807

Authors:
  - Alex Barghi (https://github.com/alexbarghi-nv)

Approvers:
  - Vibhu Jawa (https://github.com/VibhuJawa)
  - Don Acosta (https://github.com/acostadon)
  - Brad Rees (https://github.com/BradReesWork)

URL: #3795
  • Loading branch information
alexbarghi-nv authored Sep 25, 2023
1 parent f53bb56 commit fe17abc
Show file tree
Hide file tree
Showing 10 changed files with 548 additions and 341 deletions.
218 changes: 158 additions & 60 deletions python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import pandas
import cudf
import cugraph
import warnings

from cugraph.utilities.utils import import_optional, MissingModule

Expand Down Expand Up @@ -211,7 +212,9 @@ def __init__(
F: cugraph.gnn.FeatureStore,
G: Union[Dict[str, Tuple[TensorType]], Dict[str, int]],
num_nodes_dict: Dict[str, int],
*,
multi_gpu: bool = False,
order: str = "CSC",
):
"""
Constructs a new CuGraphStore from the provided
Expand Down Expand Up @@ -256,11 +259,20 @@ def __init__(
multi_gpu: bool (Optional, default = False)
Whether the store should be backed by a multi-GPU graph.
Requires dask to have been set up.
order: str (Optional ["CSR", "CSC"], default = CSC)
The order to use for sampling. Should nearly always be CSC
unless there is a specific expectation of "reverse" sampling.
It is also not uncommon to use CSR order for correctness
testing, which some cuGraph-PyG tests do.
"""

if None in G:
raise ValueError("Unspecified edge types not allowed in PyG")

if order != "CSR" and order != "CSC":
raise ValueError("invalid valid for order")

self.__vertex_dtype = torch.int64

self._tensor_attr_cls = CuGraphTensorAttr
Expand Down Expand Up @@ -289,6 +301,7 @@ def __init__(
self.__features = F
self.__graph = None
self.__is_graph_owner = False
self.__order = order

if construct_graph:
if multi_gpu:
Expand All @@ -297,7 +310,9 @@ def __init__(
)

if self.__graph is None:
self.__graph = self.__construct_graph(G, multi_gpu=multi_gpu)
self.__graph = self.__construct_graph(
G, multi_gpu=multi_gpu, order=order
)
self.__is_graph_owner = True

self.__subgraphs = {}
Expand Down Expand Up @@ -347,6 +362,7 @@ def __construct_graph(
self,
edge_info: Dict[Tuple[str, str, str], List[TensorType]],
multi_gpu: bool = False,
order: str = "CSC",
) -> cugraph.MultiGraph:
"""
This function takes edge information and uses it to construct
Expand All @@ -363,6 +379,14 @@ def __construct_graph(
multi_gpu: bool (Optional, default=False)
Whether to construct a single-GPU or multi-GPU cugraph Graph.
Defaults to a single-GPU graph.
order: str (CSC or CSR)
Essentially whether to reverse edges so that the cuGraph
sampling algorithm operates on the CSC matrix instead of
the CSR matrix. Should nearly always be CSC unless there
is a specific expectation of reverse sampling, or correctness
testing is being performed.
Returns
-------
A newly-constructed directed cugraph.MultiGraph object.
Expand All @@ -371,6 +395,9 @@ def __construct_graph(
# Ensure the original dict is not modified.
edge_info_cg = {}

if order != "CSR" and order != "CSC":
raise ValueError("Order must be either CSC (default) or CSR!")

# Iterate over the keys in sorted order so that the created
# numerical types correspond to the lexicographic order
# of the keys, which is critical to converting the numeric
Expand Down Expand Up @@ -430,20 +457,43 @@ def __construct_graph(

df = pandas.DataFrame(
{
"src": pandas.Series(na_src),
"dst": pandas.Series(na_dst),
"src": pandas.Series(na_dst)
if order == "CSC"
else pandas.Series(na_src),
"dst": pandas.Series(na_src)
if order == "CSC"
else pandas.Series(na_dst),
"etp": pandas.Series(na_etp),
}
)
vertex_dtype = df.src.dtype

if multi_gpu:
nworkers = len(distributed.get_client().scheduler_info()["workers"])
df = dd.from_pandas(df, npartitions=nworkers).persist()
df = df.map_partitions(cudf.DataFrame.from_pandas)
else:
df = cudf.from_pandas(df)
df = dd.from_pandas(df, npartitions=nworkers if len(df) > 32 else 1)

# Ensure the dataframe is constructed on each partition
# instead of adding additional synchronization head from potential
# host to device copies.
def get_empty_df():
return cudf.DataFrame(
{
"src": cudf.Series([], dtype=vertex_dtype),
"dst": cudf.Series([], dtype=vertex_dtype),
"etp": cudf.Series([], dtype="int32"),
}
)

df = df.reset_index(drop=True)
# Have to check for empty partitions and handle them appropriately
df = df.persist()
df = df.map_partitions(
lambda f: cudf.DataFrame.from_pandas(f)
if len(f) > 0
else get_empty_df(),
meta=get_empty_df(),
).reset_index(drop=True)
else:
df = cudf.from_pandas(df).reset_index(drop=True)

graph = cugraph.MultiGraph(directed=True)
if multi_gpu:
Expand All @@ -468,6 +518,10 @@ def __construct_graph(
def _edge_types_to_attrs(self) -> dict:
return dict(self.__edge_types_to_attrs)

@property
def order(self) -> str:
return self.__order

@property
def node_types(self) -> List[NodeType]:
return list(self.__vertex_type_offsets["type"])
Expand Down Expand Up @@ -557,6 +611,7 @@ def _get_edge_index(self, attr: CuGraphEdgeAttr) -> Tuple[TensorType, TensorType
raise ValueError("Graph is not in memory, cannot access edge index!")

if attr.layout != EdgeLayout.COO:
# TODO support returning CSR/CSC (Issue #3802)
raise TypeError("Only COO direct access is supported!")

# Currently, graph creation enforces that input vertex ids are always of
Expand All @@ -566,12 +621,14 @@ def _get_edge_index(self, attr: CuGraphEdgeAttr) -> Tuple[TensorType, TensorType
# This may change in the future if/when renumbering or the graph
# creation process is refactored.
# See Issue #3201 for more details.
# Also note src/dst are flipped so that cuGraph sampling is done in
# CSC format rather than CSR format.
if self._is_delayed:
src_col_name = self.__graph.renumber_map.renumbered_src_col_name
dst_col_name = self.__graph.renumber_map.renumbered_dst_col_name
dst_col_name = self.__graph.renumber_map.renumbered_src_col_name
src_col_name = self.__graph.renumber_map.renumbered_dst_col_name
else:
src_col_name = self.__graph.srcCol
dst_col_name = self.__graph.dstCol
dst_col_name = self.__graph.srcCol
src_col_name = self.__graph.dstCol

# If there is only one edge type (homogeneous graph) then
# bypass the edge filters for a significant speed improvement.
Expand Down Expand Up @@ -785,29 +842,73 @@ def _get_renumbered_edge_groups_from_sample(
"""
row_dict = {}
col_dict = {}
if len(self.__edge_types_to_attrs) == 1:
# If there is only 1 edge type (includes heterogeneous graphs)
if len(self.edge_types) == 1:
t_pyg_type = list(self.__edge_types_to_attrs.values())[0].edge_type
src_type, _, dst_type = t_pyg_type

dst_id_table = noi_index[dst_type]
dst_id_map = (
cudf.Series(cupy.asarray(dst_id_table), name="dst")
.reset_index()
.rename(columns={"index": "new_id"})
.set_index("dst")
)
dst = dst_id_map["new_id"].loc[sampling_results.destinations]
col_dict[t_pyg_type] = torch.as_tensor(dst.values, device="cuda")

src_id_table = noi_index[src_type]
src_id_map = (
cudf.Series(cupy.asarray(src_id_table), name="src")
.reset_index()
.rename(columns={"index": "new_id"})
.set_index("src")
)
src = src_id_map["new_id"].loc[sampling_results.sources]
row_dict[t_pyg_type] = torch.as_tensor(src.values, device="cuda")
# If there is only 1 node type (homogeneous)
# This should only occur if the cuGraph loader was
# not used. This logic is deprecated.
if len(self.node_types) == 1:
warnings.warn(
"Renumbering after sampling for homogeneous graphs is deprecated.",
FutureWarning,
)

# Create a dataframe mapping old ids to new ids.
vtype = src_type
id_table = noi_index[vtype]
id_map = cudf.Series(
cupy.arange(id_table.shape[0], dtype="int32"),
name="new_id",
index=cupy.asarray(id_table),
).sort_index()

# Renumber the sources using binary search
# Step 1: get the index of the new id
ix_r = torch.searchsorted(
torch.as_tensor(id_map.index.values, device="cuda"),
torch.as_tensor(sampling_results.sources.values, device="cuda"),
)
# Step 2: Go from id indices to actual ids
row_dict[t_pyg_type] = torch.as_tensor(id_map.values, device="cuda")[
ix_r
]

# Renumber the destinations using binary search
# Step 1: get the index of the new id
ix_c = torch.searchsorted(
torch.as_tensor(id_map.index.values, device="cuda"),
torch.as_tensor(
sampling_results.destinations.values, device="cuda"
),
)
# Step 2: Go from id indices to actual ids
col_dict[t_pyg_type] = torch.as_tensor(id_map.values, device="cuda")[
ix_c
]
else:
# Handle the heterogeneous case where there is only 1 edge type
dst_id_table = noi_index[dst_type]
dst_id_map = cudf.DataFrame(
{
"dst": cupy.asarray(dst_id_table),
"new_id": cupy.arange(dst_id_table.shape[0]),
}
).set_index("dst")
dst = dst_id_map["new_id"].loc[sampling_results.destinations]
col_dict[t_pyg_type] = torch.as_tensor(dst.values, device="cuda")

src_id_table = noi_index[src_type]
src_id_map = cudf.DataFrame(
{
"src": cupy.asarray(src_id_table),
"new_id": cupy.arange(src_id_table.shape[0]),
}
).set_index("src")
src = src_id_map["new_id"].loc[sampling_results.sources]
row_dict[t_pyg_type] = torch.as_tensor(src.values, device="cuda")

else:
# This will retrieve the single string representation.
Expand All @@ -822,36 +923,18 @@ def _get_renumbered_edge_groups_from_sample(

for pyg_can_edge_type_str, ix in eoi_types.items():
pyg_can_edge_type = tuple(pyg_can_edge_type_str.split("__"))
src_type, _, dst_type = pyg_can_edge_type

# Get the de-offsetted sources
sources = torch.as_tensor(
sampling_results.sources.iloc[ix].values, device="cuda"
)
sources_ix = torch.searchsorted(
self.__vertex_type_offsets["stop"], sources
)
sources -= self.__vertex_type_offsets["start"][sources_ix]

# Create the row entry for this type
src_id_table = noi_index[src_type]
src_id_map = (
cudf.Series(cupy.asarray(src_id_table), name="src")
.reset_index()
.rename(columns={"index": "new_id"})
.set_index("src")
)
src = src_id_map["new_id"].loc[cupy.asarray(sources)]
row_dict[pyg_can_edge_type] = torch.as_tensor(src.values, device="cuda")
if self.__order == "CSR":
src_type, _, dst_type = pyg_can_edge_type
else: # CSC
dst_type, _, src_type = pyg_can_edge_type

# Get the de-offsetted destinations
dst_num_type = self._numeric_vertex_type_from_name(dst_type)
destinations = torch.as_tensor(
sampling_results.destinations.iloc[ix].values, device="cuda"
)
destinations_ix = torch.searchsorted(
self.__vertex_type_offsets["stop"], destinations
)
destinations -= self.__vertex_type_offsets["start"][destinations_ix]
destinations -= self.__vertex_type_offsets["start"][dst_num_type]

# Create the col entry for this type
dst_id_table = noi_index[dst_type]
Expand All @@ -864,6 +947,24 @@ def _get_renumbered_edge_groups_from_sample(
dst = dst_id_map["new_id"].loc[cupy.asarray(destinations)]
col_dict[pyg_can_edge_type] = torch.as_tensor(dst.values, device="cuda")

# Get the de-offsetted sources
src_num_type = self._numeric_vertex_type_from_name(src_type)
sources = torch.as_tensor(
sampling_results.sources.iloc[ix].values, device="cuda"
)
sources -= self.__vertex_type_offsets["start"][src_num_type]

# Create the row entry for this type
src_id_table = noi_index[src_type]
src_id_map = (
cudf.Series(cupy.asarray(src_id_table), name="src")
.reset_index()
.rename(columns={"index": "new_id"})
.set_index("src")
)
src = src_id_map["new_id"].loc[cupy.asarray(sources)]
row_dict[pyg_can_edge_type] = torch.as_tensor(src.values, device="cuda")

return row_dict, col_dict

def put_tensor(self, tensor, attr) -> None:
Expand Down Expand Up @@ -959,9 +1060,7 @@ def _get_tensor(self, attr: CuGraphTensorAttr) -> TensorType:
t = t[-1]

if isinstance(t, np.ndarray):
t = torch.as_tensor(t, device="cuda")
else:
t = t.cuda()
t = torch.as_tensor(t, device="cpu")

return t

Expand All @@ -979,7 +1078,6 @@ def _get_tensor(self, attr: CuGraphTensorAttr) -> TensorType:

t = torch.concatenate([t, u])

t = t.cuda()
return t

def _multi_get_tensor(self, attrs: List[CuGraphTensorAttr]) -> List[TensorType]:
Expand Down
Loading

0 comments on commit fe17abc

Please sign in to comment.