diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterAsyncRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterAsyncRpcFairnessPolicyController.java new file mode 100644 index 0000000000000..33e6172a34581 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterAsyncRpcFairnessPolicyController.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.fairness; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.router.FederationUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Set; + +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys. + DFS_ROUTER_ASYNC_RPC_MAX_ASYNCCALL_PERMIT_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys. + DFS_ROUTER_ASYNC_RPC_MAX_ASYNC_CALL_PERMIT_DEFAULT; + +public class RouterAsyncRpcFairnessPolicyController extends + AbstractRouterRpcFairnessPolicyController { + + private static final Logger LOG = + LoggerFactory.getLogger(RouterAsyncRpcFairnessPolicyController.class); + + public RouterAsyncRpcFairnessPolicyController(Configuration conf) { + init(conf); + } + + public void init(Configuration conf) throws IllegalArgumentException { + super.init(conf); + + int maxAsyncCallPermit = conf.getInt(DFS_ROUTER_ASYNC_RPC_MAX_ASYNCCALL_PERMIT_KEY, + DFS_ROUTER_ASYNC_RPC_MAX_ASYNC_CALL_PERMIT_DEFAULT); + if (maxAsyncCallPermit <= 0) { + maxAsyncCallPermit = DFS_ROUTER_ASYNC_RPC_MAX_ASYNC_CALL_PERMIT_DEFAULT; + } + LOG.info("Max async call permits per nameservice: {}", maxAsyncCallPermit); + + // Get all name services configured + Set allConfiguredNS = FederationUtil.getAllConfiguredNS(conf); + + for (String nsId : allConfiguredNS) { + LOG.info("Dedicated permits {} for ns {} ", maxAsyncCallPermit, nsId); + insertNameServiceWithPermits(nsId, maxAsyncCallPermit); + logAssignment(nsId, maxAsyncCallPermit); + } + } + + private static void logAssignment(String nsId, int count) { + LOG.info("Assigned {} permits to nsId {} ", count, nsId); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java index 88321b92a35a8..70a116cca27d1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java @@ -88,6 +88,9 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic { public static final String DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY = FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "responder.count"; public static final int DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT = 10; + public static final String DFS_ROUTER_ASYNC_RPC_MAX_ASYNCCALL_PERMIT_KEY = + FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "max.asynccall.permit"; + public static final int DFS_ROUTER_ASYNC_RPC_MAX_ASYNC_CALL_PERMIT_DEFAULT = 20000; public static final String DFS_ROUTER_METRICS_ENABLE = FEDERATION_ROUTER_PREFIX + "metrics.enable"; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index c7c3699f33ec7..c20e425aff732 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -1880,7 +1880,7 @@ protected void releasePermit(final String nsId, final UserGroupInformation ugi, return routerRpcFairnessPolicyController; } - private void incrRejectedPermitForNs(String ns) { + protected void incrRejectedPermitForNs(String ns) { rejectedPermitsPerNs.computeIfAbsent(ns, k -> new LongAdder()).increment(); } @@ -1889,7 +1889,7 @@ public Long getRejectedPermitForNs(String ns) { rejectedPermitsPerNs.get(ns).longValue() : 0L; } - private void incrAcceptedPermitForNs(String ns) { + protected void incrAcceptedPermitForNs(String ns) { acceptedPermitsPerNs.computeIfAbsent(ns, k -> new LongAdder()).increment(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java index c214adf1f2abb..16a94ee83a391 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java @@ -37,7 +37,9 @@ import org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction; import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncApplyFunction; import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncCatchFunction; +import org.apache.hadoop.ipc.CallerContext; import org.apache.hadoop.ipc.Client; +import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; @@ -58,7 +60,6 @@ import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; -import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS; import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException; import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply; import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApplyUseExecutor; @@ -178,7 +179,7 @@ public Object invokeMethod( namenodes.toString(), params); } threadLocalContext.transfer(); - invokeMethodAsync(ugi, (List) namenodes, + invokeMethodAsync(nsid, ugi, (List) namenodes, useObserver, protocol, method, params); }, router.getRpcServer().getAsyncRouterHandlerExecutors().getOrDefault(nsid, router.getRpcServer().getRouterAsyncHandlerDefaultExecutor())); @@ -202,11 +203,13 @@ public Object invokeMethod( * @param params The parameters for the method invocation. */ private void invokeMethodAsync( + String nsid, final UserGroupInformation ugi, final List namenodes, boolean useObserver, final Class protocol, final Method method, final Object... params) { + RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); addClientInfoToCallerContext(ugi); if (rpcMonitor != null) { rpcMonitor.proxyOp(); @@ -214,46 +217,55 @@ private void invokeMethodAsync( final ExecutionStatus status = new ExecutionStatus(false, useObserver); Map ioes = new LinkedHashMap<>(); final ConnectionContext[] connection = new ConnectionContext[1]; - asyncForEach(namenodes.iterator(), - (foreach, namenode) -> { - if (!status.isShouldUseObserver() - && (namenode.getState() == FederationNamenodeServiceState.OBSERVER)) { - asyncComplete(null); - return; - } - String nsId = namenode.getNameserviceId(); - String rpcAddress = namenode.getRpcAddress(); - asyncTry(() -> { - connection[0] = getConnection(ugi, nsId, rpcAddress, protocol); - NameNodeProxiesClient.ProxyAndInfo client = connection[0].getClient(); - invoke(namenode, status.isShouldUseObserver(), 0, method, + asyncTry(() -> { + acquirePermit(nsid, ugi, method, controller); + asyncForEach(namenodes.iterator(), + (foreach, namenode) -> { + if (!status.isShouldUseObserver() + && (namenode.getState() == FederationNamenodeServiceState.OBSERVER)) { + asyncComplete(null); + return; + } + String nsId = namenode.getNameserviceId(); + String rpcAddress = namenode.getRpcAddress(); + asyncTry(() -> { + connection[0] = getConnection(ugi, nsId, rpcAddress, protocol); + NameNodeProxiesClient.ProxyAndInfo client = connection[0].getClient(); + invoke(namenode, status.isShouldUseObserver(), 0, method, client.getProxy(), params); - asyncApply(res -> { - status.setComplete(true); - postProcessResult(method, status, namenode, nsId, client); - foreach.breakNow(); + asyncApply(res -> { + status.setComplete(true); + postProcessResult(method, status, namenode, nsId, client); + foreach.breakNow(); + return res; + }); + }); + asyncCatch((res, ioe) -> { + ioes.put(namenode, ioe); + handleInvokeMethodIOException(namenode, ioe, status, useObserver); + return res; + }, IOException.class); + asyncFinally(res -> { + if (connection[0] != null) { + connection[0].release(); + } return res; }); }); - asyncCatch((res, ioe) -> { - ioes.put(namenode, ioe); - handleInvokeMethodIOException(namenode, ioe, status, useObserver); - return res; - }, IOException.class); - asyncFinally(res -> { - if (connection[0] != null) { - connection[0].release(); - } - return res; - }); - }); - asyncApply(res -> { - if (status.isComplete()) { - return res; - } - return handlerAllNamenodeFail(namenodes, method, ioes, params); + asyncApply(res -> { + if (status.isComplete()) { + return res; + } + return handlerAllNamenodeFail(namenodes, method, ioes, params); + }); + }); + + asyncFinally(res -> { + releasePermit(nsid, ugi, method, controller); + return res; }); + } /** @@ -363,7 +375,6 @@ public RemoteResult invokeSequential( Class expectedResultClass, Object expectedResultValue) throws IOException { - RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); final UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); final Method m = remoteMethod.getMethod(); List thrownExceptions = new ArrayList<>(); @@ -378,7 +389,6 @@ public RemoteResult invokeSequential( boolean isObserverRead = isObserverReadEligible(ns, m); List namenodes = getOrderedNamenodes(ns, isObserverRead); - acquirePermit(ns, ugi, remoteMethod, controller); asyncTry(() -> { Class proto = remoteMethod.getProtocol(); Object[] params = remoteMethod.getParams(loc); @@ -420,7 +430,6 @@ public RemoteResult invokeSequential( return ret; }, Exception.class); asyncFinally(ret -> { - releasePermit(ns, ugi, remoteMethod, controller); return ret; }); }); @@ -479,6 +488,75 @@ public Map invokeConcurrent( return asyncReturn(Map.class); } + @SuppressWarnings("unchecked") + public List> invokeConcurrent( + final Collection locations, final RemoteMethod method, + boolean standby, long timeOutMs, + Class clazz) throws IOException { + + final UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); + final Method m = method.getMethod(); + + if (locations.isEmpty()) { + throw new IOException("No remote locations available"); + } else if (locations.size() == 1 && timeOutMs <= 0) { + // Shortcut, just one call + return invokeSingle(locations.iterator().next(), method); + } + RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); + + List orderedLocations = new ArrayList<>(); + List> callables = new ArrayList<>(); + // transfer originCall & callerContext to worker threads of executor. + final Server.Call originCall = Server.getCurCall().get(); + final CallerContext originContext = CallerContext.getCurrent(); + for (final T location : locations) { + String nsId = location.getNameserviceId(); + boolean isObserverRead = isObserverReadEligible(nsId, m); + final List namenodes = + getOrderedNamenodes(nsId, isObserverRead); + final Class proto = method.getProtocol(); + final Object[] paramList = method.getParams(location); + if (standby) { + // Call the objectGetter to all NNs (including standby) + for (final FederationNamenodeContext nn : namenodes) { + String nnId = nn.getNamenodeId(); + final List nnList = + Collections.singletonList(nn); + T nnLocation = location; + if (location instanceof RemoteLocation) { + nnLocation = (T)new RemoteLocation(nsId, nnId, location.getDest()); + } + orderedLocations.add(nnLocation); + callables.add( + () -> { + transferThreadLocalContext(originCall, originContext); + return invokeMethod( + ugi, nnList, isObserverRead, proto, m, paramList); + }); + } + } else { + // Call the objectGetter in order of nameservices in the NS list + orderedLocations.add(location); + callables.add( + () -> { + transferThreadLocalContext(originCall, originContext); + return invokeMethod( + ugi, namenodes, isObserverRead, proto, m, paramList); + }); + } + } + + if (rpcMonitor != null) { + rpcMonitor.proxyOp(); + } + if (this.router.getRouterClientMetrics() != null) { + this.router.getRouterClientMetrics().incInvokedConcurrent(m); + } + + return getRemoteResults(method, timeOutMs, controller, orderedLocations, callables); + } + /** * Invokes multiple concurrent proxy calls to different clients. Returns an * array of results. @@ -523,8 +601,6 @@ protected List> getRemot LOG.error("Unexpected error while invoking API: {}", e.getMessage()); throw warpCompletionException(new IOException( "Unexpected error while invoking API " + e.getMessage(), e)); - } finally { - releasePermit(CONCURRENT_NS, ugi, method, controller); } })); return asyncReturn(List.class); @@ -553,8 +629,6 @@ public List> invokeSingl boolean isObserverRead = isObserverReadEligible(ns, m); final List namenodes = getOrderedNamenodes(ns, isObserverRead); - RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); - acquirePermit(ns, ugi, method, controller); asyncTry(() -> { Class proto = method.getProtocol(); Object[] paramList = method.getParams(location); @@ -567,8 +641,7 @@ public List> invokeSingl asyncCatch((o, ioe) -> { throw processException(ioe, location); }, IOException.class); - asyncFinally(o -> { - releasePermit(ns, ugi, method, controller); + asyncFinally(o -> { return o; }); return asyncReturn(List.class); @@ -589,8 +662,6 @@ public List> invokeSingl public Object invokeSingle(final String nsId, RemoteMethod method) throws IOException { UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); - RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); - acquirePermit(nsId, ugi, method, controller); asyncTry(() -> { boolean isObserverRead = isObserverReadEligible(nsId, method.getMethod()); List nns = getOrderedNamenodes(nsId, isObserverRead); @@ -601,7 +672,6 @@ public Object invokeSingle(final String nsId, RemoteMethod method) invokeMethod(ugi, nns, isObserverRead, proto, m, params); }); asyncFinally(o -> { - releasePermit(nsId, ugi, method, controller); return o; }); return null; @@ -627,4 +697,46 @@ public T invokeSingle( invokeSequential(locations, remoteMethod); return asyncReturn(clazz); } + + protected void acquirePermit(final String nsId, final UserGroupInformation ugi, + final Method m, RouterRpcFairnessPolicyController controller) + throws IOException { + if (controller != null) { + if (!controller.acquirePermit(nsId)) { + // Throw StandByException, + // Clients could fail over and try another router. + if (rpcMonitor != null) { + rpcMonitor.proxyOpPermitRejected(nsId); + } + incrRejectedPermitForNs(nsId); + LOG.debug("Permit denied for ugi: {} for method: {}", + ugi, m.getName()); + String msg = + "Router " + router.getRouterId() + + " is overloaded for NS: " + nsId; + throw new StandbyException(msg); + } + if (rpcMonitor != null) { + rpcMonitor.proxyOpPermitAccepted(nsId); + } + incrAcceptedPermitForNs(nsId); + } + } + + /** + * Release permit for specific nsId after processing against downstream + * nsId is completed. + * @param nsId Identifier of the block pool. + * @param ugi UserGroupIdentifier associated with the user. + * @param m Remote method that needs to be invoked. + * @param controller fairness policy controller to release permit from + */ + protected void releasePermit(final String nsId, final UserGroupInformation ugi, + final Method m, RouterRpcFairnessPolicyController controller) { + if (controller != null) { + controller.releasePermit(nsId); + LOG.trace("Permit released for ugi: {} for method: {}", ugi, + m.getName()); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpc.java index 7290c0a0aee81..0ecdb4396fd3f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpc.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpc.java @@ -20,6 +20,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.fairness.RouterAsyncRpcFairnessPolicyController; +import org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessPolicyController; import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; import org.apache.hadoop.hdfs.server.federation.router.TestRouterRpc; import org.apache.hadoop.security.UserGroupInformation; @@ -30,6 +32,7 @@ import java.util.concurrent.TimeUnit; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS; import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn; import static org.junit.Assert.assertArrayEquals; @@ -52,6 +55,9 @@ public static void globalSetUp() throws Exception { RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS); // use async router. routerConf.setBoolean(DFS_ROUTER_ASYNC_RPC_ENABLE_KEY, true); + routerConf.setClass(DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS, + RouterAsyncRpcFairnessPolicyController.class, + RouterRpcFairnessPolicyController.class); setUp(routerConf); }