From 9cb04b5f084bdaf5c5345f755fe1b4c7250369af Mon Sep 17 00:00:00 2001 From: Dmytro Vyazelenko <696855+vyazelenko@users.noreply.github.com> Date: Fri, 17 Jan 2025 20:19:15 +0100 Subject: [PATCH] [Java] Speedup `purgeSegments/deleteDetachedSegments` operations by only deleting files in a range between the current startPosition and the previous startPosition (purge) or the oldest existing segment file position (detached files). --- .../io/aeron/archive/ArchiveConductor.java | 54 ++++++++--- .../archive/ManageRecordingHistoryTest.java | 97 ++++++++++++------- 2 files changed, 106 insertions(+), 45 deletions(-) diff --git a/aeron-archive/src/main/java/io/aeron/archive/ArchiveConductor.java b/aeron-archive/src/main/java/io/aeron/archive/ArchiveConductor.java index 1a0e0c94b0..98607f973b 100644 --- a/aeron-archive/src/main/java/io/aeron/archive/ArchiveConductor.java +++ b/aeron-archive/src/main/java/io/aeron/archive/ArchiveConductor.java @@ -33,6 +33,7 @@ import org.agrona.collections.Int2ObjectHashMap; import org.agrona.collections.Long2LongCounterMap; import org.agrona.collections.Long2ObjectHashMap; +import org.agrona.collections.MutableLong; import org.agrona.collections.Object2ObjectHashMap; import org.agrona.concurrent.*; import org.agrona.concurrent.status.CountersReader; @@ -50,6 +51,7 @@ import java.util.Random; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import static io.aeron.Aeron.NULL_VALUE; import static io.aeron.CommonContext.*; @@ -66,6 +68,7 @@ import static java.lang.Math.min; import static java.nio.file.StandardOpenOption.READ; import static java.nio.file.StandardOpenOption.WRITE; +import static org.agrona.AsciiEncoding.digitCount; import static org.agrona.concurrent.status.CountersReader.METADATA_LENGTH; abstract class ArchiveConductor @@ -1046,7 +1049,7 @@ void truncateRecording( final ArrayDeque files = new ArrayDeque<>(); if (startPosition == position) { - listSegmentFiles(recordingId, files); + listSegmentFiles(recordingId, files::addLast); } else { @@ -1086,7 +1089,7 @@ void purgeRecording(final long correlationId, final long recordingId, final Cont catalog.changeState(recordingId, DELETED); final ArrayDeque files = new ArrayDeque<>(); - listSegmentFiles(recordingId, files); + listSegmentFiles(recordingId, files::addLast); deleteSegments(correlationId, recordingId, controlSession, files); } @@ -1341,8 +1344,23 @@ void deleteDetachedSegments(final long correlationId, final long recordingId, fi if (hasRecording(recordingId, correlationId, controlSession) && isDeleteAllowed(recordingId, correlationId, controlSession)) { + final MutableLong minPosition = new MutableLong(Long.MAX_VALUE); + final int prefixLength = digitCount(recordingId) + 1; + listSegmentFiles( + recordingId, + (segmentFile) -> + { + final int dotIndex = segmentFile.indexOf('.'); + final long filePosition = + AsciiEncoding.parseLongAscii(segmentFile, prefixLength, dotIndex - prefixLength); + minPosition.set(Math.min(minPosition.get(), filePosition)); + }); + final ArrayDeque files = new ArrayDeque<>(); - findDetachedSegments(recordingId, files); + if (Long.MAX_VALUE != minPosition.get()) + { + findDetachedSegments(recordingId, files, minPosition.get()); + } deleteSegments(correlationId, recordingId, controlSession, files); } } @@ -1357,10 +1375,13 @@ void purgeSegments( isValidDetach(correlationId, controlSession, recordingId, newStartPosition) && isDeleteAllowed(recordingId, correlationId, controlSession)) { + catalog.recordingSummary(recordingId, recordingSummary); + final long oldStartPosition = recordingSummary.startPosition; + catalog.startPosition(recordingId, newStartPosition); final ArrayDeque files = new ArrayDeque<>(); - findDetachedSegments(recordingId, files); + findDetachedSegments(recordingId, files, oldStartPosition); deleteSegments(correlationId, recordingId, controlSession, files); } } @@ -1524,17 +1545,28 @@ void removeDeleteSegmentsSession(final DeleteSegmentsSession deleteSegmentsSessi deleteSegmentsSessionByIdMap.remove(deleteSegmentsSession.sessionId()); } - private void findDetachedSegments(final long recordingId, final ArrayDeque files) + private void findDetachedSegments( + final long recordingId, final ArrayDeque files, final long prevStartPosition) { catalog.recordingSummary(recordingId, recordingSummary); - final int segmentFile = recordingSummary.segmentFileLength; - long filenamePosition = recordingSummary.startPosition - segmentFile; + final int segmentFileLength = recordingSummary.segmentFileLength; + + final long prevSegmentFilePosition = segmentFileBasePosition( + prevStartPosition, prevStartPosition, recordingSummary.termBufferLength, segmentFileLength); + + final long startSegmentFilePosition = segmentFileBasePosition( + recordingSummary.startPosition, + recordingSummary.startPosition, + recordingSummary.termBufferLength, + segmentFileLength); + + long filenamePosition = startSegmentFilePosition - segmentFileLength; - while (filenamePosition >= 0) + while (filenamePosition >= prevSegmentFilePosition) { final String segmentFileName = segmentFileName(recordingId, filenamePosition); files.addFirst(segmentFileName); - filenamePosition -= segmentFile; + filenamePosition -= segmentFileLength; } } @@ -1763,7 +1795,7 @@ private boolean isDeleteAllowed( return true; } - private void listSegmentFiles(final long recordingId, final ArrayDeque files) + private void listSegmentFiles(final long recordingId, final Consumer segmentFileConsumer) { final String prefix = recordingId + "-"; final String[] recordingFiles = archiveDir.list(); @@ -1774,7 +1806,7 @@ private void listSegmentFiles(final long recordingId, final ArrayDeque f if (name.startsWith(prefix) && (name.endsWith(RECORDING_SEGMENT_SUFFIX) || name.endsWith(DELETE_SUFFIX))) { - files.addLast(name); + segmentFileConsumer.accept(name); } } } diff --git a/aeron-system-tests/src/test/java/io/aeron/archive/ManageRecordingHistoryTest.java b/aeron-system-tests/src/test/java/io/aeron/archive/ManageRecordingHistoryTest.java index b8fb348e7a..3f99015636 100644 --- a/aeron-system-tests/src/test/java/io/aeron/archive/ManageRecordingHistoryTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/archive/ManageRecordingHistoryTest.java @@ -39,6 +39,8 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.File; import java.io.IOException; @@ -48,11 +50,10 @@ import static io.aeron.archive.ArchiveSystemTests.awaitSignal; import static io.aeron.archive.ArchiveSystemTests.injectRecordingSignalConsumer; import static io.aeron.archive.ArchiveSystemTests.offerToPosition; +import static io.aeron.archive.client.AeronArchive.*; import static io.aeron.logbuffer.FrameDescriptor.FRAME_ALIGNMENT; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.arrayContaining; -import static org.hamcrest.Matchers.arrayContainingInAnyOrder; -import static org.hamcrest.Matchers.arrayWithSize; +import static org.hamcrest.Matchers.*; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -104,7 +105,7 @@ void before(@TempDir final Path tempDir) aeron = Aeron.connect(); - aeronArchive = AeronArchive.connect( + aeronArchive = connect( TestContexts.localhostAeronArchive() .aeron(aeron)); @@ -135,7 +136,7 @@ void shouldPurgeForStreamJoinedAtTheBeginning() Tests.awaitPosition(counters, counterId, publication.position()); final long startPosition = 0L; - final long segmentFileBasePosition = AeronArchive.segmentFileBasePosition( + final long segmentFileBasePosition = segmentFileBasePosition( startPosition, SEGMENT_LENGTH * 2L, TERM_LENGTH, SEGMENT_LENGTH); signalConsumer.reset(); @@ -157,7 +158,7 @@ void shouldPurgeForLateJoinedStream() throws IOException { final String messagePrefix = "Message-Prefix-"; final int initialTermId = 7; - final long targetPosition = (SEGMENT_LENGTH * 13L) + 1; + final long targetPosition = (SEGMENT_LENGTH * 15L) + 100; final long startPosition = (SEGMENT_LENGTH * 10L) + 7 * FRAME_ALIGNMENT; uriBuilder.initialPosition(startPosition, initialTermId, TERM_LENGTH); @@ -174,30 +175,13 @@ void shouldPurgeForLateJoinedStream() throws IOException Tests.awaitPosition(counters, counterId, publication.position()); final File archiveDir = archive.context().archiveDir(); - long position = 0; - boolean deleted = false; - while (position < startPosition - SEGMENT_LENGTH) - { - final String segmentFileName = Archive.segmentFileName(recordingId, position); - if (deleted) - { - assertTrue(new File(archiveDir, segmentFileName + ".del").createNewFile()); - } - else - { - assertTrue(new File(archiveDir, segmentFileName).createNewFile()); - } - deleted = !deleted; - position += SEGMENT_LENGTH; - } - final String fileNamePrefix = recordingId + "-"; final String[] recordingFiles = archiveDir.list((dir, name) -> name.startsWith(fileNamePrefix)); - assertThat(recordingFiles, arrayWithSize(14)); + assertThat(recordingFiles, arrayWithSize(6)); - final long segmentFileBasePosition = AeronArchive.segmentFileBasePosition( + final long segmentFileBasePosition = segmentFileBasePosition( startPosition, - startPosition + SEGMENT_LENGTH + TERM_LENGTH + FRAME_ALIGNMENT * 5, + startPosition + 2 * SEGMENT_LENGTH + TERM_LENGTH + FRAME_ALIGNMENT * 5, TERM_LENGTH, SEGMENT_LENGTH); @@ -205,7 +189,7 @@ void shouldPurgeForLateJoinedStream() throws IOException final long purgeSegments = aeronArchive.purgeSegments(recordingId, segmentFileBasePosition); awaitSignal(aeronArchive, signalConsumer, RecordingSignal.DELETE); assertEquals(recordingId, signalConsumer.recordingId); - assertEquals(11L, purgeSegments); + assertEquals(2, purgeSegments); assertEquals(segmentFileBasePosition, aeronArchive.getStartPosition(recordingId)); signalConsumer.reset(); @@ -214,9 +198,10 @@ void shouldPurgeForLateJoinedStream() throws IOException final String[] files = archiveDir.list((dir, name) -> name.startsWith(fileNamePrefix)); assertThat(files, arrayContainingInAnyOrder( - Archive.segmentFileName(recordingId, SEGMENT_LENGTH * 13L), Archive.segmentFileName(recordingId, SEGMENT_LENGTH * 12L), - Archive.segmentFileName(recordingId, SEGMENT_LENGTH * 11L))); + Archive.segmentFileName(recordingId, SEGMENT_LENGTH * 13L), + Archive.segmentFileName(recordingId, SEGMENT_LENGTH * 14L), + Archive.segmentFileName(recordingId, SEGMENT_LENGTH * 15L))); } } @@ -243,7 +228,7 @@ void shouldDetachThenAttachFullSegments() assertEquals(recordingId, signalConsumer.recordingId); final long startPosition = 0L; - final long segmentFileBasePosition = AeronArchive.segmentFileBasePosition( + final long segmentFileBasePosition = segmentFileBasePosition( startPosition, SEGMENT_LENGTH * 2L, TERM_LENGTH, SEGMENT_LENGTH); aeronArchive.detachSegments(recordingId, segmentFileBasePosition); @@ -282,7 +267,7 @@ void shouldDetachThenAttachWhenStartNotSegmentAligned() awaitSignal(aeronArchive, signalConsumer, RecordingSignal.STOP); assertEquals(recordingId, signalConsumer.recordingId); - final long segmentFileBasePosition = AeronArchive.segmentFileBasePosition( + final long segmentFileBasePosition = segmentFileBasePosition( startPosition, startPosition + (SEGMENT_LENGTH * 2L), TERM_LENGTH, SEGMENT_LENGTH); aeronArchive.detachSegments(recordingId, segmentFileBasePosition); @@ -322,7 +307,7 @@ void shouldDeleteDetachedFullSegments() assertThat(files, arrayWithSize(4)); final long startPosition = 0L; - final long segmentFileBasePosition = AeronArchive.segmentFileBasePosition( + final long segmentFileBasePosition = segmentFileBasePosition( startPosition, SEGMENT_LENGTH * 2L, TERM_LENGTH, SEGMENT_LENGTH); aeronArchive.detachSegments(recordingId, segmentFileBasePosition); @@ -372,7 +357,7 @@ void shouldDeleteDetachedSegmentsWhenStartNotSegmentAligned() final String[] files = archive.context().archiveDir().list((dir, name) -> name.startsWith(prefix)); assertThat(files, arrayWithSize(3)); - final long segmentFileBasePosition = AeronArchive.segmentFileBasePosition( + final long segmentFileBasePosition = segmentFileBasePosition( startPosition, startPosition + (SEGMENT_LENGTH * 2L), TERM_LENGTH, SEGMENT_LENGTH); aeronArchive.detachSegments(recordingId, segmentFileBasePosition); @@ -381,7 +366,7 @@ void shouldDeleteDetachedSegmentsWhenStartNotSegmentAligned() signalConsumer.reset(); final long deletedSegments = aeronArchive.deleteDetachedSegments(recordingId); awaitSignal(aeronArchive, signalConsumer, recordingId, RecordingSignal.DELETE); - assertEquals(3L, deletedSegments); // non-existing file `0-0.rec` is counted as being deleted + assertEquals(2, deletedSegments); assertEquals(segmentFileBasePosition, aeronArchive.getStartPosition(recordingId)); final String[] updatedFiles = archive.context().archiveDir() @@ -392,6 +377,50 @@ void shouldDeleteDetachedSegmentsWhenStartNotSegmentAligned() } } + @ParameterizedTest + @ValueSource(longs = { 0, (TERM_LENGTH * 2L) + (FRAME_ALIGNMENT * 2L)}) + @InterruptAfter(10) + void deleteDetachedSegmentsIsANoOpIfNoFilesWereDetached(final long startPosition) + { + final String messagePrefix = "Message-Prefix-"; + final int initialTermId = 19; + final long firstSegmentFilePosition = + segmentFileBasePosition(startPosition, startPosition, TERM_LENGTH, SEGMENT_LENGTH); + final long targetPosition = firstSegmentFilePosition + (SEGMENT_LENGTH * 3L) + 139; + uriBuilder.initialPosition(startPosition, initialTermId, TERM_LENGTH); + + try (Publication publication = aeronArchive.addRecordedExclusivePublication(uriBuilder.build(), STREAM_ID)) + { + assertEquals(startPosition, publication.position()); + + final CountersReader counters = aeron.countersReader(); + final int counterId = + Tests.awaitRecordingCounterId(counters, publication.sessionId(), aeronArchive.archiveId()); + final long recordingId = RecordingPos.getRecordingId(counters, counterId); + + offerToPosition(publication, messagePrefix, targetPosition); + Tests.awaitPosition(counters, counterId, publication.position()); + + signalConsumer.reset(); + aeronArchive.stopRecording(publication); + awaitSignal(aeronArchive, signalConsumer, recordingId, RecordingSignal.STOP); + + final String prefix = recordingId + "-"; + final String[] files = archive.context().archiveDir().list((dir, name) -> name.startsWith(prefix)); + assertThat(files, arrayWithSize(4)); + + signalConsumer.reset(); + final long deletedSegments = aeronArchive.deleteDetachedSegments(recordingId); + awaitSignal(aeronArchive, signalConsumer, recordingId, RecordingSignal.DELETE); + assertEquals(0, deletedSegments); + assertEquals(startPosition, aeronArchive.getStartPosition(recordingId)); + + final String[] updatedFiles = + archive.context().archiveDir().list((dir, name) -> name.startsWith(prefix)); + assertThat(updatedFiles, is(files)); + } + } + @Test @InterruptAfter(10) void shouldPurgeRecording() throws IOException