Skip to content

Commit

Permalink
test(controller): commit the same stream set object or stream object (#…
Browse files Browse the repository at this point in the history
…1064)

Signed-off-by: Ning Yu <[email protected]>
  • Loading branch information
Chillax-0v0 authored Mar 31, 2024
1 parent 4f0ea12 commit b5fe31c
Showing 1 changed file with 92 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,12 @@
import org.mockito.Mockito;

import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static com.automq.stream.s3.metadata.ObjectUtils.NOOP_OBJECT_ID;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand Down Expand Up @@ -492,6 +494,96 @@ public void testCommitStreamSetObject_compactWithDeletedStream() {
assertEquals(1, ((RemoveStreamSetObjectRecord) records.get(6).message()).objectId());
}

@Test
public void testCommitStreamSetObject_theSameStreamSetObject() {
List<Long> committed = new LinkedList<>();
when(objectControlManager.commitObject(anyLong(), anyLong(), anyLong())).then(args -> {
long objectId = args.getArgument(0);
if (committed.contains(objectId)) {
return ControllerResult.of(Collections.emptyList(), Errors.REDUNDANT_OPERATION);
}
committed.add(objectId);
return ControllerResult.of(Collections.emptyList(), Errors.NONE);
});
registerAlwaysSuccessEpoch(BROKER0);

// 1. create and open stream_0
CreateStreamRequest request0 = new CreateStreamRequest();
ControllerResult<CreateStreamResponse> result0 = manager.createStream(BROKER0, BROKER_EPOCH0, request0);
replay(manager, result0.records());
ControllerResult<OpenStreamResponse> result2 = manager.openStream(BROKER0, 0,
new OpenStreamRequest().setStreamId(STREAM0).setStreamEpoch(EPOCH0));
verifyFirstTimeOpenStreamResult(result2, EPOCH0, BROKER0);
replay(manager, result2.records());

// 2. commit valid stream set object
List<ObjectStreamRange> streamRanges0 = List.of(new ObjectStreamRange()
.setStreamId(STREAM0)
.setStreamEpoch(EPOCH0)
.setStartOffset(0L)
.setEndOffset(100L));
CommitStreamSetObjectRequestData commitRequest0 = new CommitStreamSetObjectRequestData()
.setObjectId(0L)
.setNodeId(BROKER0)
.setObjectSize(999)
.setObjectStreamRanges(streamRanges0);
ControllerResult<CommitStreamSetObjectResponseData> result3 = manager.commitStreamSetObject(commitRequest0);
assertEquals(Errors.NONE.code(), result3.response().errorCode());
replay(manager, result3.records());

// 3. re-commit the same object
ControllerResult<CommitStreamSetObjectResponseData> result4 = manager.commitStreamSetObject(commitRequest0);
assertEquals(Errors.NONE.code(), result4.response().errorCode());
assertTrue(result4.records().isEmpty());
}

@Test
public void testCommitStreamSetObject_theSameStreamObject() {
List<Long> committed = new LinkedList<>();
when(objectControlManager.commitObject(anyLong(), anyLong(), anyLong())).then(args -> {
long objectId = args.getArgument(0);
if (objectId == NOOP_OBJECT_ID) {
return ControllerResult.of(Collections.emptyList(), Errors.NONE);
}
if (committed.contains(objectId)) {
return ControllerResult.of(Collections.emptyList(), Errors.REDUNDANT_OPERATION);
}
committed.add(objectId);
return ControllerResult.of(Collections.emptyList(), Errors.NONE);
});
registerAlwaysSuccessEpoch(BROKER0);

// 1. create and open stream_0
CreateStreamRequest request0 = new CreateStreamRequest();
ControllerResult<CreateStreamResponse> result0 = manager.createStream(BROKER0, BROKER_EPOCH0, request0);
replay(manager, result0.records());
ControllerResult<OpenStreamResponse> result2 = manager.openStream(BROKER0, 0,
new OpenStreamRequest().setStreamId(STREAM0).setStreamEpoch(EPOCH0));
verifyFirstTimeOpenStreamResult(result2, EPOCH0, BROKER0);
replay(manager, result2.records());

// 2. commit valid stream set object
List<StreamObject> streamObjects = List.of(new StreamObject()
.setStreamId(STREAM0)
.setObjectId(0L)
.setObjectSize(999)
.setStartOffset(0L)
.setEndOffset(100L));
CommitStreamSetObjectRequestData commitRequest0 = new CommitStreamSetObjectRequestData()
.setObjectId(-1L)
.setNodeId(BROKER0)
.setObjectSize(0)
.setStreamObjects(streamObjects);
ControllerResult<CommitStreamSetObjectResponseData> result3 = manager.commitStreamSetObject(commitRequest0);
assertEquals(Errors.NONE.code(), result3.response().errorCode());
replay(manager, result3.records());

// 3. re-commit the same object
ControllerResult<CommitStreamSetObjectResponseData> result4 = manager.commitStreamSetObject(commitRequest0);
assertEquals(Errors.NONE.code(), result4.response().errorCode());
assertTrue(result4.records().isEmpty());
}

private long createStream() {
CreateStreamRequest request0 = new CreateStreamRequest();
ControllerResult<CreateStreamResponse> result0 = manager.createStream(BROKER0, BROKER_EPOCH0, request0);
Expand Down

0 comments on commit b5fe31c

Please sign in to comment.