Skip to content

Commit

Permalink
[Java] Add OS max/default values for SO_SNDBUF and SO_RCVBUF paramete…
Browse files Browse the repository at this point in the history
…rs to the log buffer metadata section.
  • Loading branch information
vyazelenko committed Jan 9, 2025
1 parent b24a85d commit 57318d7
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 64 deletions.
204 changes: 147 additions & 57 deletions aeron-client/src/main/java/io/aeron/logbuffer/LogBufferDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
*/
public class LogBufferDescriptor
{
private static final int PADDING_SIZE = 64;

/**
* The number of partitions the log is divided into.
*/
Expand Down Expand Up @@ -138,7 +140,7 @@ public class LogBufferDescriptor
/**
* Maximum length of a frame header.
*/
public static final int LOG_DEFAULT_FRAME_HEADER_MAX_LENGTH = CACHE_LINE_LENGTH * 2;
public static final int LOG_DEFAULT_FRAME_HEADER_MAX_LENGTH = PADDING_SIZE * 2;

/**
* Offset within the log metadata where the sparse property is stored.
Expand All @@ -165,11 +167,31 @@ public class LogBufferDescriptor
*/
public static final int LOG_SOCKET_RCVBUF_LENGTH_OFFSET;

/**
* Offset within the log metadata where the OS default length for the socket receive buffer is stored.
*/
public static final int LOG_OS_DEFAULT_SOCKET_RCVBUF_LENGTH_OFFSET;

/**
* Offset within the log metadata where the OS maximum length for the socket receive buffer is stored.
*/
public static final int LOG_OS_MAX_SOCKET_RCVBUF_LENGTH_OFFSET;

/**
* Offset within the log metadata where the socket send buffer length is stored.
*/
public static final int LOG_SOCKET_SNDBUF_LENGTH_OFFSET;

/**
* Offset within the log metadata where the OS default length for the socket send buffer is stored.
*/
public static final int LOG_OS_DEFAULT_SOCKET_SNDBUF_LENGTH_OFFSET;

/**
* Offset within the log metadata where the OS maximum length for the socket send buffer is stored.
*/
public static final int LOG_OS_MAX_SOCKET_SNDBUF_LENGTH_OFFSET;

/**
* Offset within the log metadata where the receiver window length is stored.
*/
Expand Down Expand Up @@ -280,8 +302,16 @@ public class LogBufferDescriptor
* +---------------------------------------------------------------+
* | Socket Send Buffer Length |
* +---------------------------------------------------------------+
* | OS Default Socket Send Buffer Length |
* +---------------------------------------------------------------+
* | OS Max Socket Send Buffer Length |
* +---------------------------------------------------------------+
* | Socket Receive Buffer Length |
* +---------------------------------------------------------------+
* | OS Default Socket Receive Buffer Length |
* +---------------------------------------------------------------+
* | OS Max Socket Receive Buffer Length |
* +---------------------------------------------------------------+
* | Maximum Resend |
* +---------------------------------------------------------------+
* | Entity tag |
Expand Down Expand Up @@ -322,23 +352,16 @@ public class LogBufferDescriptor
*/
public static final int LOG_META_DATA_LENGTH;


static
{
int offset = 0;
TERM_TAIL_COUNTERS_OFFSET = offset;
TERM_TAIL_COUNTERS_OFFSET = 0;
LOG_ACTIVE_TERM_COUNT_OFFSET = TERM_TAIL_COUNTERS_OFFSET + (SIZE_OF_LONG * PARTITION_COUNT);

offset += (SIZE_OF_LONG * PARTITION_COUNT);
LOG_ACTIVE_TERM_COUNT_OFFSET = offset;

offset = (CACHE_LINE_LENGTH * 2);
LOG_END_OF_STREAM_POSITION_OFFSET = offset;
LOG_END_OF_STREAM_POSITION_OFFSET = PADDING_SIZE * 2;
LOG_IS_CONNECTED_OFFSET = LOG_END_OF_STREAM_POSITION_OFFSET + SIZE_OF_LONG;
LOG_ACTIVE_TRANSPORT_COUNT = LOG_IS_CONNECTED_OFFSET + SIZE_OF_INT;

offset += (CACHE_LINE_LENGTH * 2);

LOG_CORRELATION_ID_OFFSET = offset;
LOG_CORRELATION_ID_OFFSET = PADDING_SIZE * 4;
LOG_INITIAL_TERM_ID_OFFSET = LOG_CORRELATION_ID_OFFSET + SIZE_OF_LONG;
LOG_DEFAULT_FRAME_HEADER_LENGTH_OFFSET = LOG_INITIAL_TERM_ID_OFFSET + SIZE_OF_INT;
LOG_MTU_LENGTH_OFFSET = LOG_DEFAULT_FRAME_HEADER_LENGTH_OFFSET + SIZE_OF_INT;
Expand All @@ -348,50 +371,29 @@ public class LogBufferDescriptor
LOG_PUBLICATION_WINDOW_LENGTH_OFFSET = LOG_PAGE_SIZE_OFFSET + SIZE_OF_INT;
LOG_RECEIVER_WINDOW_LENGTH_OFFSET = LOG_PUBLICATION_WINDOW_LENGTH_OFFSET + SIZE_OF_INT;
LOG_SOCKET_SNDBUF_LENGTH_OFFSET = LOG_RECEIVER_WINDOW_LENGTH_OFFSET + SIZE_OF_INT;
LOG_SOCKET_RCVBUF_LENGTH_OFFSET = LOG_SOCKET_SNDBUF_LENGTH_OFFSET + SIZE_OF_INT;
LOG_MAX_RESEND_OFFSET = LOG_SOCKET_RCVBUF_LENGTH_OFFSET + SIZE_OF_INT;
LOG_ENTITY_TAG_OFFSET = LOG_MAX_RESEND_OFFSET + SIZE_OF_INT;
LOG_OS_DEFAULT_SOCKET_SNDBUF_LENGTH_OFFSET = LOG_SOCKET_SNDBUF_LENGTH_OFFSET + SIZE_OF_INT;
LOG_OS_MAX_SOCKET_SNDBUF_LENGTH_OFFSET = LOG_OS_DEFAULT_SOCKET_SNDBUF_LENGTH_OFFSET + SIZE_OF_INT;
LOG_SOCKET_RCVBUF_LENGTH_OFFSET = LOG_OS_MAX_SOCKET_SNDBUF_LENGTH_OFFSET + SIZE_OF_INT;
LOG_OS_DEFAULT_SOCKET_RCVBUF_LENGTH_OFFSET = LOG_SOCKET_RCVBUF_LENGTH_OFFSET + SIZE_OF_INT;
LOG_OS_MAX_SOCKET_RCVBUF_LENGTH_OFFSET = LOG_OS_DEFAULT_SOCKET_RCVBUF_LENGTH_OFFSET + SIZE_OF_INT;
LOG_MAX_RESEND_OFFSET = LOG_OS_MAX_SOCKET_RCVBUF_LENGTH_OFFSET + SIZE_OF_INT;

LOG_DEFAULT_FRAME_HEADER_OFFSET = PADDING_SIZE * 5;
LOG_ENTITY_TAG_OFFSET = LOG_DEFAULT_FRAME_HEADER_OFFSET + LOG_DEFAULT_FRAME_HEADER_MAX_LENGTH;
LOG_RESPONSE_CORRELATION_ID_OFFSET = LOG_ENTITY_TAG_OFFSET + SIZE_OF_LONG;
LOG_LINGER_TIMEOUT_NS_OFFSET = LOG_RESPONSE_CORRELATION_ID_OFFSET + SIZE_OF_LONG;
LOG_UNTETHERED_WINDOW_LIMIT_TIMEOUT_NS_OFFSET = LOG_LINGER_TIMEOUT_NS_OFFSET + SIZE_OF_LONG;
LOG_UNTETHERED_RESTING_TIMEOUT_NS_OFFSET = LOG_UNTETHERED_WINDOW_LIMIT_TIMEOUT_NS_OFFSET + SIZE_OF_LONG;
LOG_GROUP_OFFSET = LOG_UNTETHERED_RESTING_TIMEOUT_NS_OFFSET + SIZE_OF_LONG;
LOG_IS_RESPONSE_OFFSET = LOG_GROUP_OFFSET + SIZE_OF_BYTE;
LOG_REJOIN_OFFSET = LOG_IS_RESPONSE_OFFSET + SIZE_OF_BYTE;
LOG_RELIABLE_OFFSET = LOG_REJOIN_OFFSET + SIZE_OF_BYTE;
LOG_SPARSE_OFFSET = LOG_RELIABLE_OFFSET + SIZE_OF_BYTE;
LOG_SIGNAL_EOS_OFFSET = LOG_SPARSE_OFFSET + SIZE_OF_BYTE;
LOG_SPIES_SIMULATE_CONNECTION_OFFSET = LOG_SIGNAL_EOS_OFFSET + SIZE_OF_BYTE;
LOG_TETHER_OFFSET = LOG_SPIES_SIMULATE_CONNECTION_OFFSET + SIZE_OF_BYTE;

offset += CACHE_LINE_LENGTH;

LOG_DEFAULT_FRAME_HEADER_OFFSET = offset;
offset += LOG_DEFAULT_FRAME_HEADER_MAX_LENGTH;

LOG_LINGER_TIMEOUT_NS_OFFSET = offset;
offset += SIZE_OF_LONG;

LOG_UNTETHERED_WINDOW_LIMIT_TIMEOUT_NS_OFFSET = offset;
offset += SIZE_OF_LONG;

LOG_UNTETHERED_RESTING_TIMEOUT_NS_OFFSET = offset;
offset += SIZE_OF_LONG;

LOG_GROUP_OFFSET = offset;
offset += SIZE_OF_BYTE;

LOG_IS_RESPONSE_OFFSET = offset;
offset += SIZE_OF_BYTE;

LOG_REJOIN_OFFSET = offset;
offset += SIZE_OF_BYTE;

LOG_RELIABLE_OFFSET = offset;
offset += SIZE_OF_BYTE;

LOG_SPARSE_OFFSET = offset;
offset += SIZE_OF_BYTE;

LOG_SIGNAL_EOS_OFFSET = offset;
offset += SIZE_OF_BYTE;

LOG_SPIES_SIMULATE_CONNECTION_OFFSET = offset;
offset += SIZE_OF_BYTE;

LOG_TETHER_OFFSET = offset;
offset += SIZE_OF_BYTE;

LOG_META_DATA_LENGTH = align(offset, PAGE_MIN_SIZE);
LOG_META_DATA_LENGTH = PAGE_MIN_SIZE;
}

/**
Expand Down Expand Up @@ -593,7 +595,7 @@ public static int activeTransportCount(final UnsafeBuffer metadataBuffer)
/**
* Set the number of active transports for the Image.
*
* @param metadataBuffer containing the meta data.
* @param metadataBuffer containing the meta data.
* @param numberOfActiveTransports value to be set.
*/
public static void activeTransportCount(final UnsafeBuffer metadataBuffer, final int numberOfActiveTransports)
Expand Down Expand Up @@ -1024,7 +1026,7 @@ public static int positionBitsToShift(final int termBufferLength)
/**
* Compute frame length for a message that is fragmented into chunks of {@code maxPayloadSize}.
*
* @param length of the message.
* @param length of the message.
* @param maxPayloadSize fragment size without the header.
* @return message length after fragmentation.
*/
Expand All @@ -1041,7 +1043,7 @@ public static int computeFragmentedFrameLength(final int length, final int maxPa
/**
* Compute frame length for a message that has been reassembled from chunks of {@code maxPayloadSize}.
*
* @param length of the message.
* @param length of the message.
* @param maxPayloadSize fragment size without the header.
* @return message length after fragmentation.
*/
Expand Down Expand Up @@ -1163,6 +1165,50 @@ public static void socketRcvbufLength(final UnsafeBuffer metadataBuffer, final i
metadataBuffer.putInt(LOG_SOCKET_RCVBUF_LENGTH_OFFSET, value);
}

/**
* Get the default length in bytes for the socket receive buffer as per OS configuration from the metadata.
*
* @param metadataBuffer containing the meta data.
* @return the default length in bytes for the socket receive buffer.
*/
public static int osDefaultSocketRcvbufLength(final UnsafeBuffer metadataBuffer)
{
return metadataBuffer.getInt(LOG_OS_DEFAULT_SOCKET_RCVBUF_LENGTH_OFFSET);
}

/**
* Set the default length for the socket receive buffer as per OS configuration in the metadata.
*
* @param metadataBuffer containing the meta data.
* @param value the default length in bytes for the socket receive buffer.
*/
public static void osDefaultSocketRcvbufLength(final UnsafeBuffer metadataBuffer, final int value)
{
metadataBuffer.putInt(LOG_OS_DEFAULT_SOCKET_RCVBUF_LENGTH_OFFSET, value);
}

/**
* Get the maximum length in bytes for the socket receive buffer as per OS configuration from the metadata.
*
* @param metadataBuffer containing the meta data.
* @return the maximum allowed length in bytes for the socket receive buffer.
*/
public static int osMaxSocketRcvbufLength(final UnsafeBuffer metadataBuffer)
{
return metadataBuffer.getInt(LOG_OS_MAX_SOCKET_RCVBUF_LENGTH_OFFSET);
}

/**
* Set the maximum allowed length in bytes for the socket receive buffer as per OS configuration in the metadata.
*
* @param metadataBuffer containing the meta data.
* @param value the maximum allowed length in bytes for the socket receive buffer.
*/
public static void osMaxSocketRcvbufLength(final UnsafeBuffer metadataBuffer, final int value)
{
metadataBuffer.putInt(LOG_OS_MAX_SOCKET_RCVBUF_LENGTH_OFFSET, value);
}

/**
* Get the socket send buffer length from the metadata.
*
Expand All @@ -1185,6 +1231,50 @@ public static void socketSndbufLength(final UnsafeBuffer metadataBuffer, final i
metadataBuffer.putInt(LOG_SOCKET_SNDBUF_LENGTH_OFFSET, value);
}

/**
* Get the default length in bytes for the socket send buffer as per OS configuration from the metadata.
*
* @param metadataBuffer containing the meta data.
* @return the default length in bytes for the socket send buffer.
*/
public static int osDefaultSocketSndbufLength(final UnsafeBuffer metadataBuffer)
{
return metadataBuffer.getInt(LOG_OS_DEFAULT_SOCKET_SNDBUF_LENGTH_OFFSET);
}

/**
* Set the default length for the socket send buffer as per OS configuration in the metadata.
*
* @param metadataBuffer containing the meta data.
* @param value the default length in bytes for the socket send buffer.
*/
public static void osDefaultSocketSndbufLength(final UnsafeBuffer metadataBuffer, final int value)
{
metadataBuffer.putInt(LOG_OS_DEFAULT_SOCKET_SNDBUF_LENGTH_OFFSET, value);
}

/**
* Get the maximum length in bytes for the socket send buffer as per OS configuration from the metadata.
*
* @param metadataBuffer containing the meta data.
* @return the maximum allowed length in bytes for the socket send buffer.
*/
public static int osMaxSocketSndbufLength(final UnsafeBuffer metadataBuffer)
{
return metadataBuffer.getInt(LOG_OS_MAX_SOCKET_SNDBUF_LENGTH_OFFSET);
}

/**
* Set the maximum allowed length in bytes for the socket send buffer as per OS configuration in the metadata.
*
* @param metadataBuffer containing the meta data.
* @param value the maximum allowed length in bytes for the socket send buffer.
*/
public static void osMaxSocketSndbufLength(final UnsafeBuffer metadataBuffer, final int value)
{
metadataBuffer.putInt(LOG_OS_MAX_SOCKET_SNDBUF_LENGTH_OFFSET, value);
}

/**
* Get the receiver window length from the metadata.
*
Expand Down
19 changes: 12 additions & 7 deletions aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java
Original file line number Diff line number Diff line change
Expand Up @@ -1916,30 +1916,35 @@ private void initLogMetadata(
{
final UnsafeBuffer logMetaData = rawLog.metaData();

defaultDataHeader.sessionId(sessionId)
defaultDataHeader
.sessionId(sessionId)
.streamId(streamId)
.termId(initialTermId)
.termOffset(termOffset);
storeDefaultFrameHeader(logMetaData, defaultDataHeader);

correlationId(logMetaData, registrationId);
initialTermId(logMetaData, initialTermId);
mtuLength(logMetaData, mtuLength);
termLength(logMetaData, rawLog.termLength());
pageSize(logMetaData, ctx.filePageSize());
correlationId(logMetaData, registrationId);

socketRcvbufLength(logMetaData, socketRcvBufLength);
socketSndbufLength(logMetaData, socketSndbufLength);
receiverWindowLength(logMetaData, receiverWindowLength);
publicationWindowLength(logMetaData, publicationWindowLength);
receiverWindowLength(logMetaData, receiverWindowLength);
socketSndbufLength(logMetaData, socketSndbufLength);
osDefaultSocketSndbufLength(logMetaData, ctx.osDefaultSocketSndbufLength());
osMaxSocketSndbufLength(logMetaData, ctx.osMaxSocketSndbufLength());
socketRcvbufLength(logMetaData, socketRcvBufLength);
osDefaultSocketRcvbufLength(logMetaData, ctx.osDefaultSocketRcvbufLength());
osMaxSocketRcvbufLength(logMetaData, ctx.osMaxSocketRcvbufLength());
maxResend(logMetaData, maxResend);
spiesSimulateConnection(logMetaData, spiesSimulateConnection);

tether(logMetaData, tether);
rejoin(logMetaData, rejoin);
reliable(logMetaData, reliable);
sparse(logMetaData, sparse);
signalEos(logMetaData, signalEos);
spiesSimulateConnection(logMetaData, spiesSimulateConnection);
tether(logMetaData, tether);

untetheredWindowLimitTimeoutNs(logMetaData, untetheredWindowLimitTimeoutNs);
untetheredRestingTimeoutNs(logMetaData, untetheredRestingTimeoutNs);
Expand Down

0 comments on commit 57318d7

Please sign in to comment.