Skip to content

Commit

Permalink
[Java] Add log event for when a NAK message is received.
Browse files Browse the repository at this point in the history
  • Loading branch information
vyazelenko committed Jan 7, 2025
1 parent 80a93bb commit 7cc8bf0
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.aeron.driver.media.ImageConnection;
import io.aeron.driver.media.ReceiveChannelEndpoint;
import io.aeron.driver.media.SendChannelEndpoint;
import io.aeron.protocol.NakFlyweight;
import net.bytebuddy.asm.Advice;
import org.agrona.concurrent.UnsafeBuffer;

Expand Down Expand Up @@ -112,7 +113,7 @@ static void resendHook(

static class ReceiveChannelEndpointInterceptor
{
static class SendNakMessage
static class NakSent
{
@Advice.OnMethodEnter
static void sendNakMessage(
Expand All @@ -135,11 +136,49 @@ static void sendNakMessage(
{
if (null != connection)
{
LOGGER.logSendNakMessage(
connection.controlAddress, sessionId, streamId, termId, termOffset, length, channel);
LOGGER.logNakMessage(
SEND_NAK_MESSAGE,
connection.controlAddress,
sessionId,
streamId,
termId,
termOffset,
length,
channel);
}
}
}
}
}

static class SendChannelEndpointInterceptor
{
static class NakReceived
{
@Advice.OnMethodEnter
static void onNakMessage(
final NakFlyweight msg,
final UnsafeBuffer buffer,
final int length,
final InetSocketAddress srcAddress,
@Advice.This final Object thisObject)
{
if (null == thisObject)
{
return;
}

final String channel = ((SendChannelEndpoint)thisObject).originalUriString();
LOGGER.logNakMessage(
NAK_RECEIVED,
srcAddress,
msg.sessionId(),
msg.streamId(),
msg.termId(),
msg.termOffset(),
length,
channel);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -240,9 +240,16 @@ private AgentBuilder addChannelEndpointInstrumentation(final AgentBuilder agentB
tempBuilder,
SEND_NAK_MESSAGE,
"ReceiveChannelEndpoint",
ChannelEndpointInterceptor.ReceiveChannelEndpointInterceptor.SendNakMessage.class,
ChannelEndpointInterceptor.ReceiveChannelEndpointInterceptor.NakSent.class,
"sendNakMessage");

tempBuilder = addEventInstrumentation(
tempBuilder,
SEND_NAK_MESSAGE,
"SendChannelEndpoint",
ChannelEndpointInterceptor.SendChannelEndpointInterceptor.NakReceived.class,
"onNakMessage");

return tempBuilder;
}

Expand Down
14 changes: 10 additions & 4 deletions aeron-agent/src/main/java/io/aeron/agent/DriverEventCode.java
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,12 @@ public enum DriverEventCode implements EventCode
*/
NAME_RESOLUTION_HOST_NAME(53,
(code, buffer, offset, builder) -> DriverEventDissector.dissectHostName(buffer, offset, builder)),

/**
* Nak received.
* Nak sent.
*/
SEND_NAK_MESSAGE(54,
(code, buffer, offset, builder) -> DriverEventDissector.dissectSendNak(buffer, offset, builder)),
SEND_NAK_MESSAGE(54, DriverEventDissector::dissectNak),

/**
* Resend data upon Nak.
*/
Expand All @@ -220,7 +221,12 @@ public enum DriverEventCode implements EventCode
/**
* Reject image command received by the driver.
*/
CMD_IN_REJECT_IMAGE(57, DriverEventDissector::dissectCommand);
CMD_IN_REJECT_IMAGE(57, DriverEventDissector::dissectCommand),

/**
* Nak received.
*/
NAK_RECEIVED(58, DriverEventDissector::dissectNak);

static final int EVENT_CODE_TYPE = EventCodeType.DRIVER.getTypeCode();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,10 +411,14 @@ static void dissectHostName(
buffer.getStringAscii(absoluteOffset, builder);
}

public static void dissectSendNak(final MutableDirectBuffer buffer, final int offset, final StringBuilder builder)
static void dissectNak(
final DriverEventCode eventCode,
final MutableDirectBuffer buffer,
final int offset,
final StringBuilder builder)
{
int absoluteOffset = offset;
absoluteOffset += dissectLogHeader(CONTEXT, SEND_NAK_MESSAGE, buffer, absoluteOffset, builder);
absoluteOffset += dissectLogHeader(CONTEXT, eventCode, buffer, absoluteOffset, builder);
builder.append(": address=");
final int encodedSocketLength = dissectSocketAddress(buffer, absoluteOffset, builder);
absoluteOffset += encodedSocketLength;
Expand All @@ -432,7 +436,7 @@ public static void dissectSendNak(final MutableDirectBuffer buffer, final int of
buffer.getStringAscii(absoluteOffset, builder);
}

public static void dissectResend(final MutableDirectBuffer buffer, final int offset, final StringBuilder builder)
static void dissectResend(final MutableDirectBuffer buffer, final int offset, final StringBuilder builder)
{
int absoluteOffset = offset;
absoluteOffset += dissectLogHeader(CONTEXT, RESEND, buffer, absoluteOffset, builder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,12 +311,12 @@ static void encodeHostName(
encodingBuffer, offset + encodedLength, SIZE_OF_INT + MAX_HOST_NAME_LENGTH, hostName);
}

static void encodeSendNakMessage(
static void encodeNakMessage(
final UnsafeBuffer encodingBuffer,
final int offset,
final int length,
final int captureLength,
final InetSocketAddress controlAddress,
final InetSocketAddress address,
final int sessionId,
final int streamId,
final int termId,
Expand All @@ -326,7 +326,7 @@ static void encodeSendNakMessage(
final int headerLength = encodeLogHeader(encodingBuffer, offset, captureLength, length);
final int bodyOffset = offset + headerLength;
int bodyLength = 0;
final int socketEncodedLength = encodeSocketAddress(encodingBuffer, bodyOffset + bodyLength, controlAddress);
final int socketEncodedLength = encodeSocketAddress(encodingBuffer, bodyOffset + bodyLength, address);
bodyLength += socketEncodedLength;

encodingBuffer.putInt(bodyOffset + bodyLength, sessionId, LITTLE_ENDIAN);
Expand Down
30 changes: 16 additions & 14 deletions aeron-agent/src/main/java/io/aeron/agent/DriverEventLogger.java
Original file line number Diff line number Diff line change
Expand Up @@ -511,41 +511,43 @@ public void logFlowControlReceiver(
}

/**
* Logs a NAK message sent by the receiver for a single control address.
* Logs a NAK message sent by the receiver for a single control address or received by the sender.
*
* @param controlAddress Nak UDP destination
* @param sessionId of the Nak.
* @param streamId of the Nak.
* @param termId of the Nak.
* @param termOffset of the Nak.
* @param nakLength of the Nak.
* @param channel of the Nak.
* @param eventCode to log Nak by.
* @param address Nak UDP destination/source.
* @param sessionId of the Nak.
* @param streamId of the Nak.
* @param termId of the Nak.
* @param termOffset of the Nak.
* @param nakLength of the Nak.
* @param channel of the Nak.
*/
public void logSendNakMessage(
final InetSocketAddress controlAddress,
public void logNakMessage(
final DriverEventCode eventCode,
final InetSocketAddress address,
final int sessionId,
final int streamId,
final int termId,
final int termOffset,
final int nakLength,
final String channel)
{
final int length = socketAddressLength(controlAddress) + (SIZE_OF_INT * 6) + channel.length();
final int length = socketAddressLength(address) + (SIZE_OF_INT * 6) + channel.length();
final int captureLength = captureLength(length);
final int encodedLength = encodedLength(captureLength);

final ManyToOneRingBuffer ringBuffer = this.ringBuffer;
final int index = ringBuffer.tryClaim(toEventCodeId(SEND_NAK_MESSAGE), encodedLength);
final int index = ringBuffer.tryClaim(toEventCodeId(eventCode), encodedLength);
if (index > 0)
{
try
{
encodeSendNakMessage(
encodeNakMessage(
(UnsafeBuffer)ringBuffer.buffer(),
index,
captureLength,
length,
controlAddress,
address,
sessionId,
streamId,
termId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,8 +388,9 @@ void logHost()
assertEquals(expectedHostName, logBuffer.getStringAscii(index, LITTLE_ENDIAN));
}

@Test
void logSendNakMessage()
@ParameterizedTest
@EnumSource(value = DriverEventCode.class, names = { "SEND_NAK_MESSAGE", "NAK_RECEIVED" })
void logNakMessage(final DriverEventCode eventCode)
{
final InetSocketAddress inetSocketAddress = new InetSocketAddress("192.168.1.1", 10001);

Expand All @@ -404,15 +405,15 @@ void logSendNakMessage()
final int captureLength = socketAddressLength(inetSocketAddress) + (6 * SIZE_OF_INT) + channel.length();

logBuffer.putLong(CAPACITY + TAIL_POSITION_OFFSET, recordOffset);
logger.logSendNakMessage(inetSocketAddress, sessionId, streamId, termId, termOffset, length, channel);
logger.logNakMessage(eventCode, inetSocketAddress, sessionId, streamId, termId, termOffset, length, channel);
verifyLogHeader(
logBuffer, recordOffset, toEventCodeId(SEND_NAK_MESSAGE), captureLength, captureLength);
logBuffer, recordOffset, toEventCodeId(eventCode), captureLength, captureLength);

final StringBuilder sb = new StringBuilder();
DriverEventDissector.dissectSendNak(logBuffer, encodedMsgOffset(recordOffset), sb);
DriverEventDissector.dissectNak(eventCode, logBuffer, encodedMsgOffset(recordOffset), sb);

final String expectedMessagePattern =
"\\[[0-9]+\\.[0-9]+] DRIVER: SEND_NAK_MESSAGE \\[124/124]: address=192.168.1.1:10001 " +
"\\[[0-9]+\\.[0-9]+] DRIVER: " + eventCode + " \\[124/124]: address=192.168.1.1:10001 " +
"sessionId=9821374 streamId=988234 termId=89324 termOffset=9862314 length=1239 channel=" +
"aeron:udp\\?endpoint=localhost:10000\\|term-length=1m\\|init-term-id=0\\|term-id=0\\|term-offset=0";

Expand Down

0 comments on commit 7cc8bf0

Please sign in to comment.