diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java
index f65742a5df4e0..fa798e2f35837 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java
@@ -18,12 +18,11 @@
package org.apache.hadoop.hdfs.protocolPB;
+import org.apache.hadoop.hdfs.server.federation.router.ThreadLocalContext;
import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
-import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.internal.ShadedProtobufHelper;
import org.apache.hadoop.util.concurrent.AsyncGet;
import org.slf4j.Logger;
@@ -31,18 +30,48 @@
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException;
-import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCompleteWith;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
+/**
+ *
This utility class encapsulates the logic required to initiate asynchronous RPCs,
+ * handle responses, and propagate exceptions. It works in conjunction with
+ * {@link ProtobufRpcEngine2} and {@link Client} to facilitate the asynchronous
+ * nature of the operations.
+ *
+ * @see ProtobufRpcEngine2
+ * @see Client
+ * @see CompletableFuture
+ */
public final class AsyncRpcProtocolPBUtil {
public static final Logger LOG = LoggerFactory.getLogger(AsyncRpcProtocolPBUtil.class);
+ /** The executor used for handling responses asynchronously. */
+ private static Executor worker;
private AsyncRpcProtocolPBUtil() {}
+ /**
+ * Asynchronously invokes an RPC call and applies a response transformation function
+ * to the result. This method is generic and can be used to handle any type of
+ * RPC call.
+ *
+ *
The method uses the {@link ShadedProtobufHelper.IpcCall} to prepare the RPC call
+ * and the {@link ApplyFunction} to process the response. It also handles exceptions
+ * that may occur during the RPC call and wraps them in a user-friendly manner.
+ *
+ * @param call The IPC call encapsulating the RPC request.
+ * @param response The function to apply to the response of the RPC call.
+ * @param clazz The class object representing the type {@code R} of the response.
+ * @param Type of the call's result.
+ * @param Type of method return.
+ * @return An object of type {@code R} that is the result of applying the response
+ * function to the RPC call result.
+ * @throws IOException If an I/O error occurs during the asynchronous RPC call.
+ */
public static R asyncIpcClient(
ShadedProtobufHelper.IpcCall call, ApplyFunction response,
Class clazz) throws IOException {
@@ -50,20 +79,30 @@ public static R asyncIpcClient(
AsyncGet asyncReqMessage =
(AsyncGet) ProtobufRpcEngine2.getAsyncReturnMessage();
CompletableFuture responseFuture = Client.getResponseFuture();
- // transfer originCall & callerContext to worker threads of executor.
- final Server.Call originCall = Server.getCurCall().get();
- final CallerContext originContext = CallerContext.getCurrent();
- asyncCompleteWith(responseFuture);
- asyncApply(o -> {
+ // transfer thread local context to worker threads of executor.
+ ThreadLocalContext threadLocalContext = new ThreadLocalContext();
+ asyncCompleteWith(responseFuture.handleAsync((result, e) -> {
+ threadLocalContext.transfer();
+ if (e != null) {
+ throw warpCompletionException(e);
+ }
try {
- Server.getCurCall().set(originCall);
- CallerContext.setCurrent(originContext);
T res = asyncReqMessage.get(-1, null);
return response.apply(res);
- } catch (Exception e) {
- throw warpCompletionException(e);
+ } catch (Exception ex) {
+ throw warpCompletionException(ex);
}
- });
+ }, worker));
return asyncReturn(clazz);
}
+
+ /**
+ * Sets the executor used for handling responses asynchronously within
+ * the utility class.
+ *
+ * @param worker The executor to be used for handling responses asynchronously.
+ */
+ public static void setWorker(Executor worker) {
+ AsyncRpcProtocolPBUtil.worker = worker;
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java
index 3a0fa2016d84e..3b1d5e55781df 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java
@@ -52,9 +52,9 @@ public class FederationRPCPerformanceMonitor implements RouterRpcMonitor {
/** Time for an operation to be received in the Router. */
- private static final ThreadLocal START_TIME = new ThreadLocal<>();
+ private static final ThreadLocal START_TIME = ThreadLocal.withInitial(() -> -1L);
/** Time for an operation to be sent to the Namenode. */
- private static final ThreadLocal PROXY_TIME = new ThreadLocal<>();
+ private static final ThreadLocal PROXY_TIME = ThreadLocal.withInitial(() -> -1L);
/** Configuration for the performance monitor. */
private Configuration conf;
@@ -141,6 +141,14 @@ public void startOp() {
START_TIME.set(monotonicNow());
}
+ public static long getStartOpTime() {
+ return START_TIME.get();
+ }
+
+ public static void setStartOpTime(long startOpTime) {
+ START_TIME.set(startOpTime);
+ }
+
@Override
public long proxyOp() {
PROXY_TIME.set(monotonicNow());
@@ -151,6 +159,14 @@ public long proxyOp() {
return Thread.currentThread().getId();
}
+ public static long getProxyOpTime() {
+ return PROXY_TIME.get();
+ }
+
+ public static void setProxyOpTime(long proxyOpTime) {
+ PROXY_TIME.set(proxyOpTime);
+ }
+
@Override
public void proxyOpComplete(boolean success, String nsId,
FederationNamenodeServiceState state) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
index 64f27bd3ba32e..103953ab19151 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
@@ -72,6 +72,15 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
public static final String DFS_ROUTER_RPC_ENABLE =
FEDERATION_ROUTER_PREFIX + "rpc.enable";
public static final boolean DFS_ROUTER_RPC_ENABLE_DEFAULT = true;
+ public static final String DFS_ROUTER_RPC_ENABLE_ASYNC =
+ FEDERATION_ROUTER_PREFIX + "rpc.async.enable";
+ public static final boolean DFS_ROUTER_RPC_ENABLE_ASYNC_DEFAULT = false;
+ public static final String DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT =
+ FEDERATION_ROUTER_PREFIX + "rpc.async.handler.count";
+ public static final int DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT = 2;
+ public static final String DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT =
+ FEDERATION_ROUTER_PREFIX + "rpc.async.responder.count";
+ public static final int DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT = 10;
public static final String DFS_ROUTER_METRICS_ENABLE =
FEDERATION_ROUTER_PREFIX + "metrics.enable";
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncRpcClient.java
new file mode 100644
index 0000000000000..2bdcd7ce28724
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncRpcClient.java
@@ -0,0 +1,621 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.NameNodeProxiesClient;
+import org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessPolicyController;
+import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
+import org.apache.hadoop.hdfs.server.federation.router.async.AsyncApplyFunction;
+import org.apache.hadoop.hdfs.server.federation.router.async.AsyncCatchFunction;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.StandbyException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.ConnectException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
+import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApplyUseExecutor;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCatch;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncComplete;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCompleteWith;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncFinally;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncForEach;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncThrowException;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncTry;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.getCompletableFuture;
+
+/**
+ * The {@code RouterAsyncRpcClient} class extends the functionality of the base
+ * {@code RouterRpcClient} class to provide asynchronous remote procedure call (RPC)
+ * capabilities for communication with the Hadoop Distributed File System (HDFS)
+ * NameNodes in a federated environment.
+ *
+ * This class is responsible for managing the asynchronous execution of RPCs to
+ * multiple NameNodes, which can improve performance and scalability in large HDFS
+ * deployments.
+ *
+ *
The class also includes methods for handling failover scenarios, where it can
+ * automatically retry operations on alternative NameNodes if the primary NameNode is
+ * unavailable or in standby mode.
+ *
+ * @see RouterRpcClient
+ */
+public class RouterAsyncRpcClient extends RouterRpcClient{
+ private static final Logger LOG =
+ LoggerFactory.getLogger(RouterAsyncRpcClient.class);
+ /** Router using this RPC client. */
+ private final Router router;
+ /** Interface to identify the active NN for a nameservice or blockpool ID. */
+ private final ActiveNamenodeResolver namenodeResolver;
+ /** Optional perf monitor. */
+ private final RouterRpcMonitor rpcMonitor;
+ private final Executor asyncRouterHandler;
+
+ /**
+ * Create a router async RPC client to manage remote procedure calls to NNs.
+ *
+ * @param conf Hdfs Configuration.
+ * @param router A router using this RPC client.
+ * @param resolver A NN resolver to determine the currently active NN in HA.
+ * @param monitor Optional performance monitor.
+ * @param routerStateIdContext the router state context object to hold the state ids for all
+ * namespaces.
+ */
+ public RouterAsyncRpcClient(
+ Configuration conf, Router router, ActiveNamenodeResolver resolver,
+ RouterRpcMonitor monitor, RouterStateIdContext routerStateIdContext) {
+ super(conf, router, resolver, monitor, routerStateIdContext);
+ this.router = router;
+ this.namenodeResolver = resolver;
+ this.rpcMonitor = monitor;
+ this.asyncRouterHandler = router.getRpcServer().getAsyncRouterHandler();
+ }
+
+ /**
+ * Invoke method in all locations and return success if any succeeds.
+ *
+ * @param The type of the remote location.
+ * @param locations List of remote locations to call concurrently.
+ * @param method The remote method and parameters to invoke.
+ * @return If the call succeeds in any location.
+ * @throws IOException If any of the calls return an exception.
+ */
+ @Override
+ public boolean invokeAll(
+ final Collection locations, final RemoteMethod method)
+ throws IOException {
+ invokeConcurrent(locations, method, false, false,
+ Boolean.class);
+ asyncApply((ApplyFunction, Object>)
+ results -> results.containsValue(true));
+ return asyncReturn(boolean.class);
+ }
+
+ /**
+ * Invokes a method against the ClientProtocol proxy server. If a standby
+ * exception is generated by the call to the client, retries using the
+ * alternate server.
+ *
+ * Re-throws exceptions generated by the remote RPC call as either
+ * RemoteException or IOException.
+ *
+ * @param ugi User group information.
+ * @param namenodes A prioritized list of namenodes within the same
+ * nameservice.
+ * @param useObserver Whether to use observer namenodes.
+ * @param protocol the protocol of the connection.
+ * @param method Remote ClientProtocol method to invoke.
+ * @param params Variable list of parameters matching the method.
+ * @return The result of invoking the method.
+ * @throws ConnectException If it cannot connect to any Namenode.
+ * @throws StandbyException If all Namenodes are in Standby.
+ * @throws IOException If it cannot invoke the method.
+ */
+ @Override
+ public Object invokeMethod(
+ UserGroupInformation ugi,
+ List extends FederationNamenodeContext> namenodes,
+ boolean useObserver, Class> protocol,
+ Method method, Object... params) throws IOException {
+ if (namenodes == null || namenodes.isEmpty()) {
+ throw new IOException("No namenodes to invoke " + method.getName() +
+ " with params " + Arrays.deepToString(params) + " from "
+ + router.getRouterId());
+ }
+ // transfer threadLocalContext to worker threads of executor.
+ ThreadLocalContext threadLocalContext = new ThreadLocalContext();
+ asyncComplete(null);
+ asyncApplyUseExecutor((AsyncApplyFunction) o -> {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Async invoke method : {}, {}, {}, {}", method.getName(), useObserver,
+ namenodes.toString(), params);
+ }
+ threadLocalContext.transfer();
+ invokeMethodAsync(ugi, (List) namenodes,
+ useObserver, protocol, method, params);
+ }, asyncRouterHandler);
+ return null;
+ }
+
+ /**
+ * Asynchronously invokes a method on the specified NameNodes for a given user and operation.
+ * This method is responsible for the actual execution of the remote method call on the
+ * NameNodes in a non-blocking manner, allowing for concurrent processing.
+ *
+ * In case of exceptions, the method includes logic to handle retries, failover to standby
+ * NameNodes, and proper exception handling to ensure that the calling code can respond
+ * appropriately to different error conditions.
+ *
+ * @param ugi The user information under which the method is to be invoked.
+ * @param namenodes The list of NameNode contexts on which the method will be invoked.
+ * @param useObserver Whether to use an observer node for the invocation if available.
+ * @param protocol The protocol class defining the method to be invoked.
+ * @param method The method to be invoked on the NameNodes.
+ * @param params The parameters for the method invocation.
+ */
+ private void invokeMethodAsync(
+ final UserGroupInformation ugi,
+ final List namenodes,
+ boolean useObserver,
+ final Class> protocol, final Method method, final Object... params) {
+
+ addClientInfoToCallerContext(ugi);
+ if (rpcMonitor != null) {
+ rpcMonitor.proxyOp();
+ }
+ final ExecutionStatus status = new ExecutionStatus(false, useObserver);
+ Map ioes = new LinkedHashMap<>();
+ final ConnectionContext[] connection = new ConnectionContext[1];
+ asyncForEach(namenodes.iterator(),
+ (foreach, namenode) -> {
+ if (!status.isShouldUseObserver()
+ && (namenode.getState() == FederationNamenodeServiceState.OBSERVER)) {
+ asyncComplete(null);
+ return;
+ }
+ String nsId = namenode.getNameserviceId();
+ String rpcAddress = namenode.getRpcAddress();
+ asyncTry(() -> {
+ connection[0] = getConnection(ugi, nsId, rpcAddress, protocol);
+ NameNodeProxiesClient.ProxyAndInfo> client = connection[0].getClient();
+ invoke(namenode, status.isShouldUseObserver(), 0, method,
+ client.getProxy(), params);
+ asyncApply(res -> {
+ status.setComplete(true);
+ postProcessResult(method, status, namenode, nsId, client);
+ foreach.breakNow();
+ return res;
+ });
+ });
+ asyncCatch((res, ioe) -> {
+ ioes.put(namenode, ioe);
+ handleInvokeMethodIOException(namenode, ioe, status, useObserver);
+ return res;
+ }, IOException.class);
+ asyncFinally(res -> {
+ if (connection[0] != null) {
+ connection[0].release();
+ }
+ return res;
+ });
+ });
+
+ asyncApply(res -> {
+ if (status.isComplete()) {
+ return res;
+ }
+ return handlerAllNamenodeFail(namenodes, method, ioes, params);
+ });
+ }
+
+ /**
+ * Asynchronously invokes a method on a specified NameNode in the context of the given
+ * namespace and NameNode information. This method is designed to handle the invocation
+ * in a non-blocking manner, allowing for improved performance and scalability when
+ * interacting with the NameNode.
+ *
+ * @param namenode The context information for the NameNode.
+ * @param listObserverFirst Whether to list the observer node first in the invocation list.
+ * @param retryCount The current retry count for the operation.
+ * @param method The method to be invoked on the NameNode.
+ * @param obj The proxy object through which the method will be invoked.
+ * @param params The parameters for the method invocation.
+ */
+ protected Object invoke(
+ FederationNamenodeContext namenode, Boolean listObserverFirst,
+ int retryCount, final Method method,
+ final Object obj, final Object... params) throws IOException {
+ try {
+ Client.setAsynchronousMode(true);
+ method.invoke(obj, params);
+ Client.setAsynchronousMode(false);
+ asyncCatch((AsyncCatchFunction) (o, e) -> {
+ handlerInvokeException(namenode, listObserverFirst,
+ retryCount, method, obj, e, params);
+ }, Throwable.class);
+ } catch (InvocationTargetException e) {
+ asyncThrowException(e.getCause());
+ } catch (IllegalAccessException | IllegalArgumentException e) {
+ LOG.error("Unexpected exception while proxying API", e);
+ asyncThrowException(e);
+ }
+ return null;
+ }
+
+ /**
+ * Invokes sequential proxy calls to different locations. Continues to invoke
+ * calls until the success condition is met, or until all locations have been
+ * attempted.
+ *
+ * The success condition may be specified by:
+ *
+ * An expected result class
+ * An expected result value
+ *
+ *
+ * If no expected result class/values are specified, the success condition is
+ * a call that does not throw a remote exception.
+ *
+ * @param The type of the remote method return.
+ * @param locations List of locations/nameservices to call concurrently.
+ * @param remoteMethod The remote method and parameters to invoke.
+ * @param expectedResultClass In order to be considered a positive result, the
+ * return type must be of this class.
+ * @param expectedResultValue In order to be considered a positive result, the
+ * return value must equal the value of this object.
+ * @return The result of the first successful call, or if no calls are
+ * successful, the result of the first RPC call executed.
+ * @throws IOException if the success condition is not met, return the first
+ * remote exception generated.
+ */
+ @Override
+ public T invokeSequential(
+ final List extends RemoteLocationContext> locations,
+ final RemoteMethod remoteMethod, Class expectedResultClass,
+ Object expectedResultValue) throws IOException {
+ invokeSequential(remoteMethod, locations, expectedResultClass, expectedResultValue);
+ asyncApply((ApplyFunction) RemoteResult::getResult);
+ return asyncReturn(expectedResultClass);
+ }
+
+ /**
+ * Invokes sequential proxy calls to different locations. Continues to invoke
+ * calls until the success condition is met, or until all locations have been
+ * attempted.
+ *
+ * The success condition may be specified by:
+ *
+ * An expected result class
+ * An expected result value
+ *
+ *
+ * If no expected result class/values are specified, the success condition is
+ * a call that does not throw a remote exception.
+ *
+ * This returns RemoteResult, which contains the invoked location as well
+ * as the result.
+ *
+ * @param The type of the remote location.
+ * @param The type of the remote method return.
+ * @param remoteMethod The remote method and parameters to invoke.
+ * @param locations List of locations/nameservices to call concurrently.
+ * @param expectedResultClass In order to be considered a positive result, the
+ * return type must be of this class.
+ * @param expectedResultValue In order to be considered a positive result, the
+ * return value must equal the value of this object.
+ * @return The result of the first successful call, or if no calls are
+ * successful, the result of the first RPC call executed, along with
+ * the invoked location in form of RemoteResult.
+ * @throws IOException if the success condition is not met, return the first
+ * remote exception generated.
+ */
+ @Override
+ public RemoteResult invokeSequential(
+ final RemoteMethod remoteMethod, final List locations,
+ Class expectedResultClass, Object expectedResultValue)
+ throws IOException {
+
+ RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
+ final UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
+ final Method m = remoteMethod.getMethod();
+ List thrownExceptions = new ArrayList<>();
+ final Object[] firstResult = {null};
+ final ExecutionStatus status = new ExecutionStatus();
+ Iterator locationIterator =
+ (Iterator) locations.iterator();
+ // Invoke in priority order
+ asyncForEach(locationIterator,
+ (foreach, loc) -> {
+ String ns = loc.getNameserviceId();
+ boolean isObserverRead = isObserverReadEligible(ns, m);
+ List extends FederationNamenodeContext> namenodes =
+ getOrderedNamenodes(ns, isObserverRead);
+ acquirePermit(ns, ugi, remoteMethod, controller);
+ asyncTry(() -> {
+ Class> proto = remoteMethod.getProtocol();
+ Object[] params = remoteMethod.getParams(loc);
+ invokeMethod(ugi, namenodes, isObserverRead, proto, m, params);
+ asyncApply(result -> {
+ // Check if the result is what we expected
+ if (isExpectedClass(expectedResultClass, result) &&
+ isExpectedValue(expectedResultValue, result)) {
+ // Valid result, stop here
+ @SuppressWarnings("unchecked") R location = (R) loc;
+ @SuppressWarnings("unchecked") T ret = (T) result;
+ foreach.breakNow();
+ status.setComplete(true);
+ return new RemoteResult<>(location, ret);
+ }
+ if (firstResult[0] == null) {
+ firstResult[0] = result;
+ }
+ return null;
+ });
+ });
+ asyncCatch((ret, e) -> {
+ if (e instanceof IOException) {
+ IOException ioe = (IOException) e;
+ // Localize the exception
+ ioe = processException(ioe, loc);
+ // Record it and move on
+ thrownExceptions.add(ioe);
+ } else {
+ // Unusual error, ClientProtocol calls always use IOException (or
+ // RemoteException). Re-wrap in IOException for compatibility with
+ // ClientProtocol.
+ LOG.error("Unexpected exception {} proxying {} to {}",
+ e.getClass(), m.getName(), ns, e);
+ IOException ioe = new IOException(
+ "Unexpected exception proxying API " + e.getMessage(), e);
+ thrownExceptions.add(ioe);
+ }
+ return ret;
+ }, Exception.class);
+ asyncFinally(ret -> {
+ releasePermit(ns, ugi, remoteMethod, controller);
+ return ret;
+ });
+ });
+ asyncApply(result -> {
+ if (status.isComplete()) {
+ return result;
+ }
+ if (!thrownExceptions.isEmpty()) {
+ // An unavailable subcluster may be the actual cause
+ // We cannot surface other exceptions (e.g., FileNotFoundException)
+ for (int i = 0; i < thrownExceptions.size(); i++) {
+ IOException ioe = thrownExceptions.get(i);
+ if (isUnavailableException(ioe)) {
+ throw ioe;
+ }
+ }
+ // re-throw the first exception thrown for compatibility
+ throw thrownExceptions.get(0);
+ }
+ // Return the first result, whether it is the value or not
+ @SuppressWarnings("unchecked") T ret = (T) firstResult[0];
+ return new RemoteResult<>(locations.get(0), ret);
+ });
+ return asyncReturn(RemoteResult.class);
+ }
+
+ /**
+ * Invokes multiple concurrent proxy calls to different clients. Returns an
+ * array of results.
+ *
+ * Re-throws exceptions generated by the remote RPC call as either
+ * RemoteException or IOException.
+ *
+ * @param The type of the remote location.
+ * @param The type of the remote method return.
+ * @param locations List of remote locations to call concurrently.
+ * @param method The remote method and parameters to invoke.
+ * @param requireResponse If true an exception will be thrown if all calls do
+ * not complete. If false exceptions are ignored and all data results
+ * successfully received are returned.
+ * @param standby If the requests should go to the standby namenodes too.
+ * @param timeOutMs Timeout for each individual call.
+ * @param clazz Type of the remote return type.
+ * @return Result of invoking the method per subcluster: nsId to result.
+ * @throws IOException If requiredResponse=true and any of the calls throw an
+ * exception.
+ */
+ @Override
+ public Map invokeConcurrent(
+ final Collection locations, final RemoteMethod method,
+ boolean requireResponse, boolean standby, long timeOutMs, Class clazz)
+ throws IOException {
+ invokeConcurrent(locations, method, standby, timeOutMs, clazz);
+ asyncApply((ApplyFunction>, Object>)
+ results -> postProcessResult(requireResponse, results));
+ return asyncReturn(Map.class);
+ }
+
+ /**
+ * Invokes multiple concurrent proxy calls to different clients. Returns an
+ * array of results.
+ *
+ * @param The type of the remote location.
+ * @param The type of the remote method return.
+ * @param method The remote method and parameters to invoke.
+ * @param timeOutMs Timeout for each individual call.
+ * @param controller Fairness manager to control handlers assigned per NS.
+ * @param orderedLocations List of remote locations to call concurrently.
+ * @param callables Invoke method for each NameNode.
+ * @return Result of invoking the method per subcluster (list of results),
+ * This includes the exception for each remote location.
+ * @throws IOException If there are errors invoking the method.
+ */
+ @Override
+ protected List> getRemoteResults(
+ RemoteMethod method, long timeOutMs, RouterRpcFairnessPolicyController controller,
+ List orderedLocations, List> callables) throws IOException {
+ final UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
+ final Method m = method.getMethod();
+ final CompletableFuture[] futures =
+ new CompletableFuture[callables.size()];
+ int i = 0;
+ for (Callable callable : callables) {
+ CompletableFuture future = null;
+ try {
+ callable.call();
+ future = getCompletableFuture();
+ } catch (Exception e) {
+ future = new CompletableFuture<>();
+ future.completeExceptionally(warpCompletionException(e));
+ }
+ futures[i++] = future;
+ }
+
+ asyncCompleteWith(CompletableFuture.allOf(futures)
+ .handle((unused, throwable) -> {
+ try {
+ return processFutures(method, m, orderedLocations, Arrays.asList(futures));
+ } catch (InterruptedException e) {
+ LOG.error("Unexpected error while invoking API: {}", e.getMessage());
+ throw warpCompletionException(new IOException(
+ "Unexpected error while invoking API " + e.getMessage(), e));
+ } finally {
+ releasePermit(CONCURRENT_NS, ugi, method, controller);
+ }
+ }));
+ return asyncReturn(List.class);
+ }
+
+ /**
+ * Invokes a ClientProtocol method against the specified namespace.
+ *
+ * Re-throws exceptions generated by the remote RPC call as either
+ * RemoteException or IOException.
+ *
+ * @param The type of the remote location.
+ * @param The type of the remote method return.
+ * @param location RemoteLocation to invoke.
+ * @param method The remote method and parameters to invoke.
+ * @return Result of invoking the method per subcluster (list of results),
+ * This includes the exception for each remote location.
+ * @throws IOException If there are errors invoking the method.
+ */
+ @Override
+ public List> invokeSingle(
+ T location, RemoteMethod method) throws IOException {
+ final UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
+ final Method m = method.getMethod();
+ String ns = location.getNameserviceId();
+ boolean isObserverRead = isObserverReadEligible(ns, m);
+ final List extends FederationNamenodeContext> namenodes =
+ getOrderedNamenodes(ns, isObserverRead);
+ RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
+ acquirePermit(ns, ugi, method, controller);
+ asyncTry(() -> {
+ Class> proto = method.getProtocol();
+ Object[] paramList = method.getParams(location);
+ invokeMethod(ugi, namenodes, isObserverRead, proto, m, paramList);
+ asyncApply((ApplyFunction) result -> {
+ RemoteResult remoteResult = new RemoteResult<>(location, result);
+ return Collections.singletonList(remoteResult);
+ });
+ });
+ asyncCatch((o, ioe) -> {
+ throw processException(ioe, location);
+ }, IOException.class);
+ asyncFinally(o -> {
+ releasePermit(ns, ugi, method, controller);
+ return o;
+ });
+ return asyncReturn(List.class);
+ }
+
+ /**
+ * Invokes a ClientProtocol method against the specified namespace.
+ *
+ * Re-throws exceptions generated by the remote RPC call as either
+ * RemoteException or IOException.
+ *
+ * @param nsId Target namespace for the method.
+ * @param method The remote method and parameters to invoke.
+ * @return The result of invoking the method.
+ * @throws IOException If the invoke generated an error.
+ */
+ @Override
+ public Object invokeSingle(final String nsId, RemoteMethod method)
+ throws IOException {
+ UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
+ RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
+ acquirePermit(nsId, ugi, method, controller);
+ asyncTry(() -> {
+ boolean isObserverRead = isObserverReadEligible(nsId, method.getMethod());
+ List extends FederationNamenodeContext> nns = getOrderedNamenodes(nsId, isObserverRead);
+ RemoteLocationContext loc = new RemoteLocation(nsId, "/", "/");
+ Class> proto = method.getProtocol();
+ Method m = method.getMethod();
+ Object[] params = method.getParams(loc);
+ invokeMethod(ugi, nns, isObserverRead, proto, m, params);
+ });
+ asyncFinally(o -> {
+ releasePermit(nsId, ugi, method, controller);
+ return o;
+ });
+ return null;
+ }
+
+ /**
+ * Invokes a single proxy call for a single location.
+ *
+ * Re-throws exceptions generated by the remote RPC call as either
+ * RemoteException or IOException.
+ *
+ * @param location RemoteLocation to invoke.
+ * @param remoteMethod The remote method and parameters to invoke.
+ * @param clazz Class for the return type.
+ * @param The type of the remote method return.
+ * @return The result of invoking the method if successful.
+ * @throws IOException If the invoke generated an error.
+ */
+ public T invokeSingle(
+ final RemoteLocationContext location,
+ RemoteMethod remoteMethod, Class clazz) throws IOException {
+ List locations = Collections.singletonList(location);
+ invokeSequential(locations, remoteMethod);
+ return asyncReturn(clazz);
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
index d25e5ae4d3012..70b5034272d08 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
@@ -400,7 +400,7 @@ public String getAcceptedPermitsPerNsJSON() {
* NN + current user.
* @throws IOException If we cannot get a connection to the NameNode.
*/
- private ConnectionContext getConnection(UserGroupInformation ugi, String nsId,
+ protected ConnectionContext getConnection(UserGroupInformation ugi, String nsId,
String rpcAddress, Class> proto) throws IOException {
ConnectionContext connection = null;
try {
@@ -462,7 +462,7 @@ private static IOException toIOException(Exception e) {
* @return Retry decision.
* @throws IOException An IO Error occurred.
*/
- private RetryDecision shouldRetry(
+ protected RetryDecision shouldRetry(
final IOException ioe, final int retryCount, final String nsId,
final FederationNamenodeContext namenode,
final boolean listObserverFirst) throws IOException {
@@ -526,11 +526,12 @@ public Object invokeMethod(
if (rpcMonitor != null) {
rpcMonitor.proxyOp();
}
- boolean failover = false;
- boolean shouldUseObserver = useObserver;
+
+ ExecutionStatus status = new ExecutionStatus(false, useObserver);
Map ioes = new LinkedHashMap<>();
for (FederationNamenodeContext namenode : namenodes) {
- if (!shouldUseObserver && (namenode.getState() == FederationNamenodeServiceState.OBSERVER)) {
+ if (!status.isShouldUseObserver()
+ && (namenode.getState() == FederationNamenodeServiceState.OBSERVER)) {
continue;
}
ConnectionContext connection = null;
@@ -541,83 +542,12 @@ public Object invokeMethod(
ProxyAndInfo> client = connection.getClient();
final Object proxy = client.getProxy();
- ret = invoke(nsId, namenode, useObserver, 0, method, proxy, params);
- if (failover &&
- FederationNamenodeServiceState.OBSERVER != namenode.getState()) {
- // Success on alternate server, update
- InetSocketAddress address = client.getAddress();
- namenodeResolver.updateActiveNamenode(nsId, address);
- }
- if (this.rpcMonitor != null) {
- this.rpcMonitor.proxyOpComplete(true, nsId, namenode.getState());
- }
- if (this.router.getRouterClientMetrics() != null) {
- this.router.getRouterClientMetrics().incInvokedMethod(method);
- }
+ ret = invoke(namenode, useObserver, 0, method, proxy, params);
+ postProcessResult(method, status, namenode, nsId, client);
return ret;
} catch (IOException ioe) {
ioes.put(namenode, ioe);
- if (ioe instanceof ObserverRetryOnActiveException) {
- LOG.info("Encountered ObserverRetryOnActiveException from {}."
- + " Retry active namenode directly.", namenode);
- shouldUseObserver = false;
- } else if (ioe instanceof StandbyException) {
- // Fail over indicated by retry policy and/or NN
- if (this.rpcMonitor != null) {
- this.rpcMonitor.proxyOpFailureStandby(nsId);
- }
- failover = true;
- } else if (isUnavailableException(ioe)) {
- if (this.rpcMonitor != null) {
- this.rpcMonitor.proxyOpFailureCommunicate(nsId);
- }
- if (FederationNamenodeServiceState.OBSERVER == namenode.getState()) {
- namenodeResolver.updateUnavailableNamenode(nsId,
- NetUtils.createSocketAddr(namenode.getRpcAddress()));
- } else {
- failover = true;
- }
- } else if (ioe instanceof RemoteException) {
- if (this.rpcMonitor != null) {
- this.rpcMonitor.proxyOpComplete(true, nsId, namenode.getState());
- }
- RemoteException re = (RemoteException) ioe;
- ioe = re.unwrapRemoteException();
- ioe = getCleanException(ioe);
- // RemoteException returned by NN
- throw ioe;
- } else if (ioe instanceof ConnectionNullException) {
- if (this.rpcMonitor != null) {
- this.rpcMonitor.proxyOpFailureCommunicate(nsId);
- }
- LOG.error("Get connection for {} {} error: {}", nsId, rpcAddress,
- ioe.getMessage());
- // Throw StandbyException so that client can retry
- StandbyException se = new StandbyException(ioe.getMessage());
- se.initCause(ioe);
- throw se;
- } else if (ioe instanceof NoNamenodesAvailableException) {
- IOException cause = (IOException) ioe.getCause();
- if (this.rpcMonitor != null) {
- this.rpcMonitor.proxyOpNoNamenodes(nsId);
- }
- LOG.error("Cannot get available namenode for {} {} error: {}",
- nsId, rpcAddress, ioe.getMessage());
- // Rotate cache so that client can retry the next namenode in the cache
- if (shouldRotateCache(cause)) {
- this.namenodeResolver.rotateCache(nsId, namenode, useObserver);
- }
- // Throw RetriableException so that client can retry
- throw new RetriableException(ioe);
- } else {
- // Other communication error, this is a failure
- // Communication retries are handled by the retry policy
- if (this.rpcMonitor != null) {
- this.rpcMonitor.proxyOpFailureCommunicate(nsId);
- this.rpcMonitor.proxyOpComplete(false, nsId, namenode.getState());
- }
- throw ioe;
- }
+ handleInvokeMethodIOException(namenode, ioe, status, useObserver);
} finally {
if (connection != null) {
connection.release();
@@ -628,6 +558,24 @@ public Object invokeMethod(
this.rpcMonitor.proxyOpComplete(false, null, null);
}
+ return handlerAllNamenodeFail(namenodes, method, ioes, params);
+ }
+
+ /**
+ * All namenodes cannot successfully process the RPC request,
+ * throw corresponding exceptions according to the exception type of each namenode.
+ *
+ * @param namenodes A prioritized list of namenodes within the same nameservice.
+ * @param method Remote ClientProtocol method to invoke.
+ * @param ioes The exception type of each namenode.
+ * @param params Variable list of parameters matching the method.
+ * @return null
+ * @throws IOException Corresponding IOException according to the
+ * exception type of each namenode.
+ */
+ protected Object handlerAllNamenodeFail(
+ List extends FederationNamenodeContext> namenodes, Method method,
+ Map ioes, Object[] params) throws IOException {
// All namenodes were unavailable or in standby
String msg = "No namenode available to invoke " + method.getName() + " " +
Arrays.deepToString(params) + " in " + namenodes + " from " +
@@ -658,14 +606,120 @@ public Object invokeMethod(
}
}
+ /**
+ * The RPC request is successfully processed by the NameNode, the NameNode status
+ * in the router cache is updated according to the ExecutionStatus.
+ *
+ * @param method Remote method to invoke.
+ * @param status Current execution status.
+ * @param namenode The namenode that successfully processed this RPC request.
+ * @param nsId Nameservice ID.
+ * @param client Connection client.
+ * @throws IOException If the state store cannot be accessed.
+ */
+ protected void postProcessResult(Method method, ExecutionStatus status,
+ FederationNamenodeContext namenode, String nsId, ProxyAndInfo> client) throws IOException {
+ if (status.isFailOver() &&
+ FederationNamenodeServiceState.OBSERVER != namenode.getState()) {
+ // Success on alternate server, update
+ InetSocketAddress address = client.getAddress();
+ namenodeResolver.updateActiveNamenode(nsId, address);
+ }
+ if (this.rpcMonitor != null) {
+ this.rpcMonitor.proxyOpComplete(true, nsId, namenode.getState());
+ }
+ if (this.router.getRouterClientMetrics() != null) {
+ this.router.getRouterClientMetrics().incInvokedMethod(method);
+ }
+ }
+
+ /**
+ * The RPC request to the NameNode throws an exception,
+ * handle it according to the type of exception.
+ *
+ * @param namenode The namenode that processed this RPC request.
+ * @param ioe The exception thrown by this RPC request.
+ * @param status The current execution status.
+ * @param useObserver Whether to use observer namenodes.
+ * @throws IOException If it cannot invoke the method.
+ */
+ protected void handleInvokeMethodIOException(final FederationNamenodeContext namenode,
+ IOException ioe, final ExecutionStatus status, boolean useObserver) throws IOException {
+ String nsId = namenode.getNameserviceId();
+ String rpcAddress = namenode.getRpcAddress();
+ if (ioe instanceof ObserverRetryOnActiveException) {
+ LOG.info("Encountered ObserverRetryOnActiveException from {}."
+ + " Retry active namenode directly.", namenode);
+ status.setShouldUseObserver(false);
+ } else if (ioe instanceof StandbyException) {
+ // Fail over indicated by retry policy and/or NN
+ if (this.rpcMonitor != null) {
+ this.rpcMonitor.proxyOpFailureStandby(nsId);
+ }
+ status.setFailOver(true);
+ } else if (isUnavailableException(ioe)) {
+ if (this.rpcMonitor != null) {
+ this.rpcMonitor.proxyOpFailureCommunicate(nsId);
+ }
+ if (FederationNamenodeServiceState.OBSERVER == namenode.getState()) {
+ namenodeResolver.updateUnavailableNamenode(nsId,
+ NetUtils.createSocketAddr(namenode.getRpcAddress()));
+ } else {
+ status.setFailOver(true);
+ }
+ } else if (ioe instanceof RemoteException) {
+ if (this.rpcMonitor != null) {
+ this.rpcMonitor.proxyOpComplete(true, nsId, namenode.getState());
+ }
+ RemoteException re = (RemoteException) ioe;
+ ioe = re.unwrapRemoteException();
+ ioe = getCleanException(ioe);
+ // RemoteException returned by NN
+ throw ioe;
+ } else if (ioe instanceof ConnectionNullException) {
+ if (this.rpcMonitor != null) {
+ this.rpcMonitor.proxyOpFailureCommunicate(nsId);
+ }
+ LOG.error("Get connection for {} {} error: {}", nsId, rpcAddress,
+ ioe.getMessage());
+ // Throw StandbyException so that client can retry
+ StandbyException se = new StandbyException(ioe.getMessage());
+ se.initCause(ioe);
+ throw se;
+ } else if (ioe instanceof NoNamenodesAvailableException) {
+ IOException cause = (IOException) ioe.getCause();
+ if (this.rpcMonitor != null) {
+ this.rpcMonitor.proxyOpNoNamenodes(nsId);
+ }
+ LOG.error("Cannot get available namenode for {} {} error: {}",
+ nsId, rpcAddress, ioe.getMessage());
+ // Rotate cache so that client can retry the next namenode in the cache
+ if (shouldRotateCache(cause)) {
+ this.namenodeResolver.rotateCache(nsId, namenode, useObserver);
+ }
+ // Throw RetriableException so that client can retry
+ throw new RetriableException(ioe);
+ } else {
+ // Other communication error, this is a failure
+ // Communication retries are handled by the retry policy
+ if (this.rpcMonitor != null) {
+ this.rpcMonitor.proxyOpFailureCommunicate(nsId);
+ this.rpcMonitor.proxyOpComplete(false, nsId, namenode.getState());
+ }
+ throw ioe;
+ }
+ }
+
/**
* For tracking some information about the actual client.
* It adds trace info "clientIp:ip", "clientPort:port",
* "clientId:id", "clientCallId:callId" and "realUser:userName"
* in the caller context, removing the old values if they were
* already present.
+ *
+ * @param ugi User group information.
*/
- private void addClientInfoToCallerContext(UserGroupInformation ugi) {
+ protected void addClientInfoToCallerContext(UserGroupInformation ugi) {
CallerContext ctx = CallerContext.getCurrent();
String origContext = ctx == null ? null : ctx.getContext();
byte[] origSignature = ctx == null ? null : ctx.getSignature();
@@ -706,7 +760,8 @@ private void addClientInfoToCallerContext(UserGroupInformation ugi) {
* Re-throws exceptions generated by the remote RPC call as either
* RemoteException or IOException.
*
- * @param nsId Identifier for the namespace
+ * @param namenode namenode context.
+ * @param listObserverFirst Observer read case, observer NN will be ranked first.
* @param retryCount Current retry times
* @param method Method to invoke
* @param obj Target object for the method
@@ -714,8 +769,8 @@ private void addClientInfoToCallerContext(UserGroupInformation ugi) {
* @return Response from the remote server
* @throws IOException If error occurs.
*/
- private Object invoke(
- String nsId, FederationNamenodeContext namenode, Boolean listObserverFirst,
+ protected Object invoke(
+ FederationNamenodeContext namenode, Boolean listObserverFirst,
int retryCount, final Method method,
final Object obj, final Object... params) throws IOException {
try {
@@ -725,36 +780,58 @@ private Object invoke(
return null;
} catch (InvocationTargetException e) {
Throwable cause = e.getCause();
- if (cause instanceof IOException) {
- IOException ioe = (IOException) cause;
-
- // Check if we should retry.
- RetryDecision decision = shouldRetry(ioe, retryCount, nsId, namenode, listObserverFirst);
- if (decision == RetryDecision.RETRY) {
- if (this.rpcMonitor != null) {
- this.rpcMonitor.proxyOpRetries();
- }
+ return handlerInvokeException(namenode, listObserverFirst,
+ retryCount, method, obj, cause, params);
+ }
+ }
- // retry
- return invoke(nsId, namenode, listObserverFirst, ++retryCount, method, obj, params);
- } else if (decision == RetryDecision.FAILOVER_AND_RETRY) {
- // failover, invoker looks for standby exceptions for failover.
- if (ioe instanceof StandbyException) {
- throw ioe;
- } else if (isUnavailableException(ioe)) {
- throw ioe;
- } else {
- throw new StandbyException(ioe.getMessage());
- }
- } else {
+ /**
+ * Handle the exception when an RPC request to the NameNode throws an exception.
+ *
+ * @param namenode namenode context.
+ * @param listObserverFirst Observer read case, observer NN will be ranked first.
+ * @param retryCount Current retry times
+ * @param method Method to invoke
+ * @param obj Target object for the method
+ * @param e The exception thrown by the current invocation.
+ * @param params Variable parameters
+ * @return Response from the remote server
+ * @throws IOException If error occurs.
+ */
+ protected Object handlerInvokeException(FederationNamenodeContext namenode,
+ Boolean listObserverFirst, int retryCount, Method method, Object obj,
+ Throwable e, Object[] params) throws IOException {
+ String nsId = namenode.getNameserviceId();
+ if (e instanceof IOException) {
+ IOException ioe = (IOException) e;
+
+ // Check if we should retry.
+ RetryDecision decision = shouldRetry(ioe, retryCount, nsId, namenode, listObserverFirst);
+ if (decision == RetryDecision.RETRY) {
+ if (this.rpcMonitor != null) {
+ this.rpcMonitor.proxyOpRetries();
+ }
+
+ // retry
+ return invoke(namenode, listObserverFirst, ++retryCount, method, obj, params);
+ } else if (decision == RetryDecision.FAILOVER_AND_RETRY) {
+ // failover, invoker looks for standby exceptions for failover.
+ if (ioe instanceof StandbyException) {
throw ioe;
+ } else if (isUnavailableException(ioe)) {
+ throw ioe;
+ } else {
+ throw new StandbyException(ioe.getMessage());
}
} else {
- throw new IOException(e);
+ throw ioe;
}
+ } else {
+ throw new IOException(e);
}
}
+
/**
* Check if the exception comes from an unavailable subcluster.
* @param ioe IOException to check.
@@ -817,7 +894,7 @@ private boolean isClusterUnAvailable(
* @param ioe Exception to clean up.
* @return Copy of the original exception with a clean message.
*/
- private static IOException getCleanException(IOException ioe) {
+ protected static IOException getCleanException(IOException ioe) {
IOException ret = null;
String msg = ioe.getMessage();
@@ -1185,7 +1262,7 @@ public RemoteResult invokeSequential(
* @param loc Location we are processing.
* @return Exception processed for federation.
*/
- private IOException processException(
+ protected IOException processException(
IOException ioe, RemoteLocationContext loc) {
if (ioe instanceof RemoteException) {
@@ -1251,7 +1328,7 @@ static String processExceptionMsg(
* @return True if the result is an instance of the required class or if the
* expected class is null.
*/
- private static boolean isExpectedClass(Class> expectedClass, Object clazz) {
+ protected static boolean isExpectedClass(Class> expectedClass, Object clazz) {
if (expectedClass == null) {
return true;
} else if (clazz == null) {
@@ -1269,7 +1346,7 @@ private static boolean isExpectedClass(Class> expectedClass, Object clazz) {
* @return True if the result is equals to the expected value or if the
* expected value is null.
*/
- private static boolean isExpectedValue(Object expectedValue, Object value) {
+ protected static boolean isExpectedValue(Object expectedValue, Object value) {
if (expectedValue == null) {
return true;
} else if (value == null) {
@@ -1414,7 +1491,26 @@ public Map invokeConcurrent(
throws IOException {
final List> results = invokeConcurrent(
locations, method, standby, timeOutMs, clazz);
+ return postProcessResult(requireResponse, results);
+ }
+ /**
+ * Post-process the results returned by
+ * {@link RouterRpcClient#invokeConcurrent(Collection, RemoteMethod, boolean, long, Class)}.
+ *
+ * @param requireResponse If true an exception will be thrown if all calls do
+ * not complete. If false exceptions are ignored and all data results
+ * successfully received are returned.
+ * @param results Result of invoking the method per subcluster (list of results),
+ * This includes the exception for each remote location.
+ * @return Result of invoking the method per subcluster: nsId to result.
+ * @param The type of the remote location.
+ * @param The type of the remote method return.
+ * @throws IOException If requiredResponse=true and any of the calls throw an
+ * exception.
+ */
+ protected static Map postProcessResult(
+ boolean requireResponse, List> results) throws IOException {
// Go over the results and exceptions
final Map ret = new TreeMap<>();
final List thrownExceptions = new ArrayList<>();
@@ -1480,27 +1576,10 @@ public Map invokeConcurrent(
throw new IOException("No remote locations available");
} else if (locations.size() == 1 && timeOutMs <= 0) {
// Shortcut, just one call
- T location = locations.iterator().next();
- String ns = location.getNameserviceId();
- boolean isObserverRead = isObserverReadEligible(ns, m);
- final List extends FederationNamenodeContext> namenodes =
- getOrderedNamenodes(ns, isObserverRead);
- RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
- acquirePermit(ns, ugi, method, controller);
- try {
- Class> proto = method.getProtocol();
- Object[] paramList = method.getParams(location);
- R result = (R) invokeMethod(
- ugi, namenodes, isObserverRead, proto, m, paramList);
- RemoteResult remoteResult = new RemoteResult<>(location, result);
- return Collections.singletonList(remoteResult);
- } catch (IOException ioe) {
- // Localize the exception
- throw processException(ioe, location);
- } finally {
- releasePermit(ns, ugi, method, controller);
- }
+ return invokeSingle(locations.iterator().next(), method);
}
+ RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
+ acquirePermit(CONCURRENT_NS, ugi, method, controller);
List orderedLocations = new ArrayList<>();
List> callables = new ArrayList<>();
@@ -1551,8 +1630,29 @@ public Map invokeConcurrent(
this.router.getRouterClientMetrics().incInvokedConcurrent(m);
}
- RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
- acquirePermit(CONCURRENT_NS, ugi, method, controller);
+ return getRemoteResults(method, timeOutMs, controller, orderedLocations, callables);
+ }
+
+ /**
+ * Invokes multiple concurrent proxy calls to different clients. Returns an
+ * array of results.
+ *
+ * @param The type of the remote location.
+ * @param The type of the remote method return.
+ * @param method The remote method and parameters to invoke.
+ * @param timeOutMs Timeout for each individual call.
+ * @param controller Fairness manager to control handlers assigned per NS.
+ * @param orderedLocations List of remote locations to call concurrently.
+ * @param callables Invoke method for each NameNode.
+ * @return Result of invoking the method per subcluster (list of results),
+ * This includes the exception for each remote location.
+ * @throws IOException If there are errors invoking the method.
+ */
+ protected List> getRemoteResults(
+ RemoteMethod method, long timeOutMs, RouterRpcFairnessPolicyController controller,
+ List orderedLocations, List> callables) throws IOException {
+ final UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
+ final Method m = method.getMethod();
try {
List> futures = null;
if (timeOutMs > 0) {
@@ -1561,42 +1661,7 @@ public Map invokeConcurrent(
} else {
futures = executorService.invokeAll(callables);
}
- List> results = new ArrayList<>();
- for (int i=0; i future = futures.get(i);
- R result = (R) future.get();
- results.add(new RemoteResult<>(location, result));
- } catch (CancellationException ce) {
- T loc = orderedLocations.get(i);
- String msg = "Invocation to \"" + loc + "\" for \""
- + method.getMethodName() + "\" timed out";
- LOG.error(msg);
- IOException ioe = new SubClusterTimeoutException(msg);
- results.add(new RemoteResult<>(location, ioe));
- } catch (ExecutionException ex) {
- Throwable cause = ex.getCause();
- LOG.debug("Cannot execute {} in {}: {}",
- m.getName(), location, cause.getMessage());
-
- // Convert into IOException if needed
- IOException ioe = null;
- if (cause instanceof IOException) {
- ioe = (IOException) cause;
- } else {
- ioe = new IOException("Unhandled exception while proxying API " +
- m.getName() + ": " + cause.getMessage(), cause);
- }
-
- // Store the exceptions
- results.add(new RemoteResult<>(location, ioe));
- }
- }
- if (rpcMonitor != null) {
- rpcMonitor.proxyOpComplete(true, CONCURRENT, null);
- }
- return results;
+ return processFutures(method, m, orderedLocations, futures);
} catch (RejectedExecutionException e) {
if (rpcMonitor != null) {
rpcMonitor.proxyOpFailureClientOverloaded();
@@ -1616,6 +1681,99 @@ public Map invokeConcurrent(
}
}
+ /**
+ * Handle all futures during the invokeConcurrent call process.
+ *
+ * @param The type of the remote location.
+ * @param The type of the remote method return.
+ * @param method The remote method and parameters to invoke.
+ * @param m The method to invoke.
+ * @param orderedLocations List of remote locations to call concurrently.
+ * @param futures all futures during the invokeConcurrent call process.
+ * @return Result of invoking the method per subcluster (list of results),
+ * This includes the exception for each remote location.
+ * @throws InterruptedException if the current thread was interrupted while waiting.
+ */
+ protected List> processFutures(
+ RemoteMethod method, Method m, final List orderedLocations,
+ final List> futures) throws InterruptedException{
+ List> results = new ArrayList<>();
+ for (int i = 0; i< futures.size(); i++) {
+ T location = orderedLocations.get(i);
+ try {
+ Future future = futures.get(i);
+ R result = (R) future.get();
+ results.add(new RemoteResult<>(location, result));
+ } catch (CancellationException ce) {
+ T loc = orderedLocations.get(i);
+ String msg = "Invocation to \"" + loc + "\" for \""
+ + method.getMethodName() + "\" timed out";
+ LOG.error(msg);
+ IOException ioe = new SubClusterTimeoutException(msg);
+ results.add(new RemoteResult<>(location, ioe));
+ } catch (ExecutionException ex) {
+ Throwable cause = ex.getCause();
+ LOG.debug("Cannot execute {} in {}: {}",
+ m.getName(), location, cause.getMessage());
+
+ // Convert into IOException if needed
+ IOException ioe = null;
+ if (cause instanceof IOException) {
+ ioe = (IOException) cause;
+ } else {
+ ioe = new IOException("Unhandled exception while proxying API " +
+ m.getName() + ": " + cause.getMessage(), cause);
+ }
+
+ // Store the exceptions
+ results.add(new RemoteResult<>(location, ioe));
+ }
+ }
+ if (rpcMonitor != null) {
+ rpcMonitor.proxyOpComplete(true, CONCURRENT, null);
+ }
+ return results;
+ }
+
+ /**
+ * Invokes a ClientProtocol method against the specified namespace.
+ *
+ * Re-throws exceptions generated by the remote RPC call as either
+ * RemoteException or IOException.
+ *
+ * @param The type of the remote location.
+ * @param The type of the remote method return.
+ * @param location RemoteLocation to invoke.
+ * @param method The remote method and parameters to invoke.
+ * @return Result of invoking the method per subcluster (list of results),
+ * This includes the exception for each remote location.
+ * @throws IOException If there are errors invoking the method.
+ */
+ public List> invokeSingle(
+ T location, RemoteMethod method) throws IOException {
+ final UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
+ final Method m = method.getMethod();
+ String ns = location.getNameserviceId();
+ boolean isObserverRead = isObserverReadEligible(ns, m);
+ final List extends FederationNamenodeContext> namenodes =
+ getOrderedNamenodes(ns, isObserverRead);
+ RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
+ acquirePermit(ns, ugi, method, controller);
+ try {
+ Class> proto = method.getProtocol();
+ Object[] paramList = method.getParams(location);
+ R result = (R) invokeMethod(
+ ugi, namenodes, isObserverRead, proto, m, paramList);
+ RemoteResult remoteResult = new RemoteResult<>(location, result);
+ return Collections.singletonList(remoteResult);
+ } catch (IOException ioe) {
+ // Localize the exception
+ throw processException(ioe, location);
+ } finally {
+ releasePermit(ns, ugi, method, controller);
+ }
+ }
+
/**
* Transfer origin thread local context which is necessary to current
* worker thread when invoking method concurrently by executor service.
@@ -1624,7 +1782,7 @@ public Map invokeConcurrent(
* @param originContext origin CallerContext which should be transferred
* to server side.
*/
- private void transferThreadLocalContext(
+ protected void transferThreadLocalContext(
final Call originCall, final CallerContext originContext) {
Server.getCurCall().set(originCall);
CallerContext.setCurrent(originContext);
@@ -1675,7 +1833,7 @@ private String getNameserviceForBlockPoolId(final String bpId)
* @param controller fairness policy controller to acquire permit from
* @throws IOException If permit could not be acquired for the nsId.
*/
- private void acquirePermit(final String nsId, final UserGroupInformation ugi,
+ protected void acquirePermit(final String nsId, final UserGroupInformation ugi,
final RemoteMethod m, RouterRpcFairnessPolicyController controller)
throws IOException {
if (controller != null) {
@@ -1708,7 +1866,7 @@ private void acquirePermit(final String nsId, final UserGroupInformation ugi,
* @param m Remote method that needs to be invoked.
* @param controller fairness policy controller to release permit from
*/
- private void releasePermit(final String nsId, final UserGroupInformation ugi,
+ protected void releasePermit(final String nsId, final UserGroupInformation ugi,
final RemoteMethod m, RouterRpcFairnessPolicyController controller) {
if (controller != null) {
controller.releasePermit(nsId);
@@ -1782,7 +1940,7 @@ private String getCurrentFairnessPolicyControllerClassName() {
* @return A prioritized list of NNs to use for communication.
* @throws IOException If a NN cannot be located for the nameservice ID.
*/
- private List extends FederationNamenodeContext> getOrderedNamenodes(String nsId,
+ protected List extends FederationNamenodeContext> getOrderedNamenodes(String nsId,
boolean isObserverRead) throws IOException {
final List extends FederationNamenodeContext> namenodes;
@@ -1802,7 +1960,7 @@ && isNamespaceStateIdFresh(nsId)
return namenodes;
}
- private boolean isObserverReadEligible(String nsId, Method method) {
+ protected boolean isObserverReadEligible(String nsId, Method method) {
return isReadCall(method) && isNamespaceObserverReadEligible(nsId);
}
@@ -1857,7 +2015,7 @@ private LongAccumulator getTimeOfLastCallToActive(String namespaceId) {
* {@link RouterRpcClient#isUnavailableException(IOException) unavailable exception},
* otherwise false.
*/
- private boolean shouldRotateCache(IOException ioe) {
+ protected boolean shouldRotateCache(IOException ioe) {
if (isUnavailableException(ioe)) {
return true;
}
@@ -1868,4 +2026,61 @@ private boolean shouldRotateCache(IOException ioe) {
}
return isUnavailableException(ioe);
}
+
+
+ /**
+ * The {@link ExecutionStatus} class is a utility class used to track the status of
+ * execution operations performed by the {@link RouterRpcClient}.
+ * It encapsulates the state of an operation, including whether it has completed,
+ * if a failover to a different NameNode should be attempted, and if an observer
+ * NameNode should be used for the operation.
+ *
+ * The status is represented by a flag that indicate the current state of
+ * the execution. The flag can be checked individually to determine how to
+ * proceed with the operation or to handle its results.
+ */
+ protected static class ExecutionStatus {
+
+ /** A byte field used to store the state flags. */
+ private byte flag;
+ private static final byte FAIL_OVER_BIT = 1;
+ private static final byte SHOULD_USE_OBSERVER_BIT = 2;
+ private static final byte COMPLETE_BIT = 4;
+
+ ExecutionStatus() {
+ this(false, false);
+ }
+
+ ExecutionStatus(boolean failOver, boolean shouldUseObserver) {
+ this.flag = 0;
+ setFailOver(failOver);
+ setShouldUseObserver(shouldUseObserver);
+ setComplete(false);
+ }
+
+ private void setFailOver(boolean failOver) {
+ flag = (byte) (failOver ? (flag | FAIL_OVER_BIT) : (flag & ~FAIL_OVER_BIT));
+ }
+
+ private void setShouldUseObserver(boolean shouldUseObserver) {
+ flag = (byte) (shouldUseObserver ?
+ (flag | SHOULD_USE_OBSERVER_BIT) : (flag & ~SHOULD_USE_OBSERVER_BIT));
+ }
+
+ void setComplete(boolean complete) {
+ flag = (byte) (complete ? (flag | COMPLETE_BIT) : (flag & ~COMPLETE_BIT));
+ }
+
+ boolean isFailOver() {
+ return (flag & FAIL_OVER_BIT) != 0;
+ }
+
+ boolean isShouldUseObserver() {
+ return (flag & SHOULD_USE_OBSERVER_BIT) != 0;
+ }
+
+ boolean isComplete() {
+ return (flag & COMPLETE_BIT) != 0;
+ }
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index 217c62ff28762..c23c21c6dfb67 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -26,6 +26,12 @@
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_QUEUE_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_QUEUE_SIZE_KEY;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ENABLE_ASYNC;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ENABLE_ASYNC_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DN_REPORT_CACHE_EXPIRE;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DN_REPORT_CACHE_EXPIRE_MS_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION;
@@ -50,14 +56,18 @@
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
+import org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil;
import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.hadoop.thirdparty.com.google.common.cache.CacheLoader;
import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache;
@@ -198,7 +208,8 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
private static final Logger LOG =
LoggerFactory.getLogger(RouterRpcServer.class);
-
+ private ExecutorService asyncRouterHandler;
+ private ExecutorService asyncRouterResponder;
/** Configuration for the RPC server. */
private Configuration conf;
@@ -255,6 +266,8 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
private RouterRenameOption routerRenameOption;
/** Schedule the router federation rename jobs. */
private BalanceProcedureScheduler fedRenameScheduler;
+ private boolean enableAsync;
+
/**
* Construct a router RPC server.
*
@@ -284,6 +297,12 @@ public RouterRpcServer(Configuration conf, Router router,
int handlerQueueSize = this.conf.getInt(DFS_ROUTER_HANDLER_QUEUE_SIZE_KEY,
DFS_ROUTER_HANDLER_QUEUE_SIZE_DEFAULT);
+ this.enableAsync = conf.getBoolean(DFS_ROUTER_RPC_ENABLE_ASYNC,
+ DFS_ROUTER_RPC_ENABLE_ASYNC_DEFAULT);
+ LOG.info("Router enable async {}", this.enableAsync);
+ if (this.enableAsync) {
+ initAsyncThreadPool();
+ }
// Override Hadoop Common IPC setting
int readerQueueSize = this.conf.getInt(DFS_ROUTER_READER_QUEUE_SIZE_KEY,
DFS_ROUTER_READER_QUEUE_SIZE_DEFAULT);
@@ -391,15 +410,17 @@ public RouterRpcServer(Configuration conf, Router router,
}
// Create the client
- this.rpcClient = new RouterRpcClient(this.conf, this.router,
- this.namenodeResolver, this.rpcMonitor, routerStateIdContext);
-
- // Initialize modules
- this.quotaCall = new Quota(this.router, this);
+ if (this.enableAsync) {
+ this.rpcClient = new RouterAsyncRpcClient(this.conf, this.router,
+ this.namenodeResolver, this.rpcMonitor, routerStateIdContext);
+ } else {
+ this.rpcClient = new RouterRpcClient(this.conf, this.router,
+ this.namenodeResolver, this.rpcMonitor, routerStateIdContext);
+ }
this.nnProto = new RouterNamenodeProtocol(this);
+ this.quotaCall = new Quota(this.router, this);
this.clientProto = new RouterClientProtocol(conf, this);
this.routerProto = new RouterUserProtocol(this);
-
long dnCacheExpire = conf.getTimeDuration(
DN_REPORT_CACHE_EXPIRE,
DN_REPORT_CACHE_EXPIRE_MS_DEFAULT, TimeUnit.MILLISECONDS);
@@ -428,6 +449,27 @@ public RouterRpcServer(Configuration conf, Router router,
initRouterFedRename();
}
+ /**
+ * Init router async handlers and router async responders.
+ */
+ protected void initAsyncThreadPool() {
+ int asyncHandlerCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT,
+ DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT);
+ int asyncResponderCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT,
+ DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT);
+ if (asyncRouterHandler == null) {
+ LOG.info("init router async handler count: {}", asyncHandlerCount);
+ asyncRouterHandler = Executors.newFixedThreadPool(
+ asyncHandlerCount, new AsyncThreadFactory("router async handler "));
+ }
+ if (asyncRouterResponder == null) {
+ LOG.info("init router async responder count: {}", asyncResponderCount);
+ asyncRouterResponder = Executors.newFixedThreadPool(
+ asyncResponderCount, new AsyncThreadFactory("router async responder "));
+ }
+ AsyncRpcProtocolPBUtil.setWorker(asyncRouterResponder);
+ }
+
/**
* Clear expired namespace in the shared RouterStateIdContext.
*/
@@ -2066,4 +2108,26 @@ public ListenableFuture reload(
return executorService.submit(() -> load(type));
}
}
+
+ public boolean isAsync() {
+ return this.enableAsync;
+ }
+
+ public Executor getAsyncRouterHandler() {
+ return asyncRouterHandler;
+ }
+
+ private static class AsyncThreadFactory implements ThreadFactory {
+ private final String namePrefix;
+ private final AtomicInteger threadNumber = new AtomicInteger(1);
+
+ AsyncThreadFactory(String namePrefix) {
+ this.namePrefix = namePrefix;
+ }
+
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, namePrefix + threadNumber.getAndIncrement());
+ }
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ThreadLocalContext.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ThreadLocalContext.java
new file mode 100644
index 0000000000000..6faef079bd19a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ThreadLocalContext.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCPerformanceMonitor;
+import org.apache.hadoop.ipc.CallerContext;
+import org.apache.hadoop.ipc.Server;
+
+/**
+ * The ThreadLocalContext class is designed to capture and transfer the context of a
+ * thread-local environment within the Hadoop Distributed File System (HDFS) federation
+ * router operations. This is particularly useful for preserving the state across
+ * asynchronous operations where the context needs to be maintained consistently.
+ *
+ * The context includes details such as the current call being processed by the server, the
+ * caller's context, and performance monitoring timestamps. By transferring this context,
+ * the class ensures that the operations performed on worker threads correctly reflect
+ * the state of the original calling thread.
+ *
+ * Here is a high-level overview of the main components captured by this context:
+ *
+ * {@link Server.Call} - Represents the current server call.
+ * {@link CallerContext} - Stores information about the caller.
+ * startOpTime - Time for an operation to be received in the Router.
+ * proxyOpTime - Time for an operation to be sent to the Namenode.
+ *
+ *
+ * This class is typically used in scenarios where asynchronous processing is involved, to
+ * ensure that the thread executing the asynchronous task has the correct context applied.
+ *
+ * @see Server
+ * @see CallerContext
+ * @see FederationRPCPerformanceMonitor
+ */
+public class ThreadLocalContext {
+
+ /** The current server call being processed. */
+ private final Server.Call call;
+ /** The caller context containing information about the caller. */
+ private final CallerContext context;
+ /** Time for an operation to be received in the Router. */
+ private final long startOpTime;
+ /** Time for an operation to be sent to the Namenode. */
+ private final long proxyOpTime;
+
+ /**
+ * Constructs a new {@link ThreadLocalContext} instance, capturing the current
+ * thread-local context at the point of creation.
+ */
+ public ThreadLocalContext() {
+ this.call = Server.getCurCall().get();
+ this.context = CallerContext.getCurrent();
+ this.startOpTime = FederationRPCPerformanceMonitor.getStartOpTime();
+ this.proxyOpTime = FederationRPCPerformanceMonitor.getProxyOpTime();
+ }
+
+ /**
+ * Transfers the captured context to the current thread. This method is used to apply
+ * the context to worker threads that are processing asynchronous tasks, ensuring
+ * that the task execution reflects the state of the original calling thread.
+ */
+ public void transfer() {
+ if (call != null) {
+ Server.getCurCall().set(call);
+ }
+ if (context != null) {
+ CallerContext.setCurrent(context);
+ }
+ if (startOpTime != -1L) {
+ FederationRPCPerformanceMonitor.setStartOpTime(startOpTime);
+ }
+ if (proxyOpTime != -1L) {
+ FederationRPCPerformanceMonitor.setProxyOpTime(proxyOpTime);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "ThreadLocalContext{" +
+ "call=" + call +
+ ", context=" + context +
+ ", startOpTime=" + startOpTime +
+ ", proxyOpTime=" + proxyOpTime +
+ '}';
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncUtil.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncUtil.java
index d7ed04b08e368..834741b52db32 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncUtil.java
@@ -73,6 +73,7 @@ private AsyncUtil(){}
*
* {@code false} if {@code clazz} is {@link Boolean},
* -1 if {@code clazz} is {@link Long},
+ * -1 if {@code clazz} is {@link Integer},
* {@code null} for any other type.
*
*/
@@ -80,11 +81,14 @@ public static R asyncReturn(Class clazz) {
if (clazz == null) {
return null;
}
- if (clazz.equals(Boolean.class)) {
+ if (clazz.equals(Boolean.class)
+ || clazz.equals(boolean.class)) {
return (R) BOOLEAN_RESULT;
- } else if (clazz.equals(Long.class)) {
+ } else if (clazz.equals(Long.class)
+ || clazz.equals(long.class)) {
return (R) LONG_RESULT;
- } else if (clazz.equals(Integer.class)) {
+ } else if (clazz.equals(Integer.class)
+ || clazz.equals(int.class)) {
return (R) INT_RESULT;
}
return (R) NULL_RESULT;
@@ -140,6 +144,12 @@ public static void asyncComplete(R value) {
CompletableFuture.completedFuture(value));
}
+ /**
+ * Completes the current asynchronous operation with the specified completableFuture.
+ *
+ * @param completableFuture The completableFuture to complete the future with.
+ * @param The type of the value to be completed.
+ */
public static void asyncCompleteWith(CompletableFuture completableFuture) {
CUR_COMPLETABLE_FUTURE.set((CompletableFuture) completableFuture);
}
@@ -384,4 +394,13 @@ public static void asyncCurrent(
.handle((unused, throwable) -> then.apply(completableFutures));
CUR_COMPLETABLE_FUTURE.set((CompletableFuture) result);
}
+
+ /**
+ * Get the CompletableFuture object stored in the current thread's local variable.
+ *
+ * @return The completableFuture object.
+ */
+ public static CompletableFuture getCompletableFuture() {
+ return CUR_COMPLETABLE_FUTURE.get();
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
index 26b89ce0313fd..dda90b254b281 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
@@ -48,6 +48,14 @@
+
+ dfs.federation.router.rpc.async.enable
+ false
+
+ If true, router will process the RPC request asynchronously.
+
+
+
dfs.federation.router.rpc-address
0.0.0.0:8888
@@ -101,6 +109,22 @@
+
+ dfs.federation.router.rpc.async.handler.count
+ 2
+
+ The number of async handler for the router to handle RPC client requests.
+
+
+
+
+ dfs.federation.router.rpc.async.responder.count
+ 10
+
+ The number of async responder for the router to handle responses.
+
+
+
dfs.federation.router.connection.creator.queue-size
100
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestAsyncRpcProtocolPBUtil.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestAsyncRpcProtocolPBUtil.java
index c0cd4c898e828..15da20fdd113e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestAsyncRpcProtocolPBUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestAsyncRpcProtocolPBUtil.java
@@ -40,6 +40,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.concurrent.ForkJoinPool;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
import static org.junit.Assert.assertEquals;
@@ -53,6 +54,7 @@ public class TestAsyncRpcProtocolPBUtil {
@Before
public void setUp() throws IOException {
+ AsyncRpcProtocolPBUtil.setWorker(ForkJoinPool.commonPool());
Configuration conf = new Configuration();
RPC.setProtocolEngine(conf, TestRpcBase.TestRpcService.class,
ProtobufRpcEngine2.class);
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestRouterClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestRouterClientSideTranslatorPB.java
index d107bbe3bf7c3..3519a968c5b48 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestRouterClientSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestRouterClientSideTranslatorPB.java
@@ -52,6 +52,7 @@
import java.net.InetSocketAddress;
import java.util.EnumSet;
import java.util.List;
+import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.hadoop.crypto.CryptoProtocolVersion.ENCRYPTION_ZONES;
@@ -81,6 +82,7 @@ public class TestRouterClientSideTranslatorPB {
@BeforeClass
public static void setUp() throws Exception {
+ AsyncRpcProtocolPBUtil.setWorker(ForkJoinPool.commonPool());
conf = new HdfsConfiguration();
cluster = (new MiniDFSCluster.Builder(conf))
.numDataNodes(1).build();
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncRpcClient.java
new file mode 100644
index 0000000000000..e3429d493dc66
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncRpcClient.java
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.MockResolver;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.ipc.CallerContext;
+import org.apache.hadoop.ipc.RetriableException;
+import org.apache.hadoop.ipc.StandbyException;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.fs.permission.FsAction.ALL;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
+import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Used to test the functionality of {@link RouterAsyncRpcClient}.
+ */
+public class TestRouterAsyncRpcClient {
+ private static Configuration routerConf;
+ /** Federated HDFS cluster. */
+ private static MiniRouterDFSCluster cluster;
+ private static String ns0;
+ private static String ns1;
+
+ /** Random Router for this federated cluster. */
+ private MiniRouterDFSCluster.RouterContext router;
+ private FileSystem routerFs;
+ private RouterRpcServer routerRpcServer;
+ private RouterAsyncRpcClient asyncRpcClient;
+ private FederationRPCMetrics rpcMetrics;
+ private final String testFile = "/test.file";
+
+ /**
+ * Start a cluster using a router service that includes 2 namespaces,
+ * 6 namenodes and 6 datanodes.
+ */
+ @BeforeClass
+ public static void setUpCluster() throws Exception {
+ cluster = new MiniRouterDFSCluster(true, 2, 3,
+ DEFAULT_HEARTBEAT_INTERVAL_MS, 1000);
+ cluster.setNumDatanodesPerNameservice(3);
+
+ cluster.startCluster();
+
+ // Making one Namenode active per nameservice
+ if (cluster.isHighAvailability()) {
+ for (String ns : cluster.getNameservices()) {
+ cluster.switchToActive(ns, NAMENODES[0]);
+ cluster.switchToStandby(ns, NAMENODES[1]);
+ cluster.switchToObserver(ns, NAMENODES[2]);
+ }
+ }
+ // Start routers with only an RPC service
+ routerConf = new RouterConfigBuilder()
+ .metrics()
+ .rpc()
+ .build();
+
+ // Reduce the number of RPC clients threads to overload the Router easy
+ routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1);
+ routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1);
+ routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1);
+ // We decrease the DN cache times to make the test faster
+ routerConf.setTimeDuration(
+ RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
+ cluster.addRouterOverrides(routerConf);
+ // Start routers with only an RPC service
+ cluster.startRouters();
+
+ // Register and verify all NNs with all routers
+ cluster.registerNamenodes();
+ cluster.waitNamenodeRegistration();
+ cluster.waitActiveNamespaces();
+ ns0 = cluster.getNameservices().get(0);
+ ns1 = cluster.getNameservices().get(1);
+ }
+
+ @AfterClass
+ public static void shutdownCluster() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ /**
+ * Initialize the mount table, create a RouterAsyncRpcClient object, and create test file.
+ */
+ @Before
+ public void setup() throws Exception {
+ // Create mock locations
+ installMockLocations();
+
+ router = cluster.getRandomRouter();
+ rpcMetrics = router.getRouter().getRpcServer().getRPCMetrics();
+ routerFs = router.getFileSystem();
+ routerRpcServer = router.getRouterRpcServer();
+ routerRpcServer.initAsyncThreadPool();
+
+ // Create a RouterAsyncRpcClient object
+ asyncRpcClient = new RouterAsyncRpcClient(
+ routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(),
+ routerRpcServer.getRPCMonitor(),
+ routerRpcServer.getRouterStateIdContext());
+
+ // Create a test file
+ FSDataOutputStream fsDataOutputStream = routerFs.create(
+ new Path(testFile), true);
+ fsDataOutputStream.write(new byte[1024]);
+ fsDataOutputStream.close();
+ }
+
+ @After
+ public void down() throws IOException {
+ // clear client context
+ CallerContext.setCurrent(null);
+ cluster.switchToActive(ns0, NAMENODES[0]);
+ asyncRpcClient.getNamenodeResolver().updateActiveNamenode(
+ ns0, NetUtils.createSocketAddr(cluster
+ .getNamenode(ns0, NAMENODES[0]).getRpcAddress()));
+ // Delete the test file
+ boolean delete = routerFs.delete(new Path(testFile));
+ assertTrue(delete);
+ if (routerFs != null) {
+ routerFs.close();
+ }
+ }
+
+ /**
+ * Test the functionality of the asynchronous invokeSingle method.
+ */
+ @Test
+ public void testInvokeSingle() throws Exception {
+ long proxyOps = rpcMetrics.getProxyOps();
+ long activeProxyOps = rpcMetrics.getActiveProxyOps();
+ RemoteMethod method =
+ new RemoteMethod(NamenodeProtocol.class, "getTransactionID");
+ asyncRpcClient.invokeSingle(ns0, method);
+ long id = syncReturn(Long.class);
+ assertTrue(id > 0);
+ assertEquals(proxyOps + 1, rpcMetrics.getProxyOps());
+ assertEquals(activeProxyOps + 1, rpcMetrics.getActiveProxyOps());
+ assertTrue(rpcMetrics.getProcessingAvg() > 0);
+ assertTrue(rpcMetrics.getProxyAvg() > 0);
+ }
+
+ /**
+ * Test the functionality of the asynchronous invokeAll and invokeConcurrent methods.
+ */
+ @Test
+ public void testInvokeAll() throws Exception {
+ long proxyOps = rpcMetrics.getProxyOps();
+ long activeProxyOps = rpcMetrics.getActiveProxyOps();
+ final List locations =
+ routerRpcServer.getLocationsForPath("/multDes/dir", false);
+ RemoteMethod method = new RemoteMethod("mkdirs",
+ new Class>[] {String.class, FsPermission.class, boolean.class},
+ new RemoteParam(), new FsPermission(ALL, ALL, ALL), false);
+ asyncRpcClient.invokeAll(locations, method);
+ LambdaTestUtils.intercept(FileNotFoundException.class,
+ "Parent directory doesn't exist: /multDes",
+ () -> syncReturn(boolean.class));
+ assertEquals(proxyOps + 2, rpcMetrics.getProxyOps());
+ assertEquals(activeProxyOps + 2, rpcMetrics.getActiveProxyOps());
+
+ proxyOps = rpcMetrics.getProxyOps();
+ activeProxyOps = rpcMetrics.getActiveProxyOps();
+ method = new RemoteMethod("mkdirs",
+ new Class>[] {String.class, FsPermission.class, boolean.class},
+ new RemoteParam(), new FsPermission(ALL, ALL, ALL), true);
+ asyncRpcClient.invokeAll(locations, method);
+ Boolean success = syncReturn(Boolean.class);
+ assertTrue(success);
+ assertEquals(proxyOps + 2, rpcMetrics.getProxyOps());
+ assertEquals(activeProxyOps + 2, rpcMetrics.getActiveProxyOps());
+
+ FileStatus[] fileStatuses = routerFs.listStatus(new Path("/multDes"));
+ assertNotNull(fileStatuses);
+ assertTrue(rpcMetrics.getProcessingAvg() > 0);
+ assertTrue(rpcMetrics.getProxyAvg() > 0);
+ }
+
+ /**
+ * Test the functionality of the asynchronous invokeMethod method.
+ */
+ @Test
+ public void testInvokeMethod() throws Exception {
+ long proxyOps = rpcMetrics.getProxyOps();
+ long activeProxyOps = rpcMetrics.getActiveProxyOps();
+ RemoteMethod method = new RemoteMethod("getFileInfo",
+ new Class>[] {String.class}, new RemoteParam());
+ UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
+ Class> protocol = method.getProtocol();
+ Object[] params = new String[]{testFile};
+ List extends FederationNamenodeContext> namenodes =
+ asyncRpcClient.getOrderedNamenodes(ns0, false);
+ asyncRpcClient.invokeMethod(ugi, namenodes, false,
+ protocol, method.getMethod(), params);
+ FileStatus fileStatus = syncReturn(FileStatus.class);
+ assertEquals(1024, fileStatus.getLen());
+ assertEquals(proxyOps + 1, rpcMetrics.getProxyOps());
+ assertEquals(activeProxyOps + 1, rpcMetrics.getActiveProxyOps());
+
+ LambdaTestUtils.intercept(IOException.class,
+ "No namenodes to invoke",
+ () -> asyncRpcClient.invokeMethod(ugi, new ArrayList<>(), false,
+ protocol, method.getMethod(), params));
+
+ proxyOps = rpcMetrics.getProxyOps();
+ activeProxyOps = rpcMetrics.getActiveProxyOps();
+ asyncRpcClient.invokeMethod(ugi, namenodes.subList(1, 3), false,
+ protocol, method.getMethod(), params);
+ LambdaTestUtils.intercept(StandbyException.class,
+ "No namenode available to invoke getFileInfo",
+ () -> syncReturn(FileStatus.class));
+ assertEquals(proxyOps, rpcMetrics.getProxyOps());
+ assertEquals(activeProxyOps, rpcMetrics.getActiveProxyOps());
+
+ cluster.switchToStandby(ns0, NAMENODES[0]);
+ asyncRpcClient.getNamenodeResolver().updateUnavailableNamenode(
+ ns0, NetUtils.createSocketAddr(namenodes.get(0).getRpcAddress()));
+ asyncRpcClient.invokeMethod(ugi, namenodes, false,
+ protocol, method.getMethod(), params);
+ LambdaTestUtils.intercept(RetriableException.class,
+ "No namenodes available under nameservice ns0",
+ () -> syncReturn(FileStatus.class));
+ assertEquals(1, rpcMetrics.getProxyOpNoNamenodes());
+
+ asyncRpcClient.invokeMethod(ugi, namenodes, false,
+ null, method.getMethod(), params);
+ LambdaTestUtils.intercept(StandbyException.class,
+ "Cannot get a connection",
+ () -> syncReturn(FileStatus.class));
+ assertEquals(1, rpcMetrics.getProxyOpFailureCommunicate());
+ }
+
+ /**
+ * Test the functionality of the asynchronous invokeSequential method.
+ */
+ @Test
+ public void testInvokeSequential() throws Exception {
+ List locations =
+ routerRpcServer.getLocationsForPath(testFile, false, false);
+ RemoteMethod remoteMethod = new RemoteMethod("getBlockLocations",
+ new Class>[] {String.class, long.class, long.class},
+ new RemoteParam(), 0, 1024);
+ asyncRpcClient.invokeSequential(locations, remoteMethod,
+ LocatedBlocks.class, null);
+ LocatedBlocks locatedBlocks = syncReturn(LocatedBlocks.class);
+ assertEquals(1024, locatedBlocks.getFileLength());
+ assertEquals(1, locatedBlocks.getLocatedBlocks().size());
+ }
+
+ /**
+ * Initialize the mount information.
+ */
+ private void installMockLocations() {
+ List routers = cluster.getRouters();
+
+ for (MiniRouterDFSCluster.RouterContext rc : routers) {
+ Router r = rc.getRouter();
+ MockResolver resolver = (MockResolver) r.getSubclusterResolver();
+ resolver.addLocation("/", ns0, "/");
+ resolver.addLocation("/multDes", ns0, "/multDes");
+ resolver.addLocation("/multDes", ns1, "/multDes");
+ }
+ }
+}