Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMP] Sample with Offsets in the Bulk Sampler #3524

Merged
merged 64 commits into from
May 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
f462ec9
select edge props
alexbarghi-nv Apr 17, 2023
5bab68b
remove unwanted files
alexbarghi-nv Apr 17, 2023
ed462b7
fix style
alexbarghi-nv Apr 17, 2023
8a3bd1e
Merge branch 'branch-23.06' of https://github.com/rapidsai/cugraph in…
alexbarghi-nv Apr 19, 2023
f13f2b6
updates to store
alexbarghi-nv Apr 24, 2023
695f493
Merge branch 'branch-23.06' of https://github.com/rapidsai/cugraph in…
alexbarghi-nv Apr 24, 2023
8fe9acb
throw exception for unweighted sssp
alexbarghi-nv Apr 24, 2023
4012f4f
Merge branch 'branch-23.06' into select-edge-props
alexbarghi-nv Apr 24, 2023
7a29d57
add tests for unweighted graphs in C API, update C API implementation…
ChuckHastings Apr 25, 2023
b1f9fa7
Merge branch 'branch-23.06' into capi_handle_default_weights
ChuckHastings Apr 25, 2023
1e8323a
Merge branch 'capi_handle_default_weights' of https://github.com/chuc…
alexbarghi-nv Apr 26, 2023
a44483e
refactor to use new fill_edge_property
ChuckHastings Apr 26, 2023
442e1ce
Rename graph_helper.cu
ChuckHastings Apr 26, 2023
7e347de
need to sort after shuffling
ChuckHastings Apr 26, 2023
8232c13
Merge branch 'branch-23.06' into alex_uns_bug
ChuckHastings Apr 26, 2023
b27b544
Merge branch 'branch-23.06' into alex_uns_bug
ChuckHastings Apr 27, 2023
d2ba15f
Merge branch 'branch-23.06' into capi_handle_default_weights
ChuckHastings Apr 27, 2023
9e451ca
Merge branch 'capi_handle_default_weights' of https://github.com/chuc…
alexbarghi-nv Apr 27, 2023
d34af46
pull in chuck's changes, update sssp tests
alexbarghi-nv Apr 27, 2023
e7bfba4
style
alexbarghi-nv Apr 27, 2023
6b4bf78
Merge branch 'alex_uns_bug' of https://github.com/chuckhastings/cugra…
alexbarghi-nv Apr 27, 2023
90eb5cc
remove unused header. Modify MG egonet test to work properly
ChuckHastings Apr 28, 2023
01d46f7
update cugraph-pyg and cugraph-dgl
alexbarghi-nv Apr 28, 2023
9a30a7e
fix style and copyright
alexbarghi-nv May 1, 2023
c5eb9aa
Merge branch 'branch-23.06' into sampling-with-offsets
alexbarghi-nv May 1, 2023
5ef7b1c
Merge branch 'branch-23.06' into sampling-with-offsets
alexbarghi-nv May 2, 2023
7ed1861
Merge branch 'branch-23.06' into select-edge-props
alexbarghi-nv May 2, 2023
5a0124a
Merge branch 'capi_handle_default_weights' of https://github.com/chuc…
alexbarghi-nv May 2, 2023
cc305dd
Merge branch 'select-edge-props' into sampling-with-offsets
alexbarghi-nv May 2, 2023
4f732a8
fix pylibcugraph empty weights issue, update tests
alexbarghi-nv May 2, 2023
6fc0e31
Merge branch 'branch-23.06' into select-edge-props
alexbarghi-nv May 5, 2023
918bdbe
style,copyright
alexbarghi-nv May 5, 2023
b131cc3
Merge branch 'select-edge-props' into sampling-with-offsets
alexbarghi-nv May 5, 2023
4c30882
style
alexbarghi-nv May 5, 2023
aa8cca5
style
alexbarghi-nv May 5, 2023
e8b05a5
iterator, print statement fix
alexbarghi-nv May 8, 2023
99e2491
remove egonet prints
alexbarghi-nv May 8, 2023
11a1a62
fix
alexbarghi-nv May 8, 2023
3f23d4d
style fix
alexbarghi-nv May 8, 2023
911517c
remove print
alexbarghi-nv May 8, 2023
c1ddad8
reformat
alexbarghi-nv May 8, 2023
b2d5af2
Merge branch 'select-edge-props' into sampling-with-offsets
alexbarghi-nv May 8, 2023
d54cd6a
update docstrings
alexbarghi-nv May 8, 2023
bb35c6c
Merge branch 'select-edge-props' into sampling-with-offsets
alexbarghi-nv May 8, 2023
09bd0fa
Merge branch 'branch-23.06' into sampling-with-offsets
alexbarghi-nv May 8, 2023
5f52c08
correct error message
alexbarghi-nv May 8, 2023
4cdb783
correct error message
alexbarghi-nv May 8, 2023
df7f0ab
update graph creation with warning when edge id type doesn't match
alexbarghi-nv May 8, 2023
66cebf4
Merge branch 'select-edge-props' of https://github.com/alexbarghi-nv/…
alexbarghi-nv May 8, 2023
fef1184
Merge branch 'select-edge-props' into sampling-with-offsets
alexbarghi-nv May 8, 2023
155536f
fix style
alexbarghi-nv May 9, 2023
1bb9271
fix style
alexbarghi-nv May 9, 2023
67b57e8
Merge branch 'branch-23.06' into select-edge-props
alexbarghi-nv May 9, 2023
ad051c1
remove rank option
alexbarghi-nv May 9, 2023
d34190d
remove explicit gtest, gmock dependencies
alexbarghi-nv May 9, 2023
70632dc
Merge branch 'select-edge-props' of https://github.com/alexbarghi-nv/…
alexbarghi-nv May 9, 2023
bc4b2a1
generate
alexbarghi-nv May 9, 2023
a2bd389
update gtest to 1.13
alexbarghi-nv May 9, 2023
b9202dd
generate
alexbarghi-nv May 9, 2023
abbe83e
fix dependencies
alexbarghi-nv May 9, 2023
4b89269
generate
alexbarghi-nv May 9, 2023
3a0958f
Merge branch 'select-edge-props' into sampling-with-offsets
alexbarghi-nv May 9, 2023
56b43d1
Merge branch 'branch-23.06' into sampling-with-offsets
BradReesWork May 9, 2023
438cfeb
Merge branch 'branch-23.06' into sampling-with-offsets
alexbarghi-nv May 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions python/cugraph-dgl/cugraph_dgl/dataloading/dataloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,14 +209,12 @@ def __iter__(self):
output_dir = os.path.join(
self._sampling_output_dir, "epoch_" + str(self.epoch_number)
)
rank = self._rank
bs = BulkSampler(
output_path=output_dir,
batch_size=self._batch_size,
graph=self._cugraph_graph,
batches_per_partition=self._batches_per_partition,
seeds_per_call=self._seeds_per_call,
rank=rank,
fanout_vals=self.graph_sampler._reversed_fanout_vals,
with_replacement=self.graph_sampler.replace,
)
Expand All @@ -226,7 +224,6 @@ def __iter__(self):
batch_df = create_batch_df(self.tensorized_indices_ds)
bs.add_batches(batch_df, start_col_name="start", batch_col_name="batch_id")
bs.flush()
output_dir = output_dir + f"/rank={rank}/"
self.cugraph_dgl_dataset.set_input_files(input_directory=output_dir)
self.epoch_number = self.epoch_number + 1
return super().__iter__()
Expand Down
6 changes: 2 additions & 4 deletions python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,6 @@ def __construct_graph(
{
"src": pandas.Series(na_src),
"dst": pandas.Series(na_dst),
"w": pandas.Series(np.zeros(len(na_src))),
"eid": pandas.Series(np.arange(len(na_src))),
"etp": pandas.Series(na_etp),
}
)
Expand All @@ -441,15 +439,15 @@ def __construct_graph(
df,
source="src",
destination="dst",
edge_attr=["w", "eid", "etp"],
edge_type="etp",
)
distributed.get_client().publish_dataset(cugraph_graph=graph)
else:
graph.from_cudf_edgelist(
df,
source="src",
destination="dst",
edge_attr=["w", "eid", "etp"],
edge_type="etp",
)

return graph
Expand Down
46 changes: 24 additions & 22 deletions python/cugraph-pyg/cugraph_pyg/loader/cugraph_node_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import tempfile

import os
import re

import cupy
import cudf
Expand All @@ -31,6 +32,9 @@


class EXPERIMENTAL__BulkSampleLoader:

__ex_parquet_file = re.compile(r"batch=([0-9]+)\-([0-9]+)\.parquet")

def __init__(
self,
feature_store: CuGraphStore,
Expand All @@ -40,7 +44,6 @@ def __init__(
shuffle=False,
edge_types: Sequence[Tuple[str]] = None,
directory=None,
rank=0,
starting_batch_id=0,
batches_per_partition=100,
# Sampler args
Expand Down Expand Up @@ -84,10 +87,6 @@ def __init__(
The path of the directory to write samples to.
Defaults to a new generated temporary directory.

rank: int (optional, default=0)
The rank of the current worker. Should be provided
when there are multiple workers.

starting_batch_id: int (optional, default=0)
The starting id for each batch. Defaults to 0.
Generally used when loading previously-sampled
Expand All @@ -102,16 +101,16 @@ def __init__(

self.__feature_store = feature_store
self.__graph_store = graph_store
self.__rank = rank
self.__next_batch = starting_batch_id
self.__end_exclusive = starting_batch_id
self.__next_batch = -1
self.__end_exclusive = -1
self.__batches_per_partition = batches_per_partition
self.__starting_batch_id = starting_batch_id

if isinstance(all_indices, int):
# Will be loading from disk
self.__num_batches = all_indices
self.__directory = directory
iter(os.listdir(self.__directory))
return

if batch_size is None or batch_size < 1:
Expand All @@ -123,7 +122,6 @@ def __init__(
batch_size,
self.__directory.name,
self.__graph_store._subgraph(edge_types),
rank=rank,
fanout_vals=num_neighbors,
with_replacement=replace,
batches_per_partition=self.__batches_per_partition,
Expand Down Expand Up @@ -161,33 +159,36 @@ def __init__(
)

bulk_sampler.flush()
self.__input_files = iter(os.listdir(self.__directory.name))

def __next__(self):
# Quit iterating if there are no batches left
if self.__next_batch >= self.__num_batches + self.__starting_batch_id:
raise StopIteration

# Load the next set of sampling results if necessary
if self.__next_batch >= self.__end_exclusive:
if self.__directory is None:
raise StopIteration

# Read the next parquet file into memory
dir_path = (
self.__directory
if isinstance(self.__directory, str)
else self.__directory.name
)
rank_path = os.path.join(dir_path, f"rank={self.__rank}")

file_end_batch_incl = min(
self.__end_exclusive + self.__batches_per_partition - 1,
self.__starting_batch_id + self.__num_batches - 1,
)
# Will raise StopIteration if there are no files left
fname = next(self.__input_files)

m = self.__ex_parquet_file.match(fname)
if m is None:
raise ValueError(f"Invalid parquet filename {fname}")

self.__next_batch, end_inclusive = [int(g) for g in m.groups()]
self.__end_exclusive = end_inclusive + 1

parquet_path = os.path.join(
rank_path,
f"batch={self.__end_exclusive}" f"-{file_end_batch_incl}.parquet",
dir_path,
fname,
)

self.__end_exclusive += self.__batches_per_partition

columns = {
"sources": "int64",
"destinations": "int64",
Expand All @@ -212,6 +213,7 @@ def __next__(self):
if self.__next_batch >= self.__num_batches + self.__starting_batch_id:
# Won't delete a non-temp dir (since it would just be deleting a string)
del self.__directory
self.__directory = None

# Get and return the sampled subgraph
if isinstance(torch_geometric, MissingModule):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,13 @@ def convert_to_cudf(cp_arrays, weight_t, with_edge_properties, return_offsets=Fa
df[edge_type_n] = edge_types
df[hop_id_n] = hop_ids

print(
f"sources: {sources}\n"
f"destinations: {destinations}\n"
f"batch: {batch_ids}\n"
f"offset: {offsets}\n"
)

if return_offsets:
offsets_df = cudf.DataFrame(
{
Expand Down Expand Up @@ -297,6 +304,7 @@ def uniform_neighbor_sample(
List of output GPUs (by rank) corresponding to batch
id labels in the label list. Used to assign each batch
id to a GPU.
Must be in ascending order (i.e. [0, 0, 1, 2]).

random_state: int, optional
Random seed to use when making sampling calls.
Expand Down
76 changes: 33 additions & 43 deletions python/cugraph/cugraph/gnn/data_loading/bulk_sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@

from typing import Union

import cupy
import cudf
import dask_cudf
import cugraph.dask as dask_cugraph

import cugraph
import pylibcugraph

from cugraph.gnn.data_loading.bulk_sampler_io import write_samples


class EXPERIMENTAL__BulkSampler:
start_col_name = "_START_"
Expand All @@ -32,7 +37,6 @@ def __init__(
graph,
seeds_per_call: int = 200_000,
batches_per_partition=100,
rank: int = 0,
**kwargs,
):
"""
Expand All @@ -51,9 +55,6 @@ def __init__(
a single sampling call.
batches_per_partition: int (optional, default=100)
The number of batches outputted to a single parquet partition.
rank: int (optional, default=0)
The rank of this sampler. Used to isolate this sampler from
others that may be running on other nodes.
kwargs: kwargs
Keyword arguments to be passed to the sampler (i.e. fanout).
"""
Expand All @@ -75,14 +76,9 @@ def __init__(
self.__graph = graph
self.__seeds_per_call = seeds_per_call
self.__batches_per_partition = batches_per_partition
self.__rank = rank
self.__batches = None
self.__sample_call_args = kwargs

@property
def rank(self) -> int:
return self.__rank

@property
def seeds_per_call(self) -> int:
return self.__seeds_per_call
Expand Down Expand Up @@ -195,55 +191,49 @@ def flush(self) -> None:
sample_fn = cugraph.uniform_neighbor_sample
else:
sample_fn = cugraph.dask.uniform_neighbor_sample
self.__sample_call_args["_multiple_clients"] = True
self.__sample_call_args.update(
{
"_multiple_clients": True,
"label_to_output_comm_rank": self.__get_label_to_output_comm_rank(
min_batch_id, max_batch_id
),
"label_list": cupy.arange(
min_batch_id, max_batch_id + 1, dtype="int32"
),
}
)

samples = sample_fn(
samples, offsets = sample_fn(
self.__graph,
**self.__sample_call_args,
start_list=self.__batches[self.start_col_name][batch_id_filter],
batch_id_list=self.__batches[self.batch_col_name][batch_id_filter],
with_edge_properties=True,
return_offsets=True,
)

self.__batches = self.__batches[~batch_id_filter]
self.__write(samples, min_batch_id, npartitions)
self.__write(samples, offsets)

if self.size > 0:
self.flush()

def __write(
self,
samples: Union[cudf.DataFrame, dask_cudf.DataFrame],
min_batch_id: int,
npartitions: int,
offsets: Union[cudf.DataFrame, dask_cudf.DataFrame],
) -> None:
# Ensure each rank writes to its own partition so there is no conflict
outer_partition = f"rank={self.__rank}"
outer_partition_path = os.path.join(self.__output_path, outer_partition)
os.makedirs(outer_partition_path, exist_ok=True)

for partition_k in range(npartitions):
ix_partition_start_inclusive = (
min_batch_id + partition_k * self.batches_per_partition
)
ix_partition_end_inclusive = (
min_batch_id + (partition_k + 1) * self.batches_per_partition - 1
)
f = (samples.batch_id >= ix_partition_start_inclusive) & (
samples.batch_id <= ix_partition_end_inclusive
)
if len(samples[f]) == 0:
break

ix_partition_end_inclusive = samples[f].batch_id.max()
if hasattr(ix_partition_end_inclusive, "compute"):
ix_partition_end_inclusive = ix_partition_end_inclusive.compute()
ix_partition_end_inclusive = int(ix_partition_end_inclusive)

inner_path = os.path.join(
outer_partition_path,
f"batch={ix_partition_start_inclusive}-{ix_partition_end_inclusive}"
".parquet",
)
os.makedirs(self.__output_path, exist_ok=True)
write_samples(
samples, offsets, self.__batches_per_partition, self.__output_path
)

def __get_label_to_output_comm_rank(self, min_batch_id, max_batch_id):
num_workers = dask_cugraph.get_n_workers()
num_batches = max_batch_id - min_batch_id + 1
z = cupy.zeros(num_batches, dtype="int32")
s = cupy.array_split(cupy.arange(num_batches), num_workers)
for i, t in enumerate(s):
z[t] = i

samples[f].to_parquet(inner_path, index=False)
return cudf.Series(z)
Loading