Skip to content

Commit

Permalink
[Java] Speedup purgeSegments/deleteDetachedSegments operations by o…
Browse files Browse the repository at this point in the history
…nly deleting files in a range between the current startPosition and the previous startPosition (purge) or the oldest existing segment file position (detached files).
  • Loading branch information
vyazelenko committed Jan 17, 2025
1 parent 3617dd4 commit 9cb04b5
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 45 deletions.
54 changes: 43 additions & 11 deletions aeron-archive/src/main/java/io/aeron/archive/ArchiveConductor.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.collections.Long2LongCounterMap;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.collections.MutableLong;
import org.agrona.collections.Object2ObjectHashMap;
import org.agrona.concurrent.*;
import org.agrona.concurrent.status.CountersReader;
Expand All @@ -50,6 +51,7 @@
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import static io.aeron.Aeron.NULL_VALUE;
import static io.aeron.CommonContext.*;
Expand All @@ -66,6 +68,7 @@
import static java.lang.Math.min;
import static java.nio.file.StandardOpenOption.READ;
import static java.nio.file.StandardOpenOption.WRITE;
import static org.agrona.AsciiEncoding.digitCount;
import static org.agrona.concurrent.status.CountersReader.METADATA_LENGTH;

abstract class ArchiveConductor
Expand Down Expand Up @@ -1046,7 +1049,7 @@ void truncateRecording(
final ArrayDeque<String> files = new ArrayDeque<>();
if (startPosition == position)
{
listSegmentFiles(recordingId, files);
listSegmentFiles(recordingId, files::addLast);
}
else
{
Expand Down Expand Up @@ -1086,7 +1089,7 @@ void purgeRecording(final long correlationId, final long recordingId, final Cont
catalog.changeState(recordingId, DELETED);

final ArrayDeque<String> files = new ArrayDeque<>();
listSegmentFiles(recordingId, files);
listSegmentFiles(recordingId, files::addLast);

deleteSegments(correlationId, recordingId, controlSession, files);
}
Expand Down Expand Up @@ -1341,8 +1344,23 @@ void deleteDetachedSegments(final long correlationId, final long recordingId, fi
if (hasRecording(recordingId, correlationId, controlSession) &&
isDeleteAllowed(recordingId, correlationId, controlSession))
{
final MutableLong minPosition = new MutableLong(Long.MAX_VALUE);
final int prefixLength = digitCount(recordingId) + 1;
listSegmentFiles(
recordingId,
(segmentFile) ->
{
final int dotIndex = segmentFile.indexOf('.');
final long filePosition =
AsciiEncoding.parseLongAscii(segmentFile, prefixLength, dotIndex - prefixLength);
minPosition.set(Math.min(minPosition.get(), filePosition));
});

final ArrayDeque<String> files = new ArrayDeque<>();
findDetachedSegments(recordingId, files);
if (Long.MAX_VALUE != minPosition.get())
{
findDetachedSegments(recordingId, files, minPosition.get());
}
deleteSegments(correlationId, recordingId, controlSession, files);
}
}
Expand All @@ -1357,10 +1375,13 @@ void purgeSegments(
isValidDetach(correlationId, controlSession, recordingId, newStartPosition) &&
isDeleteAllowed(recordingId, correlationId, controlSession))
{
catalog.recordingSummary(recordingId, recordingSummary);
final long oldStartPosition = recordingSummary.startPosition;

catalog.startPosition(recordingId, newStartPosition);

final ArrayDeque<String> files = new ArrayDeque<>();
findDetachedSegments(recordingId, files);
findDetachedSegments(recordingId, files, oldStartPosition);
deleteSegments(correlationId, recordingId, controlSession, files);
}
}
Expand Down Expand Up @@ -1524,17 +1545,28 @@ void removeDeleteSegmentsSession(final DeleteSegmentsSession deleteSegmentsSessi
deleteSegmentsSessionByIdMap.remove(deleteSegmentsSession.sessionId());
}

private void findDetachedSegments(final long recordingId, final ArrayDeque<String> files)
private void findDetachedSegments(
final long recordingId, final ArrayDeque<String> files, final long prevStartPosition)
{
catalog.recordingSummary(recordingId, recordingSummary);
final int segmentFile = recordingSummary.segmentFileLength;
long filenamePosition = recordingSummary.startPosition - segmentFile;
final int segmentFileLength = recordingSummary.segmentFileLength;

final long prevSegmentFilePosition = segmentFileBasePosition(
prevStartPosition, prevStartPosition, recordingSummary.termBufferLength, segmentFileLength);

final long startSegmentFilePosition = segmentFileBasePosition(
recordingSummary.startPosition,
recordingSummary.startPosition,
recordingSummary.termBufferLength,
segmentFileLength);

long filenamePosition = startSegmentFilePosition - segmentFileLength;

while (filenamePosition >= 0)
while (filenamePosition >= prevSegmentFilePosition)
{
final String segmentFileName = segmentFileName(recordingId, filenamePosition);
files.addFirst(segmentFileName);
filenamePosition -= segmentFile;
filenamePosition -= segmentFileLength;
}
}

Expand Down Expand Up @@ -1763,7 +1795,7 @@ private boolean isDeleteAllowed(
return true;
}

private void listSegmentFiles(final long recordingId, final ArrayDeque<String> files)
private void listSegmentFiles(final long recordingId, final Consumer<String> segmentFileConsumer)
{
final String prefix = recordingId + "-";
final String[] recordingFiles = archiveDir.list();
Expand All @@ -1774,7 +1806,7 @@ private void listSegmentFiles(final long recordingId, final ArrayDeque<String> f
if (name.startsWith(prefix) &&
(name.endsWith(RECORDING_SEGMENT_SUFFIX) || name.endsWith(DELETE_SUFFIX)))
{
files.addLast(name);
segmentFileConsumer.accept(name);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.io.File;
import java.io.IOException;
Expand All @@ -48,11 +50,10 @@
import static io.aeron.archive.ArchiveSystemTests.awaitSignal;
import static io.aeron.archive.ArchiveSystemTests.injectRecordingSignalConsumer;
import static io.aeron.archive.ArchiveSystemTests.offerToPosition;
import static io.aeron.archive.client.AeronArchive.*;
import static io.aeron.logbuffer.FrameDescriptor.FRAME_ALIGNMENT;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.arrayContaining;
import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand Down Expand Up @@ -104,7 +105,7 @@ void before(@TempDir final Path tempDir)

aeron = Aeron.connect();

aeronArchive = AeronArchive.connect(
aeronArchive = connect(
TestContexts.localhostAeronArchive()
.aeron(aeron));

Expand Down Expand Up @@ -135,7 +136,7 @@ void shouldPurgeForStreamJoinedAtTheBeginning()
Tests.awaitPosition(counters, counterId, publication.position());

final long startPosition = 0L;
final long segmentFileBasePosition = AeronArchive.segmentFileBasePosition(
final long segmentFileBasePosition = segmentFileBasePosition(
startPosition, SEGMENT_LENGTH * 2L, TERM_LENGTH, SEGMENT_LENGTH);

signalConsumer.reset();
Expand All @@ -157,7 +158,7 @@ void shouldPurgeForLateJoinedStream() throws IOException
{
final String messagePrefix = "Message-Prefix-";
final int initialTermId = 7;
final long targetPosition = (SEGMENT_LENGTH * 13L) + 1;
final long targetPosition = (SEGMENT_LENGTH * 15L) + 100;
final long startPosition = (SEGMENT_LENGTH * 10L) + 7 * FRAME_ALIGNMENT;
uriBuilder.initialPosition(startPosition, initialTermId, TERM_LENGTH);

Expand All @@ -174,38 +175,21 @@ void shouldPurgeForLateJoinedStream() throws IOException
Tests.awaitPosition(counters, counterId, publication.position());

final File archiveDir = archive.context().archiveDir();
long position = 0;
boolean deleted = false;
while (position < startPosition - SEGMENT_LENGTH)
{
final String segmentFileName = Archive.segmentFileName(recordingId, position);
if (deleted)
{
assertTrue(new File(archiveDir, segmentFileName + ".del").createNewFile());
}
else
{
assertTrue(new File(archiveDir, segmentFileName).createNewFile());
}
deleted = !deleted;
position += SEGMENT_LENGTH;
}

final String fileNamePrefix = recordingId + "-";
final String[] recordingFiles = archiveDir.list((dir, name) -> name.startsWith(fileNamePrefix));
assertThat(recordingFiles, arrayWithSize(14));
assertThat(recordingFiles, arrayWithSize(6));

final long segmentFileBasePosition = AeronArchive.segmentFileBasePosition(
final long segmentFileBasePosition = segmentFileBasePosition(
startPosition,
startPosition + SEGMENT_LENGTH + TERM_LENGTH + FRAME_ALIGNMENT * 5,
startPosition + 2 * SEGMENT_LENGTH + TERM_LENGTH + FRAME_ALIGNMENT * 5,
TERM_LENGTH,
SEGMENT_LENGTH);

signalConsumer.reset();
final long purgeSegments = aeronArchive.purgeSegments(recordingId, segmentFileBasePosition);
awaitSignal(aeronArchive, signalConsumer, RecordingSignal.DELETE);
assertEquals(recordingId, signalConsumer.recordingId);
assertEquals(11L, purgeSegments);
assertEquals(2, purgeSegments);
assertEquals(segmentFileBasePosition, aeronArchive.getStartPosition(recordingId));

signalConsumer.reset();
Expand All @@ -214,9 +198,10 @@ void shouldPurgeForLateJoinedStream() throws IOException

final String[] files = archiveDir.list((dir, name) -> name.startsWith(fileNamePrefix));
assertThat(files, arrayContainingInAnyOrder(
Archive.segmentFileName(recordingId, SEGMENT_LENGTH * 13L),
Archive.segmentFileName(recordingId, SEGMENT_LENGTH * 12L),
Archive.segmentFileName(recordingId, SEGMENT_LENGTH * 11L)));
Archive.segmentFileName(recordingId, SEGMENT_LENGTH * 13L),
Archive.segmentFileName(recordingId, SEGMENT_LENGTH * 14L),
Archive.segmentFileName(recordingId, SEGMENT_LENGTH * 15L)));
}
}

Expand All @@ -243,7 +228,7 @@ void shouldDetachThenAttachFullSegments()
assertEquals(recordingId, signalConsumer.recordingId);

final long startPosition = 0L;
final long segmentFileBasePosition = AeronArchive.segmentFileBasePosition(
final long segmentFileBasePosition = segmentFileBasePosition(
startPosition, SEGMENT_LENGTH * 2L, TERM_LENGTH, SEGMENT_LENGTH);

aeronArchive.detachSegments(recordingId, segmentFileBasePosition);
Expand Down Expand Up @@ -282,7 +267,7 @@ void shouldDetachThenAttachWhenStartNotSegmentAligned()
awaitSignal(aeronArchive, signalConsumer, RecordingSignal.STOP);
assertEquals(recordingId, signalConsumer.recordingId);

final long segmentFileBasePosition = AeronArchive.segmentFileBasePosition(
final long segmentFileBasePosition = segmentFileBasePosition(
startPosition, startPosition + (SEGMENT_LENGTH * 2L), TERM_LENGTH, SEGMENT_LENGTH);

aeronArchive.detachSegments(recordingId, segmentFileBasePosition);
Expand Down Expand Up @@ -322,7 +307,7 @@ void shouldDeleteDetachedFullSegments()
assertThat(files, arrayWithSize(4));

final long startPosition = 0L;
final long segmentFileBasePosition = AeronArchive.segmentFileBasePosition(
final long segmentFileBasePosition = segmentFileBasePosition(
startPosition, SEGMENT_LENGTH * 2L, TERM_LENGTH, SEGMENT_LENGTH);

aeronArchive.detachSegments(recordingId, segmentFileBasePosition);
Expand Down Expand Up @@ -372,7 +357,7 @@ void shouldDeleteDetachedSegmentsWhenStartNotSegmentAligned()
final String[] files = archive.context().archiveDir().list((dir, name) -> name.startsWith(prefix));
assertThat(files, arrayWithSize(3));

final long segmentFileBasePosition = AeronArchive.segmentFileBasePosition(
final long segmentFileBasePosition = segmentFileBasePosition(
startPosition, startPosition + (SEGMENT_LENGTH * 2L), TERM_LENGTH, SEGMENT_LENGTH);

aeronArchive.detachSegments(recordingId, segmentFileBasePosition);
Expand All @@ -381,7 +366,7 @@ void shouldDeleteDetachedSegmentsWhenStartNotSegmentAligned()
signalConsumer.reset();
final long deletedSegments = aeronArchive.deleteDetachedSegments(recordingId);
awaitSignal(aeronArchive, signalConsumer, recordingId, RecordingSignal.DELETE);
assertEquals(3L, deletedSegments); // non-existing file `0-0.rec` is counted as being deleted
assertEquals(2, deletedSegments);
assertEquals(segmentFileBasePosition, aeronArchive.getStartPosition(recordingId));

final String[] updatedFiles = archive.context().archiveDir()
Expand All @@ -392,6 +377,50 @@ void shouldDeleteDetachedSegmentsWhenStartNotSegmentAligned()
}
}

@ParameterizedTest
@ValueSource(longs = { 0, (TERM_LENGTH * 2L) + (FRAME_ALIGNMENT * 2L)})
@InterruptAfter(10)
void deleteDetachedSegmentsIsANoOpIfNoFilesWereDetached(final long startPosition)
{
final String messagePrefix = "Message-Prefix-";
final int initialTermId = 19;
final long firstSegmentFilePosition =
segmentFileBasePosition(startPosition, startPosition, TERM_LENGTH, SEGMENT_LENGTH);
final long targetPosition = firstSegmentFilePosition + (SEGMENT_LENGTH * 3L) + 139;
uriBuilder.initialPosition(startPosition, initialTermId, TERM_LENGTH);

try (Publication publication = aeronArchive.addRecordedExclusivePublication(uriBuilder.build(), STREAM_ID))
{
assertEquals(startPosition, publication.position());

final CountersReader counters = aeron.countersReader();
final int counterId =
Tests.awaitRecordingCounterId(counters, publication.sessionId(), aeronArchive.archiveId());
final long recordingId = RecordingPos.getRecordingId(counters, counterId);

offerToPosition(publication, messagePrefix, targetPosition);
Tests.awaitPosition(counters, counterId, publication.position());

signalConsumer.reset();
aeronArchive.stopRecording(publication);
awaitSignal(aeronArchive, signalConsumer, recordingId, RecordingSignal.STOP);

final String prefix = recordingId + "-";
final String[] files = archive.context().archiveDir().list((dir, name) -> name.startsWith(prefix));
assertThat(files, arrayWithSize(4));

signalConsumer.reset();
final long deletedSegments = aeronArchive.deleteDetachedSegments(recordingId);
awaitSignal(aeronArchive, signalConsumer, recordingId, RecordingSignal.DELETE);
assertEquals(0, deletedSegments);
assertEquals(startPosition, aeronArchive.getStartPosition(recordingId));

final String[] updatedFiles =
archive.context().archiveDir().list((dir, name) -> name.startsWith(prefix));
assertThat(updatedFiles, is(files));
}
}

@Test
@InterruptAfter(10)
void shouldPurgeRecording() throws IOException
Expand Down

0 comments on commit 9cb04b5

Please sign in to comment.