Skip to content

Commit

Permalink
Merge #3282 into 1.2.0-M3
Browse files Browse the repository at this point in the history
  • Loading branch information
violetagg committed Jun 6, 2024
2 parents 8b4b046 + d670eec commit a4aede6
Show file tree
Hide file tree
Showing 4 changed files with 226 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
import java.net.SocketAddress;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;

Expand All @@ -51,6 +53,9 @@
*/
abstract class AbstractHttpServerMetricsHandler extends ChannelDuplexHandler {

static final boolean LAST_FLUSH_WHEN_NO_READ = Boolean.parseBoolean(
System.getProperty("reactor.netty.http.server.lastFlushWhenNoRead", "false"));

private static final Logger log = Loggers.getLogger(AbstractHttpServerMetricsHandler.class);

boolean channelActivated;
Expand All @@ -66,6 +71,8 @@ abstract class AbstractHttpServerMetricsHandler extends ChannelDuplexHandler {

boolean initialized;

boolean isHttp11;

String method;
String path;
SocketAddress remoteSocketAddress;
Expand All @@ -90,6 +97,7 @@ protected AbstractHttpServerMetricsHandler(AbstractHttpServerMetricsHandler copy
this.dataSent = copy.dataSent;
this.dataSentTime = copy.dataSentTime;
this.initialized = copy.initialized;
this.isHttp11 = copy.isHttp11;
this.method = copy.method;
this.path = copy.path;
this.remoteSocketAddress = copy.remoteSocketAddress;
Expand All @@ -103,16 +111,19 @@ public void channelActive(ChannelHandlerContext ctx) {
// For custom user recorders, we don't propagate the channelActive event, because this will be done
// by the ChannelMetricsHandler itself. ChannelMetricsHandler is only present when the recorder is
// not our MicrometerHttpServerMetricsRecorder. See HttpServerConfig class.
if (!(ctx.channel() instanceof Http2StreamChannel) && recorder() instanceof MicrometerHttpServerMetricsRecorder) {
try {
// Always use the real connection local address without any proxy information
channelOpened = true;
recorder().recordServerConnectionOpened(ctx.channel().localAddress());
}
catch (RuntimeException e) {
// Allow request-response exchange to continue, unaffected by metrics problem
if (log.isWarnEnabled()) {
log.warn(format(ctx.channel(), "Exception caught while recording metrics."), e);
if (!(ctx.channel() instanceof Http2StreamChannel)) {
isHttp11 = true;
if (recorder() instanceof MicrometerHttpServerMetricsRecorder) {
try {
// Always use the real connection local address without any proxy information
channelOpened = true;
recorder().recordServerConnectionOpened(ctx.channel().localAddress());
}
catch (RuntimeException e) {
// Allow request-response exchange to continue, unaffected by metrics problem
if (log.isWarnEnabled()) {
log.warn(format(ctx.channel(), "Exception caught while recording metrics."), e);
}
}
}
}
Expand Down Expand Up @@ -180,21 +191,38 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
dataSent += extractProcessedDataFromBuffer(msg);

if (msg instanceof LastHttpContent) {
promise.addListener(future -> {
MetricsArgProvider copy;
if (isHttp11 && LAST_FLUSH_WHEN_NO_READ) {
copy = createMetricsArgProvider();
ChannelOperations<?, ?> channelOps = ChannelOperations.get(ctx.channel());
if (channelOps instanceof HttpServerOperations) {
HttpServerOperations ops = (HttpServerOperations) channelOps;
try {
recordInactiveConnectionOrStream(ctx.channel(), (HttpServerOperations) channelOps);
}
}
else {
copy = null;
}
promise.addListener(future -> {
try {
if (copy == null) {
recordWrite(ctx.channel());
}
catch (RuntimeException e) {
// Allow request-response exchange to continue, unaffected by metrics problem
if (log.isWarnEnabled()) {
log.warn(format(ctx.channel(), "Exception caught while recording metrics."), e);
}
else {
recordWrite(ctx.channel(), copy);
}
}
catch (RuntimeException e) {
// Allow request-response exchange to continue, unaffected by metrics problem
if (log.isWarnEnabled()) {
log.warn(format(ctx.channel(), "Exception caught while recording metrics."), e);
}
}

recordInactiveConnectionOrStream(ctx.channel(), ops);
if (copy == null) {
ChannelOperations<?, ?> channelOps = ChannelOperations.get(ctx.channel());
if (channelOps instanceof HttpServerOperations) {
recordInactiveConnectionOrStream(ctx.channel(), (HttpServerOperations) channelOps);
}
}
});
}
Expand All @@ -216,7 +244,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
HttpServerOperations ops = null;
try {
if (msg instanceof HttpRequest) {
reset();
reset(ctx.channel());
ChannelOperations<?, ?> channelOps = ChannelOperations.get(ctx.channel());
if (channelOps instanceof HttpServerOperations) {
ops = (HttpServerOperations) channelOps;
Expand Down Expand Up @@ -292,6 +320,10 @@ else if (msg instanceof ByteBuf) {

protected abstract HttpServerMetricsRecorder recorder();

protected MetricsArgProvider createMetricsArgProvider() {
return new MetricsArgProvider(contextView, dataReceivedTime, dataSent, dataSentTime, method, path, remoteSocketAddress, status);
}

protected void contextView(HttpServerOperations ops) {
this.contextView = Context.empty();
}
Expand All @@ -311,6 +343,22 @@ protected void recordRead() {
}

protected void recordWrite(Channel channel) {
recordWrite(dataReceivedTime, dataSent, dataSentTime, method, path, remoteSocketAddress, status);
}

protected void recordWrite(Channel channel, MetricsArgProvider metricsArgProvider) {
recordWrite(metricsArgProvider.dataReceivedTime, metricsArgProvider.dataSent, metricsArgProvider.dataSentTime,
metricsArgProvider.method, metricsArgProvider.path, metricsArgProvider.remoteSocketAddress, metricsArgProvider.status);
}

void recordWrite(
long dataReceivedTime,
long dataSent,
long dataSentTime,
String method,
String path,
SocketAddress remoteSocketAddress,
String status) {
Duration dataSentTimeDuration = Duration.ofNanos(System.nanoTime() - dataSentTime);
recorder().recordDataSentTime(path, method, status, dataSentTimeDuration);

Expand Down Expand Up @@ -377,7 +425,7 @@ else if (channel instanceof SocketChannel) {
}
}

void reset() {
protected void reset(Channel channel) {
// There is no need to reset 'channelActivated' and 'channelOpened'
contextView = null;
dataReceived = 0;
Expand Down Expand Up @@ -407,4 +455,46 @@ void reset() {
}
static final String UNKNOWN_METHOD = "UNKNOWN";
static final Function<String, String> DEFAULT_METHOD_TAG_VALUE = m -> STANDARD_METHODS.contains(m) ? m : UNKNOWN_METHOD;

static class MetricsArgProvider {
final ContextView contextView;
final long dataReceivedTime;
final long dataSent;
final long dataSentTime;
final Map<Object, Object> map = new HashMap<>();
final String method;
final String path;
final SocketAddress remoteSocketAddress;
final String status;

MetricsArgProvider(
ContextView contextView,
long dataReceivedTime,
long dataSent,
long dataSentTime,
String method,
String path,
SocketAddress remoteSocketAddress,
String status) {
this.contextView = contextView;
this.dataReceivedTime = dataReceivedTime;
this.dataSent = dataSent;
this.dataSentTime = dataSentTime;
this.method = method;
this.path = path;
this.remoteSocketAddress = remoteSocketAddress;
this.status = status;
}

<T> MetricsArgProvider put(Object key, T object) {
this.map.put(key, object);
return this;
}

@Nullable
@SuppressWarnings({"unchecked", "TypeParameterUnusedInFormals"})
<T> T get(Object key) {
return (T) this.map.get(key);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import reactor.util.annotation.Nullable;
import reactor.util.context.ContextView;

import java.net.SocketAddress;
import java.time.Duration;
import java.util.function.Function;

Expand Down Expand Up @@ -74,6 +75,24 @@ protected void recordRead() {

@Override
protected void recordWrite(Channel channel) {
recordWrite(contextView, dataReceivedTime, dataSent, dataSentTime, method, path, remoteSocketAddress, status);
}

@Override
protected void recordWrite(Channel channel, MetricsArgProvider metricsArgProvider) {
recordWrite(metricsArgProvider.contextView, metricsArgProvider.dataReceivedTime, metricsArgProvider.dataSent, metricsArgProvider.dataSentTime,
metricsArgProvider.method, metricsArgProvider.path, metricsArgProvider.remoteSocketAddress, metricsArgProvider.status);
}

void recordWrite(
ContextView contextView,
long dataReceivedTime,
long dataSent,
long dataSentTime,
String method,
String path,
SocketAddress remoteSocketAddress,
String status) {
Duration dataSentTimeDuration = Duration.ofNanos(System.nanoTime() - dataSentTime);
recorder().recordDataSentTime(contextView, path, method, status, dataSentTimeDuration);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import reactor.util.context.ContextView;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.Objects;
import java.util.function.Function;
Expand Down Expand Up @@ -80,13 +81,37 @@ final class MicrometerHttpServerMetricsHandler extends AbstractHttpServerMetrics
this.parentContextView = copy.parentContextView;
}

@Override
protected MetricsArgProvider createMetricsArgProvider() {
return super.createMetricsArgProvider().put(Observation.class, responseTimeObservation);
}

@Override
protected HttpServerMetricsRecorder recorder() {
return recorder;
}

@Override
protected void recordWrite(Channel channel) {
recordWrite(dataSent, dataSentTime, method, path, remoteSocketAddress, responseTimeObservation, status);

setChannelContext(channel, parentContextView);
}

@Override
protected void recordWrite(Channel channel, MetricsArgProvider metricsArgProvider) {
recordWrite(metricsArgProvider.dataSent, metricsArgProvider.dataSentTime, metricsArgProvider.method, metricsArgProvider.path,
metricsArgProvider.remoteSocketAddress, metricsArgProvider.get(Observation.class), metricsArgProvider.status);
}

void recordWrite(
long dataSent,
long dataSentTime,
String method,
String path,
SocketAddress remoteSocketAddress,
Observation responseTimeObservation,
String status) {
Duration dataSentTimeDuration = Duration.ofNanos(System.nanoTime() - dataSentTime);
recorder().recordDataSentTime(path, method, status, dataSentTimeDuration);

Expand All @@ -100,12 +125,6 @@ protected void recordWrite(Channel channel) {
//
// Move the implementation from the recorder here
responseTimeObservation.stop();

setChannelContext(channel, parentContextView);

responseTimeHandlerContext = null;
responseTimeObservation = null;
parentContextView = null;
}

@Override
Expand Down Expand Up @@ -133,6 +152,19 @@ protected void startWrite(HttpServerOperations ops) {
responseTimeHandlerContext.status = status;
}

@Override
protected void reset(Channel channel) {
super.reset(channel);

if (isHttp11 && LAST_FLUSH_WHEN_NO_READ) {
setChannelContext(channel, parentContextView);
}

responseTimeHandlerContext = null;
responseTimeObservation = null;
parentContextView = null;
}

/*
* Requirements for HTTP servers
* <p>https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/api.md#span
Expand Down
Loading

0 comments on commit a4aede6

Please sign in to comment.