Skip to content

Commit

Permalink
[Java] Fix listRecording to not return anything is recording is pur…
Browse files Browse the repository at this point in the history
…ged/invalidated.
  • Loading branch information
vyazelenko committed Jan 30, 2025
1 parent 6fb1090 commit 35bfe4d
Show file tree
Hide file tree
Showing 4 changed files with 200 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,10 @@ void listRecording(final long correlationId, final long recordingId, final Contr
final String msg = "active listing already in progress";
controlSession.sendErrorResponse(correlationId, ACTIVE_LISTING, msg);
}
else if (!catalog.hasRecording(recordingId))
{
controlSession.sendRecordingUnknown(correlationId, recordingId);
}
else
{
final ListRecordingByIdSession session =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,84 @@

import org.agrona.concurrent.UnsafeBuffer;

class ListRecordingByIdSession extends AbstractListRecordingsSession
class ListRecordingByIdSession implements Session
{
private final long correlationId;
private final long recordingId;
private final Catalog catalog;
private final ControlSession controlSession;
private final UnsafeBuffer descriptorBuffer;
private boolean isDone;

ListRecordingByIdSession(
final long correlationId,
final long recordingId,
final Catalog catalog,
final ControlSession controlSession,
final UnsafeBuffer descriptorBuffer)
{
super(correlationId, recordingId, 1, catalog, controlSession, descriptorBuffer);
this.correlationId = correlationId;
this.recordingId = recordingId;
this.catalog = catalog;
this.controlSession = controlSession;
this.descriptorBuffer = descriptorBuffer;
}

/**
* {@inheritDoc}
*/
public void abort(final String reason)
{
isDone = true;
}

/**
* {@inheritDoc}
*/
public boolean isDone()
{
return isDone;
}

/**
* {@inheritDoc}
*/
public int doWork()
{
if (isDone)
{
return 0;
}

if (catalog.wrapDescriptor(recordingId, descriptorBuffer))
{
if (controlSession.sendDescriptor(correlationId, descriptorBuffer))
{
isDone = true;
}
}
else
{
controlSession.sendRecordingUnknown(correlationId, recordingId);
isDone = true;
}

return 1;
}

/**
* {@inheritDoc}
*/
public long sessionId()
{
return correlationId;
}

boolean acceptDescriptor(final UnsafeBuffer descriptorBuffer)
/**
* {@inheritDoc}
*/
public void close()
{
return true;
controlSession.activeListing(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.aeron.archive.codecs.RecordingDescriptorDecoder;
import io.aeron.archive.codecs.RecordingDescriptorHeaderDecoder;
import io.aeron.archive.codecs.RecordingState;
import org.agrona.CloseHelper;
import org.agrona.IoUtil;
import org.agrona.concurrent.EpochClock;
Expand All @@ -28,6 +29,7 @@

import java.io.File;

import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.*;

class ListRecordingByIdSessionTest
Expand Down Expand Up @@ -92,6 +94,79 @@ void shouldSendRecordingUnknownOnFirst()
verifyNoMoreInteractions(controlSession);
}

@Test
void shouldSendRecordingUnknownOnInvalidRecordingId()
{
final long correlationId = 42;
final long invalidatedRecordingId = recordingIds[1];
assertTrue(catalog.changeState(invalidatedRecordingId, RecordingState.INVALID));

final ListRecordingByIdSession session = new ListRecordingByIdSession(
correlationId, invalidatedRecordingId, catalog, controlSession, descriptorBuffer);

session.doWork();

verify(controlSession).sendRecordingUnknown(eq(correlationId), eq(invalidatedRecordingId));
verifyNoMoreInteractions(controlSession);
}

@Test
void shouldRetrySendingDescriptorUntilSuccess()
{
final long correlationId = 19;
final long recordingId = recordingIds[2];
when(controlSession.sendDescriptor(eq(correlationId), any())).thenReturn(false, false, false, true);

final ListRecordingByIdSession session =
new ListRecordingByIdSession(correlationId, recordingId, catalog, controlSession, descriptorBuffer);

while (!session.isDone())
{
assertEquals(1, session.doWork());
}
assertTrue(session.isDone());

assertEquals(0, session.doWork());

verify(controlSession, times(4)).sendDescriptor(eq(correlationId), any());
verifyNoMoreInteractions(controlSession);
}

@Test
void shouldSendRecordingDescriptorUnknownIfRecordingIsInvalidateBetweenRetries()
{
final long correlationId = 19;
final long recordingId = recordingIds[2];
when(controlSession.sendDescriptor(eq(correlationId), any())).thenReturn(false);

final ListRecordingByIdSession session =
new ListRecordingByIdSession(correlationId, recordingId, catalog, controlSession, descriptorBuffer);

assertEquals(1, session.doWork());
assertFalse(session.isDone());

assertTrue(catalog.changeState(recordingId, RecordingState.INVALID));

assertEquals(1, session.doWork());
assertTrue(session.isDone());

verify(controlSession).sendDescriptor(eq(correlationId), any());
verify(controlSession).sendRecordingUnknown(eq(correlationId), eq(recordingId));
verifyNoMoreInteractions(controlSession);
}

@Test
void shouldCloseActiveListing()
{
final ListRecordingByIdSession session =
new ListRecordingByIdSession(1, 111, catalog, controlSession, descriptorBuffer);

session.close();

verify(controlSession).activeListing(null);
verifyNoMoreInteractions(controlSession);
}

private Answer<Object> verifySendDescriptor()
{
return (invocation) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -791,4 +791,57 @@ void shouldErrorReplayFileIoMaxLengthLessThanMtu()
assertThat(error, Matchers.containsString("mtuLength"));
assertThat(error, Matchers.containsString("fileIoMaxLength"));
}

@Test
void shouldNotListRecordingThatWasPurged()
{
final RecordingResult recording1 = recordData(aeronArchive);
final RecordingResult recording2 = recordData(aeronArchive);
final RecordingResult recording3 = recordData(aeronArchive);

final RecordingDescriptorCollector collector = new RecordingDescriptorCollector(3);
assertEquals(3, aeronArchive.listRecordings(NULL_VALUE, 100, collector.reset()));
assertEquals(recording1.recordingId, collector.descriptors().get(0).recordingId());
assertEquals(recording2.recordingId, collector.descriptors().get(1).recordingId());
assertEquals(recording3.recordingId, collector.descriptors().get(2).recordingId());

final RecordingDescriptor descriptor = collector.descriptors().get(0);
final ChannelUri channelUri = ChannelUri.parse(descriptor.originalChannel());
channelUri.remove(CommonContext.SESSION_ID_PARAM_NAME);
final int streamId = descriptor.streamId();
assertEquals(3, aeronArchive.listRecordingsForUri(
NULL_VALUE, 100, channelUri.toString(), streamId, collector.reset()));
assertEquals(recording1.recordingId, collector.descriptors().get(0).recordingId());
assertEquals(recording2.recordingId, collector.descriptors().get(1).recordingId());
assertEquals(recording3.recordingId, collector.descriptors().get(2).recordingId());

assertEquals(1, aeronArchive.listRecording(recording1.recordingId, collector.reset()));
assertEquals(recording1.recordingId, collector.descriptors().get(0).recordingId());

assertEquals(1, aeronArchive.listRecording(recording2.recordingId, collector.reset()));
assertEquals(recording2.recordingId, collector.descriptors().get(0).recordingId());

assertEquals(1, aeronArchive.listRecording(recording3.recordingId, collector.reset()));
assertEquals(recording3.recordingId, collector.descriptors().get(0).recordingId());

assertNotEquals(0, aeronArchive.purgeRecording(recording2.recordingId));

assertEquals(1, aeronArchive.listRecording(recording1.recordingId, collector.reset()));
assertEquals(recording1.recordingId, collector.descriptors().get(0).recordingId());

assertEquals(0, aeronArchive.listRecording(recording2.recordingId, collector.reset()));
assertThat(collector.descriptors(), Matchers.hasSize(0));

assertEquals(1, aeronArchive.listRecording(recording3.recordingId, collector.reset()));
assertEquals(recording3.recordingId, collector.descriptors().get(0).recordingId());

assertEquals(2, aeronArchive.listRecordings(NULL_VALUE, 100, collector.reset()));
assertEquals(recording1.recordingId, collector.descriptors().get(0).recordingId());
assertEquals(recording3.recordingId, collector.descriptors().get(1).recordingId());

assertEquals(2, aeronArchive.listRecordingsForUri(
NULL_VALUE, 100, channelUri.toString(), streamId, collector.reset()));
assertEquals(recording1.recordingId, collector.descriptors().get(0).recordingId());
assertEquals(recording3.recordingId, collector.descriptors().get(1).recordingId());
}
}

0 comments on commit 35bfe4d

Please sign in to comment.