Skip to content

Commit

Permalink
Add options to extract_subgraph() to bypass renumbering and adding …
Browse files Browse the repository at this point in the history
…edge_data, exclude internal `_WEIGHT_` column from `edge_property_names`, added `num_vertices_with_properties` attr (#2419)

Add options to `extract_subgraph()` to bypass renumbering and adding edge_data, exclude internal `_WEIGHT_` column from `edge_property_names`.

Also added a new attribute `num_vertices_with_properties` which returns the number of vertices with properties, which is different than the number of vertices, since vertices can be added via `add_edge_data()`.  This is needed for GNN use cases which need to know how many verts have properties which can be accessed (this corresponds to the number of rows in the internal vertex prop data table).

Added unit tests to verify new `extract_subgraph()` options work, the new `num_vertices_with_properties` attribute, and `_WEIGHT_` columns names aren't included, for both SG and MG versions.

closes #2418 
closes #2410

Authors:
  - Rick Ratzel (https://github.com/rlratzel)

Approvers:
  - Alex Barghi (https://github.com/alexbarghi-nv)
  - Erik Welch (https://github.com/eriknw)
  - Brad Rees (https://github.com/BradReesWork)

URL: #2419
  • Loading branch information
rlratzel authored Jul 25, 2022
1 parent 57a1839 commit efc05b3
Show file tree
Hide file tree
Showing 5 changed files with 363 additions and 160 deletions.
110 changes: 75 additions & 35 deletions python/cugraph/cugraph/dask/structure/mg_property_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ def __init__(self, num_workers=None):

# Cached property values
self.__num_vertices = None
self.__num_vertices_with_properties = None

# number of gpu's to use
if num_workers is None:
Expand All @@ -145,6 +146,18 @@ def num_vertices(self):
self.__num_vertices = vert_count.compute()
return self.__num_vertices

@property
def num_vertices_with_properties(self):
if self.__num_vertices_with_properties is not None:
return self.__num_vertices_with_properties

if self.__vertex_prop_dataframe is not None:
self.__num_vertices_with_properties = \
len(self.__vertex_prop_dataframe)
return self.__num_vertices_with_properties

return 0

@property
def num_edges(self):
if self.__edge_prop_dataframe is not None:
Expand All @@ -156,7 +169,8 @@ def num_edges(self):
def edges(self):
if self.__edge_prop_dataframe is not None:
return self.__edge_prop_dataframe[[self.src_col_name,
self.dst_col_name]]
self.dst_col_name,
self.edge_id_col_name]]
return None

@property
Expand All @@ -176,6 +190,8 @@ def edge_property_names(self):
props.remove(self.dst_col_name)
props.remove(self.edge_id_col_name)
props.remove(self.type_col_name) # should "type" be removed?
if self.weight_col_name in props:
props.remove(self.weight_col_name)
return props
return []

Expand Down Expand Up @@ -260,9 +276,10 @@ def add_vertex_data(self,
"found in dataframe: "
f"{list(invalid_columns)}")

# Clear the cached value for num_vertices since more could be added in
# this method.
# Clear the cached values related to the number of vertices since more
# could be added in this method.
self.__num_vertices = None
self.__num_vertices_with_properties = None

# Initialize the __vertex_prop_dataframe if necessary using the same
# type as the incoming dataframe.
Expand Down Expand Up @@ -373,7 +390,7 @@ def add_edge_data(self,
f"{list(invalid_columns)}")

# Clear the cached value for num_vertices since more could be added in
# this method.
# this method. This method cannot affect num_vertices_with_properties
self.__num_vertices = None

default_edge_columns = [self.src_col_name,
Expand Down Expand Up @@ -467,7 +484,9 @@ def extract_subgraph(self,
selection=None,
edge_weight_property=None,
default_edge_weight=None,
allow_multi_edges=False
allow_multi_edges=False,
renumber_graph=True,
add_edge_data=True
):
"""
Return a subgraph of the overall PropertyGraph containing vertices
Expand Down Expand Up @@ -495,7 +514,13 @@ def extract_subgraph(self,
allow_multi_edges : bool
If True, multiple edges should be used to create the return Graph,
otherwise multiple edges will be detected and an exception raised.
renumber_graph : bool (default is True)
If True, return a Graph that has been renumbered for use by graph
algorithms. If False, the returned graph will need to be manually
renumbered prior to calling graph algos.
add_edge_data : bool (default is True)
If True, add meta data about the edges contained in the extracted
graph which are required for future calls to annotate_dataframe().
Returns
-------
A Graph instance of the same type as create_using containing only the
Expand Down Expand Up @@ -556,7 +581,9 @@ def extract_subgraph(self,
create_using=create_using,
edge_weight_property=edge_weight_property,
default_edge_weight=default_edge_weight,
allow_multi_edges=allow_multi_edges)
allow_multi_edges=allow_multi_edges,
renumber_graph=renumber_graph,
add_edge_data=add_edge_data)

def annotate_dataframe(self, df, G, edge_vertex_col_names):
raise NotImplementedError()
Expand All @@ -566,7 +593,9 @@ def edge_props_to_graph(self,
create_using,
edge_weight_property=None,
default_edge_weight=None,
allow_multi_edges=False):
allow_multi_edges=False,
renumber_graph=True,
add_edge_data=True):
"""
Create and return a Graph from the edges in edge_prop_df.
"""
Expand Down Expand Up @@ -594,10 +623,8 @@ def edge_props_to_graph(self,
# If a default_edge_weight was specified but an edge_weight_property
# was not, a new edge weight column must be added.
elif default_edge_weight:
edge_attr = self.__gen_unique_name(edge_prop_df.columns,
prefix=self.weight_col_name)
edge_attr = self.weight_col_name
edge_prop_df[edge_attr] = default_edge_weight

else:
edge_attr = None

Expand Down Expand Up @@ -630,18 +657,43 @@ def edge_props_to_graph(self,
msg = "default Graph graph type"
raise RuntimeError("query resulted in duplicate edges which "
f"cannot be represented with the {msg}")
G.from_dask_cudf_edgelist(
edge_prop_df,
source=self.src_col_name,
destination=self.dst_col_name,
edge_attr=edge_attr, renumber=True)
# Set the edge_data on the resulting Graph to a DataFrame containing
# the edges and the edge ID for each. Edge IDs are needed for future
# calls to annotate_dataframe() in order to associate edges with their
# properties, since the PG can contain multiple edges between vertrices
# with different properties.
G.edge_data = self.__create_property_lookup_table(edge_prop_df)
# FIXME: also add vertex_data

# FIXME: MNMG Graphs required renumber to be True due to requirements
# on legacy code that needed segment offsets, partition offsets,
# etc. which were previously computed during the "legacy" C
# renumbering. The workaround is to pass renumber=True, then manually
# call G.compute_renumber_edge_list(legacy_renum_only=True) to compute
# the required meta-data without changing vertex IDs.
if renumber_graph is False:
renumber = True
else:
renumber = renumber_graph

col_names = [self.src_col_name, self.dst_col_name]
if edge_attr is not None:
col_names.append(edge_attr)

G.from_dask_cudf_edgelist(edge_prop_df[col_names],
source=self.src_col_name,
destination=self.dst_col_name,
edge_attr=edge_attr,
renumber=renumber)
# FIXME: see FIXME above - to generate the edgelist,
# compute_renumber_edge_list() must be called, but legacy mode needs to
# be used based on if renumbering was to be done or not.
if renumber_graph is False:
G.compute_renumber_edge_list(legacy_renum_only=True)
else:
G.compute_renumber_edge_list(legacy_renum_only=False)

if add_edge_data:
# Set the edge_data on the resulting Graph to a DataFrame
# containing the edges and the edge ID for each. Edge IDs are
# needed for future calls to annotate_dataframe() in order to
# associate edges with their properties, since the PG can contain
# multiple edges between vertrices with different properties.
# FIXME: also add vertex_data
G.edge_data = self.__create_property_lookup_table(edge_prop_df)

return G

Expand Down Expand Up @@ -684,18 +736,6 @@ def __get_all_vertices_series(self):
vert_sers.append(epd[self.dst_col_name])
return vert_sers

@staticmethod
def __gen_unique_name(current_names, prefix="col"):
"""
Helper function to generate a currently unused name.
"""
name = prefix
counter = 2
while name in current_names:
name = f"{prefix}{counter}"
counter += 1
return name

@staticmethod
def __get_new_column_dtypes(from_df, to_df):
"""
Expand Down
74 changes: 45 additions & 29 deletions python/cugraph/cugraph/structure/property_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ def __init__(self):

# Cached property values
self.__num_vertices = None
self.__num_vertices_with_properties = None

# PropertyGraph read-only attributes
@property
Expand All @@ -152,6 +153,18 @@ def num_vertices(self):

return self.__num_vertices

@property
def num_vertices_with_properties(self):
if self.__num_vertices_with_properties is not None:
return self.__num_vertices_with_properties

if self.__vertex_prop_dataframe is not None:
self.__num_vertices_with_properties = \
len(self.__vertex_prop_dataframe)
return self.__num_vertices_with_properties

return 0

@property
def num_edges(self):
if self.__edge_prop_dataframe is not None:
Expand Down Expand Up @@ -183,6 +196,8 @@ def edge_property_names(self):
props.remove(self.dst_col_name)
props.remove(self.edge_id_col_name)
props.remove(self.type_col_name) # should "type" be removed?
if self.weight_col_name in props:
props.remove(self.weight_col_name)
return props
return []

Expand Down Expand Up @@ -278,9 +293,10 @@ def add_vertex_data(self,
"the PropertyGraph was already initialized "
f"using type {self.__dataframe_type}")

# Clear the cached value for num_vertices since more could be added in
# this method.
# Clear the cached values related to the number of vertices since more
# could be added in this method.
self.__num_vertices = None
self.__num_vertices_with_properties = None

# Initialize the __vertex_prop_dataframe if necessary using the same
# type as the incoming dataframe.
Expand Down Expand Up @@ -400,7 +416,7 @@ def add_edge_data(self,
f"using type {self.__dataframe_type}")

# Clear the cached value for num_vertices since more could be added in
# this method.
# this method. This method cannot affect num_vertices_with_properties
self.__num_vertices = None

default_edge_columns = [self.src_col_name,
Expand Down Expand Up @@ -551,7 +567,9 @@ def extract_subgraph(self,
selection=None,
edge_weight_property=None,
default_edge_weight=None,
allow_multi_edges=False
allow_multi_edges=False,
renumber_graph=True,
add_edge_data=True
):
"""
Return a subgraph of the overall PropertyGraph containing vertices
Expand Down Expand Up @@ -579,6 +597,13 @@ def extract_subgraph(self,
allow_multi_edges : bool
If True, multiple edges should be used to create the return Graph,
otherwise multiple edges will be detected and an exception raised.
renumber_graph : bool (default is True)
If True, return a Graph that has been renumbered for use by graph
algorithms. If False, the returned graph will need to be manually
renumbered prior to calling graph algos.
add_edge_data : bool (default is True)
If True, add meta data about the edges contained in the extracted
graph which are required for future calls to annotate_dataframe().
Returns
-------
Expand Down Expand Up @@ -641,7 +666,9 @@ def extract_subgraph(self,
create_using=create_using,
edge_weight_property=edge_weight_property,
default_edge_weight=default_edge_weight,
allow_multi_edges=allow_multi_edges)
allow_multi_edges=allow_multi_edges,
renumber_graph=renumber_graph,
add_edge_data=add_edge_data)

def annotate_dataframe(self, df, G, edge_vertex_col_names):
"""
Expand Down Expand Up @@ -713,7 +740,9 @@ def edge_props_to_graph(self,
create_using,
edge_weight_property=None,
default_edge_weight=None,
allow_multi_edges=False):
allow_multi_edges=False,
renumber_graph=True,
add_edge_data=True):
"""
Create and return a Graph from the edges in edge_prop_df.
"""
Expand Down Expand Up @@ -742,10 +771,8 @@ def edge_props_to_graph(self,
# If a default_edge_weight was specified but an edge_weight_property
# was not, a new edge weight column must be added.
elif default_edge_weight:
edge_attr = self.__gen_unique_name(edge_prop_df.columns,
prefix=self.weight_col_name)
edge_attr = self.weight_col_name
edge_prop_df[edge_attr] = default_edge_weight

else:
edge_attr = None

Expand Down Expand Up @@ -782,20 +809,21 @@ def edge_props_to_graph(self,
create_args = {"source": self.src_col_name,
"destination": self.dst_col_name,
"edge_attr": edge_attr,
"renumber": True,
"renumber": renumber_graph,
}
if type(edge_prop_df) is cudf.DataFrame:
G.from_cudf_edgelist(edge_prop_df, **create_args)
else:
G.from_pandas_edgelist(edge_prop_df, **create_args)

# Set the edge_data on the resulting Graph to a DataFrame containing
# the edges and the edge ID for each. Edge IDs are needed for future
# calls to annotate_dataframe() in order to associate edges with their
# properties, since the PG can contain multiple edges between vertrices
# with different properties.
G.edge_data = self.__create_property_lookup_table(edge_prop_df)
# FIXME: also add vertex_data
if add_edge_data:
# Set the edge_data on the resulting Graph to a DataFrame
# containing the edges and the edge ID for each. Edge IDs are
# needed for future calls to annotate_dataframe() in order to
# associate edges with their properties, since the PG can contain
# multiple edges between vertrices with different properties.
# FIXME: also add vertex_data
G.edge_data = self.__create_property_lookup_table(edge_prop_df)

return G

Expand Down Expand Up @@ -862,18 +890,6 @@ def __get_all_vertices_series(self):
vert_sers.append(epd[self.dst_col_name])
return vert_sers

@staticmethod
def __gen_unique_name(current_names, prefix="col"):
"""
Helper function to generate a currently unused name.
"""
name = prefix
counter = 2
while name in current_names:
name = f"{prefix}{counter}"
counter += 1
return name

@staticmethod
def __get_new_column_dtypes(from_df, to_df):
"""
Expand Down
5 changes: 1 addition & 4 deletions python/cugraph/cugraph/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,7 @@ def dask_client():
yield client

Comms.destroy()
# Shut down the connected scheduler and workers
# therefore we will no longer rely on killing the dask cluster ID
# for MNMG runs
client.shutdown()
client.close()
if cluster:
cluster.close()
print("\ndask_client fixture: client.close() called")
Loading

0 comments on commit efc05b3

Please sign in to comment.