Skip to content

Commit

Permalink
HDFS-17709. [ARR] Add async responder performance metrics.
Browse files Browse the repository at this point in the history
  • Loading branch information
hfutatzhanghb committed Jan 16, 2025
1 parent 273673c commit 7cb0ddf
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.hdfs.protocolPB;

import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
import org.apache.hadoop.hdfs.server.federation.router.ThreadLocalContext;
import org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction;
import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil;
Expand All @@ -28,6 +29,7 @@
import org.apache.hadoop.ipc.ProtobufRpcEngineCallback2;
import org.apache.hadoop.ipc.internal.ShadedProtobufHelper;
import org.apache.hadoop.thirdparty.protobuf.Message;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.AsyncGet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -87,6 +89,7 @@ public static <T, R> R asyncIpcClient(
// transfer thread local context to worker threads of executor.
ThreadLocalContext threadLocalContext = new ThreadLocalContext();
asyncCompleteWith(responseFuture.handleAsync((result, e) -> {
FederationRPCMetrics.ASYNC_RESPONDER_START_TIME.set(Time.monotonicNow());
threadLocalContext.transfer();
if (e != null) {
throw warpCompletionException(e);
Expand Down Expand Up @@ -136,6 +139,7 @@ public static <T> void asyncRouterServer(ServerReq<T> req, ServerRes<T> res) {
} else {
callback.error(e.getCause());
}
FederationRPCMetrics.addAsyncResponderThreadTime();
return null;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableRate;
import org.apache.hadoop.util.Time;

/**
* Implementation of the RPC metrics collector.
Expand All @@ -41,9 +42,13 @@ public class FederationRPCMetrics implements FederationRPCMBean {
private final MetricsRegistry registry = new MetricsRegistry("router");

private RouterRpcServer rpcServer;
public static final ThreadLocal<Long> ASYNC_RESPONDER_START_TIME = ThreadLocal.withInitial(() -> -1L);
public static final ThreadLocal<Long> ASYNC_RESPONDER_END_TIME = ThreadLocal.withInitial(() -> -1L);

@Metric("Time for the router to process an operation internally")
private MutableRate processing;
@Metric("Time for the router async responder to process an operation internally")
private static MutableRate asyncResponderProcessing;
@Metric("Number of operations the Router processed internally")
private MutableCounterLong processingOp;
@Metric("Time for the Router to proxy an operation to the Namenodes")
Expand Down Expand Up @@ -301,6 +306,20 @@ public void addProcessingTime(long time) {
processing.add(time);
processingOp.incr();
}

public static void addAsyncResponderThreadTime() {
ASYNC_RESPONDER_END_TIME.set(Time.monotonicNow());
long duration = getAsyncResponderProcessingTime();
asyncResponderProcessing.add(duration);
}

public static long getAsyncResponderProcessingTime() {
if (ASYNC_RESPONDER_START_TIME.get() != null && ASYNC_RESPONDER_START_TIME.get() > 0 &&
ASYNC_RESPONDER_END_TIME.get() != null && ASYNC_RESPONDER_END_TIME.get() > 0) {
return ASYNC_RESPONDER_END_TIME.get() - ASYNC_RESPONDER_START_TIME.get();
}
return -1;
}

@Override
public double getProcessingAvg() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
*/
package org.apache.hadoop.hdfs.server.federation.router;

import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCPerformanceMonitor;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.util.Time;

/**
* The ThreadLocalContext class is designed to capture and transfer the context of a
Expand Down

0 comments on commit 7cb0ddf

Please sign in to comment.