Skip to content

Commit

Permalink
Merge branch 'features/1411-Implement_sketched_percentile' of github.…
Browse files Browse the repository at this point in the history
…com:helmholtz-analytics/heat into features/1411-Implement_sketched_percentile
  • Loading branch information
Hoppe committed Jul 1, 2024
2 parents f63b7b5 + 330d451 commit 896f573
Show file tree
Hide file tree
Showing 13 changed files with 336 additions and 97 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/changelog-updater.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
RELEASE_TITLE: ${{ format('# {0} - {1}', github.event.release.tag_name, github.event.release.name) }}
RELEASE_BODY: ${{ github.event.release.body }}
- name: Create PR
uses: peter-evans/create-pull-request@6d6857d36972b65feb161a90e484f2984215f83e # v6.0.5
uses: peter-evans/create-pull-request@c5a7806660adbe173f04e3e038b0ccdcd758773c # v6.1.0
with:
base: main
branch: post-release-changelog-update
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/codeql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ jobs:

# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@23acc5c183826b7a8a97bce3cecc52db901f8251 # v3.25.10
uses: github/codeql-action/init@b611370bb5703a7efb587f9d136a52ea24c5c38c # v3.25.11
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
Expand All @@ -60,7 +60,7 @@ jobs:
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild
uses: github/codeql-action/autobuild@23acc5c183826b7a8a97bce3cecc52db901f8251 # v3.25.10
uses: github/codeql-action/autobuild@b611370bb5703a7efb587f9d136a52ea24c5c38c # v3.25.11

# ℹ️ Command-line programs to run using the OS shell.
# 📚 See https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions#jobsjob_idstepsrun
Expand All @@ -73,6 +73,6 @@ jobs:
# ./location_of_script_within_repo/buildscript.sh

- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@23acc5c183826b7a8a97bce3cecc52db901f8251 # v3.25.10
uses: github/codeql-action/analyze@b611370bb5703a7efb587f9d136a52ea24c5c38c # v3.25.11
with:
category: "/language:${{matrix.language}}"
2 changes: 1 addition & 1 deletion .github/workflows/create-branch-on-assignment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ jobs:
egress-policy: audit

- name: Create Issue Branch
uses: robvanderleek/create-issue-branch@b31e23e51967afc3ae85e13e708c6170474298aa # main
uses: robvanderleek/create-issue-branch@09a5e623b703e02465fb8ff3fe46653b6aa59994 # main
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
4 changes: 2 additions & 2 deletions .github/workflows/docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ jobs:
password: ${{ secrets.GITHUB_TOKEN }}
-
name: Build
uses: docker/build-push-action@ca052bb54ab0790a636c9b5f226502c73d547a25 # v5.4.0
uses: docker/build-push-action@15560696de535e4014efeff63c48f16952e52dd1 # v6.2.0
with:
file: docker/Dockerfile.release
build-args: |
Expand All @@ -65,7 +65,7 @@ jobs:
docker run -v `pwd`:`pwd` -w `pwd` --rm test_${{ inputs.name }} pytest
-
name: Build and push
uses: docker/build-push-action@ca052bb54ab0790a636c9b5f226502c73d547a25 # v5.4.0
uses: docker/build-push-action@15560696de535e4014efeff63c48f16952e52dd1 # v6.2.0
with:
file: docker/Dockerfile.release
build-args: |
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/latest-pytorch-support.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ jobs:
run: |
sed -i '/torch>=/ s/'"${{ env.previous_pytorch }}"'/'"${{ env.setup_pytorch }}"'/g' setup.py
- name: Create PR from branch
uses: peter-evans/create-pull-request@6d6857d36972b65feb161a90e484f2984215f83e # v6.0.5
uses: peter-evans/create-pull-request@c5a7806660adbe173f04e3e038b0ccdcd758773c # v6.1.0
with:
base: ${{ inputs.base_branch }}
branch: ${{ inputs.working_branch }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/scorecard.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,6 @@ jobs:

# Upload the results to GitHub's code scanning dashboard.
- name: "Upload to code-scanning"
uses: github/codeql-action/upload-sarif@23acc5c183826b7a8a97bce3cecc52db901f8251 # v3.25.10
uses: github/codeql-action/upload-sarif@b611370bb5703a7efb587f9d136a52ea24c5c38c # v3.25.11
with:
sarif_file: results.sarif
2 changes: 1 addition & 1 deletion .perun.ini
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ data_out = ./bench_data

[benchmarking]
rounds = 10
warmup_rounds = 0
warmup_rounds = 1
2 changes: 1 addition & 1 deletion doc/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Sphinx==7.3.7
sphinx-autoapi===3.1.1
sphinx-autoapi===3.1.2
sphinx_rtd_theme==2.0.0
sphinxcontrib-napoleon==0.7
sphinx-copybutton==0.5.2
186 changes: 186 additions & 0 deletions heat/core/communication.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,17 @@ def counts_displs_shape(

return tuple(counts.tolist()), tuple(displs.tolist()), tuple(output_shape)

@classmethod
def mpi_type_of(cls, dtype: torch.dtype) -> MPI.Datatype:
"""Determines the MPI Datatype from the torch dtype.
Parameters
----------
dtype : torch.dtype
PyTorch data type
"""
return cls.__mpi_type_mappings[dtype]

@classmethod
def mpi_type_and_elements_of(
cls,
Expand Down Expand Up @@ -1443,6 +1454,181 @@ def Alltoallv(

Alltoallv.__doc__ = MPI.Comm.Alltoallv.__doc__

def Alltoallw(
self,
sendbuf: Union[DNDarray, torch.Tensor, Any],
recvbuf: Union[DNDarray, torch.Tensor, Any],
):
"""
Generalized All-to-All communication allowing different counts, displacements and datatypes for each partner. See MPI standard for more information.
Parameters
----------
sendbuf: Union[DNDarray, torch.Tensor, Any]
Buffer address of the send message. The buffer is expected to be a tuple of the form (buffer, (counts, displacements), subarray_params_list), where subarray_params_list is a list of tuples of the form (lshape, subsizes, substarts).
recvbuf: Union[DNDarray, torch.Tensor, Any]
Buffer address where to store the result. The buffer is expected to be a tuple of the form (buffer, (counts, displacements), subarray_params_list), where subarray_params_list is a list of tuples of the form (lshape, subsizes, substarts).
"""
# Unpack sendbuffer information
sendbuf_tensor, (send_counts, send_displs), subarray_params_list = sendbuf
sendbuf = sendbuf_tensor if CUDA_AWARE_MPI else sendbuf_tensor.cpu()

is_contiguous = sendbuf.is_contiguous()
stride = sendbuf.stride()

send_datatype = self.mpi_type_of(sendbuf.dtype)
sendbuf_ptr = self.as_mpi_memory(sendbuf)

source_subarray_types = []

for idx, subarray_params in enumerate(subarray_params_list):
lshape, subsizes, substarts = subarray_params

if np.all(np.array(subsizes) > 0):

if is_contiguous:
# Commit the source subarray datatypes
# Subarray parameters are calculated based on the work by Dalcin et al. (https://arxiv.org/abs/1804.09536)
subarray_type = send_datatype.Create_subarray(
lshape, subsizes, substarts, order=MPI.ORDER_C
).Commit()
source_subarray_types.append(subarray_type)
else:
# Create recursive vector datatype
source_subarray_types.append(
self._create_recursive_vectortype(
send_datatype, stride, subsizes, substarts
)
)
send_counts[idx] = subsizes[0]
else:
send_counts[idx] = 0
source_subarray_types.append(MPI.INT)

# Unpack recvbuf information
recvbuf_tensor, (recv_counts, recv_displs), subarray_params_list = recvbuf
recvbuf = recvbuf_tensor if CUDA_AWARE_MPI else recvbuf_tensor.cpu()
recvbuf_ptr, _, recv_datatype = self.as_buffer(recvbuf)

# Commit the receive subarray datatypes
target_subarray_types = []
for idx, subarray_params in enumerate(subarray_params_list):
lshape, subsizes, substarts = subarray_params

if np.all(np.array(subsizes) > 0):
target_subarray_types.append(
recv_datatype.Create_subarray(
lshape, subsizes, substarts, order=MPI.ORDER_C
).Commit()
)
else:
recv_counts[idx] = 0
target_subarray_types.append(MPI.INT)

# Perform the Alltoallw operation
self.handle.Alltoallw(
[sendbuf_ptr, (send_counts, send_displs), source_subarray_types],
[recvbuf_ptr, (recv_counts, recv_displs), target_subarray_types],
)

# In case of NON Cuda-Aware MPI, copy the result back to the original buffer
if (
isinstance(recvbuf_tensor, torch.Tensor)
and recvbuf_tensor.is_cuda
and not CUDA_AWARE_MPI
):
recvbuf_tensor.copy_(recvbuf)
else:
if sendbuf_tensor.is_conj():
recvbuf_tensor.conj_physical_()

# Free the subarray datatypes
for p in range(len(source_subarray_types)):
if source_subarray_types[p] != MPI.INT:
source_subarray_types[p].Free()
if target_subarray_types[p] != MPI.INT:
target_subarray_types[p].Free()

Alltoallw.__doc__ = MPI.Comm.Alltoallw.__doc__

def _create_recursive_vectortype(
self,
datatype: MPI.Datatype,
tensor_stride: Tuple[int],
subarray_sizes: List[int],
start: List[int],
):
"""
Create a recursive vector to handle non-contiguous tensor data. The created datatype will be a recursively defined vector datatype that will enable the collection of non-contiguous tensor data in the specified subarray sizes.
Parameters
----------
datatype : MPI.Datatype
The base datatype to create the recursive vector datatype from.
tensor_stride : Tuple[int]
A list of tensor strides for each dimension.
subarray_sizes : List[int]
A list of subarray sizes for each dimension.
start: List[int]
Index of the first element of the subarray in the original array.
Notes
-----
This function creates a recursive vector datatype by defining vectors out of the previous datatype with specified strides and sizes. The extent (size of the data type in bytes) of the new datatype is set to the extent of the basic datatype to allow interweaving of data.
Examples
--------
>>> datatype = MPI.INT
>>> tensor_stride = [1, 2, 3]
>>> subarray_sizes = [4, 5, 6]
>>> recursive_vectortype = create_recursive_vectortype(datatype, tensor_stride, subarray_sizes)
"""
datatype_history = []
current_datatype = datatype

i = len(tensor_stride) - 1
while i > 0:
current_stride = tensor_stride[i]
current_size = subarray_sizes[i]
# Define vector out of previous datatype with stride equals to current stride
if i == len(tensor_stride) - 1 and current_stride == 1:
i -= 1
# Define vector out of previous datatype with stride equals to current stride
current_stride = tensor_stride[i]
next_size = subarray_sizes[i]
new_vector_datatype = current_datatype.Create_vector(
next_size, current_size, current_stride
).Commit()

else:
if i == len(tensor_stride) - 1:
new_vector_datatype = current_datatype.Create_vector(
current_size, 1, current_stride
).Commit()
else:
new_vector_datatype = current_datatype.Create_vector(
current_size, 1, 1
).Commit()

datatype_history.append(new_vector_datatype)
# Set extent of the new datatype to the extent of the basic datatype to allow interweaving of data
next_stride = tensor_stride[i - 1]
new_resized_vector_datatype = new_vector_datatype.Create_resized(
0, datatype.Get_extent()[1] * next_stride
).Commit()
datatype_history.append(new_resized_vector_datatype)
current_datatype = new_resized_vector_datatype

i -= 1

displacement = sum([x * y for x, y in zip(tensor_stride, start)]) * datatype.Get_extent()[1]
current_datatype = current_datatype.Create_hindexed_block(1, [displacement]).Commit()

for dt in datatype_history[:-1]:
dt.Free()
return current_datatype

def Ialltoall(
self,
sendbuf: Union[DNDarray, torch.Tensor, Any],
Expand Down
75 changes: 16 additions & 59 deletions heat/core/dndarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -1471,68 +1471,25 @@ def resplit_(self, axis: int = None):
self.__lshape_map = None
return self

tiles = tiling.SplitTiles(self)
new_tile_locs = tiles.set_tile_locations(
split=axis, tile_dims=tiles.tile_dimensions, arr=self
arr_tiles = tiling.SplitTiles(self)
new_tiles = tiling.SplitTiles(self)

gshape = self.shape
new_lshape = list(gshape)
new_lshape[axis] = int(arr_tiles.tile_dimensions[axis][self.comm.rank].item())

recv_buffer = torch.empty(
tuple(new_lshape), dtype=self.dtype.torch_type(), device=self.device.torch_device
)
rank = self.comm.rank
# receive the data with non-blocking, save which process its from
rcv = {}
for rpr in range(self.comm.size):
# need to get where the tiles are on the new one first
# rpr is the destination
new_locs = torch.where(new_tile_locs == rpr)
new_locs = torch.stack([new_locs[i] for i in range(self.ndim)], dim=1)
for i in range(new_locs.shape[0]):
key = tuple(new_locs[i].tolist())
spr = tiles.tile_locations[key].item()
to_send = tiles[key]
if spr == rank and spr != rpr:
self.comm.Send(to_send.clone(), dest=rpr, tag=rank)
del to_send
elif spr == rpr == rank:
rcv[key] = [None, to_send]
elif rank == rpr:
sz = tiles.get_tile_size(key)
buf = torch.zeros(
sz, dtype=self.dtype.torch_type(), device=self.device.torch_device
)
w = self.comm.Irecv(buf=buf, source=spr, tag=spr)
rcv[key] = [w, buf]
dims = list(range(self.ndim))
del dims[axis]
sorted_keys = sorted(rcv.keys())
# todo: reduce the problem to 1D cats for each dimension, then work up
sz = self.comm.size
arrays = []
for prs in range(int(len(sorted_keys) / sz)):
lp_keys = sorted_keys[prs * sz : (prs + 1) * sz]
lp_arr = None
for k in lp_keys:
if rcv[k][0] is not None:
rcv[k][0].Wait()
if lp_arr is None:
lp_arr = rcv[k][1]
else:
lp_arr = torch.cat((lp_arr, rcv[k][1]), dim=dims[-1])
del rcv[k]
if lp_arr is not None:
arrays.append(lp_arr)
del dims[-1]
# for 4 prs and 4 dims, arrays is now 16 elements long,
# next need to group the each 4 (sz) and cat in the next dim
for d in reversed(dims):
new_arrays = []
for prs in range(int(len(arrays) / sz)):
new_arrays.append(torch.cat(arrays[prs * sz : (prs + 1) * sz], dim=d))
arrays = new_arrays
del d
if len(arrays) == 1:
arrays = arrays[0]

self.__array = arrays

self._axis2axisResplit(
self.larray, self.split, arr_tiles, recv_buffer, axis, new_tiles, self.comm
)

self.__array = recv_buffer
self.__split = axis
self.__lshape_map = None

return self

def __setitem__(
Expand Down
Loading

0 comments on commit 896f573

Please sign in to comment.