Skip to content

Commit

Permalink
Adding missing fields to the logbuffer metadata
Browse files Browse the repository at this point in the history
The following fields are missing:

    int64_t entity_tag;
    int64_t response_correlation_id;
    uint8_t group;
    uint8_t is_response;
  • Loading branch information
pveentjer committed Jan 15, 2025
1 parent 3440bf0 commit de6e127
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1099,6 +1099,51 @@ public static void tether(final UnsafeBuffer metadataBuffer, final boolean value
metadataBuffer.putByte(LOG_TETHER_OFFSET, (byte)(value ? 1 : 0));
}

/**
* Get whether the log is group from the metadata.
*
* @param metadataBuffer containing the meta data.
* @return true if the log is group, otherwise false.
*/
public static boolean group(final UnsafeBuffer metadataBuffer)
{
return metadataBuffer.getByte(LOG_GROUP_OFFSET) == 1;
}

/**
* Set whether the log is group in the metadata.
*
* @param metadataBuffer containing the meta data.
* @param value true if the log is group, otherwise false.
*/
public static void group(final UnsafeBuffer metadataBuffer, final boolean value)
{
metadataBuffer.putByte(LOG_GROUP_OFFSET, (byte)(value ? 1 : 0));
}

/**
* Get whether the log is response from the metadata.
*
* @param metadataBuffer containing the meta data.
* @return true if the log is group, otherwise false.
*/
public static boolean isResponse(final UnsafeBuffer metadataBuffer)
{
return metadataBuffer.getByte(LOG_IS_RESPONSE_OFFSET) == 1;
}

/**
* Set whether the log is response in the metadata.
*
* @param metadataBuffer containing the meta data.
* @param value true if the log is group, otherwise false.
*/
public static void isResponse(final UnsafeBuffer metadataBuffer, final boolean value)
{
metadataBuffer.putByte(LOG_IS_RESPONSE_OFFSET, (byte)(value ? 1 : 0));
}


/**
* Get whether the log is rejoining from the metadata.
*
Expand Down Expand Up @@ -1407,6 +1452,50 @@ public static void lingerTimeoutNs(final UnsafeBuffer metadataBuffer, final long
metadataBuffer.putLong(LOG_LINGER_TIMEOUT_NS_OFFSET, value);
}

/**
* Get the entity tag from the metadata.
*
* @param metadataBuffer containing the meta data.
* @return the entity tag in nanoseconds.
*/
public static long entityTag(final UnsafeBuffer metadataBuffer)
{
return metadataBuffer.getLong(LOG_ENTITY_TAG_OFFSET);
}

/**
* Set the entity tag in the metadata.
*
* @param metadataBuffer containing the meta data.
* @param value the entity tag to set.
*/
public static void entityTag(final UnsafeBuffer metadataBuffer, final long value)
{
metadataBuffer.putLong(LOG_ENTITY_TAG_OFFSET, value);
}

/**
* Get the response correlation id from the metadata.
*
* @param metadataBuffer containing the meta data.
* @return the entity tag in nanoseconds.
*/
public static long responseCorrelationId(final UnsafeBuffer metadataBuffer)
{
return metadataBuffer.getLong(LOG_RESPONSE_CORRELATION_ID_OFFSET);
}

/**
* Set the response correlation id in the metadata.
*
* @param metadataBuffer containing the meta data.
* @param value the resonse correlation id to set.
*/
public static void responseCorrelationId(final UnsafeBuffer metadataBuffer, final long value)
{
metadataBuffer.putLong(LOG_RESPONSE_CORRELATION_ID_OFFSET, value);
}

/**
* Get whether the signal EOS is enabled from the metadata.
*
Expand Down
20 changes: 20 additions & 0 deletions aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java
Original file line number Diff line number Diff line change
Expand Up @@ -1835,13 +1835,17 @@ private RawLog newNetworkPublicationLog(
rejoin,
reliable,
params.isSparse,
false,//todo
params.isResponse,
params.publicationWindowLength,
params.untetheredWindowLimitTimeoutNs,
params.untetheredRestingTimeoutNs,
params.maxResend,
params.lingerTimeoutNs,
params.signalEos,
params.spiesSimulateConnection,
params.entityTag,
params.responseCorrelationId,
rawLog);
initialisePositionCounters(initialTermId, params, rawLog.metaData());

Expand Down Expand Up @@ -1878,13 +1882,17 @@ private RawLog newIpcPublicationLog(
rejoin,
reliable,
params.isSparse,
false,// todo: group is missing
params.isResponse,
params.publicationWindowLength,
params.untetheredWindowLimitTimeoutNs,
params.untetheredRestingTimeoutNs,
params.maxResend,
params.lingerTimeoutNs,
params.signalEos,
params.spiesSimulateConnection,
params.entityTag,
params.responseCorrelationId,
rawLog);
initialisePositionCounters(initialTermId, params, rawLog.metaData());

Expand All @@ -1905,13 +1913,17 @@ private void initLogMetadata(
final boolean rejoin,
final boolean reliable,
final boolean sparse,
final boolean group,
final boolean isResponse,
final int publicationWindowLength,
final long untetheredWindowLimitTimeoutNs,
final long untetheredRestingTimeoutNs,
final int maxResend,
final long lingerTimeoutNs,
final boolean signalEos,
final boolean spiesSimulateConnection,
final long entityTag,
final long responseCorrelationId,
final RawLog rawLog)
{
final UnsafeBuffer logMetaData = rawLog.metaData();
Expand Down Expand Up @@ -1945,7 +1957,11 @@ private void initLogMetadata(
signalEos(logMetaData, signalEos);
spiesSimulateConnection(logMetaData, spiesSimulateConnection);
tether(logMetaData, tether);
group(logMetaData, group);
isResponse(logMetaData, isResponse);

entityTag(logMetaData, entityTag);
responseCorrelationId(logMetaData, responseCorrelationId);
untetheredWindowLimitTimeoutNs(logMetaData, untetheredWindowLimitTimeoutNs);
untetheredRestingTimeoutNs(logMetaData, untetheredRestingTimeoutNs);
lingerTimeoutNs(logMetaData, lingerTimeoutNs);
Expand Down Expand Up @@ -2022,13 +2038,17 @@ private RawLog newPublicationImageLog(
params.isRejoin,
params.isReliable,
params.isSparse,
false,// todo: group is missing
params.isResponse,
publicationWindowLength,
untetheredWindowLimitTimeoutNs,
untetheredRestingTimeoutNs,
maxResend,
lingerTimeoutNs,
signalEos,
spiesSimulateConnection,
0, // todo: entity tag is missing
0, // todo: responseCorrelationId is missing
rawLog);

return rawLog;
Expand Down

0 comments on commit de6e127

Please sign in to comment.