diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java index f2628fe632..a68b78f1e6 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java @@ -17,6 +17,7 @@ package org.apache.kafka.image; +import com.automq.stream.s3.metadata.ObjectUtils; import com.automq.stream.s3.metadata.S3ObjectMetadata; import com.automq.stream.s3.metadata.S3ObjectType; import com.automq.stream.s3.metadata.StreamOffsetRange; @@ -72,8 +73,19 @@ public void write(ImageWriter writer, ImageWriterOptions options) { nodeStreamSetObjectMetadata.forEach((k, v) -> v.write(writer, options)); } + /** + * Get objects in range [startOffset, endOffset) with limit. + * + * @param streamId stream id + * @param startOffset inclusive start offset of the stream + * @param endOffset exclusive end offset of the stream. + * NOTE: NOOP_OFFSET means to retrieve as many objects as it's within the limit. + * @param limit max number of s3 objects to return + * @return s3 objects within the range + */ + @SuppressWarnings({"checkstyle:cyclomaticcomplexity", "checkstyle:npathcomplexity"}) public InRangeObjects getObjects(long streamId, long startOffset, long endOffset, int limit) { - if (streamId < 0 || startOffset > endOffset || limit < 0) { + if (streamId < 0 || limit < 0 || (endOffset != ObjectUtils.NOOP_OFFSET && startOffset > endOffset)) { return InRangeObjects.INVALID; } S3StreamMetadataImage stream = streamsMetadata.get(streamId); @@ -110,7 +122,7 @@ public InRangeObjects getObjects(long streamId, long startOffset, long endOffset } objects.add(streamObject.toMetadata()); nextStartOffset = streamObject.endOffset(); - if (objects.size() >= limit || nextStartOffset >= endOffset) { + if (objects.size() >= limit || (endOffset != ObjectUtils.NOOP_OFFSET && nextStartOffset >= endOffset)) { return new InRangeObjects(streamId, objects); } } @@ -141,7 +153,7 @@ public InRangeObjects getObjects(long streamId, long startOffset, long endOffset objects.add(new S3ObjectMetadata(streamSetObject.objectId(), S3ObjectType.STREAM_SET, List.of(streamOffsetRange), streamSetObject.dataTimeInMs())); nextStartOffset = streamOffsetRange.endOffset(); - if (objects.size() >= limit || nextStartOffset >= endOffset) { + if (objects.size() >= limit || (endOffset != ObjectUtils.NOOP_OFFSET && nextStartOffset >= endOffset)) { return new InRangeObjects(streamId, objects); } } else { diff --git a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java index e9554f55ff..0bb7c1f66c 100644 --- a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.image; +import com.automq.stream.s3.metadata.ObjectUtils; import com.automq.stream.s3.metadata.S3ObjectMetadata; import com.automq.stream.s3.metadata.S3StreamConstant; import com.automq.stream.s3.metadata.StreamOffsetRange; @@ -192,6 +193,12 @@ public void testGetObjects() { assertEquals(400L, objects.endOffset()); assertEquals(7, objects.objects().size()); assertEquals(expectedObjectIds.subList(1, 8), objects.objects().stream().map(S3ObjectMetadata::objectId).collect(Collectors.toList())); + + objects = streamsImage.getObjects(STREAM0, 10, ObjectUtils.NOOP_OFFSET, 9); + assertEquals(10, objects.startOffset()); + assertEquals(420, objects.endOffset()); + assertEquals(9, objects.objects().size()); + assertEquals(List.of(8L, 0L, 1L, 5L, 6L, 2L, 9L, 10L, 3L), objects.objects().stream().map(S3ObjectMetadata::objectId).collect(Collectors.toList())); } /**