Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Adding missing fields to the logbuffer metadata #1718

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1099,6 +1099,51 @@ public static void tether(final UnsafeBuffer metadataBuffer, final boolean value
metadataBuffer.putByte(LOG_TETHER_OFFSET, (byte)(value ? 1 : 0));
}

/**
* Get whether the log is group from the metadata.
*
* @param metadataBuffer containing the meta data.
* @return true if the log is group, otherwise false.
*/
public static boolean group(final UnsafeBuffer metadataBuffer)
{
return metadataBuffer.getByte(LOG_GROUP_OFFSET) == 1;
}

/**
* Set whether the log is group in the metadata.
*
* @param metadataBuffer containing the meta data.
* @param value true if the log is group, otherwise false.
*/
public static void group(final UnsafeBuffer metadataBuffer, final boolean value)
{
metadataBuffer.putByte(LOG_GROUP_OFFSET, (byte)(value ? 1 : 0));
}

/**
* Get whether the log is response from the metadata.
*
* @param metadataBuffer containing the meta data.
* @return true if the log is group, otherwise false.
*/
public static boolean isResponse(final UnsafeBuffer metadataBuffer)
{
return metadataBuffer.getByte(LOG_IS_RESPONSE_OFFSET) == 1;
}

/**
* Set whether the log is response in the metadata.
*
* @param metadataBuffer containing the meta data.
* @param value true if the log is group, otherwise false.
*/
public static void isResponse(final UnsafeBuffer metadataBuffer, final boolean value)
{
metadataBuffer.putByte(LOG_IS_RESPONSE_OFFSET, (byte)(value ? 1 : 0));
}


/**
* Get whether the log is rejoining from the metadata.
*
Expand Down Expand Up @@ -1407,6 +1452,50 @@ public static void lingerTimeoutNs(final UnsafeBuffer metadataBuffer, final long
metadataBuffer.putLong(LOG_LINGER_TIMEOUT_NS_OFFSET, value);
}

/**
* Get the entity tag from the metadata.
*
* @param metadataBuffer containing the meta data.
* @return the entity tag in nanoseconds.
*/
public static long entityTag(final UnsafeBuffer metadataBuffer)
{
return metadataBuffer.getLong(LOG_ENTITY_TAG_OFFSET);
}

/**
* Set the entity tag in the metadata.
*
* @param metadataBuffer containing the meta data.
* @param value the entity tag to set.
*/
public static void entityTag(final UnsafeBuffer metadataBuffer, final long value)
{
metadataBuffer.putLong(LOG_ENTITY_TAG_OFFSET, value);
}

/**
* Get the response correlation id from the metadata.
*
* @param metadataBuffer containing the meta data.
* @return the entity tag in nanoseconds.
*/
public static long responseCorrelationId(final UnsafeBuffer metadataBuffer)
{
return metadataBuffer.getLong(LOG_RESPONSE_CORRELATION_ID_OFFSET);
}

/**
* Set the response correlation id in the metadata.
*
* @param metadataBuffer containing the meta data.
* @param value the resonse correlation id to set.
*/
public static void responseCorrelationId(final UnsafeBuffer metadataBuffer, final long value)
{
metadataBuffer.putLong(LOG_RESPONSE_CORRELATION_ID_OFFSET, value);
}

/**
* Get whether the signal EOS is enabled from the metadata.
*
Expand Down
20 changes: 20 additions & 0 deletions aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java
Original file line number Diff line number Diff line change
Expand Up @@ -1835,13 +1835,17 @@ private RawLog newNetworkPublicationLog(
rejoin,
reliable,
params.isSparse,
false,//todo
params.isResponse,
params.publicationWindowLength,
params.untetheredWindowLimitTimeoutNs,
params.untetheredRestingTimeoutNs,
params.maxResend,
params.lingerTimeoutNs,
params.signalEos,
params.spiesSimulateConnection,
params.entityTag,
params.responseCorrelationId,
rawLog);
initialisePositionCounters(initialTermId, params, rawLog.metaData());

Expand Down Expand Up @@ -1878,13 +1882,17 @@ private RawLog newIpcPublicationLog(
rejoin,
reliable,
params.isSparse,
false,// todo: group is missing
params.isResponse,
params.publicationWindowLength,
params.untetheredWindowLimitTimeoutNs,
params.untetheredRestingTimeoutNs,
params.maxResend,
params.lingerTimeoutNs,
params.signalEos,
params.spiesSimulateConnection,
params.entityTag,
params.responseCorrelationId,
rawLog);
initialisePositionCounters(initialTermId, params, rawLog.metaData());

Expand All @@ -1905,13 +1913,17 @@ private void initLogMetadata(
final boolean rejoin,
final boolean reliable,
final boolean sparse,
final boolean group,
final boolean isResponse,
final int publicationWindowLength,
final long untetheredWindowLimitTimeoutNs,
final long untetheredRestingTimeoutNs,
final int maxResend,
final long lingerTimeoutNs,
final boolean signalEos,
final boolean spiesSimulateConnection,
final long entityTag,
final long responseCorrelationId,
final RawLog rawLog)
{
final UnsafeBuffer logMetaData = rawLog.metaData();
Expand Down Expand Up @@ -1945,7 +1957,11 @@ private void initLogMetadata(
signalEos(logMetaData, signalEos);
spiesSimulateConnection(logMetaData, spiesSimulateConnection);
tether(logMetaData, tether);
group(logMetaData, group);
isResponse(logMetaData, isResponse);

entityTag(logMetaData, entityTag);
responseCorrelationId(logMetaData, responseCorrelationId);
untetheredWindowLimitTimeoutNs(logMetaData, untetheredWindowLimitTimeoutNs);
untetheredRestingTimeoutNs(logMetaData, untetheredRestingTimeoutNs);
lingerTimeoutNs(logMetaData, lingerTimeoutNs);
Expand Down Expand Up @@ -2022,13 +2038,17 @@ private RawLog newPublicationImageLog(
params.isRejoin,
params.isReliable,
params.isSparse,
false,// todo: group is missing
params.isResponse,
publicationWindowLength,
untetheredWindowLimitTimeoutNs,
untetheredRestingTimeoutNs,
maxResend,
lingerTimeoutNs,
signalEos,
spiesSimulateConnection,
0, // todo: entity tag is missing
0, // todo: responseCorrelationId is missing
rawLog);

return rawLog;
Expand Down
Loading