Skip to content

Commit

Permalink
Merge dask/master into jakirkham/simp_write_recv_ops
Browse files Browse the repository at this point in the history
  • Loading branch information
jakirkham committed Mar 20, 2020
2 parents 7b156b3 + 0d64f3a commit 3ff1ce2
Showing 1 changed file with 22 additions and 0 deletions.
22 changes: 22 additions & 0 deletions distributed/comm/ucx.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@
cuda_array = None


def synchronize_stream(stream=0):
import numba.cuda

ctx = numba.cuda.current_context()
cu_stream = numba.cuda.driver.drvapi.cu_stream(stream)
stream = numba.cuda.driver.Stream(ctx, cu_stream, None)
stream.synchronize()


def init_once():
global ucp, cuda_array
if ucp is not None:
Expand Down Expand Up @@ -164,6 +173,14 @@ async def write(
)

# Send frames

# It is necessary to first synchronize the default stream before start sending
# We synchronize the default stream because UCX is not stream-ordered and
# syncing the default stream will wait for other non-blocking CUDA streams.
# Note this is only sufficient if the memory being sent is not currently in use on
# non-blocking CUDA streams.
synchronize_stream(0)

for each_frame in send_frames:
await self.ep.send(each_frame)
return sum(map(nbytes, send_frames))
Expand Down Expand Up @@ -201,6 +218,11 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")):
recv_frames = [
each_frame for each_frame in frames if len(each_frame) > 0
]

# It is necessary to first populate `frames` with CUDA arrays and synchronize
# the default stream before starting receiving to ensure buffers have been allocated
synchronize_stream(0)

for each_frame in recv_frames:
await self.ep.recv(each_frame)
msg = await from_frames(
Expand Down

0 comments on commit 3ff1ce2

Please sign in to comment.