Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add distributed ndarray #7881

Closed
wants to merge 0 commits into from
Closed

Add distributed ndarray #7881

wants to merge 0 commits into from

Conversation

shino16
Copy link
Contributor

@shino16 shino16 commented Sep 26, 2023

Adds array.DistributedArray to cupyx.distributed.

It provides initial support of

  • conversion from/to ndarray
  • element-wise operations (ufunc, ElementwiseKernel)
  • reduction (max/min/sum/prod)
  • matrix multiplication (matmul)

in a multi-GPU setting.

@@ -581,6 +581,10 @@ cdef class _SimpleReductionKernel(_AbstractReductionKernel):
def __call__(self, object a, axis=None, dtype=None, _ndarray_base out=None,
bint keepdims=False):

if hasattr(a, '__cupy_override_reduction_kernel__'):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is currently only working for SimpleReductionKernel
There are other more generic Reduction kernels, I wonder if we can easily support them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

__cupy_override_reduction_kernel__ has to be called here, before type checks which rule out DistributedArray.
I believe we can support ReductionKernel in the same way, by adding this hook in its __call__ method.

@@ -995,6 +995,9 @@ cdef class _ndarray_base:
:meth:`numpy.ndarray.max`

"""
if hasattr(self, '__cupy_override_reduction_kernel__'):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would only like to have this check in the ReductionKernel machinery. Do you think it should be possible to do that? thanks!

Copy link
Contributor Author

@shino16 shino16 Sep 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we remove this check, cupy._core._routines_statistics._ndarray_max tries CUB and cuTENSOR before falling back to cupy.max.
So this check should be done before reaching cupy.max.__call__, I wonder where this check should be placed.
https://github.com/shino16/cupy/blob/main/cupy/_core/_routines_statistics.pyx#L25-L43

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good call. Maybe we should rewrite those checks to make it feasible.
Like having an attribute in the ndarray that's overridden by derived classes.
array.support_cub_for_routines

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now DistributedArray overrides all the methods of ndarray including max/min/sum/prod, so those hooks in cupy._core.core.ndarray are removed. Still your idea sounds reasonable.

@@ -43,6 +43,18 @@
_nccl_ops = {}


def _get_nccl_dtype_and_count(array, count=None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

shape (tuple of ints): Length of axes.
dtype: Data type. It must be an argument of :class:`numpy.dtype`.
mode (str or mode object): Mode that determines how overlaps of
chunks are interpreted.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like a description of the available models in the docstring under a .. note: section :)

@mergify
Copy link
Contributor

mergify bot commented Sep 29, 2023

This pull request is now in conflicts. Could you fix it @shino16? 🙏

@emcastillo
Copy link
Member

/test mini

@emcastillo
Copy link
Member

/test mncore

@shino16 shino16 marked this pull request as ready for review September 30, 2023 15:59
Copy link
Member

@leofang leofang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick drive-by comments 🙂

Comment on lines 11 to 19
INDEX_MAP_A = {
0: (slice(200)), # arr[:200]
1: (slice(200, None)), # arr[200:]
}

INDEX_MAP_B = {
0: (slice(None), slice(None, None, 3)), # arr[:, ::2]
1: (slice(None), slice(1, None, 2)), # arr[:, 1::2]
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be nice to comment why we need these index maps, currently this sample assumes that users have the concept of sharding, which may or may not be the case.

Comment on lines 28 to 34
with Device(0):
s0 = Stream()
s0.use()

with Device(1):
s1 = Stream()
s1.use()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quality of life change: This sample assumes there exists 2 GPUs, we need a device count check at the top and do early exit.

Comment on lines 14 to 19
# 0 M/3 M
# 0 +-----+-----+
# | 0 1 | 2 |
# N/2 +-----+-----+
# | 0 3 | 1 2 |
# N +-----+-----+
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: Does this sample require 4 GPUs?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we need to add a runtime check here! great catch :)

@asi1024 asi1024 added this to the v13.0.0rc1 milestone Oct 11, 2023
corresponding to slices of the original array. Note that one device can
hold multiple chunks.

This direct constructor is designed for internal calls. Users should create
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
This direct constructor is designed for internal calls. Users should create
This direct constructor is intended as a private API. Users should create


obj._streams = {}
obj._comms = comms if comms is not None else {}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Several of the assignments here could be moved to init

def distributed_array(
array: ArrayLike,
index_map: dict[int, Any],
mode: str = 'replica',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe better to use enum?


if isinstance(array, (numpy.ndarray, ndarray)):
if mode != 'replica':
array = array.copy()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: If the array is already in the device, making a copy can cause an OOM, in some cases we may be able to copy only the chunk in the device if the array is contiguous.



class _ArrayPlaceholder:
# Mocks ndarray
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Self note, this is used when we are waiting chunks to arrive, to the current device

@emcastillo
Copy link
Member

/test full

@emcastillo
Copy link
Member

@shino16 can you repush your changes and open again the PR? thanks!

@kmaehashi
Copy link
Member

This pull-request has merged as #7942. Thanks again @shino16 for working on this!

@asi1024 asi1024 modified the milestones: v13.0.0rc1, Closed PRs Dec 6, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants