diff --git a/distributed/comm/tcp.py b/distributed/comm/tcp.py index 9bec238f0ed..1c1a8fbb2ae 100644 --- a/distributed/comm/tcp.py +++ b/distributed/comm/tcp.py @@ -191,11 +191,11 @@ async def read(self, deserializers=None): lengths = struct.unpack("Q" * n_frames, lengths) frames = [bytearray(each_length) for each_length in lengths] - for each_frame in frames: + recv_frames = [each_frame for each_frame in frames if len(each_frame) > 0] + for each_frame in recv_frames: each_length = len(each_frame) - if each_length: - n = await stream.read_into(each_frame) - assert n == each_length, (n, each_length) + n = await stream.read_into(each_frame) + assert n == each_length, (n, each_length) except StreamClosedError as e: self.stream = None if not shutting_down(): diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index ac039205f3a..0bf785fb521 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -147,6 +147,9 @@ async def write( frames = await to_frames( msg, serializers=serializers, on_error=on_error ) + send_frames = [ + each_frame for each_frame in frames if len(each_frame) > 0 + ] # Send meta data await self.ep.send(np.array([len(frames)], dtype=np.uint64)) @@ -159,10 +162,10 @@ async def write( await self.ep.send( np.array([nbytes(f) for f in frames], dtype=np.uint64) ) + # Send frames - for frame in frames: - if nbytes(frame) > 0: - await self.ep.send(frame) + for frame in send_frames: + await self.ep.send(frame) return sum(map(nbytes, frames)) except (ucp.exceptions.UCXBaseException): self.abort() @@ -195,9 +198,11 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")): else np.empty(each_size, dtype=np.uint8) for is_cuda, each_size in zip(is_cudas.tolist(), sizes.tolist()) ] - for each_frame in frames: - if len(each_frame) > 0: - await self.ep.recv(each_frame) + recv_frames = [ + each_frame for each_frame in frames if len(each_frame) > 0 + ] + for each_frame in recv_frames: + await self.ep.recv(each_frame) msg = await from_frames( frames, deserialize=self.deserialize, deserializers=deserializers )