From de6e1270cba01eafdd0685c536faa0e82f0b357d Mon Sep 17 00:00:00 2001 From: Peter Veentjer Date: Wed, 15 Jan 2025 18:54:55 +0200 Subject: [PATCH] Adding missing fields to the logbuffer metadata The following fields are missing: int64_t entity_tag; int64_t response_correlation_id; uint8_t group; uint8_t is_response; --- .../aeron/logbuffer/LogBufferDescriptor.java | 89 +++++++++++++++++++ .../java/io/aeron/driver/DriverConductor.java | 20 +++++ 2 files changed, 109 insertions(+) diff --git a/aeron-client/src/main/java/io/aeron/logbuffer/LogBufferDescriptor.java b/aeron-client/src/main/java/io/aeron/logbuffer/LogBufferDescriptor.java index a0c9daa717..ae28d40a07 100644 --- a/aeron-client/src/main/java/io/aeron/logbuffer/LogBufferDescriptor.java +++ b/aeron-client/src/main/java/io/aeron/logbuffer/LogBufferDescriptor.java @@ -1099,6 +1099,51 @@ public static void tether(final UnsafeBuffer metadataBuffer, final boolean value metadataBuffer.putByte(LOG_TETHER_OFFSET, (byte)(value ? 1 : 0)); } + /** + * Get whether the log is group from the metadata. + * + * @param metadataBuffer containing the meta data. + * @return true if the log is group, otherwise false. + */ + public static boolean group(final UnsafeBuffer metadataBuffer) + { + return metadataBuffer.getByte(LOG_GROUP_OFFSET) == 1; + } + + /** + * Set whether the log is group in the metadata. + * + * @param metadataBuffer containing the meta data. + * @param value true if the log is group, otherwise false. + */ + public static void group(final UnsafeBuffer metadataBuffer, final boolean value) + { + metadataBuffer.putByte(LOG_GROUP_OFFSET, (byte)(value ? 1 : 0)); + } + + /** + * Get whether the log is response from the metadata. + * + * @param metadataBuffer containing the meta data. + * @return true if the log is group, otherwise false. + */ + public static boolean isResponse(final UnsafeBuffer metadataBuffer) + { + return metadataBuffer.getByte(LOG_IS_RESPONSE_OFFSET) == 1; + } + + /** + * Set whether the log is response in the metadata. + * + * @param metadataBuffer containing the meta data. + * @param value true if the log is group, otherwise false. + */ + public static void isResponse(final UnsafeBuffer metadataBuffer, final boolean value) + { + metadataBuffer.putByte(LOG_IS_RESPONSE_OFFSET, (byte)(value ? 1 : 0)); + } + + /** * Get whether the log is rejoining from the metadata. * @@ -1407,6 +1452,50 @@ public static void lingerTimeoutNs(final UnsafeBuffer metadataBuffer, final long metadataBuffer.putLong(LOG_LINGER_TIMEOUT_NS_OFFSET, value); } + /** + * Get the entity tag from the metadata. + * + * @param metadataBuffer containing the meta data. + * @return the entity tag in nanoseconds. + */ + public static long entityTag(final UnsafeBuffer metadataBuffer) + { + return metadataBuffer.getLong(LOG_ENTITY_TAG_OFFSET); + } + + /** + * Set the entity tag in the metadata. + * + * @param metadataBuffer containing the meta data. + * @param value the entity tag to set. + */ + public static void entityTag(final UnsafeBuffer metadataBuffer, final long value) + { + metadataBuffer.putLong(LOG_ENTITY_TAG_OFFSET, value); + } + + /** + * Get the response correlation id from the metadata. + * + * @param metadataBuffer containing the meta data. + * @return the entity tag in nanoseconds. + */ + public static long responseCorrelationId(final UnsafeBuffer metadataBuffer) + { + return metadataBuffer.getLong(LOG_RESPONSE_CORRELATION_ID_OFFSET); + } + + /** + * Set the response correlation id in the metadata. + * + * @param metadataBuffer containing the meta data. + * @param value the resonse correlation id to set. + */ + public static void responseCorrelationId(final UnsafeBuffer metadataBuffer, final long value) + { + metadataBuffer.putLong(LOG_RESPONSE_CORRELATION_ID_OFFSET, value); + } + /** * Get whether the signal EOS is enabled from the metadata. * diff --git a/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java b/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java index 580d15a406..a90c9ec8f3 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java +++ b/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java @@ -1835,6 +1835,8 @@ private RawLog newNetworkPublicationLog( rejoin, reliable, params.isSparse, + false,//todo + params.isResponse, params.publicationWindowLength, params.untetheredWindowLimitTimeoutNs, params.untetheredRestingTimeoutNs, @@ -1842,6 +1844,8 @@ private RawLog newNetworkPublicationLog( params.lingerTimeoutNs, params.signalEos, params.spiesSimulateConnection, + params.entityTag, + params.responseCorrelationId, rawLog); initialisePositionCounters(initialTermId, params, rawLog.metaData()); @@ -1878,6 +1882,8 @@ private RawLog newIpcPublicationLog( rejoin, reliable, params.isSparse, + false,// todo: group is missing + params.isResponse, params.publicationWindowLength, params.untetheredWindowLimitTimeoutNs, params.untetheredRestingTimeoutNs, @@ -1885,6 +1891,8 @@ private RawLog newIpcPublicationLog( params.lingerTimeoutNs, params.signalEos, params.spiesSimulateConnection, + params.entityTag, + params.responseCorrelationId, rawLog); initialisePositionCounters(initialTermId, params, rawLog.metaData()); @@ -1905,6 +1913,8 @@ private void initLogMetadata( final boolean rejoin, final boolean reliable, final boolean sparse, + final boolean group, + final boolean isResponse, final int publicationWindowLength, final long untetheredWindowLimitTimeoutNs, final long untetheredRestingTimeoutNs, @@ -1912,6 +1922,8 @@ private void initLogMetadata( final long lingerTimeoutNs, final boolean signalEos, final boolean spiesSimulateConnection, + final long entityTag, + final long responseCorrelationId, final RawLog rawLog) { final UnsafeBuffer logMetaData = rawLog.metaData(); @@ -1945,7 +1957,11 @@ private void initLogMetadata( signalEos(logMetaData, signalEos); spiesSimulateConnection(logMetaData, spiesSimulateConnection); tether(logMetaData, tether); + group(logMetaData, group); + isResponse(logMetaData, isResponse); + entityTag(logMetaData, entityTag); + responseCorrelationId(logMetaData, responseCorrelationId); untetheredWindowLimitTimeoutNs(logMetaData, untetheredWindowLimitTimeoutNs); untetheredRestingTimeoutNs(logMetaData, untetheredRestingTimeoutNs); lingerTimeoutNs(logMetaData, lingerTimeoutNs); @@ -2022,6 +2038,8 @@ private RawLog newPublicationImageLog( params.isRejoin, params.isReliable, params.isSparse, + false,// todo: group is missing + params.isResponse, publicationWindowLength, untetheredWindowLimitTimeoutNs, untetheredRestingTimeoutNs, @@ -2029,6 +2047,8 @@ private RawLog newPublicationImageLog( lingerTimeoutNs, signalEos, spiesSimulateConnection, + 0, // todo: entity tag is missing + 0, // todo: responseCorrelationId is missing rawLog); return rawLog;