Skip to content

Commit

Permalink
fix(s3stream): limit the expired objects delete count (#1050)
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored Mar 29, 2024
1 parent ee5d152 commit b651a01
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
* 2. Compact some stream objects with the same stream ID into bigger stream objects.
*/
public class StreamObjectCompactor {
public static final int EXPIRED_OBJECTS_CLEAN_UP_STEP = 1000;

/**
* max object count in one group, the group count will limit the compact request size to kraft and multipart object
* part count (less than {@code Writer.MAX_PART_COUNT}).
Expand Down Expand Up @@ -115,12 +117,20 @@ void compact0(boolean onlyCleanup) throws ExecutionException, InterruptedExcepti
// clean up the expired objects
if (!expiredObjects.isEmpty()) {
List<Long> compactedObjectIds = expiredObjects.stream().map(S3ObjectMetadata::objectId).collect(Collectors.toList());
request = new CompactStreamObjectRequest(NOOP_OBJECT_ID, 0,
streamId, stream.streamEpoch(), NOOP_OFFSET, NOOP_OFFSET, compactedObjectIds);
objectManager.compactStreamObject(request).get();
if (s3ObjectLogger.isTraceEnabled()) {
s3ObjectLogger.trace("{}", request);
int expiredObjectCount = compactedObjectIds.size();
// limit the expired objects compaction step to EXPIRED_OBJECTS_CLEAN_UP_STEP
for (int i = 0; i < expiredObjectCount; ) {
int start = i;
int end = Math.min(i + EXPIRED_OBJECTS_CLEAN_UP_STEP, expiredObjectCount);
request = new CompactStreamObjectRequest(NOOP_OBJECT_ID, 0,
streamId, stream.streamEpoch(), NOOP_OFFSET, NOOP_OFFSET, new ArrayList<>(compactedObjectIds.subList(start, end)));
objectManager.compactStreamObject(request).get();
if (s3ObjectLogger.isTraceEnabled()) {
s3ObjectLogger.trace("{}", request);
}
i = end;
}

}

if (onlyCleanup) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
Expand Down Expand Up @@ -300,6 +301,44 @@ public void testGroup() {
assertEquals(List.of(5L, 6L), groups.get(2).stream().map(S3ObjectMetadata::objectId).collect(Collectors.toList()));
}

@Test
public void testCleanup_byStep() throws ExecutionException, InterruptedException {
// prepare object
List<S3ObjectMetadata> objects = new LinkedList<>();
for (int i = 0; i < 1500; i++) {
ObjectWriter writer = ObjectWriter.writer(1, s3Operator, Integer.MAX_VALUE, Integer.MAX_VALUE);
writer.write(streamId, List.of(
newRecord(i, 1, 1)
));
writer.close().get();
objects.add(new S3ObjectMetadata(i, S3ObjectType.STREAM, List.of(new StreamOffsetRange(streamId, i, i + 1)),
System.currentTimeMillis(), System.currentTimeMillis(), writer.size(), 1));
}

when(objectManager.getStreamObjects(eq(streamId), eq(0L), eq(1500L), eq(Integer.MAX_VALUE)))
.thenReturn(CompletableFuture.completedFuture(objects));
AtomicLong nextObjectId = new AtomicLong(1501);
doAnswer(invocationOnMock -> CompletableFuture.completedFuture(nextObjectId.getAndIncrement())).when(objectManager).prepareObject(anyInt(), anyLong());
when(objectManager.compactStreamObject(any())).thenReturn(CompletableFuture.completedFuture(null));
when(stream.streamId()).thenReturn(streamId);
when(stream.startOffset()).thenReturn(1450L);
when(stream.confirmOffset()).thenReturn(1500L);

StreamObjectCompactor task = StreamObjectCompactor.builder().objectManager(objectManager).s3Operator(s3Operator)
.maxStreamObjectSize(1024 * 1024 * 1024).stream(stream).dataBlockGroupSizeThreshold(1).build();
task.cleanup();

ArgumentCaptor<CompactStreamObjectRequest> ac = ArgumentCaptor.forClass(CompactStreamObjectRequest.class);
verify(objectManager, times(2)).compactStreamObject(ac.capture());
CompactStreamObjectRequest clean = ac.getAllValues().get(0);
assertEquals(ObjectUtils.NOOP_OBJECT_ID, clean.getObjectId());
assertEquals(LongStream.range(0, StreamObjectCompactor.EXPIRED_OBJECTS_CLEAN_UP_STEP).boxed().collect(Collectors.toList()), clean.getSourceObjectIds());

clean = ac.getAllValues().get(1);
assertEquals(ObjectUtils.NOOP_OBJECT_ID, clean.getObjectId());
assertEquals(LongStream.range(StreamObjectCompactor.EXPIRED_OBJECTS_CLEAN_UP_STEP, 1450).boxed().collect(Collectors.toList()), clean.getSourceObjectIds());
}

StreamRecordBatch newRecord(long offset, int count, int payloadSize) {
return new StreamRecordBatch(streamId, 0, offset, count, TestUtils.random(payloadSize));
}
Expand Down

0 comments on commit b651a01

Please sign in to comment.