Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track if proxy protocol messages need buffering #1669

Merged
merged 3 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,26 +48,40 @@ public void addProxyProtocol(ChannelPipeline pipeline) {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (withProxyProtocol && isHAPMDetected(msg)) {
if (!withProxyProtocol) {
ctx.pipeline().remove(this);
super.channelRead(ctx, msg);
return;
}

ProtocolDetectionState haProxyState = getDetectionState(msg);
if (haProxyState == ProtocolDetectionState.DETECTED) {
ctx.pipeline()
.addAfter(NAME, null, new HAProxyMessageChannelHandler())
.replace(this, null, new HAProxyMessageDecoder());
} else {
if (withProxyProtocol) {
final int port = ctx.channel()
.attr(SourceAddressChannelHandler.ATTR_SERVER_LOCAL_PORT)
.get();
// This likely means initialization was requested with proxy protocol, but we encountered a non-ppv2
// message
registry.counter("zuul.hapm.decode", "success", "false", "port", String.valueOf(port))
.increment();
}
final int port = ctx.channel()
.attr(SourceAddressChannelHandler.ATTR_SERVER_LOCAL_PORT)
.get();

String needsMoreData = String.valueOf(haProxyState == ProtocolDetectionState.NEEDS_MORE_DATA);
// This likely means initialization was requested with proxy protocol, but we encountered a non-ppv2
// message
registry.counter(
"zuul.hapm.decode",
"success",
"false",
"port",
String.valueOf(port),
"needs_more_data",
needsMoreData)
.increment();
ctx.pipeline().remove(this);
}
super.channelRead(ctx, msg);
}

private boolean isHAPMDetected(Object msg) {
return HAProxyMessageDecoder.detectProtocol((ByteBuf) msg).state() == ProtocolDetectionState.DETECTED;
private ProtocolDetectionState getDetectionState(Object msg) {
return HAProxyMessageDecoder.detectProtocol((ByteBuf) msg).state();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ void incrementCounterWhenPPEnabledButNonHAPMMessage() {
assertEquals(dropped, buf);
buf.release();

final Counter counter = registry.counter("zuul.hapm.decode", "success", "false", "port", String.valueOf(port));
final Counter counter = registry.counter(
"zuul.hapm.decode", "success", "false", "port", String.valueOf(port), "needs_more_data", "false");
assertEquals(1, counter.count());
}

Expand Down Expand Up @@ -181,6 +182,31 @@ void detectsSplitPpv1Message() {
assertNull(channel.pipeline().context(ElbProxyProtocolChannelHandler.class));
}

@Test
void tracksSplitMessage() {
EmbeddedChannel channel = new EmbeddedChannel();
// This is normally done by Server.
channel.attr(Server.CONN_DIMENSIONS).set(Attrs.newInstance());
int port = 7007;
channel.attr(SourceAddressChannelHandler.ATTR_SERVER_LOCAL_PORT).set(port);

channel.pipeline()
.addLast(ElbProxyProtocolChannelHandler.NAME, new ElbProxyProtocolChannelHandler(registry, true));
ByteBuf buf1 = Unpooled.wrappedBuffer("PROXY TCP4".getBytes(StandardCharsets.US_ASCII));
channel.writeInbound(buf1);

Object msg = channel.readInbound();
assertEquals(buf1, msg);
buf1.release();

// The handler should remove itself.
assertNull(channel.pipeline().context(ElbProxyProtocolChannelHandler.class));

Counter counter = registry.counter(
"zuul.hapm.decode", "success", "false", "port", String.valueOf(port), "needs_more_data", "true");
assertEquals(1, counter.count());
}

@Test
void negotiateProxy_ppv1_ipv4() {
EmbeddedChannel channel = new EmbeddedChannel();
Expand Down