diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java index b22c5298cfdea..fa882ca37f903 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java @@ -192,7 +192,7 @@ public long sendRpc(ByteBuffer message, RpcResponseCallback callback) { logger.trace("Sending RPC to {}", getRemoteAddress(channel)); } - long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits()); + long requestId = requestId(); handler.addRpcRequest(requestId, callback); channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message))) @@ -221,7 +221,7 @@ public long uploadStream( logger.trace("Sending RPC to {}", getRemoteAddress(channel)); } - long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits()); + long requestId = requestId(); handler.addRpcRequest(requestId, callback); channel.writeAndFlush(new UploadStream(requestId, meta, data)) @@ -264,6 +264,10 @@ public void operationComplete(Future future) throws Exception { void handleFailure(String errorMsg, Throwable cause) throws Exception {} } + private static long requestId() { + return Math.abs(UUID.randomUUID().getLeastSignificantBits()); + } + private class RpcChannelListener extends StdChannelListener { final long rpcRequestId; final RpcResponseCallback callback;