diff --git a/aeron-client/src/main/java/io/aeron/logbuffer/LogBufferDescriptor.java b/aeron-client/src/main/java/io/aeron/logbuffer/LogBufferDescriptor.java index e1a86bd9a9..a0c9daa717 100644 --- a/aeron-client/src/main/java/io/aeron/logbuffer/LogBufferDescriptor.java +++ b/aeron-client/src/main/java/io/aeron/logbuffer/LogBufferDescriptor.java @@ -40,6 +40,8 @@ */ public class LogBufferDescriptor { + private static final int PADDING_SIZE = 64; + /** * The number of partitions the log is divided into. */ @@ -138,7 +140,7 @@ public class LogBufferDescriptor /** * Maximum length of a frame header. */ - public static final int LOG_DEFAULT_FRAME_HEADER_MAX_LENGTH = CACHE_LINE_LENGTH * 2; + public static final int LOG_DEFAULT_FRAME_HEADER_MAX_LENGTH = PADDING_SIZE * 2; /** * Offset within the log metadata where the sparse property is stored. @@ -165,11 +167,31 @@ public class LogBufferDescriptor */ public static final int LOG_SOCKET_RCVBUF_LENGTH_OFFSET; + /** + * Offset within the log metadata where the OS default length for the socket receive buffer is stored. + */ + public static final int LOG_OS_DEFAULT_SOCKET_RCVBUF_LENGTH_OFFSET; + + /** + * Offset within the log metadata where the OS maximum length for the socket receive buffer is stored. + */ + public static final int LOG_OS_MAX_SOCKET_RCVBUF_LENGTH_OFFSET; + /** * Offset within the log metadata where the socket send buffer length is stored. */ public static final int LOG_SOCKET_SNDBUF_LENGTH_OFFSET; + /** + * Offset within the log metadata where the OS default length for the socket send buffer is stored. + */ + public static final int LOG_OS_DEFAULT_SOCKET_SNDBUF_LENGTH_OFFSET; + + /** + * Offset within the log metadata where the OS maximum length for the socket send buffer is stored. + */ + public static final int LOG_OS_MAX_SOCKET_SNDBUF_LENGTH_OFFSET; + /** * Offset within the log metadata where the receiver window length is stored. */ @@ -280,8 +302,16 @@ public class LogBufferDescriptor * +---------------------------------------------------------------+ * | Socket Send Buffer Length | * +---------------------------------------------------------------+ + * | OS Default Socket Send Buffer Length | + * +---------------------------------------------------------------+ + * | OS Max Socket Send Buffer Length | + * +---------------------------------------------------------------+ * | Socket Receive Buffer Length | * +---------------------------------------------------------------+ + * | OS Default Socket Receive Buffer Length | + * +---------------------------------------------------------------+ + * | OS Max Socket Receive Buffer Length | + * +---------------------------------------------------------------+ * | Maximum Resend | * +---------------------------------------------------------------+ * | Entity tag | @@ -322,23 +352,16 @@ public class LogBufferDescriptor */ public static final int LOG_META_DATA_LENGTH; - static { - int offset = 0; - TERM_TAIL_COUNTERS_OFFSET = offset; + TERM_TAIL_COUNTERS_OFFSET = 0; + LOG_ACTIVE_TERM_COUNT_OFFSET = TERM_TAIL_COUNTERS_OFFSET + (SIZE_OF_LONG * PARTITION_COUNT); - offset += (SIZE_OF_LONG * PARTITION_COUNT); - LOG_ACTIVE_TERM_COUNT_OFFSET = offset; - - offset = (CACHE_LINE_LENGTH * 2); - LOG_END_OF_STREAM_POSITION_OFFSET = offset; + LOG_END_OF_STREAM_POSITION_OFFSET = PADDING_SIZE * 2; LOG_IS_CONNECTED_OFFSET = LOG_END_OF_STREAM_POSITION_OFFSET + SIZE_OF_LONG; LOG_ACTIVE_TRANSPORT_COUNT = LOG_IS_CONNECTED_OFFSET + SIZE_OF_INT; - offset += (CACHE_LINE_LENGTH * 2); - - LOG_CORRELATION_ID_OFFSET = offset; + LOG_CORRELATION_ID_OFFSET = PADDING_SIZE * 4; LOG_INITIAL_TERM_ID_OFFSET = LOG_CORRELATION_ID_OFFSET + SIZE_OF_LONG; LOG_DEFAULT_FRAME_HEADER_LENGTH_OFFSET = LOG_INITIAL_TERM_ID_OFFSET + SIZE_OF_INT; LOG_MTU_LENGTH_OFFSET = LOG_DEFAULT_FRAME_HEADER_LENGTH_OFFSET + SIZE_OF_INT; @@ -348,50 +371,29 @@ public class LogBufferDescriptor LOG_PUBLICATION_WINDOW_LENGTH_OFFSET = LOG_PAGE_SIZE_OFFSET + SIZE_OF_INT; LOG_RECEIVER_WINDOW_LENGTH_OFFSET = LOG_PUBLICATION_WINDOW_LENGTH_OFFSET + SIZE_OF_INT; LOG_SOCKET_SNDBUF_LENGTH_OFFSET = LOG_RECEIVER_WINDOW_LENGTH_OFFSET + SIZE_OF_INT; - LOG_SOCKET_RCVBUF_LENGTH_OFFSET = LOG_SOCKET_SNDBUF_LENGTH_OFFSET + SIZE_OF_INT; - LOG_MAX_RESEND_OFFSET = LOG_SOCKET_RCVBUF_LENGTH_OFFSET + SIZE_OF_INT; - LOG_ENTITY_TAG_OFFSET = LOG_MAX_RESEND_OFFSET + SIZE_OF_INT; + LOG_OS_DEFAULT_SOCKET_SNDBUF_LENGTH_OFFSET = LOG_SOCKET_SNDBUF_LENGTH_OFFSET + SIZE_OF_INT; + LOG_OS_MAX_SOCKET_SNDBUF_LENGTH_OFFSET = LOG_OS_DEFAULT_SOCKET_SNDBUF_LENGTH_OFFSET + SIZE_OF_INT; + LOG_SOCKET_RCVBUF_LENGTH_OFFSET = LOG_OS_MAX_SOCKET_SNDBUF_LENGTH_OFFSET + SIZE_OF_INT; + LOG_OS_DEFAULT_SOCKET_RCVBUF_LENGTH_OFFSET = LOG_SOCKET_RCVBUF_LENGTH_OFFSET + SIZE_OF_INT; + LOG_OS_MAX_SOCKET_RCVBUF_LENGTH_OFFSET = LOG_OS_DEFAULT_SOCKET_RCVBUF_LENGTH_OFFSET + SIZE_OF_INT; + LOG_MAX_RESEND_OFFSET = LOG_OS_MAX_SOCKET_RCVBUF_LENGTH_OFFSET + SIZE_OF_INT; + + LOG_DEFAULT_FRAME_HEADER_OFFSET = PADDING_SIZE * 5; + LOG_ENTITY_TAG_OFFSET = LOG_DEFAULT_FRAME_HEADER_OFFSET + LOG_DEFAULT_FRAME_HEADER_MAX_LENGTH; LOG_RESPONSE_CORRELATION_ID_OFFSET = LOG_ENTITY_TAG_OFFSET + SIZE_OF_LONG; + LOG_LINGER_TIMEOUT_NS_OFFSET = LOG_RESPONSE_CORRELATION_ID_OFFSET + SIZE_OF_LONG; + LOG_UNTETHERED_WINDOW_LIMIT_TIMEOUT_NS_OFFSET = LOG_LINGER_TIMEOUT_NS_OFFSET + SIZE_OF_LONG; + LOG_UNTETHERED_RESTING_TIMEOUT_NS_OFFSET = LOG_UNTETHERED_WINDOW_LIMIT_TIMEOUT_NS_OFFSET + SIZE_OF_LONG; + LOG_GROUP_OFFSET = LOG_UNTETHERED_RESTING_TIMEOUT_NS_OFFSET + SIZE_OF_LONG; + LOG_IS_RESPONSE_OFFSET = LOG_GROUP_OFFSET + SIZE_OF_BYTE; + LOG_REJOIN_OFFSET = LOG_IS_RESPONSE_OFFSET + SIZE_OF_BYTE; + LOG_RELIABLE_OFFSET = LOG_REJOIN_OFFSET + SIZE_OF_BYTE; + LOG_SPARSE_OFFSET = LOG_RELIABLE_OFFSET + SIZE_OF_BYTE; + LOG_SIGNAL_EOS_OFFSET = LOG_SPARSE_OFFSET + SIZE_OF_BYTE; + LOG_SPIES_SIMULATE_CONNECTION_OFFSET = LOG_SIGNAL_EOS_OFFSET + SIZE_OF_BYTE; + LOG_TETHER_OFFSET = LOG_SPIES_SIMULATE_CONNECTION_OFFSET + SIZE_OF_BYTE; - offset += CACHE_LINE_LENGTH; - - LOG_DEFAULT_FRAME_HEADER_OFFSET = offset; - offset += LOG_DEFAULT_FRAME_HEADER_MAX_LENGTH; - - LOG_LINGER_TIMEOUT_NS_OFFSET = offset; - offset += SIZE_OF_LONG; - - LOG_UNTETHERED_WINDOW_LIMIT_TIMEOUT_NS_OFFSET = offset; - offset += SIZE_OF_LONG; - - LOG_UNTETHERED_RESTING_TIMEOUT_NS_OFFSET = offset; - offset += SIZE_OF_LONG; - - LOG_GROUP_OFFSET = offset; - offset += SIZE_OF_BYTE; - - LOG_IS_RESPONSE_OFFSET = offset; - offset += SIZE_OF_BYTE; - - LOG_REJOIN_OFFSET = offset; - offset += SIZE_OF_BYTE; - - LOG_RELIABLE_OFFSET = offset; - offset += SIZE_OF_BYTE; - - LOG_SPARSE_OFFSET = offset; - offset += SIZE_OF_BYTE; - - LOG_SIGNAL_EOS_OFFSET = offset; - offset += SIZE_OF_BYTE; - - LOG_SPIES_SIMULATE_CONNECTION_OFFSET = offset; - offset += SIZE_OF_BYTE; - - LOG_TETHER_OFFSET = offset; - offset += SIZE_OF_BYTE; - - LOG_META_DATA_LENGTH = align(offset, PAGE_MIN_SIZE); + LOG_META_DATA_LENGTH = PAGE_MIN_SIZE; } /** @@ -593,7 +595,7 @@ public static int activeTransportCount(final UnsafeBuffer metadataBuffer) /** * Set the number of active transports for the Image. * - * @param metadataBuffer containing the meta data. + * @param metadataBuffer containing the meta data. * @param numberOfActiveTransports value to be set. */ public static void activeTransportCount(final UnsafeBuffer metadataBuffer, final int numberOfActiveTransports) @@ -1024,7 +1026,7 @@ public static int positionBitsToShift(final int termBufferLength) /** * Compute frame length for a message that is fragmented into chunks of {@code maxPayloadSize}. * - * @param length of the message. + * @param length of the message. * @param maxPayloadSize fragment size without the header. * @return message length after fragmentation. */ @@ -1041,7 +1043,7 @@ public static int computeFragmentedFrameLength(final int length, final int maxPa /** * Compute frame length for a message that has been reassembled from chunks of {@code maxPayloadSize}. * - * @param length of the message. + * @param length of the message. * @param maxPayloadSize fragment size without the header. * @return message length after fragmentation. */ @@ -1163,6 +1165,50 @@ public static void socketRcvbufLength(final UnsafeBuffer metadataBuffer, final i metadataBuffer.putInt(LOG_SOCKET_RCVBUF_LENGTH_OFFSET, value); } + /** + * Get the default length in bytes for the socket receive buffer as per OS configuration from the metadata. + * + * @param metadataBuffer containing the meta data. + * @return the default length in bytes for the socket receive buffer. + */ + public static int osDefaultSocketRcvbufLength(final UnsafeBuffer metadataBuffer) + { + return metadataBuffer.getInt(LOG_OS_DEFAULT_SOCKET_RCVBUF_LENGTH_OFFSET); + } + + /** + * Set the default length for the socket receive buffer as per OS configuration in the metadata. + * + * @param metadataBuffer containing the meta data. + * @param value the default length in bytes for the socket receive buffer. + */ + public static void osDefaultSocketRcvbufLength(final UnsafeBuffer metadataBuffer, final int value) + { + metadataBuffer.putInt(LOG_OS_DEFAULT_SOCKET_RCVBUF_LENGTH_OFFSET, value); + } + + /** + * Get the maximum length in bytes for the socket receive buffer as per OS configuration from the metadata. + * + * @param metadataBuffer containing the meta data. + * @return the maximum allowed length in bytes for the socket receive buffer. + */ + public static int osMaxSocketRcvbufLength(final UnsafeBuffer metadataBuffer) + { + return metadataBuffer.getInt(LOG_OS_MAX_SOCKET_RCVBUF_LENGTH_OFFSET); + } + + /** + * Set the maximum allowed length in bytes for the socket receive buffer as per OS configuration in the metadata. + * + * @param metadataBuffer containing the meta data. + * @param value the maximum allowed length in bytes for the socket receive buffer. + */ + public static void osMaxSocketRcvbufLength(final UnsafeBuffer metadataBuffer, final int value) + { + metadataBuffer.putInt(LOG_OS_MAX_SOCKET_RCVBUF_LENGTH_OFFSET, value); + } + /** * Get the socket send buffer length from the metadata. * @@ -1185,6 +1231,50 @@ public static void socketSndbufLength(final UnsafeBuffer metadataBuffer, final i metadataBuffer.putInt(LOG_SOCKET_SNDBUF_LENGTH_OFFSET, value); } + /** + * Get the default length in bytes for the socket send buffer as per OS configuration from the metadata. + * + * @param metadataBuffer containing the meta data. + * @return the default length in bytes for the socket send buffer. + */ + public static int osDefaultSocketSndbufLength(final UnsafeBuffer metadataBuffer) + { + return metadataBuffer.getInt(LOG_OS_DEFAULT_SOCKET_SNDBUF_LENGTH_OFFSET); + } + + /** + * Set the default length for the socket send buffer as per OS configuration in the metadata. + * + * @param metadataBuffer containing the meta data. + * @param value the default length in bytes for the socket send buffer. + */ + public static void osDefaultSocketSndbufLength(final UnsafeBuffer metadataBuffer, final int value) + { + metadataBuffer.putInt(LOG_OS_DEFAULT_SOCKET_SNDBUF_LENGTH_OFFSET, value); + } + + /** + * Get the maximum length in bytes for the socket send buffer as per OS configuration from the metadata. + * + * @param metadataBuffer containing the meta data. + * @return the maximum allowed length in bytes for the socket send buffer. + */ + public static int osMaxSocketSndbufLength(final UnsafeBuffer metadataBuffer) + { + return metadataBuffer.getInt(LOG_OS_MAX_SOCKET_SNDBUF_LENGTH_OFFSET); + } + + /** + * Set the maximum allowed length in bytes for the socket send buffer as per OS configuration in the metadata. + * + * @param metadataBuffer containing the meta data. + * @param value the maximum allowed length in bytes for the socket send buffer. + */ + public static void osMaxSocketSndbufLength(final UnsafeBuffer metadataBuffer, final int value) + { + metadataBuffer.putInt(LOG_OS_MAX_SOCKET_SNDBUF_LENGTH_OFFSET, value); + } + /** * Get the receiver window length from the metadata. * diff --git a/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java b/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java index 7b21cc7f79..580d15a406 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java +++ b/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java @@ -1916,30 +1916,35 @@ private void initLogMetadata( { final UnsafeBuffer logMetaData = rawLog.metaData(); - defaultDataHeader.sessionId(sessionId) + defaultDataHeader + .sessionId(sessionId) .streamId(streamId) .termId(initialTermId) .termOffset(termOffset); storeDefaultFrameHeader(logMetaData, defaultDataHeader); + correlationId(logMetaData, registrationId); initialTermId(logMetaData, initialTermId); mtuLength(logMetaData, mtuLength); termLength(logMetaData, rawLog.termLength()); pageSize(logMetaData, ctx.filePageSize()); - correlationId(logMetaData, registrationId); - socketRcvbufLength(logMetaData, socketRcvBufLength); - socketSndbufLength(logMetaData, socketSndbufLength); - receiverWindowLength(logMetaData, receiverWindowLength); publicationWindowLength(logMetaData, publicationWindowLength); + receiverWindowLength(logMetaData, receiverWindowLength); + socketSndbufLength(logMetaData, socketSndbufLength); + osDefaultSocketSndbufLength(logMetaData, ctx.osDefaultSocketSndbufLength()); + osMaxSocketSndbufLength(logMetaData, ctx.osMaxSocketSndbufLength()); + socketRcvbufLength(logMetaData, socketRcvBufLength); + osDefaultSocketRcvbufLength(logMetaData, ctx.osDefaultSocketRcvbufLength()); + osMaxSocketRcvbufLength(logMetaData, ctx.osMaxSocketRcvbufLength()); maxResend(logMetaData, maxResend); - spiesSimulateConnection(logMetaData, spiesSimulateConnection); - tether(logMetaData, tether); rejoin(logMetaData, rejoin); reliable(logMetaData, reliable); sparse(logMetaData, sparse); signalEos(logMetaData, signalEos); + spiesSimulateConnection(logMetaData, spiesSimulateConnection); + tether(logMetaData, tether); untetheredWindowLimitTimeoutNs(logMetaData, untetheredWindowLimitTimeoutNs); untetheredRestingTimeoutNs(logMetaData, untetheredRestingTimeoutNs);