Skip to content

Commit

Permalink
Add API for obtaining request ID (#1530)
Browse files Browse the repository at this point in the history
Request ID is a combination of the id of the underlying connection
and the serial number of the request received on that connection.

Use the request ID in the logs when wire logger is enabled (transition from
LoggingHandler to ReactorNettyLoggingHandler)
  • Loading branch information
violetagg authored Mar 4, 2021
1 parent c146e48 commit af2d17f
Show file tree
Hide file tree
Showing 21 changed files with 667 additions and 58 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright (c) 2011-Present VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package reactor.netty;

/**
* Provides short/long string representation of the channel.
*
* @author Violeta Georgieva
* @since 1.0.5
*/
public interface ChannelOperationsId {

/**
* The short string is a combination of the id of the underlying connection
* and in case of HTTP, the serial number of the request received on that connection.
* <p>Format of the short string:
* {@code <CONNECTION_ID>-<REQUEST_NUMBER>}
* </p>
* <p>
* Example:
* {@code
* <CONNECTION_ID>: 329c6ffd
* <REQUEST_NUMBER>: 5
*
* Result: 329c6ffd-5
* }
* </p>
*/
String asShortText();

/**
* The long string is a combination of the id of the underlying connection, local and remote addresses,
* and in case of HTTP, the serial number of the request received on that connection.
* <p>Format of the short string:
* {@code [id: 0x<CONNECTION_ID>-<REQUEST_NUMBER>, <LOCAL_ADDRESS> <CONNECTION_OPENED_CLOSED> <REMOTE_ADDRESS>]}
* </p>
* <p>
* Example:
* {@code
* Opened connection
* <CONNECTION_ID>: 329c6ffd
* <REQUEST_NUMBER>: 5
* <LOCAL_ADDRESS>: L:/0:0:0:0:0:0:0:1:64286
* <CONNECTION_OPENED_CLOSED>: - (opened)
* <REMOTE_ADDRESS>: R:/0:0:0:0:0:0:0:1:64284
*
* Result: [id: 0x329c6ffd-5, L:/0:0:0:0:0:0:0:1:64286 - R:/0:0:0:0:0:0:0:1:64284]
*
* Closed connection
* <CONNECTION_ID>: 329c6ffd
* <REQUEST_NUMBER>: 5
* <LOCAL_ADDRESS>: L:/0:0:0:0:0:0:0:1:64286
* <CONNECTION_OPENED_CLOSED>: ! (closed)
* <REMOTE_ADDRESS>: R:/0:0:0:0:0:0:0:1:64284
*
* Result: [id: 0x329c6ffd-5, L:/0:0:0:0:0:0:0:1:64286 ! R:/0:0:0:0:0:0:0:1:64284]
* }
* </p>
*/
String asLongText();
}
14 changes: 11 additions & 3 deletions reactor-netty-core/src/main/java/reactor/netty/ReactorNetty.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
Expand Down Expand Up @@ -209,9 +210,16 @@ public static String format(Channel channel, String msg) {
Objects.requireNonNull(channel, "channel");
Objects.requireNonNull(msg, "msg");
if (LOG_CHANNEL_INFO) {
String channelStr = channel.toString();
String channelStr;
Connection connection = Connection.from(channel);
if (connection instanceof ChannelOperationsId) {
channelStr = ((ChannelOperationsId) connection).asLongText();
}
else {
channelStr = channel.toString();
}
return new StringBuilder(channelStr.length() + 1 + msg.length())
.append(channel)
.append(channelStr)
.append(' ')
.append(msg)
.toString();
Expand Down Expand Up @@ -840,7 +848,7 @@ protected void hookFinally(SignalType type) {
}
}

static final class SimpleConnection implements Connection {
static final class SimpleConnection extends AtomicLong implements Connection {

final Channel channel;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import reactor.core.publisher.Operators;
import reactor.core.publisher.Sinks;
import reactor.netty.ByteBufFlux;
import reactor.netty.ChannelOperationsId;
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.FutureMono;
Expand All @@ -60,7 +61,7 @@
* @since 0.6
*/
public class ChannelOperations<INBOUND extends NettyInbound, OUTBOUND extends NettyOutbound>
implements NettyInbound, NettyOutbound, Connection, CoreSubscriber<Void> {
implements NettyInbound, NettyOutbound, Connection, CoreSubscriber<Void>, ChannelOperationsId {

/**
* Add {@link NettyPipeline#ReactiveBridge} handler at the end of {@link Channel}
Expand Down Expand Up @@ -115,15 +116,22 @@ public static void addMetricsHandler(Channel ch, ChannelMetricsRecorder recorder
final Connection connection;
final FluxReceive inbound;
final ConnectionObserver listener;
final Sinks.Empty<Void> onTerminate;
final Sinks.Empty<Void> onTerminate;
final String shortId;

volatile Subscription outboundSubscription;

boolean localActive;
String longId;

protected ChannelOperations(ChannelOperations<INBOUND, OUTBOUND> replaced) {
this.connection = replaced.connection;
this.listener = replaced.listener;
this.onTerminate = replaced.onTerminate;
this.inbound = new FluxReceive(this);
this.shortId = replaced.shortId;
this.longId = replaced.longId;
this.localActive = replaced.localActive;
}

/**
Expand All @@ -137,6 +145,7 @@ public ChannelOperations(Connection connection, ConnectionObserver listener) {
this.listener = requireNonNull(listener, "listener");
this.onTerminate = Sinks.unsafe().empty();
this.inbound = new FluxReceive(this);
shortId = initShortId();
}

@Nullable
Expand Down Expand Up @@ -495,6 +504,10 @@ protected final String formatName() {
.replace("Operations", "");
}

protected String initShortId() {
return channel().id().asShortText();
}

/**
* Wrap an inbound error
*
Expand Down Expand Up @@ -522,6 +535,58 @@ public Context currentContext() {
return listener.currentContext();
}

@Override
public String asShortText() {
return shortId;
}

@Override
public String asLongText() {
boolean active = channel().isActive();
if (localActive == active && longId != null) {
return longId;
}

SocketAddress remoteAddress = channel().remoteAddress();
SocketAddress localAddress = channel().localAddress();
String shortText = asShortText();
if (remoteAddress != null) {
String localAddressStr = String.valueOf(localAddress);
String remoteAddressStr = String.valueOf(remoteAddress);
StringBuilder buf =
new StringBuilder(7 + shortText.length() + 4 + localAddressStr.length() + 3 + 2 + remoteAddressStr.length() + 1)
.append("[id: 0x")
.append(shortText)
.append(", L:")
.append(localAddressStr)
.append(active ? " - " : " ! ")
.append("R:")
.append(remoteAddressStr)
.append(']');
longId = buf.toString();
}
else if (localAddress != null) {
String localAddressStr = String.valueOf(localAddress);
StringBuilder buf = new StringBuilder(7 + shortText.length() + 4 + localAddressStr.length() + 1)
.append("[id: 0x")
.append(shortText)
.append(", L:")
.append(localAddressStr)
.append(']');
longId = buf.toString();
}
else {
StringBuilder buf = new StringBuilder(7 + shortText.length() + 1)
.append("[id: 0x")
.append(shortText)
.append(']');
longId = buf.toString();
}

localActive = active;
return longId;
}

/**
* A {@link ChannelOperations} factory
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiPredicate;
import java.util.function.Function;

Expand Down Expand Up @@ -381,7 +382,7 @@ static class Pending {
}
}

static final class PooledConnection implements Connection, ConnectionObserver {
static final class PooledConnection extends AtomicLong implements Connection, ConnectionObserver {
final Channel channel;
final Sinks.Empty<Void> onTerminate;
final InstrumentedPool<PooledConnection> pool;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -36,6 +37,7 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.ApplicationProtocolConfig;
import io.netty.handler.ssl.ApplicationProtocolNames;
Expand All @@ -51,6 +53,7 @@
import reactor.netty.ReactorNetty;
import reactor.netty.channel.ChannelMetricsHandler;
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.transport.logging.AdvancedByteBufFormat;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;
Expand Down Expand Up @@ -766,7 +769,9 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {

static final Logger log = Loggers.getLogger(SslProvider.class);

static final LoggingHandler LOGGING_HANDLER = new LoggingHandler("reactor.netty.tcp.ssl");
static final LoggingHandler LOGGING_HANDLER =
AdvancedByteBufFormat.HEX_DUMP
.toLoggingHandler("reactor.netty.tcp.ssl", LogLevel.DEBUG, Charset.defaultCharset());

/**
* <a href="https://wiki.mozilla.org/Security/Server_Side_TLS#Modern_compatibility">Mozilla Modern Cipher
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import reactor.netty.ChannelPipelineConfigurer;
import reactor.netty.ConnectionObserver;
Expand All @@ -27,9 +28,11 @@
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources;
import reactor.netty.transport.ClientTransportConfig;
import reactor.netty.transport.logging.AdvancedByteBufFormat;
import reactor.util.annotation.Nullable;

import java.net.SocketAddress;
import java.nio.charset.Charset;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
Expand Down Expand Up @@ -116,7 +119,9 @@ protected ChannelPipelineConfigurer defaultOnChannelInit() {

static final ChannelOperations.OnSetup DEFAULT_OPS = (ch, c, msg) -> new ChannelOperations<>(ch, c);

static final LoggingHandler LOGGING_HANDLER = new LoggingHandler(TcpClient.class);
static final LoggingHandler LOGGING_HANDLER =
AdvancedByteBufFormat.HEX_DUMP
.toLoggingHandler(TcpClient.class.getName(), LogLevel.DEBUG, Charset.defaultCharset());

/**
* Default value whether the SSL debugging on the client side will be enabled/disabled,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import reactor.netty.ChannelPipelineConfigurer;
import reactor.netty.ConnectionObserver;
Expand All @@ -26,9 +27,11 @@
import reactor.netty.channel.MicrometerChannelMetricsRecorder;
import reactor.netty.resources.LoopResources;
import reactor.netty.transport.ServerTransportConfig;
import reactor.netty.transport.logging.AdvancedByteBufFormat;
import reactor.util.annotation.Nullable;

import java.net.SocketAddress;
import java.nio.charset.Charset;
import java.util.Map;
import java.util.function.Supplier;

Expand Down Expand Up @@ -109,7 +112,9 @@ protected ChannelPipelineConfigurer defaultOnChannelInit() {

static final ChannelOperations.OnSetup DEFAULT_OPS = (ch, c, msg) -> new ChannelOperations<>(ch, c);

static final LoggingHandler LOGGING_HANDLER = new LoggingHandler(TcpServer.class);
static final LoggingHandler LOGGING_HANDLER =
AdvancedByteBufFormat.HEX_DUMP
.toLoggingHandler(TcpServer.class.getName(), LogLevel.DEBUG, Charset.defaultCharset());

/**
* Default value whether the SSL debugging on the server side will be enabled/disabled,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.charset.Charset;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;
Expand All @@ -29,13 +30,15 @@
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.proxy.HttpProxyHandler;
import io.netty.handler.proxy.ProxyHandler;
import io.netty.handler.proxy.Socks4ProxyHandler;
import io.netty.handler.proxy.Socks5ProxyHandler;
import io.netty.util.internal.StringUtil;
import reactor.netty.NettyPipeline;
import reactor.netty.transport.logging.AdvancedByteBufFormat;
import reactor.util.annotation.Nullable;

/**
Expand Down Expand Up @@ -218,7 +221,9 @@ private String getPasswordValue() {
return password.apply(username);
}

static final LoggingHandler LOGGING_HANDLER = new LoggingHandler("reactor.netty.proxy");
static final LoggingHandler LOGGING_HANDLER =
AdvancedByteBufFormat.HEX_DUMP
.toLoggingHandler("reactor.netty.proxy", LogLevel.DEBUG, Charset.defaultCharset());

static final class Build implements TypeSpec, AddressSpec, Builder {

Expand Down
Loading

0 comments on commit af2d17f

Please sign in to comment.