diff --git a/aeron-cluster/src/main/java/io/aeron/cluster/ClusterBackup.java b/aeron-cluster/src/main/java/io/aeron/cluster/ClusterBackup.java index add1c5b73e..a95a5ace71 100644 --- a/aeron-cluster/src/main/java/io/aeron/cluster/ClusterBackup.java +++ b/aeron-cluster/src/main/java/io/aeron/cluster/ClusterBackup.java @@ -2021,11 +2021,7 @@ public String toString() private void concludeMarkFile() { ClusterMarkFile.checkHeaderLength( - aeron.context().aeronDirectoryName(), - null, - null, - null, - null); + aeron.context().aeronDirectoryName(), null, null, null, null); markFile.encoder() .archiveStreamId(archiveContext.controlRequestStreamId()) diff --git a/aeron-cluster/src/main/java/io/aeron/cluster/ConsensusModule.java b/aeron-cluster/src/main/java/io/aeron/cluster/ConsensusModule.java index bca8dbde2d..f5532213d6 100644 --- a/aeron-cluster/src/main/java/io/aeron/cluster/ConsensusModule.java +++ b/aeron-cluster/src/main/java/io/aeron/cluster/ConsensusModule.java @@ -4301,11 +4301,7 @@ private void concludeMarkFile() final String aeronDirectory = aeron.context().aeronDirectoryName(); final String authenticatorClassName = authenticatorSupplier.getClass().getName(); ClusterMarkFile.checkHeaderLength( - aeronDirectory, - controlChannel(), - ingressChannel, - null, - authenticatorClassName); + aeronDirectory, controlChannel(), ingressChannel, null, authenticatorClassName); markFile.encoder() .archiveStreamId(archiveContext.controlRequestStreamId()) diff --git a/aeron-cluster/src/main/java/io/aeron/cluster/service/ClusterMarkFile.java b/aeron-cluster/src/main/java/io/aeron/cluster/service/ClusterMarkFile.java index fec82fe915..6f17455ff9 100644 --- a/aeron-cluster/src/main/java/io/aeron/cluster/service/ClusterMarkFile.java +++ b/aeron-cluster/src/main/java/io/aeron/cluster/service/ClusterMarkFile.java @@ -21,8 +21,11 @@ import io.aeron.cluster.codecs.mark.ClusterComponentType; import io.aeron.cluster.codecs.mark.MarkFileHeaderDecoder; import io.aeron.cluster.codecs.mark.MarkFileHeaderEncoder; +import io.aeron.cluster.codecs.mark.MessageHeaderDecoder; +import io.aeron.cluster.codecs.mark.MessageHeaderEncoder; import io.aeron.cluster.codecs.mark.VarAsciiEncodingEncoder; import org.agrona.CloseHelper; +import org.agrona.IoUtil; import org.agrona.MarkFile; import org.agrona.SemanticVersion; import org.agrona.SystemUtil; @@ -98,11 +101,14 @@ public final class ClusterMarkFile implements AutoCloseable */ public static final String SERVICE_FILENAME_PREFIX = "cluster-mark-service-"; + private static final int HEADER_OFFSET = MessageHeaderDecoder.ENCODED_LENGTH; + private final MarkFileHeaderDecoder headerDecoder = new MarkFileHeaderDecoder(); private final MarkFileHeaderEncoder headerEncoder = new MarkFileHeaderEncoder(); private final MarkFile markFile; private final UnsafeBuffer buffer; private final UnsafeBuffer errorBuffer; + private final int headerOffset; /** * Create new {@link MarkFile} for a cluster service but check if an existing service is active. @@ -128,70 +134,121 @@ public ClusterMarkFile( final boolean markFileExists = file.exists(); final int totalFileLength = HEADER_LENGTH + errorBufferLength; - markFile = new MarkFile( - file, - markFileExists, - MarkFileHeaderDecoder.versionEncodingOffset(), - MarkFileHeaderDecoder.activityTimestampEncodingOffset(), - totalFileLength, - timeoutMs, - epochClock, - (version) -> - { - if (VERSION_FAILED == version && markFileExists) - { - System.err.println("mark file version -1 indicates error on previous startup."); - } - else if (SemanticVersion.major(version) != MAJOR_VERSION) - { - throw new ClusterException("mark file major version " + SemanticVersion.major(version) + - " does not match software: " + MAJOR_VERSION); - } - }, - null); - - buffer = markFile.buffer(); - errorBuffer = new UnsafeBuffer(this.buffer, HEADER_LENGTH, errorBufferLength); - - headerEncoder.wrap(buffer, 0); - headerDecoder.wrap(buffer, 0, MarkFileHeaderDecoder.BLOCK_LENGTH, MarkFileHeaderDecoder.SCHEMA_VERSION); + final MessageHeaderDecoder messageHeaderDecoder = new MessageHeaderDecoder(); + final long candidateTermId; if (markFileExists) { + final int headerOffset = headerOffset(file); + final MarkFile markFile = new MarkFile( + file, + true, + headerOffset + MarkFileHeaderDecoder.versionEncodingOffset(), + headerOffset + MarkFileHeaderDecoder.activityTimestampEncodingOffset(), + totalFileLength, + timeoutMs, + epochClock, + (version) -> + { + if (VERSION_FAILED == version) + { + System.err.println("mark file version -1 indicates error on previous startup."); + } + else if (SemanticVersion.major(version) != MAJOR_VERSION) + { + throw new ClusterException("mark file major version " + SemanticVersion.major(version) + + " does not match software: " + MAJOR_VERSION); + } + }, + null); + final UnsafeBuffer buffer = markFile.buffer(); + if (buffer.capacity() != totalFileLength) { throw new ClusterException( "ClusterMarkFile capacity=" + buffer.capacity() + " < expectedCapacity=" + totalFileLength); } + if (0 != headerOffset) + { + headerDecoder.wrapAndApplyHeader(buffer, 0, messageHeaderDecoder); + } + else + { + headerDecoder.wrap(buffer, 0, MarkFileHeaderDecoder.BLOCK_LENGTH, MarkFileHeaderDecoder.SCHEMA_VERSION); + } + + final ClusterComponentType existingType = headerDecoder.componentType(); + + if (existingType != ClusterComponentType.UNKNOWN && existingType != type) + { + if (existingType != ClusterComponentType.BACKUP || ClusterComponentType.CONSENSUS_MODULE != type) + { + throw new ClusterException( + "existing Mark file type " + existingType + " not same as required type " + type); + } + } + final int existingErrorBufferLength = headerDecoder.errorBufferLength(); final UnsafeBuffer existingErrorBuffer = new UnsafeBuffer( buffer, headerDecoder.headerLength(), existingErrorBufferLength); saveExistingErrors(file, existingErrorBuffer, type, CommonContext.fallbackLogger()); existingErrorBuffer.setMemory(0, existingErrorBufferLength, (byte)0); + + candidateTermId = headerDecoder.candidateTermId(); + + if (0 != headerOffset) + { + this.markFile = markFile; + this.buffer = buffer; + } + else + { + this.markFile = new MarkFile( + file, + false, + HEADER_OFFSET + MarkFileHeaderDecoder.versionEncodingOffset(), + HEADER_OFFSET + MarkFileHeaderDecoder.activityTimestampEncodingOffset(), + totalFileLength, + timeoutMs, + epochClock, + null, + null); + this.buffer = markFile.buffer(); + this.buffer.setMemory(0, headerDecoder.headerLength(), (byte)0); + } } else { - headerEncoder.candidateTermId(NULL_VALUE); + markFile = new MarkFile( + file, + false, + HEADER_OFFSET + MarkFileHeaderDecoder.versionEncodingOffset(), + HEADER_OFFSET + MarkFileHeaderDecoder.activityTimestampEncodingOffset(), + totalFileLength, + timeoutMs, + epochClock, + null, + null); + buffer = markFile.buffer(); + candidateTermId = NULL_VALUE; } - final ClusterComponentType existingType = headerDecoder.componentType(); + headerOffset = HEADER_OFFSET; - if (existingType != ClusterComponentType.UNKNOWN && existingType != type) - { - if (existingType != ClusterComponentType.BACKUP || ClusterComponentType.CONSENSUS_MODULE != type) - { - throw new ClusterException( - "existing Mark file type " + existingType + " not same as required type " + type); - } - } + errorBuffer = new UnsafeBuffer(buffer, HEADER_LENGTH, errorBufferLength); + + headerEncoder + .wrapAndApplyHeader(buffer, 0, new MessageHeaderEncoder()) + .componentType(type) + .startTimestamp(epochClock.time()) + .pid(SystemUtil.getPid()) + .candidateTermId(candidateTermId) + .headerLength(HEADER_LENGTH) + .errorBufferLength(errorBufferLength); - headerEncoder.componentType(type); - headerEncoder.headerLength(HEADER_LENGTH); - headerEncoder.errorBufferLength(errorBufferLength); - headerEncoder.pid(SystemUtil.getPid()); - headerEncoder.startTimestamp(epochClock.time()); + headerDecoder.wrapAndApplyHeader(buffer, 0, messageHeaderDecoder); } /** @@ -210,30 +267,26 @@ public ClusterMarkFile( final long timeoutMs, final Consumer logger) { - this(new MarkFile( - directory, - filename, - MarkFileHeaderDecoder.versionEncodingOffset(), - MarkFileHeaderDecoder.activityTimestampEncodingOffset(), - timeoutMs, - epochClock, - (version) -> - { - if (SemanticVersion.major(version) != MAJOR_VERSION) - { - throw new ClusterException( - "mark file major version " + SemanticVersion.major(version) + - " does not match software: " + MAJOR_VERSION); - } - }, - logger)); + this(openExistingMarkFile(directory, filename, epochClock, timeoutMs, logger)); } ClusterMarkFile(final MarkFile markFile) { this.markFile = markFile; buffer = markFile.buffer(); - headerDecoder.wrap(buffer, 0, MarkFileHeaderDecoder.BLOCK_LENGTH, MarkFileHeaderDecoder.SCHEMA_VERSION); + + headerOffset = headerOffset(buffer); + if (0 != headerOffset) + { + headerEncoder.wrap(buffer, headerOffset); + headerDecoder.wrapAndApplyHeader(buffer, 0, new MessageHeaderDecoder()); + } + else + { + headerEncoder.wrap(buffer, 0); + headerDecoder.wrap(buffer, 0, MarkFileHeaderDecoder.BLOCK_LENGTH, MarkFileHeaderDecoder.SCHEMA_VERSION); + } + errorBuffer = new UnsafeBuffer(buffer, headerDecoder.headerLength(), headerDecoder.errorBufferLength()); } @@ -292,14 +345,14 @@ public boolean isClosed() } /** - * Get the current value of a candidate term id if a vote is placed in an election. + * Get the current value of a candidate term id if a vote is placed in an election with volatile semantics. * * @return the current candidate term id within an election after voting or {@link Aeron#NULL_VALUE} if * no voting phase of an election is currently active. */ public long candidateTermId() { - return buffer.getLongVolatile(MarkFileHeaderDecoder.candidateTermIdEncodingOffset()); + return buffer.getLongVolatile(headerOffset + MarkFileHeaderDecoder.candidateTermIdEncodingOffset()); } /** @@ -309,7 +362,7 @@ public long candidateTermId() */ public int memberId() { - return buffer.getInt(MarkFileHeaderDecoder.memberIdEncodingOffset()); + return headerDecoder.memberId(); } /** @@ -319,7 +372,7 @@ public int memberId() */ public void memberId(final int memberId) { - buffer.putInt(MarkFileHeaderEncoder.memberIdEncodingOffset(), memberId); + headerEncoder.memberId(memberId); } /** @@ -329,7 +382,7 @@ public void memberId(final int memberId) */ public int clusterId() { - return buffer.getInt(MarkFileHeaderDecoder.clusterIdEncodingOffset()); + return headerDecoder.clusterId(); } /** @@ -339,7 +392,7 @@ public int clusterId() */ public void clusterId(final int clusterId) { - buffer.putInt(MarkFileHeaderEncoder.clusterIdEncodingOffset(), clusterId); + headerEncoder.clusterId(clusterId); } /** @@ -445,6 +498,7 @@ public static void checkHeaderLength( final String authenticator) { final int length = + HEADER_OFFSET + MarkFileHeaderEncoder.BLOCK_LENGTH + (5 * VarAsciiEncodingEncoder.lengthEncodingLength()) + (null == aeronDirectory ? 0 : aeronDirectory.length()) + @@ -489,24 +543,19 @@ public static String linkFilenameForService(final int serviceId) */ public ClusterNodeControlProperties loadControlProperties() { - final MarkFileHeaderDecoder decoder = new MarkFileHeaderDecoder(); - decoder.wrap( - headerDecoder.buffer(), - headerDecoder.offset(), - MarkFileHeaderDecoder.BLOCK_LENGTH, - MarkFileHeaderDecoder.SCHEMA_VERSION); - + headerDecoder.sbeRewind(); return new ClusterNodeControlProperties( - decoder.memberId(), - decoder.serviceStreamId(), - decoder.consensusModuleStreamId(), - decoder.aeronDirectory(), - decoder.controlChannel()); + headerDecoder.memberId(), + headerDecoder.serviceStreamId(), + headerDecoder.consensusModuleStreamId(), + headerDecoder.aeronDirectory(), + headerDecoder.controlChannel()); } /** * Forces any changes made to the mark file's content to be written to the storage device containing the mapped * file. + * * @since 1.44.0 */ public void force() @@ -528,4 +577,54 @@ public String toString() ", markFile=" + markFile.markFile() + '}'; } + + private static int headerOffset(final File file) + { + final MappedByteBuffer mappedByteBuffer = IoUtil.mapExistingFile(file, FILENAME); + try + { + final UnsafeBuffer unsafeBuffer = + new UnsafeBuffer(mappedByteBuffer, 0, HEADER_OFFSET); + return headerOffset(unsafeBuffer); + } + finally + { + IoUtil.unmap(mappedByteBuffer); + } + } + + private static int headerOffset(final UnsafeBuffer headerBuffer) + { + final MessageHeaderDecoder decoder = new MessageHeaderDecoder(); + decoder.wrap(headerBuffer, 0); + return MarkFileHeaderDecoder.TEMPLATE_ID == decoder.templateId() && + MarkFileHeaderDecoder.SCHEMA_ID == decoder.schemaId() ? HEADER_OFFSET : 0; + } + + private static MarkFile openExistingMarkFile( + final File directory, + final String filename, + final EpochClock epochClock, + final long timeoutMs, + final Consumer logger) + { + final int headerOffset = headerOffset(new File(directory, filename)); + return new MarkFile( + directory, + filename, + headerOffset + MarkFileHeaderDecoder.versionEncodingOffset(), + headerOffset + MarkFileHeaderDecoder.activityTimestampEncodingOffset(), + timeoutMs, + epochClock, + (version) -> + { + if (SemanticVersion.major(version) != MAJOR_VERSION) + { + throw new ClusterException( + "mark file major version " + SemanticVersion.major(version) + + " does not match software: " + MAJOR_VERSION); + } + }, + logger); + } } diff --git a/aeron-cluster/src/main/java/io/aeron/cluster/service/ClusteredServiceContainer.java b/aeron-cluster/src/main/java/io/aeron/cluster/service/ClusteredServiceContainer.java index 33ff0b0de5..002cdc79d3 100644 --- a/aeron-cluster/src/main/java/io/aeron/cluster/service/ClusteredServiceContainer.java +++ b/aeron-cluster/src/main/java/io/aeron/cluster/service/ClusteredServiceContainer.java @@ -2032,11 +2032,7 @@ CountDownLatch abortLatch() private void concludeMarkFile() { ClusterMarkFile.checkHeaderLength( - aeron.context().aeronDirectoryName(), - controlChannel(), - null, - serviceName, - null); + aeron.context().aeronDirectoryName(), controlChannel(), null, serviceName, null); final MarkFileHeaderEncoder encoder = markFile.encoder(); diff --git a/aeron-cluster/src/test/java/io/aeron/cluster/service/ClusterMarkFileTest.java b/aeron-cluster/src/test/java/io/aeron/cluster/service/ClusterMarkFileTest.java index f1bc9b56aa..ebcf95e91b 100644 --- a/aeron-cluster/src/test/java/io/aeron/cluster/service/ClusterMarkFileTest.java +++ b/aeron-cluster/src/test/java/io/aeron/cluster/service/ClusterMarkFileTest.java @@ -15,8 +15,15 @@ */ package io.aeron.cluster.service; +import io.aeron.Aeron; import io.aeron.cluster.codecs.mark.ClusterComponentType; +import io.aeron.cluster.codecs.mark.MarkFileHeaderDecoder; +import io.aeron.cluster.codecs.mark.MarkFileHeaderEncoder; +import org.agrona.IoUtil; import org.agrona.MarkFile; +import org.agrona.SemanticVersion; +import org.agrona.SystemUtil; +import org.agrona.concurrent.CachedEpochClock; import org.agrona.concurrent.SystemEpochClock; import org.agrona.concurrent.UnsafeBuffer; import org.junit.jupiter.api.Test; @@ -24,28 +31,35 @@ import org.junit.jupiter.api.condition.JRE; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.InOrder; +import java.io.File; +import java.io.IOException; import java.nio.MappedByteBuffer; +import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.StandardOpenOption; import static io.aeron.cluster.service.ClusterMarkFile.ERROR_BUFFER_MAX_LENGTH; import static io.aeron.cluster.service.ClusterMarkFile.ERROR_BUFFER_MIN_LENGTH; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrowsExactly; +import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.*; class ClusterMarkFileTest { + @TempDir + private Path tempDir; + @ParameterizedTest @ValueSource(ints = { Integer.MIN_VALUE, -100, ERROR_BUFFER_MIN_LENGTH - 1, ERROR_BUFFER_MAX_LENGTH + 1 }) - void throwsExceptionIfErrorBufferLengthIsInvalid(final int errorBufferLength, final @TempDir Path dir) + void throwsExceptionIfErrorBufferLengthIsInvalid(final int errorBufferLength) { final IllegalArgumentException exception = assertThrowsExactly( IllegalArgumentException.class, () -> new ClusterMarkFile( - dir.resolve("test.cfg").toFile(), + tempDir.resolve("test.cfg").toFile(), ClusterComponentType.CONSENSUS_MODULE, errorBufferLength, SystemEpochClock.INSTANCE, @@ -91,4 +105,361 @@ void shouldNotCallForceIfMarkFileIsClosed() inOrder.verifyNoMoreInteractions(); } } + + @ParameterizedTest + @EnumSource(ClusterComponentType.class) + void shouldCreateNewMarkFile(final ClusterComponentType componentType) + { + final File file = tempDir.resolve(ClusterMarkFile.FILENAME).toFile(); + assertFalse(file.exists()); + + final CachedEpochClock epochClock = new CachedEpochClock(); + epochClock.advance(35984758934759843L); + + try (ClusterMarkFile clusterMarkFile = + new ClusterMarkFile(file, componentType, ERROR_BUFFER_MIN_LENGTH, epochClock, 1000)) + { + assertTrue(file.exists()); + assertEquals(ClusterMarkFile.HEADER_LENGTH + ERROR_BUFFER_MIN_LENGTH, file.length()); + + clusterMarkFile.signalReady(); + + verifyMarkFileContents( + clusterMarkFile, + ClusterMarkFile.SEMANTIC_VERSION, + componentType, + 0, + epochClock.time(), + SystemUtil.getPid(), + Aeron.NULL_VALUE, + 0, + 0, + 0, + 0, + 0, + 0, + ClusterMarkFile.HEADER_LENGTH, + ERROR_BUFFER_MIN_LENGTH, + 0, + "", + "", + "", + "", + "", + ""); + + assertInstanceOf(MarkFileHeaderEncoder.class, clusterMarkFile.encoder()); + assertInstanceOf(MarkFileHeaderDecoder.class, clusterMarkFile.decoder()); + } + } + + @Test + void shouldUpdateExistingMarkFile() + { + final long activityTimestamp = 112211443311L; + final long candidateTermId = 753475487L; + final int archiveStreamId = 4; + final int serviceStreamId = 5; + final int consensusModuleStreamId = 108; + final int insgresStreamId = 101; + final int memberId = 8; + final int serviceId = 1; + final int clusterId = -9; + final String aeronDir = tempDir.resolve("aeron").toString(); + final String controlChannel = "aeron:ipc"; + final String ingressChannel = "aeron:udp?alias=ingress"; + final String serviceName = "io.aeron.cluster.TestService"; + final String authenticator = "authenticator"; + final String clusterDir = "cluster dir"; + + final File file = tempDir.resolve(ClusterMarkFile.FILENAME).toFile(); + assertFalse(file.exists()); + + final CachedEpochClock epochClock = new CachedEpochClock(); + epochClock.update(123456L); + + try (ClusterMarkFile clusterMarkFile = + new ClusterMarkFile(file, ClusterComponentType.BACKUP, ERROR_BUFFER_MIN_LENGTH, epochClock, 1000)) + { + clusterMarkFile.signalReady(); + + clusterMarkFile.encoder() + .activityTimestamp(activityTimestamp) + .startTimestamp(-900000) + .pid(Long.MIN_VALUE) + .candidateTermId(candidateTermId) + .archiveStreamId(archiveStreamId) + .serviceStreamId(serviceStreamId) + .consensusModuleStreamId(consensusModuleStreamId) + .ingressStreamId(insgresStreamId) + .memberId(memberId) + .serviceId(serviceId) + .headerLength(444444) + .errorBufferLength(555555) + .clusterId(clusterId) + .aeronDirectory(aeronDir) + .controlChannel(controlChannel) + .ingressChannel(ingressChannel) + .serviceName(serviceName) + .authenticator(authenticator) + .servicesClusterDir(clusterDir); + } + + epochClock.update(753498573948593L); + try (ClusterMarkFile clusterMarkFile = new ClusterMarkFile( + file, ClusterComponentType.CONSENSUS_MODULE, ERROR_BUFFER_MIN_LENGTH * 2, epochClock, 2222)) + { + verifyMarkFileContents( + clusterMarkFile, + ClusterMarkFile.SEMANTIC_VERSION, + ClusterComponentType.CONSENSUS_MODULE, + activityTimestamp, + epochClock.time(), + SystemUtil.getPid(), + candidateTermId, + archiveStreamId, + serviceStreamId, + consensusModuleStreamId, + insgresStreamId, + memberId, + serviceId, + ClusterMarkFile.HEADER_LENGTH, + ERROR_BUFFER_MIN_LENGTH * 2, + clusterId, + aeronDir, + controlChannel, + ingressChannel, + serviceName, + authenticator, + clusterDir); + } + } + + @ParameterizedTest + @EnumSource(io.aeron.cluster.codecs.mark.v0.ClusterComponentType.class) + @SuppressWarnings("MethodLength") + void shouldHandleExistingMarkFileV0(final io.aeron.cluster.codecs.mark.v0.ClusterComponentType componentType) + throws IOException + { + final int version = SemanticVersion.compose(ClusterMarkFile.MAJOR_VERSION, 98, 157); + final ClusterComponentType currentComponentType = ClusterComponentType.get(componentType.value()); + final int activityTimestamp = 89898989; + final int startTimestamp = -94237423; + final long pid = 42; + final long candidateTermId = -78; + final int archiveStreamId = 33; + final int serviceStreamId = 777; + final int consensusModuleStreamId = -87; + final int ingressStreamId = 5; + final int memberId = 16; + final int serviceId = 6; + final int headerLength = 2048; + final int errorBufferLength = 1500; + final int clusterId = 3; + final String aeronDir = tempDir.resolve("path/to/dev/shm").toString(); + final String controlChannel = "control"; + final String ingressChannel = "aeron:udp?endpoint=9999"; + final String serviceName = "service name"; + final String authenticator = "auth"; + final String clusterDir = tempDir.resolve("cluster").toString(); + + final Path file = + Files.write(tempDir.resolve("test.txt"), new byte[4096], StandardOpenOption.CREATE_NEW); + + final MarkFile markFile = new MarkFile( + IoUtil.mapExistingFile(file.toFile(), ClusterMarkFile.FILENAME), + io.aeron.cluster.codecs.mark.v0.MarkFileHeaderDecoder.versionEncodingOffset(), + io.aeron.cluster.codecs.mark.v0.MarkFileHeaderDecoder.activityTimestampEncodingOffset()); + + final io.aeron.cluster.codecs.mark.v0.MarkFileHeaderEncoder encoder = + new io.aeron.cluster.codecs.mark.v0.MarkFileHeaderEncoder(); + encoder.wrap(markFile.buffer(), 0); + + encoder + .version(version) + .componentType(componentType) + .activityTimestamp(activityTimestamp) + .startTimestamp(startTimestamp) + .pid(pid) + .candidateTermId(candidateTermId) + .archiveStreamId(archiveStreamId) + .serviceStreamId(serviceStreamId) + .consensusModuleStreamId(consensusModuleStreamId) + .ingressStreamId(ingressStreamId) + .memberId(memberId) + .serviceId(serviceId) + .headerLength(headerLength) + .errorBufferLength(errorBufferLength) + .clusterId(clusterId) + .aeronDirectory(aeronDir) + .controlChannel(controlChannel) + .ingressChannel(ingressChannel) + .serviceName(serviceName) + .authenticator(authenticator); + + markFile.buffer().putStringAscii(encoder.encodedLength(), clusterDir); + + try (ClusterMarkFile clusterMarkFile = new ClusterMarkFile(markFile)) + { + verifyMarkFileContents( + clusterMarkFile, + version, + currentComponentType, + activityTimestamp, + startTimestamp, + pid, + candidateTermId, + archiveStreamId, + serviceStreamId, + consensusModuleStreamId, + ingressStreamId, + memberId, + serviceId, + headerLength, + errorBufferLength, + clusterId, + aeronDir, + controlChannel, + ingressChannel, + serviceName, + authenticator, + clusterDir); + + clusterMarkFile.signalFailedStart(); + assertEquals(ClusterMarkFile.VERSION_FAILED, clusterMarkFile.decoder().version()); + + clusterMarkFile.signalReady(); + assertEquals(ClusterMarkFile.SEMANTIC_VERSION, clusterMarkFile.decoder().version()); + + clusterMarkFile.memberId(42); + assertEquals(42, clusterMarkFile.memberId()); + + clusterMarkFile.clusterId(8888888); + assertEquals(8888888, clusterMarkFile.clusterId()); + + assertEquals(candidateTermId, clusterMarkFile.candidateTermId()); + assertEquals(componentType.value(), clusterMarkFile.decoder().componentType().value()); + + clusterMarkFile.decoder().sbeRewind(); + assertEquals(aeronDir, clusterMarkFile.decoder().aeronDirectory()); + } + + final CachedEpochClock epochClock = new CachedEpochClock(); + epochClock.advance(5436547234L); + + try (ClusterMarkFile clusterMarkFile = new ClusterMarkFile( + file.getParent().toFile(), + file.getFileName().toString(), + epochClock, + 50_000, + null)) + { + verifyMarkFileContents( + clusterMarkFile, + ClusterMarkFile.SEMANTIC_VERSION, + currentComponentType, + activityTimestamp, + startTimestamp, + pid, + candidateTermId, + archiveStreamId, + serviceStreamId, + consensusModuleStreamId, + ingressStreamId, + 42, + serviceId, + headerLength, + errorBufferLength, + 8888888, + aeronDir, + controlChannel, + ingressChannel, + serviceName, + authenticator, + clusterDir); + + clusterMarkFile.signalReady(); + assertEquals(ClusterMarkFile.SEMANTIC_VERSION, clusterMarkFile.decoder().version()); + } + + // should overwrite existing data when message header offset is being added + try (ClusterMarkFile clusterMarkFile = + new ClusterMarkFile(file.toFile(), currentComponentType, ERROR_BUFFER_MIN_LENGTH * 2, epochClock, 1000)) + { + verifyMarkFileContents( + clusterMarkFile, + 0, + currentComponentType, + 0, + epochClock.time(), + SystemUtil.getPid(), + candidateTermId, + 0, + 0, + 0, + 0, + 0, + 0, + ClusterMarkFile.HEADER_LENGTH, + ERROR_BUFFER_MIN_LENGTH * 2, + 0, + "", + "", + "", + "", + "", + ""); + } + } + + private static void verifyMarkFileContents( + final ClusterMarkFile clusterMarkFile, + final int version, + final ClusterComponentType clusterComponentType, + final long activityTimestamp, + final long startTimestamp, + final long pid, + final long candidateTermId, + final int archiveStreamId, + final int serviceStreamId, + final int consensusModuleStreamId, + final int ingressStreamId, + final int memberId, + final int serviceId, + final int headerLength, + final int errorBufferLength, + final int clusterId, + final String aeronDir, + final String controlChannel, + final String ingressChannel, + final String serviceName, + final String authenticator, + final String clusterDir) + { + assertEquals(version, clusterMarkFile.decoder().version()); + assertEquals(clusterComponentType, clusterMarkFile.decoder().componentType()); + assertEquals(activityTimestamp, clusterMarkFile.decoder().activityTimestamp()); + assertEquals(startTimestamp, clusterMarkFile.decoder().startTimestamp()); + assertEquals(pid, clusterMarkFile.decoder().pid()); + assertEquals(candidateTermId, clusterMarkFile.decoder().candidateTermId()); + assertEquals(archiveStreamId, clusterMarkFile.decoder().archiveStreamId()); + assertEquals(serviceStreamId, clusterMarkFile.decoder().serviceStreamId()); + assertEquals(consensusModuleStreamId, clusterMarkFile.decoder().consensusModuleStreamId()); + assertEquals(ingressStreamId, clusterMarkFile.decoder().ingressStreamId()); + assertEquals(memberId, clusterMarkFile.decoder().memberId()); + assertEquals(serviceId, clusterMarkFile.decoder().serviceId()); + assertEquals(headerLength, clusterMarkFile.decoder().headerLength()); + assertEquals(errorBufferLength, clusterMarkFile.decoder().errorBufferLength()); + assertEquals(clusterId, clusterMarkFile.decoder().clusterId()); + assertEquals(aeronDir, clusterMarkFile.decoder().aeronDirectory()); + assertEquals(controlChannel, clusterMarkFile.decoder().controlChannel()); + assertEquals(ingressChannel, clusterMarkFile.decoder().ingressChannel()); + assertEquals(serviceName, clusterMarkFile.decoder().serviceName()); + assertEquals(authenticator, clusterMarkFile.decoder().authenticator()); + assertEquals(clusterDir, clusterMarkFile.decoder().servicesClusterDir()); + + assertEquals(memberId, clusterMarkFile.memberId()); + assertEquals(clusterId, clusterMarkFile.clusterId()); + assertEquals(candidateTermId, clusterMarkFile.candidateTermId()); + } } diff --git a/aeron-cluster/src/test/resources/aeron-cluster-mark-codecs-v0.xml b/aeron-cluster/src/test/resources/aeron-cluster-mark-codecs-v0.xml new file mode 100644 index 0000000000..0e259c8355 --- /dev/null +++ b/aeron-cluster/src/test/resources/aeron-cluster-mark-codecs-v0.xml @@ -0,0 +1,59 @@ + + + + + + + + + + + + + + + + + + + + 0 + 1 + 2 + 3 + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/build.gradle b/build.gradle index a1b4e5e2c4..1e8ae7c617 100644 --- a/build.gradle +++ b/build.gradle @@ -627,7 +627,7 @@ project(':aeron-archive') { implementation project(':aeron-annotations') annotationProcessor project(':aeron-annotations') testImplementation project(':aeron-test-support') - testImplementation files('build/classes/java/generatedTest') + testImplementation files("${layout.buildDirectory.get()}/classes/java/generatedTest") } def generatedSrcDir = file("${layout.buildDirectory.get()}/generated-src") @@ -850,37 +850,64 @@ project(':aeron-cluster') { implementation project(':aeron-annotations') annotationProcessor project(':aeron-annotations') testImplementation project(':aeron-test-support') + testImplementation files("${layout.buildDirectory.get()}/classes/java/generatedTest") } - def generatedDir = file("${layout.buildDirectory.get()}/generated-src") + def generatedSrcDir = file("${layout.buildDirectory.get()}/generated-src") + def generatedTestDir = file("${layout.buildDirectory.get()}/generated-test") + sourceSets { generated { - java.srcDir generatedDir + java.srcDir generatedSrcDir + compileClasspath += configurations.codecGeneration + } + generatedTest { + java.srcDir generatedTestDir compileClasspath += configurations.codecGeneration } } - tasks.register('generateCodecs', JavaExec) { - def codecsFile = 'src/main/resources/cluster/aeron-cluster-codecs.xml' - def markCodecsFile = 'src/main/resources/cluster/aeron-cluster-mark-codecs.xml' - def nodeStateCodecsFile = 'src/main/resources/cluster/aeron-cluster-node-state-codecs.xml' - def sbeFile = 'src/main/resources/cluster/fpl/sbe.xsd' + def codecsFile = 'src/main/resources/cluster/aeron-cluster-codecs.xml' + def markCodecsFile = 'src/main/resources/cluster/aeron-cluster-mark-codecs.xml' + def nodeStateCodecsFile = 'src/main/resources/cluster/aeron-cluster-node-state-codecs.xml' + def sbeFile = 'src/main/resources/cluster/fpl/sbe.xsd' + tasks.register('generateCodecs', JavaExec) { inputs.files(codecsFile, markCodecsFile, sbeFile) - outputs.dir generatedDir + outputs.dir generatedSrcDir mainClass.set('uk.co.real_logic.sbe.SbeTool') classpath = configurations.codecGeneration systemProperties( - 'sbe.output.dir': generatedDir, + 'sbe.output.dir': generatedSrcDir, 'sbe.target.language': 'Java', 'sbe.validation.xsd': sbeFile, 'sbe.validation.stop.on.error': 'true') args = [codecsFile, markCodecsFile, nodeStateCodecsFile] } - compileJava.dependsOn compileGeneratedJava - compileGeneratedJava.dependsOn generateCodecs + def markCodecsFileV0 = 'src/test/resources/aeron-cluster-mark-codecs-v0.xml' + tasks.register('generateTestCodecsMarkFileV0', JavaExec) { + inputs.files(markCodecsFileV0, sbeFile) + outputs.dir generatedTestDir + + mainClass.set('uk.co.real_logic.sbe.SbeTool') + jvmArgs('--add-opens', 'java.base/jdk.internal.misc=ALL-UNNAMED') + classpath = configurations.codecGeneration + systemProperties( + 'sbe.output.dir': generatedTestDir, + 'sbe.target.language': 'Java', + 'sbe.validation.xsd': sbeFile, + 'sbe.validation.stop.on.error': 'true', + 'sbe.target.namespace': 'io.aeron.cluster.codecs.mark.v0', + 'sbe.schema.transform.version': '*:0') + args = [markCodecsFileV0] + } + + compileJava.dependsOn 'compileGeneratedJava' + compileGeneratedJava.dependsOn 'generateCodecs' + compileTestJava.dependsOn 'compileGeneratedTestJava' + compileGeneratedTestJava.dependsOn 'generateTestCodecsMarkFileV0' jar { from sourceSets.generated.output