Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Archive]Enhance the ArchiveTool.delete-orphaned-segments method #1661

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 51 additions & 16 deletions aeron-archive/src/main/java/io/aeron/archive/ArchiveTool.java
Original file line number Diff line number Diff line change
Expand Up @@ -299,13 +299,26 @@ else if (args.length == 2 && "compact".equals(args[1]))
compact(out, archiveDir);
}
}
else if (args.length == 2 && "delete-orphaned-segments".equals(args[1]))
else if (args.length >= 2 && "delete-orphaned-segments".equals(args[1]))
{
out.print("WARNING: All orphaned segment files will be deleted.");

if (readContinueAnswer("Continue? (y/n)"))
if (args.length == 2)
{
deleteOrphanedSegments(out, archiveDir);
out.print("WARNING: All orphaned segment files will be deleted.");
if (readContinueAnswer("Continue? (y/n)"))
{
deleteOrphanedSegments(out, archiveDir, null);
}
}
else
{
final long recordingId = Long.parseLong(args[2]);
out.print("WARNING: All orphaned segment files owned by the RecordingId[");
out.print(recordingId);
out.print("] will be deleted.");
if (readContinueAnswer("Continue? (y/n)"))
{
deleteOrphanedSegments(out, archiveDir, recordingId);
}
}
}
else if (args.length == 3 && "mark-valid".equals(args[1]))
Expand Down Expand Up @@ -789,25 +802,46 @@ public static void compact(final PrintStream out, final File archiveDir)
*
* @param out stream to print results and errors to.
* @param archiveDir that contains {@link MarkFile}, {@link Catalog}, and recordings.
* @param targetRecordingId optional recordingId to delete orphaned segments for a specific recording.
* If null, delete orphaned segments for all recordings.
*/
public static void deleteOrphanedSegments(final PrintStream out, final File archiveDir)
public static void deleteOrphanedSegments(
final PrintStream out,
final File archiveDir,
final Long targetRecordingId)
{
deleteOrphanedSegments(out, archiveDir, INSTANCE);
deleteOrphanedSegments(out, archiveDir, INSTANCE, targetRecordingId);
}

static void deleteOrphanedSegments(final PrintStream out, final File archiveDir, final EpochClock epochClock)
static void deleteOrphanedSegments(
final PrintStream out,
final File archiveDir,
final EpochClock epochClock,
final Long targetRecordingId)
{
try (Catalog catalog = openCatalogReadOnly(archiveDir, epochClock))
{
final Long2ObjectHashMap<List<String>> segmentFilesByRecordingId = indexSegmentFiles(archiveDir);

catalog.forEach(
(recordingDescriptorOffset, headerEncoder, headerDecoder, descriptorEncoder, descriptorDecoder) ->
{
final long recordingId = descriptorDecoder.recordingId();
final List<String> files = segmentFilesByRecordingId.getOrDefault(recordingId, emptyList());
deleteOrphanedSegmentFiles(out, archiveDir, descriptorDecoder, files);
});
final CatalogEntryProcessor processor = (recordingDescriptorOffset,
headerEncoder,
headerDecoder,
descriptorEncoder,
descriptorDecoder) ->
{
final long recordingId = descriptorDecoder.recordingId();
final List<String> files = segmentFilesByRecordingId.getOrDefault(recordingId, emptyList());
deleteOrphanedSegmentFiles(out, archiveDir, descriptorDecoder, files);
};

if (targetRecordingId != null)
{
catalog.forEntry(targetRecordingId, processor);
}
else
{
catalog.forEach(processor);
}
}
}

Expand Down Expand Up @@ -1711,7 +1745,8 @@ private static void printHelp()
" compact: compacts Catalog file by removing entries in non-valid state and deleting the%n" +
" corresponding segment files.%n%n" +
" count-entries: queries the number of `VALID` recording entries in the catalog.%n%n" +
" delete-orphaned-segments: deletes orphaned recording segments that have been detached,%n" +
" delete-orphaned-segments [recordingId]: deletes orphaned recording segments that have been detached,%n" +
" If recordingId is specified, only delete orphaned segments for that recording.%n" +
" i.e. outside the start and stop recording range, but are not deleted.%n%n" +
" describe: prints out descriptors for all valid recordings in the catalog.%n%n" +
" describe recordingId: prints out descriptor for the specified recording entry in the catalog.%n%n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1409,7 +1409,7 @@ void deleteOrphanedSegmentsDeletesSegmentFilesForAllRecordings() throws IOExcept
final File file25 = createFile(segmentFileName(
rec2, segmentFileBasePosition(1_000_000, Long.MAX_VALUE, TERM_LENGTH, SEGMENT_LENGTH)));

deleteOrphanedSegments(out, archiveDir, epochClock);
deleteOrphanedSegments(out, archiveDir, epochClock, null);

assertFileExists(file12, file13, file15, file17);
assertFileDoesNotExist(file11, file14, file16);
Expand Down