Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW] Allow passing a dict in feat_name for add_edge_data and add_node_data #2795

Merged
merged 17 commits into from
Oct 19, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 108 additions & 82 deletions python/cugraph/cugraph/gnn/graph_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ def add_node_data(
self,
df,
node_col_name,
feat_name=None,
ntype=None,
is_single_vector_feature=True,
feat_name=None,
contains_vector_features=False,
):
Comment on lines +58 to 60
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to this to keep the API cleaner and intuitive as you would want to provide feat_name together with contains_vector_features

"""
Add a dataframe describing node properties to the PropertyGraph.
Expand All @@ -68,54 +68,41 @@ def add_node_data(
interface.
node_col_name : string
The column name that contains the values to be used as vertex IDs.
feat_name : string
The feature name under which we should save the added properties
(ignored if is_single_vector_feature=False and the col names of
the dataframe are treated as corresponding feature names)
ntype : string
The node type to be added.
For example, if dataframe contains data about users, ntype
might be "users".
If not specified, the type of properties will be added as
an empty string.
is_single_vector_feature : True
Whether to treat all the columns of the dataframe being added as
a single 2d feature
feat_name : {} or string
A map of feature names under which we should save the added
properties like {"feat_1":[f1, f2], "feat_2":[f3, f4]}
(ignored if contains_vector_features=False and the col names of
the dataframe are treated as corresponding feature names)
contains_vector_features : False
Wether to treat the columns of the dataframe being added as
VibhuJawa marked this conversation as resolved.
Show resolved Hide resolved
as 2d features
rlratzel marked this conversation as resolved.
Show resolved Hide resolved
Returns
-------
None
"""
self.gdata.add_vertex_data(df, vertex_col_name=node_col_name, type_name=ntype)
columns = [col for col in list(df.columns) if col != node_col_name]

if is_single_vector_feature:
if feat_name is None:
raise ValueError(
"feature name must be provided when wrapping"
+ " multiple columns under a single feature name"
)

elif feat_name:
raise ValueError(
"feat_name is only valid when wrapping"
+ " multiple columns under a single feature name"
)

if is_single_vector_feature:
self.ndata_feat_col_d[feat_name] = columns
else:
for col in columns:
self.ndata_feat_col_d[col] = [col]
_update_feature_map(
self.ndata_feat_col_d, feat_name, contains_vector_features, columns
)
# Clear properties if set as data has changed

self.__clear_cached_properties()

def add_edge_data(
self,
df,
node_col_names,
canonical_etype=None,
feat_name=None,
etype=None,
is_single_vector_feature=True,
contains_vector_features=False,
):
"""
Add a dataframe describing edge properties to the PropertyGraph.
Expand All @@ -128,47 +115,34 @@ def add_edge_data(
node_col_names : string
The column names that contain the values to be used as the source
and destination vertex IDs for the edges.
feat_name : string
feat_name : string or dict {}
The feature name under which we should save the added properties
(ignored if is_single_vector_feature=False and the col names of
(ignored if contains_vector_features=False and the col names of
the dataframe are treated as corresponding feature names)
etype : string
canonical_etype : string
The edge type to be added. This should follow the string format
'(src_type),(edge_type),(dst_type)'
If not specified, the type of properties will be added as
an empty string.
is_single_vector_feature : True
Wether to treat all the columns of the dataframe being
added as a single 2d feature
contains_vector_features : False
Wether to treat the columns of the dataframe being added as
VibhuJawa marked this conversation as resolved.
Show resolved Hide resolved
as 2d features
Returns
-------
None
"""
self.gdata.add_edge_data(df, vertex_col_names=node_col_names, type_name=etype)
self.gdata.add_edge_data(
df, vertex_col_names=node_col_names, type_name=canonical_etype
)
columns = [col for col in list(df.columns) if col not in node_col_names]
if is_single_vector_feature:
if feat_name is None:
raise ValueError(
"feature name must be provided when wrapping"
+ " multiple columns under a single feature name"
)

elif feat_name:
raise ValueError(
"feat_name is only valid when wrapping"
+ " multiple columns under a single feature name"
)

if is_single_vector_feature:
self.edata_feat_col_d[feat_name] = columns
else:
for col in columns:
self.edata_feat_col_d[col] = [col]
_update_feature_map(
self.edata_feat_col_d, feat_name, contains_vector_features, columns
)

# Clear properties if set as data has changed
self.__clear_cached_properties()

def get_node_storage(self, feat_name, ntype=None):
def get_node_storage(self, key, ntype=None, indices_offset=0):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to key to match DGL.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if ntype is None:
ntypes = self.ntypes
if len(self.ntypes) > 1:
Expand All @@ -179,22 +153,23 @@ def get_node_storage(self, feat_name, ntype=None):
)
)
ntype = ntypes[0]
if feat_name not in self.ndata_feat_col_d:
if key not in self.ndata_feat_col_d:
raise ValueError(
f"feat_name {feat_name} not found in CuGraphStore" " node features",
f"key {key} not found in CuGraphStore" " node features",
VibhuJawa marked this conversation as resolved.
Show resolved Hide resolved
f" {list(self.ndata_feat_col_d.keys())}",
)

columns = self.ndata_feat_col_d[feat_name]
columns = self.ndata_feat_col_d[key]

return CuFeatureStorage(
pg=self.gdata,
columns=columns,
storage_type="node",
indices_offset=indices_offset,
backend_lib=self.backend_lib,
)

def get_edge_storage(self, feat_name, etype=None):
def get_edge_storage(self, key, etype=None, indices_offset=0):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to key to match DGL.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if etype is None:
etypes = self.etypes
if len(self.etypes) > 1:
Expand All @@ -206,18 +181,19 @@ def get_edge_storage(self, feat_name, etype=None):
)

etype = etypes[0]
if feat_name not in self.edata_feat_col_d:
if key not in self.edata_feat_col_d:
raise ValueError(
f"feat_name {feat_name} not found in CuGraphStore" " edge features",
f"key {key} not found in CuGraphStore" " edge features",
f" {list(self.edata_feat_col_d.keys())}",
)
columns = self.edata_feat_col_d[feat_name]
columns = self.edata_feat_col_d[key]

return CuFeatureStorage(
pg=self.gdata,
columns=columns,
storage_type="edge",
backend_lib=self.backend_lib,
indices_offset=indices_offset,
)

def num_nodes(self, ntype=None):
Expand Down Expand Up @@ -500,25 +476,27 @@ def node_subgraph(
return _subg

def __clear_cached_properties(self):
if hasattr(self, "has_multiple_etypes"):
# has_attr caused access to the attribute
# forcing computation
VibhuJawa marked this conversation as resolved.
Show resolved Hide resolved
if "has_multiple_etypes" in self.__dict__:
del self.has_multiple_etypes

if hasattr(self, "num_nodes_dict"):
if "num_nodes_dict" in self.__dict__:
del self.num_nodes_dict

if hasattr(self, "num_edges_dict"):
if "num_edges_dict" in self.__dict__:
del self.num_edges_dict

if hasattr(self, "extracted_subgraph"):
if "extracted_subgraph" in self.__dict__:
del self.extracted_subgraph

if hasattr(self, "extracted_reverse_subgraph"):
if "extracted_reverse_subgraph" in self.__dict__:
del self.extracted_reverse_subgraph

if hasattr(self, "extracted_subgraphs_per_type"):
if "extracted_subgraphs_per_type" in self.__dict__:
del self.extracted_subgraphs_per_type

if hasattr(self, "extracted_reverse_subgraphs_per_type"):
if "extracted_reverse_subgraphs_per_type" in self.__dict__:
del self.extracted_reverse_subgraphs_per_type


Expand All @@ -529,7 +507,9 @@ class CuFeatureStorage:
is fine. DGL simply uses duck-typing to implement its sampling pipeline.
"""

def __init__(self, pg, columns, storage_type, backend_lib="torch"):
def __init__(
self, pg, columns, storage_type, backend_lib="torch", indices_offset=0
):
self.pg = pg
self.columns = columns
if backend_lib == "torch":
Expand All @@ -548,6 +528,7 @@ def __init__(self, pg, columns, storage_type, backend_lib="torch"):
self.storage_type = storage_type

self.from_dlpack = from_dlpack
self.indices_offset = indices_offset
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting offset makes upstream code cleaner


def fetch(self, indices, device=None, pin_memory=False, **kwargs):
"""Fetch the features of the given node/edge IDs to the
Expand Down Expand Up @@ -575,6 +556,9 @@ def fetch(self, indices, device=None, pin_memory=False, **kwargs):
indices = indices.get()
else:
indices = cudf.Series(indices)

indices = indices + self.indices_offset

if self.storage_type == "node":
subset_df = self.pg.get_vertex_data(
vertex_ids=indices, columns=self.columns
Expand All @@ -586,20 +570,17 @@ def fetch(self, indices, device=None, pin_memory=False, **kwargs):

if isinstance(subset_df, dask_cudf.DataFrame):
subset_df = subset_df.compute()
if len(subset_df) == 0:
raise ValueError(f"{indices=} not found in FeatureStorage")
else:
tensor = self.from_dlpack(subset_df.to_dlpack())

if isinstance(tensor, cp.ndarray):
# can not transfer to
# a different device for cupy
return tensor
else:
if device:
if len(subset_df) == 0:
raise ValueError(f"indices = {indices} not found in FeatureStorage")
VibhuJawa marked this conversation as resolved.
Show resolved Hide resolved
cap = subset_df.to_dlpack()
tensor = self.from_dlpack(cap)
del cap
if device:
if not isinstance(tensor, cp.ndarray):
# Cant transfer to different device for cupy
tensor = tensor.to(device)
else:
return tensor
return tensor
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed an error here when a user provides device, cant test this as that needs Pytorch installed here.



def return_dlpack_d(d):
Expand Down Expand Up @@ -721,3 +702,48 @@ def get_subgraph_from_edgelist(edge_list, is_mg, reverse_edges=False):
)

return subgraph


def _update_feature_map(
pg_feature_map, feat_name_obj, contains_vector_features, columns
):
if contains_vector_features:
if feat_name_obj is None:
raise ValueError(
"feature name must be provided when wrapping"
+ " multiple columns under a single feature name"
+ " or a feature map"
)

if isinstance(feat_name_obj, str):
pg_feature_map[feat_name_obj] = columns

elif isinstance(feat_name_obj, dict):
covered_columns = []
for col in feat_name_obj.keys():
current_cols = feat_name_obj[col]
# Handle strings too
if isinstance(current_cols, str):
current_cols = [current_cols]
covered_columns = covered_columns + current_cols

if set(covered_columns) != set(columns):
raise ValueError(
f"All the columns {columns} not covered in {columns} "
VibhuJawa marked this conversation as resolved.
Show resolved Hide resolved
f"Please check the feature_map {feat_name_obj} provided"
)

for key, cols in feat_name_obj.items():
if isinstance(cols, str):
cols = [cols]
pg_feature_map[key] = cols
else:
raise ValueError(f"{feat_name_obj} should be str or dict")
else:
if feat_name_obj:
raise ValueError(
f"feat_name {feat_name_obj} is only valid when "
"wrapping multiple columns under feature names"
)
for col in columns:
pg_feature_map[col] = [col]
Loading