Skip to content

Commit

Permalink
Ensure the real connection local address without any proxy informatio…
Browse files Browse the repository at this point in the history
…n is used for HttpServer connections.active metrics (#2954)

Related to #2945
  • Loading branch information
violetagg authored Nov 2, 2023
1 parent c860208 commit ea67bb5
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
Expand All @@ -30,6 +31,7 @@
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;

import java.net.SocketAddress;
import java.time.Duration;
import java.util.function.Function;

Expand Down Expand Up @@ -75,6 +77,7 @@ public void channelActive(ChannelHandlerContext ctx) {
// not our MicrometerHttpServerMetricsRecorder. See HttpServerConfig class.
if (!(ctx.channel() instanceof Http2StreamChannel) && recorder() instanceof MicrometerHttpServerMetricsRecorder) {
try {
// Always use the real connection local address without any proxy information
recorder().recordServerConnectionOpened(ctx.channel().localAddress());
}
catch (RuntimeException e) {
Expand All @@ -91,6 +94,7 @@ public void channelActive(ChannelHandlerContext ctx) {
public void channelInactive(ChannelHandlerContext ctx) {
if (!(ctx.channel() instanceof Http2StreamChannel) && recorder() instanceof MicrometerHttpServerMetricsRecorder) {
try {
// Always use the real connection local address without any proxy information
recorder().recordServerConnectionClosed(ctx.channel().localAddress());
}
catch (RuntimeException e) {
Expand All @@ -100,10 +104,9 @@ public void channelInactive(ChannelHandlerContext ctx) {
// Allow request-response exchange to continue, unaffected by metrics problem
}
}
ChannelOperations<?, ?> channelOps = ChannelOperations.get(ctx.channel());
if (channelOps instanceof HttpServerOperations) {
recordInactiveConnectionOrStream((HttpServerOperations) channelOps);
}

recordInactiveConnectionOrStream(ctx.channel());

ctx.fireChannelInactive();
}

Expand Down Expand Up @@ -136,9 +139,10 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
}
// Allow request-response exchange to continue, unaffected by metrics problem
}
recordInactiveConnectionOrStream(ops);
}

recordInactiveConnectionOrStream(ctx.channel());

dataSent = 0;
});
}
Expand All @@ -160,16 +164,15 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
if (msg instanceof HttpRequest) {
dataReceivedTime = System.nanoTime();
ChannelOperations<?, ?> channelOps = ChannelOperations.get(ctx.channel());
if (channelOps instanceof HttpServerOperations) {
channelActivated = true;
HttpServerOperations ops = (HttpServerOperations) channelOps;
if (ops.isHttp2()) {
recordOpenStream(ops);
}
else {
recordActiveConnection(ops);
}

channelActivated = true;
if (ctx.channel() instanceof Http2StreamChannel) {
// Always use the real connection local address without any proxy information
recordOpenStream(ctx.channel().localAddress());
}
else {
// Always use the real connection local address without any proxy information
recordActiveConnection(ctx.channel().localAddress());
}
}

Expand Down Expand Up @@ -257,36 +260,38 @@ protected void recordWrite(HttpServerOperations ops, String path, String method,
recorder().recordDataSent(ops.remoteSocketAddress(), path, dataSent);
}

protected void recordActiveConnection(HttpServerOperations ops) {
recorder().recordServerConnectionActive(ops.hostSocketAddress());
protected void recordActiveConnection(SocketAddress localAddress) {
recorder().recordServerConnectionActive(localAddress);
}

protected void recordInactiveConnection(HttpServerOperations ops) {
recorder().recordServerConnectionInactive(ops.hostSocketAddress());
protected void recordInactiveConnection(SocketAddress localAddress) {
recorder().recordServerConnectionInactive(localAddress);
}

protected void recordOpenStream(HttpServerOperations ops) {
recorder().recordStreamOpened(ops.hostSocketAddress());
protected void recordOpenStream(SocketAddress localAddress) {
recorder().recordStreamOpened(localAddress);
}

protected void recordClosedStream(HttpServerOperations ops) {
recorder().recordStreamClosed(ops.hostSocketAddress());
protected void recordClosedStream(SocketAddress localAddress) {
recorder().recordStreamClosed(localAddress);
}

void recordInactiveConnectionOrStream(HttpServerOperations ops) {
void recordInactiveConnectionOrStream(Channel channel) {
if (channelActivated) {
channelActivated = false;
try {
if (ops.isHttp2()) {
recordClosedStream(ops);
if (channel instanceof Http2StreamChannel) {
// Always use the real connection local address without any proxy information
recordClosedStream(channel.localAddress());
}
else {
recordInactiveConnection(ops);
// Always use the real connection local address without any proxy information
recordInactiveConnection(channel.localAddress());
}
}
catch (RuntimeException e) {
if (log.isWarnEnabled()) {
log.warn(format(ops.channel(), "Exception caught while recording metrics."), e);
log.warn(format(channel, "Exception caught while recording metrics."), e);
}
// Allow request-response exchange to continue, unaffected by metrics problem
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ void testServerConnectionsMicrometer(HttpProtocol[] serverProtocols, HttpProtoco
ServerCloseHandler.INSTANCE.register(cnx.channel());
});

disposableServer = server.bindNow();
disposableServer = server.forwarded(true).bindNow();

AtomicReference<SocketAddress> clientAddress = new AtomicReference<>();
httpClient = httpClient
Expand All @@ -661,6 +661,7 @@ void testServerConnectionsMicrometer(HttpProtocol[] serverProtocols, HttpProtoco

customizeClientOptions(httpClient, clientCtx, clientProtocols)
.metrics(true, Function.identity())
.headers(h -> h.add("X-Forwarded-Host", "192.168.0.1"))
.post()
.uri(uri)
.send(body)
Expand Down Expand Up @@ -994,7 +995,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
}

private void checkServerConnectionsMicrometer(HttpServerRequest request) {
String address = formatSocketAddress(request.hostAddress());
String address = formatSocketAddress(request.connectionHostAddress());
boolean isHttp2 = request.requestHeaders().contains(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text());
checkGauge(SERVER_CONNECTIONS_TOTAL, true, 1, URI, HTTP, LOCAL_ADDRESS, address);
if (isHttp2) {
Expand Down

0 comments on commit ea67bb5

Please sign in to comment.