diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java index f65742a5df4e0..fa798e2f35837 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java @@ -18,12 +18,11 @@ package org.apache.hadoop.hdfs.protocolPB; +import org.apache.hadoop.hdfs.server.federation.router.ThreadLocalContext; import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.ipc.CallerContext; import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.ProtobufRpcEngine2; -import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.internal.ShadedProtobufHelper; import org.apache.hadoop.util.concurrent.AsyncGet; import org.slf4j.Logger; @@ -31,18 +30,48 @@ import java.io.IOException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException; -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply; import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCompleteWith; import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn; import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc; +/** + *

This utility class encapsulates the logic required to initiate asynchronous RPCs, + * handle responses, and propagate exceptions. It works in conjunction with + * {@link ProtobufRpcEngine2} and {@link Client} to facilitate the asynchronous + * nature of the operations. + * + * @see ProtobufRpcEngine2 + * @see Client + * @see CompletableFuture + */ public final class AsyncRpcProtocolPBUtil { public static final Logger LOG = LoggerFactory.getLogger(AsyncRpcProtocolPBUtil.class); + /** The executor used for handling responses asynchronously. */ + private static Executor worker; private AsyncRpcProtocolPBUtil() {} + /** + * Asynchronously invokes an RPC call and applies a response transformation function + * to the result. This method is generic and can be used to handle any type of + * RPC call. + * + *

The method uses the {@link ShadedProtobufHelper.IpcCall} to prepare the RPC call + * and the {@link ApplyFunction} to process the response. It also handles exceptions + * that may occur during the RPC call and wraps them in a user-friendly manner. + * + * @param call The IPC call encapsulating the RPC request. + * @param response The function to apply to the response of the RPC call. + * @param clazz The class object representing the type {@code R} of the response. + * @param Type of the call's result. + * @param Type of method return. + * @return An object of type {@code R} that is the result of applying the response + * function to the RPC call result. + * @throws IOException If an I/O error occurs during the asynchronous RPC call. + */ public static R asyncIpcClient( ShadedProtobufHelper.IpcCall call, ApplyFunction response, Class clazz) throws IOException { @@ -50,20 +79,30 @@ public static R asyncIpcClient( AsyncGet asyncReqMessage = (AsyncGet) ProtobufRpcEngine2.getAsyncReturnMessage(); CompletableFuture responseFuture = Client.getResponseFuture(); - // transfer originCall & callerContext to worker threads of executor. - final Server.Call originCall = Server.getCurCall().get(); - final CallerContext originContext = CallerContext.getCurrent(); - asyncCompleteWith(responseFuture); - asyncApply(o -> { + // transfer thread local context to worker threads of executor. + ThreadLocalContext threadLocalContext = new ThreadLocalContext(); + asyncCompleteWith(responseFuture.handleAsync((result, e) -> { + threadLocalContext.transfer(); + if (e != null) { + throw warpCompletionException(e); + } try { - Server.getCurCall().set(originCall); - CallerContext.setCurrent(originContext); T res = asyncReqMessage.get(-1, null); return response.apply(res); - } catch (Exception e) { - throw warpCompletionException(e); + } catch (Exception ex) { + throw warpCompletionException(ex); } - }); + }, worker)); return asyncReturn(clazz); } + + /** + * Sets the executor used for handling responses asynchronously within + * the utility class. + * + * @param worker The executor to be used for handling responses asynchronously. + */ + public static void setWorker(Executor worker) { + AsyncRpcProtocolPBUtil.worker = worker; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java index 3a0fa2016d84e..3b1d5e55781df 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java @@ -52,9 +52,9 @@ public class FederationRPCPerformanceMonitor implements RouterRpcMonitor { /** Time for an operation to be received in the Router. */ - private static final ThreadLocal START_TIME = new ThreadLocal<>(); + private static final ThreadLocal START_TIME = ThreadLocal.withInitial(() -> -1L); /** Time for an operation to be sent to the Namenode. */ - private static final ThreadLocal PROXY_TIME = new ThreadLocal<>(); + private static final ThreadLocal PROXY_TIME = ThreadLocal.withInitial(() -> -1L); /** Configuration for the performance monitor. */ private Configuration conf; @@ -141,6 +141,14 @@ public void startOp() { START_TIME.set(monotonicNow()); } + public static long getStartOpTime() { + return START_TIME.get(); + } + + public static void setStartOpTime(long startOpTime) { + START_TIME.set(startOpTime); + } + @Override public long proxyOp() { PROXY_TIME.set(monotonicNow()); @@ -151,6 +159,14 @@ public long proxyOp() { return Thread.currentThread().getId(); } + public static long getProxyOpTime() { + return PROXY_TIME.get(); + } + + public static void setProxyOpTime(long proxyOpTime) { + PROXY_TIME.set(proxyOpTime); + } + @Override public void proxyOpComplete(boolean success, String nsId, FederationNamenodeServiceState state) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java index 64f27bd3ba32e..103953ab19151 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java @@ -72,6 +72,15 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic { public static final String DFS_ROUTER_RPC_ENABLE = FEDERATION_ROUTER_PREFIX + "rpc.enable"; public static final boolean DFS_ROUTER_RPC_ENABLE_DEFAULT = true; + public static final String DFS_ROUTER_RPC_ENABLE_ASYNC = + FEDERATION_ROUTER_PREFIX + "rpc.async.enable"; + public static final boolean DFS_ROUTER_RPC_ENABLE_ASYNC_DEFAULT = false; + public static final String DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT = + FEDERATION_ROUTER_PREFIX + "rpc.async.handler.count"; + public static final int DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT = 2; + public static final String DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT = + FEDERATION_ROUTER_PREFIX + "rpc.async.responder.count"; + public static final int DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT = 10; public static final String DFS_ROUTER_METRICS_ENABLE = FEDERATION_ROUTER_PREFIX + "metrics.enable"; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncRpcClient.java new file mode 100644 index 0000000000000..2bdcd7ce28724 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncRpcClient.java @@ -0,0 +1,621 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.NameNodeProxiesClient; +import org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessPolicyController; +import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction; +import org.apache.hadoop.hdfs.server.federation.router.async.AsyncApplyFunction; +import org.apache.hadoop.hdfs.server.federation.router.async.AsyncCatchFunction; +import org.apache.hadoop.ipc.Client; +import org.apache.hadoop.ipc.StandbyException; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.net.ConnectException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS; +import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApplyUseExecutor; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCatch; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncComplete; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCompleteWith; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncFinally; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncForEach; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncThrowException; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncTry; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.getCompletableFuture; + +/** + * The {@code RouterAsyncRpcClient} class extends the functionality of the base + * {@code RouterRpcClient} class to provide asynchronous remote procedure call (RPC) + * capabilities for communication with the Hadoop Distributed File System (HDFS) + * NameNodes in a federated environment. + * + *

This class is responsible for managing the asynchronous execution of RPCs to + * multiple NameNodes, which can improve performance and scalability in large HDFS + * deployments. + * + *

The class also includes methods for handling failover scenarios, where it can + * automatically retry operations on alternative NameNodes if the primary NameNode is + * unavailable or in standby mode. + * + * @see RouterRpcClient + */ +public class RouterAsyncRpcClient extends RouterRpcClient{ + private static final Logger LOG = + LoggerFactory.getLogger(RouterAsyncRpcClient.class); + /** Router using this RPC client. */ + private final Router router; + /** Interface to identify the active NN for a nameservice or blockpool ID. */ + private final ActiveNamenodeResolver namenodeResolver; + /** Optional perf monitor. */ + private final RouterRpcMonitor rpcMonitor; + private final Executor asyncRouterHandler; + + /** + * Create a router async RPC client to manage remote procedure calls to NNs. + * + * @param conf Hdfs Configuration. + * @param router A router using this RPC client. + * @param resolver A NN resolver to determine the currently active NN in HA. + * @param monitor Optional performance monitor. + * @param routerStateIdContext the router state context object to hold the state ids for all + * namespaces. + */ + public RouterAsyncRpcClient( + Configuration conf, Router router, ActiveNamenodeResolver resolver, + RouterRpcMonitor monitor, RouterStateIdContext routerStateIdContext) { + super(conf, router, resolver, monitor, routerStateIdContext); + this.router = router; + this.namenodeResolver = resolver; + this.rpcMonitor = monitor; + this.asyncRouterHandler = router.getRpcServer().getAsyncRouterHandler(); + } + + /** + * Invoke method in all locations and return success if any succeeds. + * + * @param The type of the remote location. + * @param locations List of remote locations to call concurrently. + * @param method The remote method and parameters to invoke. + * @return If the call succeeds in any location. + * @throws IOException If any of the calls return an exception. + */ + @Override + public boolean invokeAll( + final Collection locations, final RemoteMethod method) + throws IOException { + invokeConcurrent(locations, method, false, false, + Boolean.class); + asyncApply((ApplyFunction, Object>) + results -> results.containsValue(true)); + return asyncReturn(boolean.class); + } + + /** + * Invokes a method against the ClientProtocol proxy server. If a standby + * exception is generated by the call to the client, retries using the + * alternate server. + *

+ * Re-throws exceptions generated by the remote RPC call as either + * RemoteException or IOException. + * + * @param ugi User group information. + * @param namenodes A prioritized list of namenodes within the same + * nameservice. + * @param useObserver Whether to use observer namenodes. + * @param protocol the protocol of the connection. + * @param method Remote ClientProtocol method to invoke. + * @param params Variable list of parameters matching the method. + * @return The result of invoking the method. + * @throws ConnectException If it cannot connect to any Namenode. + * @throws StandbyException If all Namenodes are in Standby. + * @throws IOException If it cannot invoke the method. + */ + @Override + public Object invokeMethod( + UserGroupInformation ugi, + List namenodes, + boolean useObserver, Class protocol, + Method method, Object... params) throws IOException { + if (namenodes == null || namenodes.isEmpty()) { + throw new IOException("No namenodes to invoke " + method.getName() + + " with params " + Arrays.deepToString(params) + " from " + + router.getRouterId()); + } + // transfer threadLocalContext to worker threads of executor. + ThreadLocalContext threadLocalContext = new ThreadLocalContext(); + asyncComplete(null); + asyncApplyUseExecutor((AsyncApplyFunction) o -> { + if (LOG.isDebugEnabled()) { + LOG.debug("Async invoke method : {}, {}, {}, {}", method.getName(), useObserver, + namenodes.toString(), params); + } + threadLocalContext.transfer(); + invokeMethodAsync(ugi, (List) namenodes, + useObserver, protocol, method, params); + }, asyncRouterHandler); + return null; + } + + /** + * Asynchronously invokes a method on the specified NameNodes for a given user and operation. + * This method is responsible for the actual execution of the remote method call on the + * NameNodes in a non-blocking manner, allowing for concurrent processing. + * + *

In case of exceptions, the method includes logic to handle retries, failover to standby + * NameNodes, and proper exception handling to ensure that the calling code can respond + * appropriately to different error conditions. + * + * @param ugi The user information under which the method is to be invoked. + * @param namenodes The list of NameNode contexts on which the method will be invoked. + * @param useObserver Whether to use an observer node for the invocation if available. + * @param protocol The protocol class defining the method to be invoked. + * @param method The method to be invoked on the NameNodes. + * @param params The parameters for the method invocation. + */ + private void invokeMethodAsync( + final UserGroupInformation ugi, + final List namenodes, + boolean useObserver, + final Class protocol, final Method method, final Object... params) { + + addClientInfoToCallerContext(ugi); + if (rpcMonitor != null) { + rpcMonitor.proxyOp(); + } + final ExecutionStatus status = new ExecutionStatus(false, useObserver); + Map ioes = new LinkedHashMap<>(); + final ConnectionContext[] connection = new ConnectionContext[1]; + asyncForEach(namenodes.iterator(), + (foreach, namenode) -> { + if (!status.isShouldUseObserver() + && (namenode.getState() == FederationNamenodeServiceState.OBSERVER)) { + asyncComplete(null); + return; + } + String nsId = namenode.getNameserviceId(); + String rpcAddress = namenode.getRpcAddress(); + asyncTry(() -> { + connection[0] = getConnection(ugi, nsId, rpcAddress, protocol); + NameNodeProxiesClient.ProxyAndInfo client = connection[0].getClient(); + invoke(namenode, status.isShouldUseObserver(), 0, method, + client.getProxy(), params); + asyncApply(res -> { + status.setComplete(true); + postProcessResult(method, status, namenode, nsId, client); + foreach.breakNow(); + return res; + }); + }); + asyncCatch((res, ioe) -> { + ioes.put(namenode, ioe); + handleInvokeMethodIOException(namenode, ioe, status, useObserver); + return res; + }, IOException.class); + asyncFinally(res -> { + if (connection[0] != null) { + connection[0].release(); + } + return res; + }); + }); + + asyncApply(res -> { + if (status.isComplete()) { + return res; + } + return handlerAllNamenodeFail(namenodes, method, ioes, params); + }); + } + + /** + * Asynchronously invokes a method on a specified NameNode in the context of the given + * namespace and NameNode information. This method is designed to handle the invocation + * in a non-blocking manner, allowing for improved performance and scalability when + * interacting with the NameNode. + * + * @param namenode The context information for the NameNode. + * @param listObserverFirst Whether to list the observer node first in the invocation list. + * @param retryCount The current retry count for the operation. + * @param method The method to be invoked on the NameNode. + * @param obj The proxy object through which the method will be invoked. + * @param params The parameters for the method invocation. + */ + protected Object invoke( + FederationNamenodeContext namenode, Boolean listObserverFirst, + int retryCount, final Method method, + final Object obj, final Object... params) throws IOException { + try { + Client.setAsynchronousMode(true); + method.invoke(obj, params); + Client.setAsynchronousMode(false); + asyncCatch((AsyncCatchFunction) (o, e) -> { + handlerInvokeException(namenode, listObserverFirst, + retryCount, method, obj, e, params); + }, Throwable.class); + } catch (InvocationTargetException e) { + asyncThrowException(e.getCause()); + } catch (IllegalAccessException | IllegalArgumentException e) { + LOG.error("Unexpected exception while proxying API", e); + asyncThrowException(e); + } + return null; + } + + /** + * Invokes sequential proxy calls to different locations. Continues to invoke + * calls until the success condition is met, or until all locations have been + * attempted. + * + * The success condition may be specified by: + *

+ * + * If no expected result class/values are specified, the success condition is + * a call that does not throw a remote exception. + * + * @param The type of the remote method return. + * @param locations List of locations/nameservices to call concurrently. + * @param remoteMethod The remote method and parameters to invoke. + * @param expectedResultClass In order to be considered a positive result, the + * return type must be of this class. + * @param expectedResultValue In order to be considered a positive result, the + * return value must equal the value of this object. + * @return The result of the first successful call, or if no calls are + * successful, the result of the first RPC call executed. + * @throws IOException if the success condition is not met, return the first + * remote exception generated. + */ + @Override + public T invokeSequential( + final List locations, + final RemoteMethod remoteMethod, Class expectedResultClass, + Object expectedResultValue) throws IOException { + invokeSequential(remoteMethod, locations, expectedResultClass, expectedResultValue); + asyncApply((ApplyFunction) RemoteResult::getResult); + return asyncReturn(expectedResultClass); + } + + /** + * Invokes sequential proxy calls to different locations. Continues to invoke + * calls until the success condition is met, or until all locations have been + * attempted. + * + * The success condition may be specified by: + *
    + *
  • An expected result class + *
  • An expected result value + *
+ * + * If no expected result class/values are specified, the success condition is + * a call that does not throw a remote exception. + * + * This returns RemoteResult, which contains the invoked location as well + * as the result. + * + * @param The type of the remote location. + * @param The type of the remote method return. + * @param remoteMethod The remote method and parameters to invoke. + * @param locations List of locations/nameservices to call concurrently. + * @param expectedResultClass In order to be considered a positive result, the + * return type must be of this class. + * @param expectedResultValue In order to be considered a positive result, the + * return value must equal the value of this object. + * @return The result of the first successful call, or if no calls are + * successful, the result of the first RPC call executed, along with + * the invoked location in form of RemoteResult. + * @throws IOException if the success condition is not met, return the first + * remote exception generated. + */ + @Override + public RemoteResult invokeSequential( + final RemoteMethod remoteMethod, final List locations, + Class expectedResultClass, Object expectedResultValue) + throws IOException { + + RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); + final UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); + final Method m = remoteMethod.getMethod(); + List thrownExceptions = new ArrayList<>(); + final Object[] firstResult = {null}; + final ExecutionStatus status = new ExecutionStatus(); + Iterator locationIterator = + (Iterator) locations.iterator(); + // Invoke in priority order + asyncForEach(locationIterator, + (foreach, loc) -> { + String ns = loc.getNameserviceId(); + boolean isObserverRead = isObserverReadEligible(ns, m); + List namenodes = + getOrderedNamenodes(ns, isObserverRead); + acquirePermit(ns, ugi, remoteMethod, controller); + asyncTry(() -> { + Class proto = remoteMethod.getProtocol(); + Object[] params = remoteMethod.getParams(loc); + invokeMethod(ugi, namenodes, isObserverRead, proto, m, params); + asyncApply(result -> { + // Check if the result is what we expected + if (isExpectedClass(expectedResultClass, result) && + isExpectedValue(expectedResultValue, result)) { + // Valid result, stop here + @SuppressWarnings("unchecked") R location = (R) loc; + @SuppressWarnings("unchecked") T ret = (T) result; + foreach.breakNow(); + status.setComplete(true); + return new RemoteResult<>(location, ret); + } + if (firstResult[0] == null) { + firstResult[0] = result; + } + return null; + }); + }); + asyncCatch((ret, e) -> { + if (e instanceof IOException) { + IOException ioe = (IOException) e; + // Localize the exception + ioe = processException(ioe, loc); + // Record it and move on + thrownExceptions.add(ioe); + } else { + // Unusual error, ClientProtocol calls always use IOException (or + // RemoteException). Re-wrap in IOException for compatibility with + // ClientProtocol. + LOG.error("Unexpected exception {} proxying {} to {}", + e.getClass(), m.getName(), ns, e); + IOException ioe = new IOException( + "Unexpected exception proxying API " + e.getMessage(), e); + thrownExceptions.add(ioe); + } + return ret; + }, Exception.class); + asyncFinally(ret -> { + releasePermit(ns, ugi, remoteMethod, controller); + return ret; + }); + }); + asyncApply(result -> { + if (status.isComplete()) { + return result; + } + if (!thrownExceptions.isEmpty()) { + // An unavailable subcluster may be the actual cause + // We cannot surface other exceptions (e.g., FileNotFoundException) + for (int i = 0; i < thrownExceptions.size(); i++) { + IOException ioe = thrownExceptions.get(i); + if (isUnavailableException(ioe)) { + throw ioe; + } + } + // re-throw the first exception thrown for compatibility + throw thrownExceptions.get(0); + } + // Return the first result, whether it is the value or not + @SuppressWarnings("unchecked") T ret = (T) firstResult[0]; + return new RemoteResult<>(locations.get(0), ret); + }); + return asyncReturn(RemoteResult.class); + } + + /** + * Invokes multiple concurrent proxy calls to different clients. Returns an + * array of results. + *

+ * Re-throws exceptions generated by the remote RPC call as either + * RemoteException or IOException. + * + * @param The type of the remote location. + * @param The type of the remote method return. + * @param locations List of remote locations to call concurrently. + * @param method The remote method and parameters to invoke. + * @param requireResponse If true an exception will be thrown if all calls do + * not complete. If false exceptions are ignored and all data results + * successfully received are returned. + * @param standby If the requests should go to the standby namenodes too. + * @param timeOutMs Timeout for each individual call. + * @param clazz Type of the remote return type. + * @return Result of invoking the method per subcluster: nsId to result. + * @throws IOException If requiredResponse=true and any of the calls throw an + * exception. + */ + @Override + public Map invokeConcurrent( + final Collection locations, final RemoteMethod method, + boolean requireResponse, boolean standby, long timeOutMs, Class clazz) + throws IOException { + invokeConcurrent(locations, method, standby, timeOutMs, clazz); + asyncApply((ApplyFunction>, Object>) + results -> postProcessResult(requireResponse, results)); + return asyncReturn(Map.class); + } + + /** + * Invokes multiple concurrent proxy calls to different clients. Returns an + * array of results. + * + * @param The type of the remote location. + * @param The type of the remote method return. + * @param method The remote method and parameters to invoke. + * @param timeOutMs Timeout for each individual call. + * @param controller Fairness manager to control handlers assigned per NS. + * @param orderedLocations List of remote locations to call concurrently. + * @param callables Invoke method for each NameNode. + * @return Result of invoking the method per subcluster (list of results), + * This includes the exception for each remote location. + * @throws IOException If there are errors invoking the method. + */ + @Override + protected List> getRemoteResults( + RemoteMethod method, long timeOutMs, RouterRpcFairnessPolicyController controller, + List orderedLocations, List> callables) throws IOException { + final UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); + final Method m = method.getMethod(); + final CompletableFuture[] futures = + new CompletableFuture[callables.size()]; + int i = 0; + for (Callable callable : callables) { + CompletableFuture future = null; + try { + callable.call(); + future = getCompletableFuture(); + } catch (Exception e) { + future = new CompletableFuture<>(); + future.completeExceptionally(warpCompletionException(e)); + } + futures[i++] = future; + } + + asyncCompleteWith(CompletableFuture.allOf(futures) + .handle((unused, throwable) -> { + try { + return processFutures(method, m, orderedLocations, Arrays.asList(futures)); + } catch (InterruptedException e) { + LOG.error("Unexpected error while invoking API: {}", e.getMessage()); + throw warpCompletionException(new IOException( + "Unexpected error while invoking API " + e.getMessage(), e)); + } finally { + releasePermit(CONCURRENT_NS, ugi, method, controller); + } + })); + return asyncReturn(List.class); + } + + /** + * Invokes a ClientProtocol method against the specified namespace. + *

+ * Re-throws exceptions generated by the remote RPC call as either + * RemoteException or IOException. + * + * @param The type of the remote location. + * @param The type of the remote method return. + * @param location RemoteLocation to invoke. + * @param method The remote method and parameters to invoke. + * @return Result of invoking the method per subcluster (list of results), + * This includes the exception for each remote location. + * @throws IOException If there are errors invoking the method. + */ + @Override + public List> invokeSingle( + T location, RemoteMethod method) throws IOException { + final UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); + final Method m = method.getMethod(); + String ns = location.getNameserviceId(); + boolean isObserverRead = isObserverReadEligible(ns, m); + final List namenodes = + getOrderedNamenodes(ns, isObserverRead); + RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); + acquirePermit(ns, ugi, method, controller); + asyncTry(() -> { + Class proto = method.getProtocol(); + Object[] paramList = method.getParams(location); + invokeMethod(ugi, namenodes, isObserverRead, proto, m, paramList); + asyncApply((ApplyFunction) result -> { + RemoteResult remoteResult = new RemoteResult<>(location, result); + return Collections.singletonList(remoteResult); + }); + }); + asyncCatch((o, ioe) -> { + throw processException(ioe, location); + }, IOException.class); + asyncFinally(o -> { + releasePermit(ns, ugi, method, controller); + return o; + }); + return asyncReturn(List.class); + } + + /** + * Invokes a ClientProtocol method against the specified namespace. + *

+ * Re-throws exceptions generated by the remote RPC call as either + * RemoteException or IOException. + * + * @param nsId Target namespace for the method. + * @param method The remote method and parameters to invoke. + * @return The result of invoking the method. + * @throws IOException If the invoke generated an error. + */ + @Override + public Object invokeSingle(final String nsId, RemoteMethod method) + throws IOException { + UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); + RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); + acquirePermit(nsId, ugi, method, controller); + asyncTry(() -> { + boolean isObserverRead = isObserverReadEligible(nsId, method.getMethod()); + List nns = getOrderedNamenodes(nsId, isObserverRead); + RemoteLocationContext loc = new RemoteLocation(nsId, "/", "/"); + Class proto = method.getProtocol(); + Method m = method.getMethod(); + Object[] params = method.getParams(loc); + invokeMethod(ugi, nns, isObserverRead, proto, m, params); + }); + asyncFinally(o -> { + releasePermit(nsId, ugi, method, controller); + return o; + }); + return null; + } + + /** + * Invokes a single proxy call for a single location. + *

+ * Re-throws exceptions generated by the remote RPC call as either + * RemoteException or IOException. + * + * @param location RemoteLocation to invoke. + * @param remoteMethod The remote method and parameters to invoke. + * @param clazz Class for the return type. + * @param The type of the remote method return. + * @return The result of invoking the method if successful. + * @throws IOException If the invoke generated an error. + */ + public T invokeSingle( + final RemoteLocationContext location, + RemoteMethod remoteMethod, Class clazz) throws IOException { + List locations = Collections.singletonList(location); + invokeSequential(locations, remoteMethod); + return asyncReturn(clazz); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index d25e5ae4d3012..70b5034272d08 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -400,7 +400,7 @@ public String getAcceptedPermitsPerNsJSON() { * NN + current user. * @throws IOException If we cannot get a connection to the NameNode. */ - private ConnectionContext getConnection(UserGroupInformation ugi, String nsId, + protected ConnectionContext getConnection(UserGroupInformation ugi, String nsId, String rpcAddress, Class proto) throws IOException { ConnectionContext connection = null; try { @@ -462,7 +462,7 @@ private static IOException toIOException(Exception e) { * @return Retry decision. * @throws IOException An IO Error occurred. */ - private RetryDecision shouldRetry( + protected RetryDecision shouldRetry( final IOException ioe, final int retryCount, final String nsId, final FederationNamenodeContext namenode, final boolean listObserverFirst) throws IOException { @@ -526,11 +526,12 @@ public Object invokeMethod( if (rpcMonitor != null) { rpcMonitor.proxyOp(); } - boolean failover = false; - boolean shouldUseObserver = useObserver; + + ExecutionStatus status = new ExecutionStatus(false, useObserver); Map ioes = new LinkedHashMap<>(); for (FederationNamenodeContext namenode : namenodes) { - if (!shouldUseObserver && (namenode.getState() == FederationNamenodeServiceState.OBSERVER)) { + if (!status.isShouldUseObserver() + && (namenode.getState() == FederationNamenodeServiceState.OBSERVER)) { continue; } ConnectionContext connection = null; @@ -541,83 +542,12 @@ public Object invokeMethod( ProxyAndInfo client = connection.getClient(); final Object proxy = client.getProxy(); - ret = invoke(nsId, namenode, useObserver, 0, method, proxy, params); - if (failover && - FederationNamenodeServiceState.OBSERVER != namenode.getState()) { - // Success on alternate server, update - InetSocketAddress address = client.getAddress(); - namenodeResolver.updateActiveNamenode(nsId, address); - } - if (this.rpcMonitor != null) { - this.rpcMonitor.proxyOpComplete(true, nsId, namenode.getState()); - } - if (this.router.getRouterClientMetrics() != null) { - this.router.getRouterClientMetrics().incInvokedMethod(method); - } + ret = invoke(namenode, useObserver, 0, method, proxy, params); + postProcessResult(method, status, namenode, nsId, client); return ret; } catch (IOException ioe) { ioes.put(namenode, ioe); - if (ioe instanceof ObserverRetryOnActiveException) { - LOG.info("Encountered ObserverRetryOnActiveException from {}." - + " Retry active namenode directly.", namenode); - shouldUseObserver = false; - } else if (ioe instanceof StandbyException) { - // Fail over indicated by retry policy and/or NN - if (this.rpcMonitor != null) { - this.rpcMonitor.proxyOpFailureStandby(nsId); - } - failover = true; - } else if (isUnavailableException(ioe)) { - if (this.rpcMonitor != null) { - this.rpcMonitor.proxyOpFailureCommunicate(nsId); - } - if (FederationNamenodeServiceState.OBSERVER == namenode.getState()) { - namenodeResolver.updateUnavailableNamenode(nsId, - NetUtils.createSocketAddr(namenode.getRpcAddress())); - } else { - failover = true; - } - } else if (ioe instanceof RemoteException) { - if (this.rpcMonitor != null) { - this.rpcMonitor.proxyOpComplete(true, nsId, namenode.getState()); - } - RemoteException re = (RemoteException) ioe; - ioe = re.unwrapRemoteException(); - ioe = getCleanException(ioe); - // RemoteException returned by NN - throw ioe; - } else if (ioe instanceof ConnectionNullException) { - if (this.rpcMonitor != null) { - this.rpcMonitor.proxyOpFailureCommunicate(nsId); - } - LOG.error("Get connection for {} {} error: {}", nsId, rpcAddress, - ioe.getMessage()); - // Throw StandbyException so that client can retry - StandbyException se = new StandbyException(ioe.getMessage()); - se.initCause(ioe); - throw se; - } else if (ioe instanceof NoNamenodesAvailableException) { - IOException cause = (IOException) ioe.getCause(); - if (this.rpcMonitor != null) { - this.rpcMonitor.proxyOpNoNamenodes(nsId); - } - LOG.error("Cannot get available namenode for {} {} error: {}", - nsId, rpcAddress, ioe.getMessage()); - // Rotate cache so that client can retry the next namenode in the cache - if (shouldRotateCache(cause)) { - this.namenodeResolver.rotateCache(nsId, namenode, useObserver); - } - // Throw RetriableException so that client can retry - throw new RetriableException(ioe); - } else { - // Other communication error, this is a failure - // Communication retries are handled by the retry policy - if (this.rpcMonitor != null) { - this.rpcMonitor.proxyOpFailureCommunicate(nsId); - this.rpcMonitor.proxyOpComplete(false, nsId, namenode.getState()); - } - throw ioe; - } + handleInvokeMethodIOException(namenode, ioe, status, useObserver); } finally { if (connection != null) { connection.release(); @@ -628,6 +558,24 @@ public Object invokeMethod( this.rpcMonitor.proxyOpComplete(false, null, null); } + return handlerAllNamenodeFail(namenodes, method, ioes, params); + } + + /** + * All namenodes cannot successfully process the RPC request, + * throw corresponding exceptions according to the exception type of each namenode. + * + * @param namenodes A prioritized list of namenodes within the same nameservice. + * @param method Remote ClientProtocol method to invoke. + * @param ioes The exception type of each namenode. + * @param params Variable list of parameters matching the method. + * @return null + * @throws IOException Corresponding IOException according to the + * exception type of each namenode. + */ + protected Object handlerAllNamenodeFail( + List namenodes, Method method, + Map ioes, Object[] params) throws IOException { // All namenodes were unavailable or in standby String msg = "No namenode available to invoke " + method.getName() + " " + Arrays.deepToString(params) + " in " + namenodes + " from " + @@ -658,14 +606,120 @@ public Object invokeMethod( } } + /** + * The RPC request is successfully processed by the NameNode, the NameNode status + * in the router cache is updated according to the ExecutionStatus. + * + * @param method Remote method to invoke. + * @param status Current execution status. + * @param namenode The namenode that successfully processed this RPC request. + * @param nsId Nameservice ID. + * @param client Connection client. + * @throws IOException If the state store cannot be accessed. + */ + protected void postProcessResult(Method method, ExecutionStatus status, + FederationNamenodeContext namenode, String nsId, ProxyAndInfo client) throws IOException { + if (status.isFailOver() && + FederationNamenodeServiceState.OBSERVER != namenode.getState()) { + // Success on alternate server, update + InetSocketAddress address = client.getAddress(); + namenodeResolver.updateActiveNamenode(nsId, address); + } + if (this.rpcMonitor != null) { + this.rpcMonitor.proxyOpComplete(true, nsId, namenode.getState()); + } + if (this.router.getRouterClientMetrics() != null) { + this.router.getRouterClientMetrics().incInvokedMethod(method); + } + } + + /** + * The RPC request to the NameNode throws an exception, + * handle it according to the type of exception. + * + * @param namenode The namenode that processed this RPC request. + * @param ioe The exception thrown by this RPC request. + * @param status The current execution status. + * @param useObserver Whether to use observer namenodes. + * @throws IOException If it cannot invoke the method. + */ + protected void handleInvokeMethodIOException(final FederationNamenodeContext namenode, + IOException ioe, final ExecutionStatus status, boolean useObserver) throws IOException { + String nsId = namenode.getNameserviceId(); + String rpcAddress = namenode.getRpcAddress(); + if (ioe instanceof ObserverRetryOnActiveException) { + LOG.info("Encountered ObserverRetryOnActiveException from {}." + + " Retry active namenode directly.", namenode); + status.setShouldUseObserver(false); + } else if (ioe instanceof StandbyException) { + // Fail over indicated by retry policy and/or NN + if (this.rpcMonitor != null) { + this.rpcMonitor.proxyOpFailureStandby(nsId); + } + status.setFailOver(true); + } else if (isUnavailableException(ioe)) { + if (this.rpcMonitor != null) { + this.rpcMonitor.proxyOpFailureCommunicate(nsId); + } + if (FederationNamenodeServiceState.OBSERVER == namenode.getState()) { + namenodeResolver.updateUnavailableNamenode(nsId, + NetUtils.createSocketAddr(namenode.getRpcAddress())); + } else { + status.setFailOver(true); + } + } else if (ioe instanceof RemoteException) { + if (this.rpcMonitor != null) { + this.rpcMonitor.proxyOpComplete(true, nsId, namenode.getState()); + } + RemoteException re = (RemoteException) ioe; + ioe = re.unwrapRemoteException(); + ioe = getCleanException(ioe); + // RemoteException returned by NN + throw ioe; + } else if (ioe instanceof ConnectionNullException) { + if (this.rpcMonitor != null) { + this.rpcMonitor.proxyOpFailureCommunicate(nsId); + } + LOG.error("Get connection for {} {} error: {}", nsId, rpcAddress, + ioe.getMessage()); + // Throw StandbyException so that client can retry + StandbyException se = new StandbyException(ioe.getMessage()); + se.initCause(ioe); + throw se; + } else if (ioe instanceof NoNamenodesAvailableException) { + IOException cause = (IOException) ioe.getCause(); + if (this.rpcMonitor != null) { + this.rpcMonitor.proxyOpNoNamenodes(nsId); + } + LOG.error("Cannot get available namenode for {} {} error: {}", + nsId, rpcAddress, ioe.getMessage()); + // Rotate cache so that client can retry the next namenode in the cache + if (shouldRotateCache(cause)) { + this.namenodeResolver.rotateCache(nsId, namenode, useObserver); + } + // Throw RetriableException so that client can retry + throw new RetriableException(ioe); + } else { + // Other communication error, this is a failure + // Communication retries are handled by the retry policy + if (this.rpcMonitor != null) { + this.rpcMonitor.proxyOpFailureCommunicate(nsId); + this.rpcMonitor.proxyOpComplete(false, nsId, namenode.getState()); + } + throw ioe; + } + } + /** * For tracking some information about the actual client. * It adds trace info "clientIp:ip", "clientPort:port", * "clientId:id", "clientCallId:callId" and "realUser:userName" * in the caller context, removing the old values if they were * already present. + * + * @param ugi User group information. */ - private void addClientInfoToCallerContext(UserGroupInformation ugi) { + protected void addClientInfoToCallerContext(UserGroupInformation ugi) { CallerContext ctx = CallerContext.getCurrent(); String origContext = ctx == null ? null : ctx.getContext(); byte[] origSignature = ctx == null ? null : ctx.getSignature(); @@ -706,7 +760,8 @@ private void addClientInfoToCallerContext(UserGroupInformation ugi) { * Re-throws exceptions generated by the remote RPC call as either * RemoteException or IOException. * - * @param nsId Identifier for the namespace + * @param namenode namenode context. + * @param listObserverFirst Observer read case, observer NN will be ranked first. * @param retryCount Current retry times * @param method Method to invoke * @param obj Target object for the method @@ -714,8 +769,8 @@ private void addClientInfoToCallerContext(UserGroupInformation ugi) { * @return Response from the remote server * @throws IOException If error occurs. */ - private Object invoke( - String nsId, FederationNamenodeContext namenode, Boolean listObserverFirst, + protected Object invoke( + FederationNamenodeContext namenode, Boolean listObserverFirst, int retryCount, final Method method, final Object obj, final Object... params) throws IOException { try { @@ -725,36 +780,58 @@ private Object invoke( return null; } catch (InvocationTargetException e) { Throwable cause = e.getCause(); - if (cause instanceof IOException) { - IOException ioe = (IOException) cause; - - // Check if we should retry. - RetryDecision decision = shouldRetry(ioe, retryCount, nsId, namenode, listObserverFirst); - if (decision == RetryDecision.RETRY) { - if (this.rpcMonitor != null) { - this.rpcMonitor.proxyOpRetries(); - } + return handlerInvokeException(namenode, listObserverFirst, + retryCount, method, obj, cause, params); + } + } - // retry - return invoke(nsId, namenode, listObserverFirst, ++retryCount, method, obj, params); - } else if (decision == RetryDecision.FAILOVER_AND_RETRY) { - // failover, invoker looks for standby exceptions for failover. - if (ioe instanceof StandbyException) { - throw ioe; - } else if (isUnavailableException(ioe)) { - throw ioe; - } else { - throw new StandbyException(ioe.getMessage()); - } - } else { + /** + * Handle the exception when an RPC request to the NameNode throws an exception. + * + * @param namenode namenode context. + * @param listObserverFirst Observer read case, observer NN will be ranked first. + * @param retryCount Current retry times + * @param method Method to invoke + * @param obj Target object for the method + * @param e The exception thrown by the current invocation. + * @param params Variable parameters + * @return Response from the remote server + * @throws IOException If error occurs. + */ + protected Object handlerInvokeException(FederationNamenodeContext namenode, + Boolean listObserverFirst, int retryCount, Method method, Object obj, + Throwable e, Object[] params) throws IOException { + String nsId = namenode.getNameserviceId(); + if (e instanceof IOException) { + IOException ioe = (IOException) e; + + // Check if we should retry. + RetryDecision decision = shouldRetry(ioe, retryCount, nsId, namenode, listObserverFirst); + if (decision == RetryDecision.RETRY) { + if (this.rpcMonitor != null) { + this.rpcMonitor.proxyOpRetries(); + } + + // retry + return invoke(namenode, listObserverFirst, ++retryCount, method, obj, params); + } else if (decision == RetryDecision.FAILOVER_AND_RETRY) { + // failover, invoker looks for standby exceptions for failover. + if (ioe instanceof StandbyException) { throw ioe; + } else if (isUnavailableException(ioe)) { + throw ioe; + } else { + throw new StandbyException(ioe.getMessage()); } } else { - throw new IOException(e); + throw ioe; } + } else { + throw new IOException(e); } } + /** * Check if the exception comes from an unavailable subcluster. * @param ioe IOException to check. @@ -817,7 +894,7 @@ private boolean isClusterUnAvailable( * @param ioe Exception to clean up. * @return Copy of the original exception with a clean message. */ - private static IOException getCleanException(IOException ioe) { + protected static IOException getCleanException(IOException ioe) { IOException ret = null; String msg = ioe.getMessage(); @@ -1185,7 +1262,7 @@ public RemoteResult invokeSequential( * @param loc Location we are processing. * @return Exception processed for federation. */ - private IOException processException( + protected IOException processException( IOException ioe, RemoteLocationContext loc) { if (ioe instanceof RemoteException) { @@ -1251,7 +1328,7 @@ static String processExceptionMsg( * @return True if the result is an instance of the required class or if the * expected class is null. */ - private static boolean isExpectedClass(Class expectedClass, Object clazz) { + protected static boolean isExpectedClass(Class expectedClass, Object clazz) { if (expectedClass == null) { return true; } else if (clazz == null) { @@ -1269,7 +1346,7 @@ private static boolean isExpectedClass(Class expectedClass, Object clazz) { * @return True if the result is equals to the expected value or if the * expected value is null. */ - private static boolean isExpectedValue(Object expectedValue, Object value) { + protected static boolean isExpectedValue(Object expectedValue, Object value) { if (expectedValue == null) { return true; } else if (value == null) { @@ -1414,7 +1491,26 @@ public Map invokeConcurrent( throws IOException { final List> results = invokeConcurrent( locations, method, standby, timeOutMs, clazz); + return postProcessResult(requireResponse, results); + } + /** + * Post-process the results returned by + * {@link RouterRpcClient#invokeConcurrent(Collection, RemoteMethod, boolean, long, Class)}. + * + * @param requireResponse If true an exception will be thrown if all calls do + * not complete. If false exceptions are ignored and all data results + * successfully received are returned. + * @param results Result of invoking the method per subcluster (list of results), + * This includes the exception for each remote location. + * @return Result of invoking the method per subcluster: nsId to result. + * @param The type of the remote location. + * @param The type of the remote method return. + * @throws IOException If requiredResponse=true and any of the calls throw an + * exception. + */ + protected static Map postProcessResult( + boolean requireResponse, List> results) throws IOException { // Go over the results and exceptions final Map ret = new TreeMap<>(); final List thrownExceptions = new ArrayList<>(); @@ -1480,27 +1576,10 @@ public Map invokeConcurrent( throw new IOException("No remote locations available"); } else if (locations.size() == 1 && timeOutMs <= 0) { // Shortcut, just one call - T location = locations.iterator().next(); - String ns = location.getNameserviceId(); - boolean isObserverRead = isObserverReadEligible(ns, m); - final List namenodes = - getOrderedNamenodes(ns, isObserverRead); - RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); - acquirePermit(ns, ugi, method, controller); - try { - Class proto = method.getProtocol(); - Object[] paramList = method.getParams(location); - R result = (R) invokeMethod( - ugi, namenodes, isObserverRead, proto, m, paramList); - RemoteResult remoteResult = new RemoteResult<>(location, result); - return Collections.singletonList(remoteResult); - } catch (IOException ioe) { - // Localize the exception - throw processException(ioe, location); - } finally { - releasePermit(ns, ugi, method, controller); - } + return invokeSingle(locations.iterator().next(), method); } + RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); + acquirePermit(CONCURRENT_NS, ugi, method, controller); List orderedLocations = new ArrayList<>(); List> callables = new ArrayList<>(); @@ -1551,8 +1630,29 @@ public Map invokeConcurrent( this.router.getRouterClientMetrics().incInvokedConcurrent(m); } - RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); - acquirePermit(CONCURRENT_NS, ugi, method, controller); + return getRemoteResults(method, timeOutMs, controller, orderedLocations, callables); + } + + /** + * Invokes multiple concurrent proxy calls to different clients. Returns an + * array of results. + * + * @param The type of the remote location. + * @param The type of the remote method return. + * @param method The remote method and parameters to invoke. + * @param timeOutMs Timeout for each individual call. + * @param controller Fairness manager to control handlers assigned per NS. + * @param orderedLocations List of remote locations to call concurrently. + * @param callables Invoke method for each NameNode. + * @return Result of invoking the method per subcluster (list of results), + * This includes the exception for each remote location. + * @throws IOException If there are errors invoking the method. + */ + protected List> getRemoteResults( + RemoteMethod method, long timeOutMs, RouterRpcFairnessPolicyController controller, + List orderedLocations, List> callables) throws IOException { + final UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); + final Method m = method.getMethod(); try { List> futures = null; if (timeOutMs > 0) { @@ -1561,42 +1661,7 @@ public Map invokeConcurrent( } else { futures = executorService.invokeAll(callables); } - List> results = new ArrayList<>(); - for (int i=0; i future = futures.get(i); - R result = (R) future.get(); - results.add(new RemoteResult<>(location, result)); - } catch (CancellationException ce) { - T loc = orderedLocations.get(i); - String msg = "Invocation to \"" + loc + "\" for \"" - + method.getMethodName() + "\" timed out"; - LOG.error(msg); - IOException ioe = new SubClusterTimeoutException(msg); - results.add(new RemoteResult<>(location, ioe)); - } catch (ExecutionException ex) { - Throwable cause = ex.getCause(); - LOG.debug("Cannot execute {} in {}: {}", - m.getName(), location, cause.getMessage()); - - // Convert into IOException if needed - IOException ioe = null; - if (cause instanceof IOException) { - ioe = (IOException) cause; - } else { - ioe = new IOException("Unhandled exception while proxying API " + - m.getName() + ": " + cause.getMessage(), cause); - } - - // Store the exceptions - results.add(new RemoteResult<>(location, ioe)); - } - } - if (rpcMonitor != null) { - rpcMonitor.proxyOpComplete(true, CONCURRENT, null); - } - return results; + return processFutures(method, m, orderedLocations, futures); } catch (RejectedExecutionException e) { if (rpcMonitor != null) { rpcMonitor.proxyOpFailureClientOverloaded(); @@ -1616,6 +1681,99 @@ public Map invokeConcurrent( } } + /** + * Handle all futures during the invokeConcurrent call process. + * + * @param The type of the remote location. + * @param The type of the remote method return. + * @param method The remote method and parameters to invoke. + * @param m The method to invoke. + * @param orderedLocations List of remote locations to call concurrently. + * @param futures all futures during the invokeConcurrent call process. + * @return Result of invoking the method per subcluster (list of results), + * This includes the exception for each remote location. + * @throws InterruptedException if the current thread was interrupted while waiting. + */ + protected List> processFutures( + RemoteMethod method, Method m, final List orderedLocations, + final List> futures) throws InterruptedException{ + List> results = new ArrayList<>(); + for (int i = 0; i< futures.size(); i++) { + T location = orderedLocations.get(i); + try { + Future future = futures.get(i); + R result = (R) future.get(); + results.add(new RemoteResult<>(location, result)); + } catch (CancellationException ce) { + T loc = orderedLocations.get(i); + String msg = "Invocation to \"" + loc + "\" for \"" + + method.getMethodName() + "\" timed out"; + LOG.error(msg); + IOException ioe = new SubClusterTimeoutException(msg); + results.add(new RemoteResult<>(location, ioe)); + } catch (ExecutionException ex) { + Throwable cause = ex.getCause(); + LOG.debug("Cannot execute {} in {}: {}", + m.getName(), location, cause.getMessage()); + + // Convert into IOException if needed + IOException ioe = null; + if (cause instanceof IOException) { + ioe = (IOException) cause; + } else { + ioe = new IOException("Unhandled exception while proxying API " + + m.getName() + ": " + cause.getMessage(), cause); + } + + // Store the exceptions + results.add(new RemoteResult<>(location, ioe)); + } + } + if (rpcMonitor != null) { + rpcMonitor.proxyOpComplete(true, CONCURRENT, null); + } + return results; + } + + /** + * Invokes a ClientProtocol method against the specified namespace. + *

+ * Re-throws exceptions generated by the remote RPC call as either + * RemoteException or IOException. + * + * @param The type of the remote location. + * @param The type of the remote method return. + * @param location RemoteLocation to invoke. + * @param method The remote method and parameters to invoke. + * @return Result of invoking the method per subcluster (list of results), + * This includes the exception for each remote location. + * @throws IOException If there are errors invoking the method. + */ + public List> invokeSingle( + T location, RemoteMethod method) throws IOException { + final UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); + final Method m = method.getMethod(); + String ns = location.getNameserviceId(); + boolean isObserverRead = isObserverReadEligible(ns, m); + final List namenodes = + getOrderedNamenodes(ns, isObserverRead); + RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); + acquirePermit(ns, ugi, method, controller); + try { + Class proto = method.getProtocol(); + Object[] paramList = method.getParams(location); + R result = (R) invokeMethod( + ugi, namenodes, isObserverRead, proto, m, paramList); + RemoteResult remoteResult = new RemoteResult<>(location, result); + return Collections.singletonList(remoteResult); + } catch (IOException ioe) { + // Localize the exception + throw processException(ioe, location); + } finally { + releasePermit(ns, ugi, method, controller); + } + } + /** * Transfer origin thread local context which is necessary to current * worker thread when invoking method concurrently by executor service. @@ -1624,7 +1782,7 @@ public Map invokeConcurrent( * @param originContext origin CallerContext which should be transferred * to server side. */ - private void transferThreadLocalContext( + protected void transferThreadLocalContext( final Call originCall, final CallerContext originContext) { Server.getCurCall().set(originCall); CallerContext.setCurrent(originContext); @@ -1675,7 +1833,7 @@ private String getNameserviceForBlockPoolId(final String bpId) * @param controller fairness policy controller to acquire permit from * @throws IOException If permit could not be acquired for the nsId. */ - private void acquirePermit(final String nsId, final UserGroupInformation ugi, + protected void acquirePermit(final String nsId, final UserGroupInformation ugi, final RemoteMethod m, RouterRpcFairnessPolicyController controller) throws IOException { if (controller != null) { @@ -1708,7 +1866,7 @@ private void acquirePermit(final String nsId, final UserGroupInformation ugi, * @param m Remote method that needs to be invoked. * @param controller fairness policy controller to release permit from */ - private void releasePermit(final String nsId, final UserGroupInformation ugi, + protected void releasePermit(final String nsId, final UserGroupInformation ugi, final RemoteMethod m, RouterRpcFairnessPolicyController controller) { if (controller != null) { controller.releasePermit(nsId); @@ -1782,7 +1940,7 @@ private String getCurrentFairnessPolicyControllerClassName() { * @return A prioritized list of NNs to use for communication. * @throws IOException If a NN cannot be located for the nameservice ID. */ - private List getOrderedNamenodes(String nsId, + protected List getOrderedNamenodes(String nsId, boolean isObserverRead) throws IOException { final List namenodes; @@ -1802,7 +1960,7 @@ && isNamespaceStateIdFresh(nsId) return namenodes; } - private boolean isObserverReadEligible(String nsId, Method method) { + protected boolean isObserverReadEligible(String nsId, Method method) { return isReadCall(method) && isNamespaceObserverReadEligible(nsId); } @@ -1857,7 +2015,7 @@ private LongAccumulator getTimeOfLastCallToActive(String namespaceId) { * {@link RouterRpcClient#isUnavailableException(IOException) unavailable exception}, * otherwise false. */ - private boolean shouldRotateCache(IOException ioe) { + protected boolean shouldRotateCache(IOException ioe) { if (isUnavailableException(ioe)) { return true; } @@ -1868,4 +2026,61 @@ private boolean shouldRotateCache(IOException ioe) { } return isUnavailableException(ioe); } + + + /** + * The {@link ExecutionStatus} class is a utility class used to track the status of + * execution operations performed by the {@link RouterRpcClient}. + * It encapsulates the state of an operation, including whether it has completed, + * if a failover to a different NameNode should be attempted, and if an observer + * NameNode should be used for the operation. + * + *

The status is represented by a flag that indicate the current state of + * the execution. The flag can be checked individually to determine how to + * proceed with the operation or to handle its results. + */ + protected static class ExecutionStatus { + + /** A byte field used to store the state flags. */ + private byte flag; + private static final byte FAIL_OVER_BIT = 1; + private static final byte SHOULD_USE_OBSERVER_BIT = 2; + private static final byte COMPLETE_BIT = 4; + + ExecutionStatus() { + this(false, false); + } + + ExecutionStatus(boolean failOver, boolean shouldUseObserver) { + this.flag = 0; + setFailOver(failOver); + setShouldUseObserver(shouldUseObserver); + setComplete(false); + } + + private void setFailOver(boolean failOver) { + flag = (byte) (failOver ? (flag | FAIL_OVER_BIT) : (flag & ~FAIL_OVER_BIT)); + } + + private void setShouldUseObserver(boolean shouldUseObserver) { + flag = (byte) (shouldUseObserver ? + (flag | SHOULD_USE_OBSERVER_BIT) : (flag & ~SHOULD_USE_OBSERVER_BIT)); + } + + void setComplete(boolean complete) { + flag = (byte) (complete ? (flag | COMPLETE_BIT) : (flag & ~COMPLETE_BIT)); + } + + boolean isFailOver() { + return (flag & FAIL_OVER_BIT) != 0; + } + + boolean isShouldUseObserver() { + return (flag & SHOULD_USE_OBSERVER_BIT) != 0; + } + + boolean isComplete() { + return (flag & COMPLETE_BIT) != 0; + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 217c62ff28762..c23c21c6dfb67 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -26,6 +26,12 @@ import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_COUNT_KEY; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_QUEUE_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_QUEUE_SIZE_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ENABLE_ASYNC; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ENABLE_ASYNC_DEFAULT; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DN_REPORT_CACHE_EXPIRE; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DN_REPORT_CACHE_EXPIRE_MS_DEFAULT; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION; @@ -50,14 +56,18 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; +import org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil; import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder; import org.apache.hadoop.thirdparty.com.google.common.cache.CacheLoader; import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache; @@ -198,7 +208,8 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol, private static final Logger LOG = LoggerFactory.getLogger(RouterRpcServer.class); - + private ExecutorService asyncRouterHandler; + private ExecutorService asyncRouterResponder; /** Configuration for the RPC server. */ private Configuration conf; @@ -255,6 +266,8 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol, private RouterRenameOption routerRenameOption; /** Schedule the router federation rename jobs. */ private BalanceProcedureScheduler fedRenameScheduler; + private boolean enableAsync; + /** * Construct a router RPC server. * @@ -284,6 +297,12 @@ public RouterRpcServer(Configuration conf, Router router, int handlerQueueSize = this.conf.getInt(DFS_ROUTER_HANDLER_QUEUE_SIZE_KEY, DFS_ROUTER_HANDLER_QUEUE_SIZE_DEFAULT); + this.enableAsync = conf.getBoolean(DFS_ROUTER_RPC_ENABLE_ASYNC, + DFS_ROUTER_RPC_ENABLE_ASYNC_DEFAULT); + LOG.info("Router enable async {}", this.enableAsync); + if (this.enableAsync) { + initAsyncThreadPool(); + } // Override Hadoop Common IPC setting int readerQueueSize = this.conf.getInt(DFS_ROUTER_READER_QUEUE_SIZE_KEY, DFS_ROUTER_READER_QUEUE_SIZE_DEFAULT); @@ -391,15 +410,17 @@ public RouterRpcServer(Configuration conf, Router router, } // Create the client - this.rpcClient = new RouterRpcClient(this.conf, this.router, - this.namenodeResolver, this.rpcMonitor, routerStateIdContext); - - // Initialize modules - this.quotaCall = new Quota(this.router, this); + if (this.enableAsync) { + this.rpcClient = new RouterAsyncRpcClient(this.conf, this.router, + this.namenodeResolver, this.rpcMonitor, routerStateIdContext); + } else { + this.rpcClient = new RouterRpcClient(this.conf, this.router, + this.namenodeResolver, this.rpcMonitor, routerStateIdContext); + } this.nnProto = new RouterNamenodeProtocol(this); + this.quotaCall = new Quota(this.router, this); this.clientProto = new RouterClientProtocol(conf, this); this.routerProto = new RouterUserProtocol(this); - long dnCacheExpire = conf.getTimeDuration( DN_REPORT_CACHE_EXPIRE, DN_REPORT_CACHE_EXPIRE_MS_DEFAULT, TimeUnit.MILLISECONDS); @@ -428,6 +449,27 @@ public RouterRpcServer(Configuration conf, Router router, initRouterFedRename(); } + /** + * Init router async handlers and router async responders. + */ + protected void initAsyncThreadPool() { + int asyncHandlerCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, + DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT); + int asyncResponderCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, + DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT); + if (asyncRouterHandler == null) { + LOG.info("init router async handler count: {}", asyncHandlerCount); + asyncRouterHandler = Executors.newFixedThreadPool( + asyncHandlerCount, new AsyncThreadFactory("router async handler ")); + } + if (asyncRouterResponder == null) { + LOG.info("init router async responder count: {}", asyncResponderCount); + asyncRouterResponder = Executors.newFixedThreadPool( + asyncResponderCount, new AsyncThreadFactory("router async responder ")); + } + AsyncRpcProtocolPBUtil.setWorker(asyncRouterResponder); + } + /** * Clear expired namespace in the shared RouterStateIdContext. */ @@ -2066,4 +2108,26 @@ public ListenableFuture reload( return executorService.submit(() -> load(type)); } } + + public boolean isAsync() { + return this.enableAsync; + } + + public Executor getAsyncRouterHandler() { + return asyncRouterHandler; + } + + private static class AsyncThreadFactory implements ThreadFactory { + private final String namePrefix; + private final AtomicInteger threadNumber = new AtomicInteger(1); + + AsyncThreadFactory(String namePrefix) { + this.namePrefix = namePrefix; + } + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, namePrefix + threadNumber.getAndIncrement()); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ThreadLocalContext.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ThreadLocalContext.java new file mode 100644 index 0000000000000..6faef079bd19a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ThreadLocalContext.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCPerformanceMonitor; +import org.apache.hadoop.ipc.CallerContext; +import org.apache.hadoop.ipc.Server; + +/** + * The ThreadLocalContext class is designed to capture and transfer the context of a + * thread-local environment within the Hadoop Distributed File System (HDFS) federation + * router operations. This is particularly useful for preserving the state across + * asynchronous operations where the context needs to be maintained consistently. + * + * The context includes details such as the current call being processed by the server, the + * caller's context, and performance monitoring timestamps. By transferring this context, + * the class ensures that the operations performed on worker threads correctly reflect + * the state of the original calling thread. + * + * Here is a high-level overview of the main components captured by this context: + *

    + *
  • {@link Server.Call} - Represents the current server call.
  • + *
  • {@link CallerContext} - Stores information about the caller.
  • + *
  • startOpTime - Time for an operation to be received in the Router.
  • + *
  • proxyOpTime - Time for an operation to be sent to the Namenode.
  • + *
+ * + * This class is typically used in scenarios where asynchronous processing is involved, to + * ensure that the thread executing the asynchronous task has the correct context applied. + * + * @see Server + * @see CallerContext + * @see FederationRPCPerformanceMonitor + */ +public class ThreadLocalContext { + + /** The current server call being processed. */ + private final Server.Call call; + /** The caller context containing information about the caller. */ + private final CallerContext context; + /** Time for an operation to be received in the Router. */ + private final long startOpTime; + /** Time for an operation to be sent to the Namenode. */ + private final long proxyOpTime; + + /** + * Constructs a new {@link ThreadLocalContext} instance, capturing the current + * thread-local context at the point of creation. + */ + public ThreadLocalContext() { + this.call = Server.getCurCall().get(); + this.context = CallerContext.getCurrent(); + this.startOpTime = FederationRPCPerformanceMonitor.getStartOpTime(); + this.proxyOpTime = FederationRPCPerformanceMonitor.getProxyOpTime(); + } + + /** + * Transfers the captured context to the current thread. This method is used to apply + * the context to worker threads that are processing asynchronous tasks, ensuring + * that the task execution reflects the state of the original calling thread. + */ + public void transfer() { + if (call != null) { + Server.getCurCall().set(call); + } + if (context != null) { + CallerContext.setCurrent(context); + } + if (startOpTime != -1L) { + FederationRPCPerformanceMonitor.setStartOpTime(startOpTime); + } + if (proxyOpTime != -1L) { + FederationRPCPerformanceMonitor.setProxyOpTime(proxyOpTime); + } + } + + @Override + public String toString() { + return "ThreadLocalContext{" + + "call=" + call + + ", context=" + context + + ", startOpTime=" + startOpTime + + ", proxyOpTime=" + proxyOpTime + + '}'; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncUtil.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncUtil.java index d7ed04b08e368..834741b52db32 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncUtil.java @@ -73,6 +73,7 @@ private AsyncUtil(){} *
    *
  • {@code false} if {@code clazz} is {@link Boolean}, *
  • -1 if {@code clazz} is {@link Long}, + *
  • -1 if {@code clazz} is {@link Integer}, *
  • {@code null} for any other type. *
*/ @@ -80,11 +81,14 @@ public static R asyncReturn(Class clazz) { if (clazz == null) { return null; } - if (clazz.equals(Boolean.class)) { + if (clazz.equals(Boolean.class) + || clazz.equals(boolean.class)) { return (R) BOOLEAN_RESULT; - } else if (clazz.equals(Long.class)) { + } else if (clazz.equals(Long.class) + || clazz.equals(long.class)) { return (R) LONG_RESULT; - } else if (clazz.equals(Integer.class)) { + } else if (clazz.equals(Integer.class) + || clazz.equals(int.class)) { return (R) INT_RESULT; } return (R) NULL_RESULT; @@ -140,6 +144,12 @@ public static void asyncComplete(R value) { CompletableFuture.completedFuture(value)); } + /** + * Completes the current asynchronous operation with the specified completableFuture. + * + * @param completableFuture The completableFuture to complete the future with. + * @param The type of the value to be completed. + */ public static void asyncCompleteWith(CompletableFuture completableFuture) { CUR_COMPLETABLE_FUTURE.set((CompletableFuture) completableFuture); } @@ -384,4 +394,13 @@ public static void asyncCurrent( .handle((unused, throwable) -> then.apply(completableFutures)); CUR_COMPLETABLE_FUTURE.set((CompletableFuture) result); } + + /** + * Get the CompletableFuture object stored in the current thread's local variable. + * + * @return The completableFuture object. + */ + public static CompletableFuture getCompletableFuture() { + return CUR_COMPLETABLE_FUTURE.get(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml index 26b89ce0313fd..dda90b254b281 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml @@ -48,6 +48,14 @@ + + dfs.federation.router.rpc.async.enable + false + + If true, router will process the RPC request asynchronously. + + + dfs.federation.router.rpc-address 0.0.0.0:8888 @@ -101,6 +109,22 @@ + + dfs.federation.router.rpc.async.handler.count + 2 + + The number of async handler for the router to handle RPC client requests. + + + + + dfs.federation.router.rpc.async.responder.count + 10 + + The number of async responder for the router to handle responses. + + + dfs.federation.router.connection.creator.queue-size 100 diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestAsyncRpcProtocolPBUtil.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestAsyncRpcProtocolPBUtil.java index c0cd4c898e828..15da20fdd113e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestAsyncRpcProtocolPBUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestAsyncRpcProtocolPBUtil.java @@ -40,6 +40,7 @@ import java.io.IOException; import java.net.InetSocketAddress; +import java.util.concurrent.ForkJoinPool; import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn; import static org.junit.Assert.assertEquals; @@ -53,6 +54,7 @@ public class TestAsyncRpcProtocolPBUtil { @Before public void setUp() throws IOException { + AsyncRpcProtocolPBUtil.setWorker(ForkJoinPool.commonPool()); Configuration conf = new Configuration(); RPC.setProtocolEngine(conf, TestRpcBase.TestRpcService.class, ProtobufRpcEngine2.class); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestRouterClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestRouterClientSideTranslatorPB.java index d107bbe3bf7c3..3519a968c5b48 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestRouterClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestRouterClientSideTranslatorPB.java @@ -52,6 +52,7 @@ import java.net.InetSocketAddress; import java.util.EnumSet; import java.util.List; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.hadoop.crypto.CryptoProtocolVersion.ENCRYPTION_ZONES; @@ -81,6 +82,7 @@ public class TestRouterClientSideTranslatorPB { @BeforeClass public static void setUp() throws Exception { + AsyncRpcProtocolPBUtil.setWorker(ForkJoinPool.commonPool()); conf = new HdfsConfiguration(); cluster = (new MiniDFSCluster.Builder(conf)) .numDataNodes(1).build(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncRpcClient.java new file mode 100644 index 0000000000000..e3429d493dc66 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncRpcClient.java @@ -0,0 +1,314 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; +import org.apache.hadoop.hdfs.server.federation.MockResolver; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; +import org.apache.hadoop.ipc.CallerContext; +import org.apache.hadoop.ipc.RetriableException; +import org.apache.hadoop.ipc.StandbyException; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.test.LambdaTestUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.fs.permission.FsAction.ALL; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES; +import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Used to test the functionality of {@link RouterAsyncRpcClient}. + */ +public class TestRouterAsyncRpcClient { + private static Configuration routerConf; + /** Federated HDFS cluster. */ + private static MiniRouterDFSCluster cluster; + private static String ns0; + private static String ns1; + + /** Random Router for this federated cluster. */ + private MiniRouterDFSCluster.RouterContext router; + private FileSystem routerFs; + private RouterRpcServer routerRpcServer; + private RouterAsyncRpcClient asyncRpcClient; + private FederationRPCMetrics rpcMetrics; + private final String testFile = "/test.file"; + + /** + * Start a cluster using a router service that includes 2 namespaces, + * 6 namenodes and 6 datanodes. + */ + @BeforeClass + public static void setUpCluster() throws Exception { + cluster = new MiniRouterDFSCluster(true, 2, 3, + DEFAULT_HEARTBEAT_INTERVAL_MS, 1000); + cluster.setNumDatanodesPerNameservice(3); + + cluster.startCluster(); + + // Making one Namenode active per nameservice + if (cluster.isHighAvailability()) { + for (String ns : cluster.getNameservices()) { + cluster.switchToActive(ns, NAMENODES[0]); + cluster.switchToStandby(ns, NAMENODES[1]); + cluster.switchToObserver(ns, NAMENODES[2]); + } + } + // Start routers with only an RPC service + routerConf = new RouterConfigBuilder() + .metrics() + .rpc() + .build(); + + // Reduce the number of RPC clients threads to overload the Router easy + routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1); + routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1); + routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1); + // We decrease the DN cache times to make the test faster + routerConf.setTimeDuration( + RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS); + cluster.addRouterOverrides(routerConf); + // Start routers with only an RPC service + cluster.startRouters(); + + // Register and verify all NNs with all routers + cluster.registerNamenodes(); + cluster.waitNamenodeRegistration(); + cluster.waitActiveNamespaces(); + ns0 = cluster.getNameservices().get(0); + ns1 = cluster.getNameservices().get(1); + } + + @AfterClass + public static void shutdownCluster() { + if (cluster != null) { + cluster.shutdown(); + } + } + + /** + * Initialize the mount table, create a RouterAsyncRpcClient object, and create test file. + */ + @Before + public void setup() throws Exception { + // Create mock locations + installMockLocations(); + + router = cluster.getRandomRouter(); + rpcMetrics = router.getRouter().getRpcServer().getRPCMetrics(); + routerFs = router.getFileSystem(); + routerRpcServer = router.getRouterRpcServer(); + routerRpcServer.initAsyncThreadPool(); + + // Create a RouterAsyncRpcClient object + asyncRpcClient = new RouterAsyncRpcClient( + routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(), + routerRpcServer.getRPCMonitor(), + routerRpcServer.getRouterStateIdContext()); + + // Create a test file + FSDataOutputStream fsDataOutputStream = routerFs.create( + new Path(testFile), true); + fsDataOutputStream.write(new byte[1024]); + fsDataOutputStream.close(); + } + + @After + public void down() throws IOException { + // clear client context + CallerContext.setCurrent(null); + cluster.switchToActive(ns0, NAMENODES[0]); + asyncRpcClient.getNamenodeResolver().updateActiveNamenode( + ns0, NetUtils.createSocketAddr(cluster + .getNamenode(ns0, NAMENODES[0]).getRpcAddress())); + // Delete the test file + boolean delete = routerFs.delete(new Path(testFile)); + assertTrue(delete); + if (routerFs != null) { + routerFs.close(); + } + } + + /** + * Test the functionality of the asynchronous invokeSingle method. + */ + @Test + public void testInvokeSingle() throws Exception { + long proxyOps = rpcMetrics.getProxyOps(); + long activeProxyOps = rpcMetrics.getActiveProxyOps(); + RemoteMethod method = + new RemoteMethod(NamenodeProtocol.class, "getTransactionID"); + asyncRpcClient.invokeSingle(ns0, method); + long id = syncReturn(Long.class); + assertTrue(id > 0); + assertEquals(proxyOps + 1, rpcMetrics.getProxyOps()); + assertEquals(activeProxyOps + 1, rpcMetrics.getActiveProxyOps()); + assertTrue(rpcMetrics.getProcessingAvg() > 0); + assertTrue(rpcMetrics.getProxyAvg() > 0); + } + + /** + * Test the functionality of the asynchronous invokeAll and invokeConcurrent methods. + */ + @Test + public void testInvokeAll() throws Exception { + long proxyOps = rpcMetrics.getProxyOps(); + long activeProxyOps = rpcMetrics.getActiveProxyOps(); + final List locations = + routerRpcServer.getLocationsForPath("/multDes/dir", false); + RemoteMethod method = new RemoteMethod("mkdirs", + new Class[] {String.class, FsPermission.class, boolean.class}, + new RemoteParam(), new FsPermission(ALL, ALL, ALL), false); + asyncRpcClient.invokeAll(locations, method); + LambdaTestUtils.intercept(FileNotFoundException.class, + "Parent directory doesn't exist: /multDes", + () -> syncReturn(boolean.class)); + assertEquals(proxyOps + 2, rpcMetrics.getProxyOps()); + assertEquals(activeProxyOps + 2, rpcMetrics.getActiveProxyOps()); + + proxyOps = rpcMetrics.getProxyOps(); + activeProxyOps = rpcMetrics.getActiveProxyOps(); + method = new RemoteMethod("mkdirs", + new Class[] {String.class, FsPermission.class, boolean.class}, + new RemoteParam(), new FsPermission(ALL, ALL, ALL), true); + asyncRpcClient.invokeAll(locations, method); + Boolean success = syncReturn(Boolean.class); + assertTrue(success); + assertEquals(proxyOps + 2, rpcMetrics.getProxyOps()); + assertEquals(activeProxyOps + 2, rpcMetrics.getActiveProxyOps()); + + FileStatus[] fileStatuses = routerFs.listStatus(new Path("/multDes")); + assertNotNull(fileStatuses); + assertTrue(rpcMetrics.getProcessingAvg() > 0); + assertTrue(rpcMetrics.getProxyAvg() > 0); + } + + /** + * Test the functionality of the asynchronous invokeMethod method. + */ + @Test + public void testInvokeMethod() throws Exception { + long proxyOps = rpcMetrics.getProxyOps(); + long activeProxyOps = rpcMetrics.getActiveProxyOps(); + RemoteMethod method = new RemoteMethod("getFileInfo", + new Class[] {String.class}, new RemoteParam()); + UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); + Class protocol = method.getProtocol(); + Object[] params = new String[]{testFile}; + List namenodes = + asyncRpcClient.getOrderedNamenodes(ns0, false); + asyncRpcClient.invokeMethod(ugi, namenodes, false, + protocol, method.getMethod(), params); + FileStatus fileStatus = syncReturn(FileStatus.class); + assertEquals(1024, fileStatus.getLen()); + assertEquals(proxyOps + 1, rpcMetrics.getProxyOps()); + assertEquals(activeProxyOps + 1, rpcMetrics.getActiveProxyOps()); + + LambdaTestUtils.intercept(IOException.class, + "No namenodes to invoke", + () -> asyncRpcClient.invokeMethod(ugi, new ArrayList<>(), false, + protocol, method.getMethod(), params)); + + proxyOps = rpcMetrics.getProxyOps(); + activeProxyOps = rpcMetrics.getActiveProxyOps(); + asyncRpcClient.invokeMethod(ugi, namenodes.subList(1, 3), false, + protocol, method.getMethod(), params); + LambdaTestUtils.intercept(StandbyException.class, + "No namenode available to invoke getFileInfo", + () -> syncReturn(FileStatus.class)); + assertEquals(proxyOps, rpcMetrics.getProxyOps()); + assertEquals(activeProxyOps, rpcMetrics.getActiveProxyOps()); + + cluster.switchToStandby(ns0, NAMENODES[0]); + asyncRpcClient.getNamenodeResolver().updateUnavailableNamenode( + ns0, NetUtils.createSocketAddr(namenodes.get(0).getRpcAddress())); + asyncRpcClient.invokeMethod(ugi, namenodes, false, + protocol, method.getMethod(), params); + LambdaTestUtils.intercept(RetriableException.class, + "No namenodes available under nameservice ns0", + () -> syncReturn(FileStatus.class)); + assertEquals(1, rpcMetrics.getProxyOpNoNamenodes()); + + asyncRpcClient.invokeMethod(ugi, namenodes, false, + null, method.getMethod(), params); + LambdaTestUtils.intercept(StandbyException.class, + "Cannot get a connection", + () -> syncReturn(FileStatus.class)); + assertEquals(1, rpcMetrics.getProxyOpFailureCommunicate()); + } + + /** + * Test the functionality of the asynchronous invokeSequential method. + */ + @Test + public void testInvokeSequential() throws Exception { + List locations = + routerRpcServer.getLocationsForPath(testFile, false, false); + RemoteMethod remoteMethod = new RemoteMethod("getBlockLocations", + new Class[] {String.class, long.class, long.class}, + new RemoteParam(), 0, 1024); + asyncRpcClient.invokeSequential(locations, remoteMethod, + LocatedBlocks.class, null); + LocatedBlocks locatedBlocks = syncReturn(LocatedBlocks.class); + assertEquals(1024, locatedBlocks.getFileLength()); + assertEquals(1, locatedBlocks.getLocatedBlocks().size()); + } + + /** + * Initialize the mount information. + */ + private void installMockLocations() { + List routers = cluster.getRouters(); + + for (MiniRouterDFSCluster.RouterContext rc : routers) { + Router r = rc.getRouter(); + MockResolver resolver = (MockResolver) r.getSubclusterResolver(); + resolver.addLocation("/", ns0, "/"); + resolver.addLocation("/multDes", ns0, "/multDes"); + resolver.addLocation("/multDes", ns1, "/multDes"); + } + } +}