Skip to content

Commit

Permalink
Optimize Sampling for graph_store (#2283)
Browse files Browse the repository at this point in the history
This PR optimizes the sampling function for `graph_store`  by 3x+  by getting rid of the host side code and doing the sampling end to end on GPUs. 

More **importantly** this code makes sure that the actual sampling in `batched_ego_graphs`  is the bottleneck , previously we only spent `32%`  in the core sampling code while now we spend `98.5%`  of the time there. 

See Below Benchmarks : 

### Before PR
```python
Timer unit: 1e-06 s

Total time: 17.3772 s
File: /home/nfs/vjawa/dgl/cugraph/python/cugraph/cugraph/gnn/graph_store.py
Function: sample_neighbors at line 73

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
    73                                               def sample_neighbors(self,
    74                                                                    nodes,
    75                                                                    fanout=-1,
    76                                                                    edge_dir='in',
    77                                                                    prob=None,
    78                                                                    replace=False):
    79                                                   """
    80                                                   Sample neighboring edges of the given nodes and return the subgraph.
    81                                           
    82                                                   Parameters
    83                                                   ----------
    84                                                   nodes : array (single dimension)
    85                                                       Node IDs to sample neighbors from.
    86                                                   fanout : int
    87                                                       The number of edges to be sampled for each node on each edge type.
    88                                                   edge_dir : str {"in" or "out"}
    89                                                       Determines whether to sample inbound or outbound edges.
    90                                                       Can take either in for inbound edges or out for outbound edges.
    91                                                   prob : str
    92                                                       Feature name used as the (unnormalized) probabilities associated
    93                                                       with each neighboring edge of a node. Each feature must be a
    94                                                       scalar. The features must be non-negative floats, and the sum of
    95                                                       the features of inbound/outbound edges for every node must be
    96                                                       positive (though they don't have to sum up to one). Otherwise,
    97                                                       the result will be undefined. If not specified, sample uniformly.
    98                                                   replace : bool
    99                                                       If True, sample with replacement.
   100                                           
   101                                                   Returns
   102                                                   -------
   103                                                   CuPy array
   104                                                       The sampled arrays for bipartite graph.
   105                                                   """
   106         1         18.0     18.0      0.0          num_nodes = len(nodes)
   107         1       7833.0   7833.0      0.0          current_seeds = nodes.reindex(index=np.arange(0, num_nodes))
   108         2     129790.0  64895.0      0.7          _g = self.__G.extract_subgraph(create_using=cugraph.Graph,
   109         1          1.0      1.0      0.0                                         allow_multi_edges=True)
   110         2    5467307.0 2733653.5     31.5          ego_edge_list, seeds_offsets = batched_ego_graphs(_g,
   111         1          1.0      1.0      0.0                                                            current_seeds,
   112         1          0.0      0.0      0.0                                                            radius=1)
   113         1        123.0    123.0      0.0          all_parents = cupy.ndarray(0)
   114         1         12.0     12.0      0.0          all_children = cupy.ndarray(0)
   115                                                   # filter and get a certain size neighborhood
   116      1001       1143.0      1.1      0.0          for i in range(1, len(seeds_offsets)):
   117      1000     262330.0    262.3      1.5              pos0 = seeds_offsets.values_host[i-1]
   118      1000     211487.0    211.5      1.2              pos1 = seeds_offsets.values_host[i]
   119      1000     335515.0    335.5      1.9              edge_list = ego_edge_list[pos0:pos1]
   120                                                       # get randomness fanout
   121      1000    6202089.0   6202.1     35.7              filtered_list = edge_list[edge_list['dst'] == current_seeds[i-1]]
   122                                           
   123                                                       # get sampled_list
   124      1000      14097.0     14.1      0.1              if len(filtered_list) > fanout:
   125      1654      19502.0     11.8      0.1                  sampled_indices = random.sample(
   126       827     192781.0    233.1      1.1                          filtered_list.index.to_arrow().to_pylist(), fanout)
   127       827    4080293.0   4933.8     23.5                  filtered_list = filtered_list.reindex(index=sampled_indices)
   128                                           
   129      1000     146293.0    146.3      0.8              children = cupy.asarray(filtered_list['src'])
   130      1000     126122.0    126.1      0.7              parents = cupy.asarray(filtered_list['dst'])
   131      1000     105440.0    105.4      0.6              all_parents = cupy.append(all_parents, parents)
   132      1000      74987.0     75.0      0.4              all_children = cupy.append(all_children, children)
   133         1          1.0      1.0      0.0          return all_parents, all_children
   ````
   
   
   ### After PR: 
   
   
   ```python
   Timer unit: 1e-06 s

Total time: 5.73069 s
File: /datasets/vjawa/miniconda3/envs/cugraph_dev/lib/python3.8/site-packages/cugraph-22.6.0a0+86.gd9ec8f718.dirty-py3.8-linux-x86_64.egg/cugraph/gnn/graph_store.py
Function: sample_neighbors at line 73

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
    73                                               def sample_neighbors(self,
    74                                                                    nodes,
    75                                                                    fanout=-1,
    76                                                                    edge_dir='in',
    77                                                                    prob=None,
    78                                                                    replace=False):
    79                                                   """
    80                                                   Sample neighboring edges of the given nodes and return the subgraph.
    81                                           
    82                                                   Parameters
    83                                                   ----------
    84                                                   nodes : array (single dimension)
    85                                                       Node IDs to sample neighbors from.
    86                                                   fanout : int
    87                                                       The number of edges to be sampled for each node on each edge type.
    88                                                   edge_dir : str {"in" or "out"}
    89                                                       Determines whether to sample inbound or outbound edges.
    90                                                       Can take either in for inbound edges or out for outbound edges.
    91                                                   prob : str
    92                                                       Feature name used as the (unnormalized) probabilities associated
    93                                                       with each neighboring edge of a node. Each feature must be a
    94                                                       scalar. The features must be non-negative floats, and the sum of
    95                                                       the features of inbound/outbound edges for every node must be
    96                                                       positive (though they don't have to sum up to one). Otherwise,
    97                                                       the result will be undefined. If not specified, sample uniformly.
    98                                                   replace : bool
    99                                                       If True, sample with replacement.
   100                                           
   101                                                   Returns
   102                                                   -------
   103                                                   CuPy array
   104                                                       The sampled arrays for bipartite graph.
   105                                                   """
   106         1         20.0     20.0      0.0          num_nodes = len(nodes)
   107         1       7681.0   7681.0      0.1          current_seeds = nodes.reindex(index=np.arange(0, num_nodes))
   108         2     143943.0  71971.5      2.5          _g = self.__G.extract_subgraph(create_using=cugraph.Graph,
   109         1          0.0      0.0      0.0                                         allow_multi_edges=True)
   110         2    5500286.0 2750143.0     96.0          ego_edge_list, seeds_offsets = batched_ego_graphs(_g,
   111         1          1.0      1.0      0.0                                                            current_seeds,
   112         1          0.0      0.0      0.0                                                            radius=1)
   113                                                   # filter and get a certain size neighborhood
   114                                           
   115                                                   # Step 1
   116                                                   # Get Filtered List of ego_edge_list corresposing to current_seeds
   117                                                   # We filter by creating a series of destination nodes
   118                                                   # corresponding to the offsets and filtering non matching vallues
   119                                           
   120         1        719.0    719.0      0.0          seeds_offsets_s = cudf.Series(seeds_offsets).values
   121         1        174.0    174.0      0.0          offset_lens = seeds_offsets_s[1:] - seeds_offsets_s[0:-1]
   122         1       4042.0   4042.0      0.1          dst_seeds = current_seeds.repeat(offset_lens)
   123         1        637.0    637.0      0.0          dst_seeds.index = ego_edge_list.index
   124         1       5196.0   5196.0      0.1          filtered_list = ego_edge_list[ego_edge_list["dst"] == dst_seeds]
   125                                           
   126                                                   # Step 2
   127                                                   # Sample Fan Out
   128                                                   # for each dst take maximum of fanout samples
   129         2      67247.0  33623.5      1.2          filtered_list = sample_groups(filtered_list,
   130         1          1.0      1.0      0.0                                        by="dst",
   131         1          1.0      1.0      0.0                                        n_samples=fanout)
   132                                           
   133         1        744.0    744.0      0.0          return filtered_list['src'].values,  filtered_list['dst'].values
```

## Todo:
- [ ] Add Unit Tests

Authors:
  - Vibhu Jawa (https://github.com/VibhuJawa)

Approvers:
  - Brad Rees (https://github.com/BradReesWork)
  - Xiaoyun Wang (https://github.com/wangxiaoyunNV)

URL: #2283
  • Loading branch information
VibhuJawa authored May 19, 2022
1 parent 77c776a commit 3dcc4b8
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 22 deletions.
44 changes: 22 additions & 22 deletions python/cugraph/cugraph/gnn/graph_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
import cugraph
from cugraph.experimental import PropertyGraph
from cugraph.community.egonet import batched_ego_graphs
import cupy
import random
from cugraph.utilities.utils import sample_groups

import numpy as np


Expand Down Expand Up @@ -110,27 +110,27 @@ def sample_neighbors(self,
ego_edge_list, seeds_offsets = batched_ego_graphs(_g,
current_seeds,
radius=1)
all_parents = cupy.ndarray(0)
all_children = cupy.ndarray(0)
# filter and get a certain size neighborhood
for i in range(1, len(seeds_offsets)):
pos0 = seeds_offsets.values_host[i-1]
pos1 = seeds_offsets.values_host[i]
edge_list = ego_edge_list[pos0:pos1]
# get randomness fanout
filtered_list = edge_list[edge_list['dst'] == current_seeds[i-1]]

# get sampled_list
if len(filtered_list) > fanout:
sampled_indices = random.sample(
filtered_list.index.to_arrow().to_pylist(), fanout)
filtered_list = filtered_list.reindex(index=sampled_indices)

children = cupy.asarray(filtered_list['src'])
parents = cupy.asarray(filtered_list['dst'])
all_parents = cupy.append(all_parents, parents)
all_children = cupy.append(all_children, children)
return all_parents, all_children

# Step 1
# Get Filtered List of ego_edge_list corresposing to current_seeds
# We filter by creating a series of destination nodes
# corresponding to the offsets and filtering non matching vallues

seeds_offsets_s = cudf.Series(seeds_offsets).values
offset_lens = seeds_offsets_s[1:] - seeds_offsets_s[0:-1]
dst_seeds = current_seeds.repeat(offset_lens)
dst_seeds.index = ego_edge_list.index
filtered_list = ego_edge_list[ego_edge_list["dst"] == dst_seeds]

# Step 2
# Sample Fan Out
# for each dst take maximum of fanout samples
filtered_list = sample_groups(filtered_list,
by="dst",
n_samples=fanout)

return filtered_list['dst'].values, filtered_list['src'].values

def node_subgraph(self,
nodes=None,
Expand Down
23 changes: 23 additions & 0 deletions python/cugraph/cugraph/utilities/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -476,3 +476,26 @@ def create_random_bipartite(v1, v2, size, dtype):
renumber=False)

return df1['src'], g, a


def sample_groups(df, by, n_samples):
# Sample n_samples in the df frm by column

# Step 1
# first, shuffle the dataframe and reset its index,
# so that the ordering of values within each group
# is made random:
df = df.sample(frac=1).reset_index(drop=True)

# Step 2
# add an integer-encoded version of the "by" column,
# since the rank aggregation seems not to work for
# non-numeric data
df["_"] = df[by].astype("category").cat.codes

# Step 3
# now do a "rank" aggregation and filter out only
# the first N_SAMPLES ranks.
result = df.loc[df.groupby(by)["_"].rank("first") <= n_samples, :]
del result["_"]
return result

0 comments on commit 3dcc4b8

Please sign in to comment.