Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correctly report TLS handshake when TCP Fast Open is enabled #1970

Merged
merged 3 commits into from
Dec 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@ public void onFlush() {
// AsyncContext is unknown at this point because protocols can flush multiple requests concurrently
}

@Override
public void onTransportHandshakeComplete() {
// AsyncContext is unknown at this point because this event is triggered by network
}

@Override
public SecurityHandshakeObserver onSecurityHandshake() {
// AsyncContext is unknown at this point because this event is triggered by network
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ void connectionEstablished(HttpProtocol httpProtocol) throws Exception {

verify(clientTransportObserver).onNewConnection();
verify(serverTransportObserver, await()).onNewConnection();
verify(clientConnectionObserver).onTransportHandshakeComplete();
verify(serverConnectionObserver, await()).onTransportHandshakeComplete();
if (protocol == HTTP_1) {
verify(clientConnectionObserver).connectionEstablished(any(ConnectionInfo.class));
verify(serverConnectionObserver, await()).connectionEstablished(any(ConnectionInfo.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ class SecurityHandshakeObserverTest {
static final ExecutionContextExtension CLIENT_CTX =
ExecutionContextExtension.cached("client-io", "client-executor");



private final TransportObserver clientTransportObserver;
private final ConnectionObserver clientConnectionObserver;
private final SecurityHandshakeObserver clientSecurityHandshakeObserver;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public TcpClientChannelInitializer(final ReadOnlyTcpClientConfig config,
final SslContext sslContext = config.sslContext();
if (observer != NoopConnectionObserver.INSTANCE) {
delegate = delegate.andThen(new ConnectionObserverInitializer(observer,
sslContext != null && !deferSslHandler));
sslContext != null && !deferSslHandler, true));
}

if (config.idleTimeoutMs() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public TcpServerChannelInitializer(final ReadOnlyTcpServerConfig config,
ChannelInitializer delegate = ChannelInitializer.defaultInitializer();

if (observer != NoopConnectionObserver.INSTANCE) {
delegate = delegate.andThen(new ConnectionObserverInitializer(observer, config.sslContext() != null));
delegate = delegate.andThen(
new ConnectionObserverInitializer(observer, config.sslContext() != null, false));
}

if (config.idleTimeoutMs() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ static ServerSslConfig defaultServerSslConfig(SslProvider provider) {
static void verifyWriteObserver(DataObserver dataObserver, WriteObserver writeObserver,
boolean completeExpected) {
verify(dataObserver).onNewWrite();
verify(writeObserver).requestedToWrite(anyLong());
verify(writeObserver, atLeastOnce()).requestedToWrite(anyLong());
verify(writeObserver).itemReceived();
verify(writeObserver).onFlushRequest();
verify(writeObserver).itemWritten();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ void testConnectionObserverEvents(SslProvider clientProvider, SslProvider server
verify(clientTransportObserver).onNewConnection();
verify(serverTransportObserver, await()).onNewConnection();

verify(clientConnectionObserver).onTransportHandshakeComplete();
verify(clientConnectionObserver).connectionEstablished(any(ConnectionInfo.class));
verify(serverConnectionObserver, await()).onTransportHandshakeComplete();
verify(serverConnectionObserver, await()).connectionEstablished(any(ConnectionInfo.class));

// handshake starts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@ void testConnectionClosed(ErrorSource errorSource) throws Exception {
NettyConnection<Buffer, Buffer> connection = client.connectBlocking(CLIENT_CTX, serverAddress);
verify(clientTransportObserver).onNewConnection();
verify(serverTransportObserver, await()).onNewConnection();
verify(clientConnectionObserver).onTransportHandshakeComplete();
verify(clientConnectionObserver).connectionEstablished(any(ConnectionInfo.class));
verify(serverConnectionObserver, await()).onTransportHandshakeComplete();
verify(serverConnectionObserver, await()).connectionEstablished(any(ConnectionInfo.class));
switch (errorSource) {
case CONNECTION_ACCEPTOR:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ void testConnectionObserverEvents() throws Exception {
NettyConnection<Buffer, Buffer> connection = client.connectBlocking(CLIENT_CTX, serverAddress);
verify(clientTransportObserver).onNewConnection();
verify(serverTransportObserver, await()).onNewConnection();
verify(clientConnectionObserver).onTransportHandshakeComplete();
verify(clientConnectionObserver).connectionEstablished(any(ConnectionInfo.class));
verify(serverConnectionObserver, await()).onTransportHandshakeComplete();
verify(serverConnectionObserver, await()).connectionEstablished(any(ConnectionInfo.class));

Buffer content = connection.executionContext().bufferAllocator().fromAscii("Hello");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ public void onFlush() {
second.onFlush();
}

@Override
public void onTransportHandshakeComplete() {
first.onTransportHandshakeComplete();
second.onTransportHandshakeComplete();
}

@Override
public SecurityHandshakeObserver onSecurityHandshake() {
return new BiSecurityHandshakeObserver(first.onSecurityHandshake(), second.onSecurityHandshake());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ public void onFlush() {
safeReport(observer::onFlush, observer, "flush");
}

@Override
public void onTransportHandshakeComplete() {
safeReport(observer::onTransportHandshakeComplete, observer, "flush");
}

@Override
public SecurityHandshakeObserver onSecurityHandshake() {
return safeReport(observer::onSecurityHandshake, observer, "security handshake",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,33 @@ public interface ConnectionObserver {
*/
void onFlush();

/**
* Callback when a transport handshake completes.
* <p>
* Transport protocols that require a handshake in order to connect. Example:
* <a href="https://datatracker.ietf.org/doc/html/rfc793.html#section-3.4">TCP "three-way handshake"</a>.
*/
default void onTransportHandshakeComplete() {
// FIXME: 0.42 - remove default impl
}

/**
* Callback when a security handshake is initiated.
* <p>
* For a typical connection, this callback is invoked after {@link #onTransportHandshakeComplete()}. There are may
* be exceptions:
* <ol>
* <li>For a TCP connection, when {@link ServiceTalkSocketOptions#TCP_FASTOPEN_CONNECT} option is configured and
* the Fast Open feature is supported by the OS, this callback may be invoked earlier. Note, even if the Fast
* Open is available and configured, it may not actually happen if the
* <a href="https://datatracker.ietf.org/doc/html/rfc7413#section-4.1">Fast Open Cookie</a> is {@code null} or
* rejected by the server.</li>
* <li>For a proxy connections, the handshake may happen after the
* {@link #connectionEstablished(ConnectionInfo)}.</li>
* </ol>
*
* @return a new {@link SecurityHandshakeObserver} that provides visibility into security handshake events
* @see <a href="https://datatracker.ietf.org/doc/html/rfc7413">RFC7413: TCP Fast Open</a>
*/
SecurityHandshakeObserver onSecurityHandshake();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ public void onDataWrite(final int size) {
public void onFlush() {
}

@Override
public void onTransportHandshakeComplete() {
}

@Override
public SecurityHandshakeObserver onSecurityHandshake() {
return NoopSecurityHandshakeObserver.INSTANCE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.kqueue.KQueue;

import javax.annotation.Nullable;

import static io.netty.channel.ChannelOption.TCP_FASTOPEN_CONNECT;
import static io.servicetalk.transport.netty.internal.ChannelCloseUtils.channelError;
import static java.util.Objects.requireNonNull;

Expand All @@ -38,16 +41,20 @@ public final class ConnectionObserverInitializer implements ChannelInitializer {

private final ConnectionObserver observer;
private final boolean secure;
private final boolean client;

/**
* Creates a new instance.
*
* @param observer {@link ConnectionObserver} to report network events.
* @param secure {@code true} if the observed connection is secure
* @param client {@code true} if this initializer is used on the client-side
*/
public ConnectionObserverInitializer(final ConnectionObserver observer, final boolean secure) {
public ConnectionObserverInitializer(final ConnectionObserver observer, final boolean secure,
final boolean client) {
this.observer = requireNonNull(observer);
this.secure = secure;
this.client = client;
}

@Override
Expand All @@ -60,36 +67,56 @@ public void init(final Channel channel) {
observer.connectionClosed(t);
}
});
channel.pipeline().addLast(new ConnectionObserverHandler(observer, secure));
channel.pipeline().addLast(new ConnectionObserverHandler(observer, secure, isFastOpen(channel)));
}

private boolean isFastOpen(final Channel channel) {
return client && secure && Boolean.TRUE.equals(channel.config().getOption(TCP_FASTOPEN_CONNECT)) &&
(Epoll.isTcpFastOpenClientSideAvailable() || KQueue.isTcpFastOpenClientSideAvailable());
}

static final class ConnectionObserverHandler extends ChannelDuplexHandler {

private final ConnectionObserver observer;
private final boolean secure;
private boolean tcpHandshakeComplete;
@Nullable
private SecurityHandshakeObserver handshakeObserver;

ConnectionObserverHandler(final ConnectionObserver observer, final boolean secure) {
ConnectionObserverHandler(final ConnectionObserver observer, final boolean secure, final boolean fastOpen) {
this.observer = observer;
this.secure = secure;
if (fastOpen) {
reportSecurityHandshakeStarting();
}
}

@Override
public void handlerAdded(final ChannelHandlerContext ctx) {
if (secure && ctx.channel().isActive()) {
reportSecurityHandshakeStarting();
if (ctx.channel().isActive()) {
reportTcpHandshakeComplete();
if (secure) {
reportSecurityHandshakeStarting();
}
}
}

@Override
public void channelActive(final ChannelHandlerContext ctx) {
reportTcpHandshakeComplete();
if (secure) {
reportSecurityHandshakeStarting();
}
ctx.fireChannelActive();
}

void reportTcpHandshakeComplete() {
if (!tcpHandshakeComplete) {
tcpHandshakeComplete = true;
observer.onTransportHandshakeComplete();
}
}

void reportSecurityHandshakeStarting() {
if (handshakeObserver == null) {
handshakeObserver = observer.onSecurityHandshake();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ public void onDataWrite(final int size) {
public void onFlush() {
}

@Override
public void onTransportHandshakeComplete() {
}

@Override
public SecurityHandshakeObserver onSecurityHandshake() {
return NoopSecurityHandshakeObserver.INSTANCE;
Expand Down