diff --git a/aeron-archive/src/main/java/io/aeron/archive/ArchiveConductor.java b/aeron-archive/src/main/java/io/aeron/archive/ArchiveConductor.java index de2907378d..864cd8c87b 100644 --- a/aeron-archive/src/main/java/io/aeron/archive/ArchiveConductor.java +++ b/aeron-archive/src/main/java/io/aeron/archive/ArchiveConductor.java @@ -609,6 +609,10 @@ void listRecording(final long correlationId, final long recordingId, final Contr final String msg = "active listing already in progress"; controlSession.sendErrorResponse(correlationId, ACTIVE_LISTING, msg); } + else if (!catalog.hasRecording(recordingId)) + { + controlSession.sendRecordingUnknown(correlationId, recordingId); + } else { final ListRecordingByIdSession session = diff --git a/aeron-archive/src/main/java/io/aeron/archive/ListRecordingByIdSession.java b/aeron-archive/src/main/java/io/aeron/archive/ListRecordingByIdSession.java index 67deb4af22..82794f2cfd 100644 --- a/aeron-archive/src/main/java/io/aeron/archive/ListRecordingByIdSession.java +++ b/aeron-archive/src/main/java/io/aeron/archive/ListRecordingByIdSession.java @@ -17,8 +17,15 @@ import org.agrona.concurrent.UnsafeBuffer; -class ListRecordingByIdSession extends AbstractListRecordingsSession +class ListRecordingByIdSession implements Session { + private final long correlationId; + private final long recordingId; + private final Catalog catalog; + private final ControlSession controlSession; + private final UnsafeBuffer descriptorBuffer; + private boolean isDone; + ListRecordingByIdSession( final long correlationId, final long recordingId, @@ -26,11 +33,68 @@ class ListRecordingByIdSession extends AbstractListRecordingsSession final ControlSession controlSession, final UnsafeBuffer descriptorBuffer) { - super(correlationId, recordingId, 1, catalog, controlSession, descriptorBuffer); + this.correlationId = correlationId; + this.recordingId = recordingId; + this.catalog = catalog; + this.controlSession = controlSession; + this.descriptorBuffer = descriptorBuffer; + } + + /** + * {@inheritDoc} + */ + public void abort(final String reason) + { + isDone = true; + } + + /** + * {@inheritDoc} + */ + public boolean isDone() + { + return isDone; + } + + /** + * {@inheritDoc} + */ + public int doWork() + { + if (isDone) + { + return 0; + } + + if (catalog.wrapDescriptor(recordingId, descriptorBuffer)) + { + if (controlSession.sendDescriptor(correlationId, descriptorBuffer)) + { + isDone = true; + } + } + else + { + controlSession.sendRecordingUnknown(correlationId, recordingId); + isDone = true; + } + + return 1; + } + + /** + * {@inheritDoc} + */ + public long sessionId() + { + return correlationId; } - boolean acceptDescriptor(final UnsafeBuffer descriptorBuffer) + /** + * {@inheritDoc} + */ + public void close() { - return true; + controlSession.activeListing(null); } } diff --git a/aeron-archive/src/test/java/io/aeron/archive/ListRecordingByIdSessionTest.java b/aeron-archive/src/test/java/io/aeron/archive/ListRecordingByIdSessionTest.java index aa911840a4..db45cec488 100644 --- a/aeron-archive/src/test/java/io/aeron/archive/ListRecordingByIdSessionTest.java +++ b/aeron-archive/src/test/java/io/aeron/archive/ListRecordingByIdSessionTest.java @@ -17,6 +17,7 @@ import io.aeron.archive.codecs.RecordingDescriptorDecoder; import io.aeron.archive.codecs.RecordingDescriptorHeaderDecoder; +import io.aeron.archive.codecs.RecordingState; import org.agrona.CloseHelper; import org.agrona.IoUtil; import org.agrona.concurrent.EpochClock; @@ -28,6 +29,7 @@ import java.io.File; +import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.*; class ListRecordingByIdSessionTest @@ -92,6 +94,79 @@ void shouldSendRecordingUnknownOnFirst() verifyNoMoreInteractions(controlSession); } + @Test + void shouldSendRecordingUnknownOnInvalidRecordingId() + { + final long correlationId = 42; + final long invalidatedRecordingId = recordingIds[1]; + assertTrue(catalog.changeState(invalidatedRecordingId, RecordingState.INVALID)); + + final ListRecordingByIdSession session = new ListRecordingByIdSession( + correlationId, invalidatedRecordingId, catalog, controlSession, descriptorBuffer); + + session.doWork(); + + verify(controlSession).sendRecordingUnknown(eq(correlationId), eq(invalidatedRecordingId)); + verifyNoMoreInteractions(controlSession); + } + + @Test + void shouldRetrySendingDescriptorUntilSuccess() + { + final long correlationId = 19; + final long recordingId = recordingIds[2]; + when(controlSession.sendDescriptor(eq(correlationId), any())).thenReturn(false, false, false, true); + + final ListRecordingByIdSession session = + new ListRecordingByIdSession(correlationId, recordingId, catalog, controlSession, descriptorBuffer); + + while (!session.isDone()) + { + assertEquals(1, session.doWork()); + } + assertTrue(session.isDone()); + + assertEquals(0, session.doWork()); + + verify(controlSession, times(4)).sendDescriptor(eq(correlationId), any()); + verifyNoMoreInteractions(controlSession); + } + + @Test + void shouldSendRecordingDescriptorUnknownIfRecordingIsInvalidateBetweenRetries() + { + final long correlationId = 19; + final long recordingId = recordingIds[2]; + when(controlSession.sendDescriptor(eq(correlationId), any())).thenReturn(false); + + final ListRecordingByIdSession session = + new ListRecordingByIdSession(correlationId, recordingId, catalog, controlSession, descriptorBuffer); + + assertEquals(1, session.doWork()); + assertFalse(session.isDone()); + + assertTrue(catalog.changeState(recordingId, RecordingState.INVALID)); + + assertEquals(1, session.doWork()); + assertTrue(session.isDone()); + + verify(controlSession).sendDescriptor(eq(correlationId), any()); + verify(controlSession).sendRecordingUnknown(eq(correlationId), eq(recordingId)); + verifyNoMoreInteractions(controlSession); + } + + @Test + void shouldCloseActiveListing() + { + final ListRecordingByIdSession session = + new ListRecordingByIdSession(1, 111, catalog, controlSession, descriptorBuffer); + + session.close(); + + verify(controlSession).activeListing(null); + verifyNoMoreInteractions(controlSession); + } + private Answer verifySendDescriptor() { return (invocation) -> diff --git a/aeron-system-tests/src/test/java/io/aeron/archive/BasicArchiveTest.java b/aeron-system-tests/src/test/java/io/aeron/archive/BasicArchiveTest.java index f6a2cc4a27..cc89629a27 100644 --- a/aeron-system-tests/src/test/java/io/aeron/archive/BasicArchiveTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/archive/BasicArchiveTest.java @@ -791,4 +791,57 @@ void shouldErrorReplayFileIoMaxLengthLessThanMtu() assertThat(error, Matchers.containsString("mtuLength")); assertThat(error, Matchers.containsString("fileIoMaxLength")); } + + @Test + void shouldNotListRecordingThatWasPurged() + { + final RecordingResult recording1 = recordData(aeronArchive); + final RecordingResult recording2 = recordData(aeronArchive); + final RecordingResult recording3 = recordData(aeronArchive); + + final RecordingDescriptorCollector collector = new RecordingDescriptorCollector(3); + assertEquals(3, aeronArchive.listRecordings(NULL_VALUE, 100, collector.reset())); + assertEquals(recording1.recordingId, collector.descriptors().get(0).recordingId()); + assertEquals(recording2.recordingId, collector.descriptors().get(1).recordingId()); + assertEquals(recording3.recordingId, collector.descriptors().get(2).recordingId()); + + final RecordingDescriptor descriptor = collector.descriptors().get(0); + final ChannelUri channelUri = ChannelUri.parse(descriptor.originalChannel()); + channelUri.remove(CommonContext.SESSION_ID_PARAM_NAME); + final int streamId = descriptor.streamId(); + assertEquals(3, aeronArchive.listRecordingsForUri( + NULL_VALUE, 100, channelUri.toString(), streamId, collector.reset())); + assertEquals(recording1.recordingId, collector.descriptors().get(0).recordingId()); + assertEquals(recording2.recordingId, collector.descriptors().get(1).recordingId()); + assertEquals(recording3.recordingId, collector.descriptors().get(2).recordingId()); + + assertEquals(1, aeronArchive.listRecording(recording1.recordingId, collector.reset())); + assertEquals(recording1.recordingId, collector.descriptors().get(0).recordingId()); + + assertEquals(1, aeronArchive.listRecording(recording2.recordingId, collector.reset())); + assertEquals(recording2.recordingId, collector.descriptors().get(0).recordingId()); + + assertEquals(1, aeronArchive.listRecording(recording3.recordingId, collector.reset())); + assertEquals(recording3.recordingId, collector.descriptors().get(0).recordingId()); + + assertNotEquals(0, aeronArchive.purgeRecording(recording2.recordingId)); + + assertEquals(1, aeronArchive.listRecording(recording1.recordingId, collector.reset())); + assertEquals(recording1.recordingId, collector.descriptors().get(0).recordingId()); + + assertEquals(0, aeronArchive.listRecording(recording2.recordingId, collector.reset())); + assertThat(collector.descriptors(), Matchers.hasSize(0)); + + assertEquals(1, aeronArchive.listRecording(recording3.recordingId, collector.reset())); + assertEquals(recording3.recordingId, collector.descriptors().get(0).recordingId()); + + assertEquals(2, aeronArchive.listRecordings(NULL_VALUE, 100, collector.reset())); + assertEquals(recording1.recordingId, collector.descriptors().get(0).recordingId()); + assertEquals(recording3.recordingId, collector.descriptors().get(1).recordingId()); + + assertEquals(2, aeronArchive.listRecordingsForUri( + NULL_VALUE, 100, channelUri.toString(), streamId, collector.reset())); + assertEquals(recording1.recordingId, collector.descriptors().get(0).recordingId()); + assertEquals(recording3.recordingId, collector.descriptors().get(1).recordingId()); + } }