Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDFS-17545. [ARR] router async rpc client. #6871

Merged
merged 8 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,52 +18,91 @@

package org.apache.hadoop.hdfs.protocolPB;

import org.apache.hadoop.hdfs.server.federation.router.ThreadLocalContext;
import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.internal.ShadedProtobufHelper;
import org.apache.hadoop.util.concurrent.AsyncGet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;

import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCompleteWith;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;

/**
* <p>This utility class encapsulates the logic required to initiate asynchronous RPCs,
* handle responses, and propagate exceptions. It works in conjunction with
* {@link ProtobufRpcEngine2} and {@link Client} to facilitate the asynchronous
* nature of the operations.
*
* @see ProtobufRpcEngine2
* @see Client
* @see CompletableFuture
*/
public final class AsyncRpcProtocolPBUtil {
public static final Logger LOG = LoggerFactory.getLogger(AsyncRpcProtocolPBUtil.class);
/** The executor used for handling responses asynchronously. */
private static Executor worker;

private AsyncRpcProtocolPBUtil() {}

/**
* Asynchronously invokes an RPC call and applies a response transformation function
* to the result. This method is generic and can be used to handle any type of
* RPC call.
*
* <p>The method uses the {@link ShadedProtobufHelper.IpcCall} to prepare the RPC call
* and the {@link ApplyFunction} to process the response. It also handles exceptions
* that may occur during the RPC call and wraps them in a user-friendly manner.
*
* @param call The IPC call encapsulating the RPC request.
* @param response The function to apply to the response of the RPC call.
* @param clazz The class object representing the type {@code R} of the response.
* @param <T> Type of the call's result.
* @param <R> Type of method return.
* @return An object of type {@code R} that is the result of applying the response
* function to the RPC call result.
* @throws IOException If an I/O error occurs during the asynchronous RPC call.
*/
public static <T, R> R asyncIpcClient(
ShadedProtobufHelper.IpcCall<T> call, ApplyFunction<T, R> response,
Class<R> clazz) throws IOException {
ipc(call);
AsyncGet<T, Exception> asyncReqMessage =
(AsyncGet<T, Exception>) ProtobufRpcEngine2.getAsyncReturnMessage();
CompletableFuture<Writable> responseFuture = Client.getResponseFuture();
// transfer originCall & callerContext to worker threads of executor.
final Server.Call originCall = Server.getCurCall().get();
final CallerContext originContext = CallerContext.getCurrent();
asyncCompleteWith(responseFuture);
asyncApply(o -> {
// transfer thread local context to worker threads of executor.
ThreadLocalContext threadLocalContext = new ThreadLocalContext();
asyncCompleteWith(responseFuture.handleAsync((result, e) -> {
threadLocalContext.transfer();
if (e != null) {
throw warpCompletionException(e);
}
try {
Server.getCurCall().set(originCall);
CallerContext.setCurrent(originContext);
T res = asyncReqMessage.get(-1, null);
return response.apply(res);
} catch (Exception e) {
throw warpCompletionException(e);
} catch (Exception ex) {
throw warpCompletionException(ex);
}
});
}, worker));
return asyncReturn(clazz);
}

/**
* Sets the executor used for handling responses asynchronously within
* the utility class.
*
* @param worker The executor to be used for handling responses asynchronously.
*/
public static void setWorker(Executor worker) {
AsyncRpcProtocolPBUtil.worker = worker;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ public class FederationRPCPerformanceMonitor implements RouterRpcMonitor {


/** Time for an operation to be received in the Router. */
private static final ThreadLocal<Long> START_TIME = new ThreadLocal<>();
private static final ThreadLocal<Long> START_TIME = ThreadLocal.withInitial(() -> -1L);
/** Time for an operation to be sent to the Namenode. */
private static final ThreadLocal<Long> PROXY_TIME = new ThreadLocal<>();
private static final ThreadLocal<Long> PROXY_TIME = ThreadLocal.withInitial(() -> -1L);

/** Configuration for the performance monitor. */
private Configuration conf;
Expand Down Expand Up @@ -141,6 +141,14 @@ public void startOp() {
START_TIME.set(monotonicNow());
}

public static long getStartOpTime() {
return START_TIME.get();
}

public static void setStartOpTime(long startOpTime) {
START_TIME.set(startOpTime);
}

@Override
public long proxyOp() {
PROXY_TIME.set(monotonicNow());
Expand All @@ -151,6 +159,14 @@ public long proxyOp() {
return Thread.currentThread().getId();
}

public static long getProxyOpTime() {
return PROXY_TIME.get();
}

public static void setProxyOpTime(long proxyOpTime) {
PROXY_TIME.set(proxyOpTime);
}

@Override
public void proxyOpComplete(boolean success, String nsId,
FederationNamenodeServiceState state) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
public static final String DFS_ROUTER_RPC_ENABLE =
FEDERATION_ROUTER_PREFIX + "rpc.enable";
public static final boolean DFS_ROUTER_RPC_ENABLE_DEFAULT = true;
public static final String DFS_ROUTER_RPC_ENABLE_ASYNC =
FEDERATION_ROUTER_PREFIX + "rpc.async.enable";
public static final boolean DFS_ROUTER_RPC_ENABLE_ASYNC_DEFAULT = false;
public static final String DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT =
FEDERATION_ROUTER_PREFIX + "rpc.async.handler.count";
public static final int DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT = 2;
public static final String DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT =
FEDERATION_ROUTER_PREFIX + "rpc.async.responder.count";
public static final int DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT = 10;

public static final String DFS_ROUTER_METRICS_ENABLE =
FEDERATION_ROUTER_PREFIX + "metrics.enable";
Expand Down
Loading
Loading