-
Notifications
You must be signed in to change notification settings - Fork 310
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[REVIEW] Allow passing a dict in feat_name for add_edge_data and add_node_data #2795
Changes from 16 commits
de7c4d8
13f9f9a
45b9ea3
525bc9b
cb235f6
5ee39cb
5ba162f
32ef3ec
23daf8a
d0a7e29
5084247
5d93920
dd5f090
8a778d8
7245ad4
45e257d
81a758e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -54,9 +54,9 @@ def add_node_data( | |
self, | ||
df, | ||
node_col_name, | ||
feat_name=None, | ||
ntype=None, | ||
is_single_vector_feature=True, | ||
feat_name=None, | ||
contains_vector_features=False, | ||
): | ||
""" | ||
Add a dataframe describing node properties to the PropertyGraph. | ||
|
@@ -68,54 +68,41 @@ def add_node_data( | |
interface. | ||
node_col_name : string | ||
The column name that contains the values to be used as vertex IDs. | ||
feat_name : string | ||
The feature name under which we should save the added properties | ||
(ignored if is_single_vector_feature=False and the col names of | ||
the dataframe are treated as corresponding feature names) | ||
ntype : string | ||
The node type to be added. | ||
For example, if dataframe contains data about users, ntype | ||
might be "users". | ||
If not specified, the type of properties will be added as | ||
an empty string. | ||
is_single_vector_feature : True | ||
Whether to treat all the columns of the dataframe being added as | ||
a single 2d feature | ||
feat_name : {} or string | ||
A map of feature names under which we should save the added | ||
properties like {"feat_1":[f1, f2], "feat_2":[f3, f4]} | ||
(ignored if contains_vector_features=False and the col names of | ||
the dataframe are treated as corresponding feature names) | ||
contains_vector_features : False | ||
Whether to treat the columns of the dataframe being added as | ||
as 2d features | ||
rlratzel marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Returns | ||
------- | ||
None | ||
""" | ||
self.gdata.add_vertex_data(df, vertex_col_name=node_col_name, type_name=ntype) | ||
columns = [col for col in list(df.columns) if col != node_col_name] | ||
|
||
if is_single_vector_feature: | ||
if feat_name is None: | ||
raise ValueError( | ||
"feature name must be provided when wrapping" | ||
+ " multiple columns under a single feature name" | ||
) | ||
|
||
elif feat_name: | ||
raise ValueError( | ||
"feat_name is only valid when wrapping" | ||
+ " multiple columns under a single feature name" | ||
) | ||
|
||
if is_single_vector_feature: | ||
self.ndata_feat_col_d[feat_name] = columns | ||
else: | ||
for col in columns: | ||
self.ndata_feat_col_d[col] = [col] | ||
_update_feature_map( | ||
self.ndata_feat_col_d, feat_name, contains_vector_features, columns | ||
) | ||
# Clear properties if set as data has changed | ||
|
||
self.__clear_cached_properties() | ||
|
||
def add_edge_data( | ||
self, | ||
df, | ||
node_col_names, | ||
canonical_etype=None, | ||
feat_name=None, | ||
etype=None, | ||
is_single_vector_feature=True, | ||
contains_vector_features=False, | ||
): | ||
""" | ||
Add a dataframe describing edge properties to the PropertyGraph. | ||
|
@@ -128,47 +115,34 @@ def add_edge_data( | |
node_col_names : string | ||
The column names that contain the values to be used as the source | ||
and destination vertex IDs for the edges. | ||
feat_name : string | ||
The feature name under which we should save the added properties | ||
(ignored if is_single_vector_feature=False and the col names of | ||
the dataframe are treated as corresponding feature names) | ||
etype : string | ||
canonical_etype : string | ||
The edge type to be added. This should follow the string format | ||
'(src_type),(edge_type),(dst_type)' | ||
If not specified, the type of properties will be added as | ||
an empty string. | ||
is_single_vector_feature : True | ||
Wether to treat all the columns of the dataframe being | ||
added as a single 2d feature | ||
feat_name : string or dict {} | ||
The feature name under which we should save the added properties | ||
(ignored if contains_vector_features=False and the col names of | ||
the dataframe are treated as corresponding feature names) | ||
contains_vector_features : False | ||
Whether to treat the columns of the dataframe being added as | ||
as 2d features | ||
Returns | ||
------- | ||
None | ||
""" | ||
self.gdata.add_edge_data(df, vertex_col_names=node_col_names, type_name=etype) | ||
self.gdata.add_edge_data( | ||
df, vertex_col_names=node_col_names, type_name=canonical_etype | ||
) | ||
columns = [col for col in list(df.columns) if col not in node_col_names] | ||
if is_single_vector_feature: | ||
if feat_name is None: | ||
raise ValueError( | ||
"feature name must be provided when wrapping" | ||
+ " multiple columns under a single feature name" | ||
) | ||
|
||
elif feat_name: | ||
raise ValueError( | ||
"feat_name is only valid when wrapping" | ||
+ " multiple columns under a single feature name" | ||
) | ||
|
||
if is_single_vector_feature: | ||
self.edata_feat_col_d[feat_name] = columns | ||
else: | ||
for col in columns: | ||
self.edata_feat_col_d[col] = [col] | ||
_update_feature_map( | ||
self.edata_feat_col_d, feat_name, contains_vector_features, columns | ||
) | ||
|
||
# Clear properties if set as data has changed | ||
self.__clear_cached_properties() | ||
|
||
def get_node_storage(self, feat_name, ntype=None): | ||
def get_node_storage(self, key, ntype=None, indices_offset=0): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed to key to match DGL. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
if ntype is None: | ||
ntypes = self.ntypes | ||
if len(self.ntypes) > 1: | ||
|
@@ -179,22 +153,23 @@ def get_node_storage(self, feat_name, ntype=None): | |
) | ||
) | ||
ntype = ntypes[0] | ||
if feat_name not in self.ndata_feat_col_d: | ||
if key not in self.ndata_feat_col_d: | ||
raise ValueError( | ||
f"feat_name {feat_name} not found in CuGraphStore" " node features", | ||
f"key {key} not found in CuGraphStore node features", | ||
f" {list(self.ndata_feat_col_d.keys())}", | ||
) | ||
|
||
columns = self.ndata_feat_col_d[feat_name] | ||
columns = self.ndata_feat_col_d[key] | ||
|
||
return CuFeatureStorage( | ||
pg=self.gdata, | ||
columns=columns, | ||
storage_type="node", | ||
indices_offset=indices_offset, | ||
backend_lib=self.backend_lib, | ||
) | ||
|
||
def get_edge_storage(self, feat_name, etype=None): | ||
def get_edge_storage(self, key, etype=None, indices_offset=0): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed to key to match DGL. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
if etype is None: | ||
etypes = self.etypes | ||
if len(self.etypes) > 1: | ||
|
@@ -206,18 +181,19 @@ def get_edge_storage(self, feat_name, etype=None): | |
) | ||
|
||
etype = etypes[0] | ||
if feat_name not in self.edata_feat_col_d: | ||
if key not in self.edata_feat_col_d: | ||
raise ValueError( | ||
f"feat_name {feat_name} not found in CuGraphStore" " edge features", | ||
f"key {key} not found in CuGraphStore" " edge features", | ||
f" {list(self.edata_feat_col_d.keys())}", | ||
) | ||
columns = self.edata_feat_col_d[feat_name] | ||
columns = self.edata_feat_col_d[key] | ||
|
||
return CuFeatureStorage( | ||
pg=self.gdata, | ||
columns=columns, | ||
storage_type="edge", | ||
backend_lib=self.backend_lib, | ||
indices_offset=indices_offset, | ||
) | ||
|
||
def num_nodes(self, ntype=None): | ||
|
@@ -500,25 +476,26 @@ def node_subgraph( | |
return _subg | ||
|
||
def __clear_cached_properties(self): | ||
if hasattr(self, "has_multiple_etypes"): | ||
# Check for cached properties using self.__dict__ because calling hasattr() accesses the attribute and forces computation | ||
if "has_multiple_etypes" in self.__dict__: | ||
del self.has_multiple_etypes | ||
|
||
if hasattr(self, "num_nodes_dict"): | ||
if "num_nodes_dict" in self.__dict__: | ||
del self.num_nodes_dict | ||
|
||
if hasattr(self, "num_edges_dict"): | ||
if "num_edges_dict" in self.__dict__: | ||
del self.num_edges_dict | ||
|
||
if hasattr(self, "extracted_subgraph"): | ||
if "extracted_subgraph" in self.__dict__: | ||
del self.extracted_subgraph | ||
|
||
if hasattr(self, "extracted_reverse_subgraph"): | ||
if "extracted_reverse_subgraph" in self.__dict__: | ||
del self.extracted_reverse_subgraph | ||
|
||
if hasattr(self, "extracted_subgraphs_per_type"): | ||
if "extracted_subgraphs_per_type" in self.__dict__: | ||
del self.extracted_subgraphs_per_type | ||
|
||
if hasattr(self, "extracted_reverse_subgraphs_per_type"): | ||
if "extracted_reverse_subgraphs_per_type" in self.__dict__: | ||
del self.extracted_reverse_subgraphs_per_type | ||
|
||
|
||
|
@@ -529,7 +506,9 @@ class CuFeatureStorage: | |
is fine. DGL simply uses duck-typing to implement its sampling pipeline. | ||
""" | ||
|
||
def __init__(self, pg, columns, storage_type, backend_lib="torch"): | ||
def __init__( | ||
self, pg, columns, storage_type, backend_lib="torch", indices_offset=0 | ||
): | ||
self.pg = pg | ||
self.columns = columns | ||
if backend_lib == "torch": | ||
|
@@ -548,6 +527,7 @@ def __init__(self, pg, columns, storage_type, backend_lib="torch"): | |
self.storage_type = storage_type | ||
|
||
self.from_dlpack = from_dlpack | ||
self.indices_offset = indices_offset | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Setting offset makes upstream code cleaner |
||
|
||
def fetch(self, indices, device=None, pin_memory=False, **kwargs): | ||
"""Fetch the features of the given node/edge IDs to the | ||
|
@@ -575,6 +555,9 @@ def fetch(self, indices, device=None, pin_memory=False, **kwargs): | |
indices = indices.get() | ||
else: | ||
indices = cudf.Series(indices) | ||
|
||
indices = indices + self.indices_offset | ||
|
||
if self.storage_type == "node": | ||
subset_df = self.pg.get_vertex_data( | ||
vertex_ids=indices, columns=self.columns | ||
|
@@ -586,20 +569,17 @@ def fetch(self, indices, device=None, pin_memory=False, **kwargs): | |
|
||
if isinstance(subset_df, dask_cudf.DataFrame): | ||
subset_df = subset_df.compute() | ||
|
||
if len(subset_df) == 0: | ||
raise ValueError(f"{indices=} not found in FeatureStorage") | ||
else: | ||
tensor = self.from_dlpack(subset_df.to_dlpack()) | ||
|
||
if isinstance(tensor, cp.ndarray): | ||
# can not transfer to | ||
# a different device for cupy | ||
return tensor | ||
else: | ||
if device: | ||
cap = subset_df.to_dlpack() | ||
tensor = self.from_dlpack(cap) | ||
del cap | ||
if device: | ||
if not isinstance(tensor, cp.ndarray): | ||
# Cant transfer to different device for cupy | ||
tensor = tensor.to(device) | ||
else: | ||
return tensor | ||
return tensor | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed an error here when a user provides device, cant test this as that needs Pytorch installed here. |
||
|
||
|
||
def return_dlpack_d(d): | ||
|
@@ -721,3 +701,48 @@ def get_subgraph_from_edgelist(edge_list, is_mg, reverse_edges=False): | |
) | ||
|
||
return subgraph | ||
|
||
|
||
def _update_feature_map( | ||
pg_feature_map, feat_name_obj, contains_vector_features, columns | ||
): | ||
if contains_vector_features: | ||
if feat_name_obj is None: | ||
raise ValueError( | ||
"feature name must be provided when wrapping" | ||
+ " multiple columns under a single feature name" | ||
+ " or a feature map" | ||
) | ||
|
||
if isinstance(feat_name_obj, str): | ||
pg_feature_map[feat_name_obj] = columns | ||
|
||
elif isinstance(feat_name_obj, dict): | ||
covered_columns = [] | ||
for col in feat_name_obj.keys(): | ||
current_cols = feat_name_obj[col] | ||
# Handle strings too | ||
if isinstance(current_cols, str): | ||
current_cols = [current_cols] | ||
covered_columns = covered_columns + current_cols | ||
|
||
if set(covered_columns) != set(columns): | ||
raise ValueError( | ||
f"All the columns {columns} not covered in {covered_columns} " | ||
f"Please check the feature_map {feat_name_obj} provided" | ||
) | ||
|
||
for key, cols in feat_name_obj.items(): | ||
if isinstance(cols, str): | ||
cols = [cols] | ||
pg_feature_map[key] = cols | ||
else: | ||
raise ValueError(f"{feat_name_obj} should be str or dict") | ||
else: | ||
if feat_name_obj: | ||
raise ValueError( | ||
f"feat_name {feat_name_obj} is only valid when " | ||
"wrapping multiple columns under feature names" | ||
) | ||
for col in columns: | ||
pg_feature_map[col] = [col] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to this to keep the API cleaner and intuitive as you would want to provide feat_name together with contains_vector_features