diff --git a/core/src/main/scala/kafka/log/s3/MinorCompactTask.java b/core/src/main/scala/kafka/log/s3/MinorCompactTask.java new file mode 100644 index 0000000000..7250848096 --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/MinorCompactTask.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3; + +import kafka.log.s3.model.StreamRecordBatch; +import kafka.log.s3.objects.CommitCompactObjectRequest; +import kafka.log.s3.objects.ObjectManager; +import kafka.log.s3.objects.ObjectStreamRange; +import kafka.log.s3.objects.StreamObject; +import kafka.log.s3.operator.S3Operator; +import org.apache.kafka.common.utils.ThreadUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; + +class MinorCompactTask implements Runnable { + private static final Logger LOGGER = LoggerFactory.getLogger(MinorCompactTask.class); + private static final long NOOP_TIMESTAMP = -1L; + private final long compactSizeThreshold; + private final long maxCompactInterval; + private final int streamSplitSizeThreshold; + private final BlockingQueue waitingCompactRecords = new LinkedBlockingQueue<>(); + private final AtomicLong waitingCompactRecordsBytesSize = new AtomicLong(); + private volatile long lastCompactTimestamp = NOOP_TIMESTAMP; + private final ObjectManager objectManager; + private final S3Operator s3Operator; + private final ScheduledExecutorService schedule = Executors.newSingleThreadScheduledExecutor( + ThreadUtils.createThreadFactory("minor compact", true)); + + public MinorCompactTask(long compactSizeThreshold, long maxCompactInterval, int streamSplitSizeThreshold, ObjectManager objectManager, S3Operator s3Operator) { + this.compactSizeThreshold = compactSizeThreshold; + this.maxCompactInterval = maxCompactInterval; + this.streamSplitSizeThreshold = streamSplitSizeThreshold; + // TODO: close + schedule.scheduleAtFixedRate(this, 1, 1, TimeUnit.SECONDS); + this.objectManager = objectManager; + this.s3Operator = s3Operator; + } + + public void tryCompact(MinorCompactPart part) { + // TODO: back pressure + if (lastCompactTimestamp == NOOP_TIMESTAMP) { + lastCompactTimestamp = System.currentTimeMillis(); + } + waitingCompactRecords.add(part); + if (waitingCompactRecordsBytesSize.addAndGet(part.size) >= compactSizeThreshold) { + schedule.execute(() -> tryCompact0(false)); + } + } + + @Override + public void run() { + tryCompact0(false); + } + + public void close() { + try { + schedule.submit(() -> tryCompact0(true)).get(); + schedule.shutdown(); + } catch (Throwable e) { + LOGGER.error("minor compact fail", e); + } + } + + private void tryCompact0(boolean force) { + long now = System.currentTimeMillis(); + boolean timeout = lastCompactTimestamp != NOOP_TIMESTAMP && (now - lastCompactTimestamp) >= maxCompactInterval; + boolean sizeExceed = waitingCompactRecordsBytesSize.get() >= compactSizeThreshold; + if (!force && !sizeExceed && !timeout) { + return; + } + try { + List parts = new ArrayList<>(waitingCompactRecords.size()); + + waitingCompactRecords.drainTo(parts); + lastCompactTimestamp = now; + waitingCompactRecordsBytesSize.getAndAdd(-parts.stream().mapToLong(r -> r.size).sum()); + if (parts.isEmpty()) { + return; + } + + CommitCompactObjectRequest compactRequest = new CommitCompactObjectRequest(); + compactRequest.setCompactedObjectIds(parts.stream().map(p -> p.walObjectId).collect(Collectors.toList())); + + long objectId = objectManager.prepareObject(1, TimeUnit.SECONDS.toMillis(30)).get(); + ObjectWriter minorCompactObject = new ObjectWriter(objectId, s3Operator); + + List> streamObjectCfList = new LinkedList<>(); + List> streamRecordsList = sortAndSplit(parts); + for (List streamRecords : streamRecordsList) { + long streamSize = streamRecords.stream().mapToLong(r -> r.getRecordBatch().rawPayload().remaining()).sum(); + if (streamSize >= streamSplitSizeThreshold) { + streamObjectCfList.add(writeStreamObject(streamRecords)); + } else { + for (StreamRecordBatch record : streamRecords) { + minorCompactObject.write(record); + } + long streamId = streamRecords.get(0).getStreamId(); + long startOffset = streamRecords.get(0).getBaseOffset(); + long endOffset = streamRecords.get(streamRecords.size() - 1).getLastOffset(); + compactRequest.addStreamRange(new ObjectStreamRange(streamId, -1L, startOffset, endOffset)); + // minor compact object block only contain single stream's data. + minorCompactObject.closeCurrentBlock(); + } + } + minorCompactObject.close().get(); + + compactRequest.setObjectId(objectId); + compactRequest.setObjectSize(minorCompactObject.size()); + + CompletableFuture.allOf(streamObjectCfList.toArray(new CompletableFuture[0])).get(); + for (CompletableFuture cf : streamObjectCfList) { + compactRequest.addStreamObject(cf.get()); + } + + objectManager.commitMinorCompactObject(compactRequest).get(); + } catch (Throwable e) { + //TODO: handle exception, only expect fail when quit. + LOGGER.error("minor compact fail", e); + } + + } + + private CompletableFuture writeStreamObject(List streamRecords) { + CompletableFuture objectIdCf = objectManager.prepareObject(1, TimeUnit.MINUTES.toMillis(30)); + return objectIdCf.thenCompose(objectId -> { + ObjectWriter streamObjectWriter = new ObjectWriter(objectId, s3Operator); + for (StreamRecordBatch record : streamRecords) { + streamObjectWriter.write(record); + } + long streamId = streamRecords.get(0).getStreamId(); + long startOffset = streamRecords.get(0).getBaseOffset(); + long endOffset = streamRecords.get(streamRecords.size() - 1).getLastOffset(); + StreamObject streamObject = new StreamObject(); + streamObject.setObjectId(objectId); + streamObject.setStreamId(streamId); + streamObject.setStartOffset(startOffset); + streamObject.setEndOffset(endOffset); + return streamObjectWriter.close().thenApply(nil -> { + streamObject.setObjectSize(streamObjectWriter.size()); + return streamObject; + }); + }); + } + + /** + * Sort records and split them in (stream, epoch) dimension. + * ex. + * part0: s1-e0-m1 s1-e0-m2 s2-e0-m1 s2-e0-m2 + * part1: s1-e0-m3 s1-e0-m4 + * part2: s1-e1-m10 s1-e1-m11 + * after split: + * list0: s1-e0-m1 s1-e0-m2 s1-e0-m3 s1-e0-m4 + * list1: s1-e1-m10 s1-e1-m11 + * list2: s2-e0-m1 s2-e0-m3 + */ + private List> sortAndSplit(List parts) { + int count = parts.stream().mapToInt(p -> p.records.size()).sum(); + // TODO: more efficient sort + List sortedList = new ArrayList<>(count); + for (MinorCompactPart part : parts) { + sortedList.addAll(part.records); + } + Collections.sort(sortedList); + List> streamRecordsList = new ArrayList<>(1024); + long streamId = -1L; + long epoch = -1L; + List streamRecords = null; + for (StreamRecordBatch record : sortedList) { + long recordStreamId = record.getStreamId(); + long recordEpoch = record.getEpoch(); + if (recordStreamId != streamId || recordEpoch != epoch) { + if (streamRecords != null) { + streamRecordsList.add(streamRecords); + } + streamRecords = new LinkedList<>(); + streamId = recordStreamId; + epoch = recordEpoch; + } + if (streamRecords != null) { + streamRecords.add(record); + } + } + if (streamRecords != null) { + streamRecordsList.add(streamRecords); + } + return streamRecordsList; + } + + + static class MinorCompactPart { + long walObjectId; + List records; + long size; + + public MinorCompactPart(long walObjectId, List records) { + this.walObjectId = walObjectId; + this.records = new ArrayList<>(records); + this.size = records.stream().mapToLong(r -> r.getRecordBatch().rawPayload().remaining()).sum(); + } + } +} diff --git a/core/src/main/scala/kafka/log/s3/ObjectWriter.java b/core/src/main/scala/kafka/log/s3/ObjectWriter.java index 62833973c9..891f94a723 100644 --- a/core/src/main/scala/kafka/log/s3/ObjectWriter.java +++ b/core/src/main/scala/kafka/log/s3/ObjectWriter.java @@ -24,6 +24,7 @@ import kafka.log.s3.objects.ObjectStreamRange; import kafka.log.s3.operator.S3Operator; import kafka.log.s3.operator.Writer; +import kafka.log.s3.utils.ObjectUtils; import org.apache.kafka.common.compress.ZstdFactory; import org.apache.kafka.common.utils.ByteBufferOutputStream; @@ -35,23 +36,22 @@ // TODO: memory optimization public class ObjectWriter { - private int blockSizeThreshold; - private int partSizeThreshold; - private final S3Operator s3Operator; + private final int blockSizeThreshold; + private final int partSizeThreshold; private final List waitingUploadBlocks; private final List completedBlocks; private IndexBlock indexBlock; private final Writer writer; - private final String objectKey; + private final long objectId; private long nextDataBlockPosition; private long size; private DataBlock dataBlock; - public ObjectWriter(String objectKey, S3Operator s3Operator, int blockSizeThreshold, int partSizeThreshold) { - this.objectKey = objectKey; - this.s3Operator = s3Operator; + public ObjectWriter(long objectId, S3Operator s3Operator, int blockSizeThreshold, int partSizeThreshold) { + this.objectId = objectId; + String objectKey = ObjectUtils.genKey(0, "todocluster", objectId); this.blockSizeThreshold = blockSizeThreshold; this.partSizeThreshold = partSizeThreshold; waitingUploadBlocks = new LinkedList<>(); @@ -59,8 +59,8 @@ public ObjectWriter(String objectKey, S3Operator s3Operator, int blockSizeThresh writer = s3Operator.writer(objectKey); } - public ObjectWriter(String objectKey, S3Operator s3Operator) { - this(objectKey, s3Operator, 16 * 1024 * 1024, 32 * 1024 * 1024); + public ObjectWriter(long objectId, S3Operator s3Operator) { + this(objectId, s3Operator, 16 * 1024 * 1024, 32 * 1024 * 1024); } public void write(StreamRecordBatch record) { @@ -75,6 +75,13 @@ public void write(StreamRecordBatch record) { } } + public void closeCurrentBlock() { + if (dataBlock != null) { + dataBlock.close(); + dataBlock = null; + } + } + private void tryUploadPart() { long waitingUploadSize = waitingUploadBlocks.stream().mapToLong(DataBlock::size).sum(); if (waitingUploadSize >= partSizeThreshold) { @@ -127,6 +134,10 @@ public List getStreamRanges() { return streamRanges; } + public long objectId() { + return objectId; + } + public long size() { return size; } diff --git a/core/src/main/scala/kafka/log/s3/S3Wal.java b/core/src/main/scala/kafka/log/s3/S3Wal.java index 1a3b207720..f2b375ef04 100644 --- a/core/src/main/scala/kafka/log/s3/S3Wal.java +++ b/core/src/main/scala/kafka/log/s3/S3Wal.java @@ -40,6 +40,7 @@ public class S3Wal implements Wal { private final int batchIntervalMs = 200; private final BlockingQueue writeBuffer; private final WalBatchWriteTask walBatchWriteTask; + private final MinorCompactTask minorCompactTask; private final ObjectManager objectManager; private final S3Operator s3Operator; @@ -47,6 +48,7 @@ public class S3Wal implements Wal { public S3Wal(ObjectManager objectManager, S3Operator s3Operator) { writeBuffer = new ArrayBlockingQueue<>(16384); walBatchWriteTask = new WalBatchWriteTask(objectManager, s3Operator); + minorCompactTask = new MinorCompactTask(5L * 1024 * 1024 * 1024, 60, 16 * 1024 * 1024, objectManager, s3Operator); this.objectManager = objectManager; this.s3Operator = s3Operator; } @@ -59,7 +61,7 @@ public void close() { @Override public CompletableFuture append(StreamRecordBatch streamRecord) { CompletableFuture cf = new CompletableFuture<>(); - //TODO: copy to pooled bytebuffer to reduce gc + //TODO: copy to pooled bytebuffer to reduce gc, convert to flat record try { writeBuffer.put(new WalWriteRequest(streamRecord, cf)); } catch (InterruptedException e) { @@ -124,6 +126,7 @@ void tryComplete() { if (task.isDone()) { writeTasks.poll(); task.ack(); + minorCompactTask.tryCompact(new MinorCompactTask.MinorCompactPart(task.objectId(), task.records())); } else { return; } diff --git a/core/src/main/scala/kafka/log/s3/SingleWalObjectWriteTask.java b/core/src/main/scala/kafka/log/s3/SingleWalObjectWriteTask.java index 7a7fa69f8e..c2406b9a01 100644 --- a/core/src/main/scala/kafka/log/s3/SingleWalObjectWriteTask.java +++ b/core/src/main/scala/kafka/log/s3/SingleWalObjectWriteTask.java @@ -19,11 +19,11 @@ import com.automq.elasticstream.client.api.ElasticStreamClientException; import com.automq.elasticstream.client.flatc.header.ErrorCode; +import kafka.log.s3.model.StreamRecordBatch; import kafka.log.s3.objects.CommitWalObjectRequest; import kafka.log.s3.objects.CommitWalObjectResponse; import kafka.log.s3.objects.ObjectManager; import kafka.log.s3.operator.S3Operator; -import kafka.log.s3.utils.ObjectUtils; import java.util.Collections; import java.util.HashSet; @@ -31,6 +31,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; public class SingleWalObjectWriteTask { private final List requests; @@ -40,6 +41,7 @@ public class SingleWalObjectWriteTask { private CommitWalObjectResponse response; private volatile boolean isDone = false; + public SingleWalObjectWriteTask(List records, ObjectManager objectManager, S3Operator s3Operator) { Collections.sort(records); this.requests = records; @@ -57,8 +59,7 @@ public CompletableFuture upload() { CompletableFuture writeCf = objectIdCf.thenCompose(objectId -> { context.objectId = objectId; // TODO: fill cluster name - String objectKey = ObjectUtils.genKey(0, "todocluster", objectId); - objectWriter = new ObjectWriter(objectKey, s3Operator); + objectWriter = new ObjectWriter(objectId, s3Operator); for (WalWriteRequest request : requests) { objectWriter.write(request.record); } @@ -95,6 +96,14 @@ public void ack() { } } + public long objectId() { + return objectWriter.objectId(); + } + + public List records() { + return requests.stream().map(r -> r.record).collect(Collectors.toList()); + } + static class UploadContext { long objectId; } diff --git a/core/src/main/scala/kafka/log/s3/cache/DefaultS3BlockCache.java b/core/src/main/scala/kafka/log/s3/cache/DefaultS3BlockCache.java index b18c7aecd5..fd82c6f8f2 100644 --- a/core/src/main/scala/kafka/log/s3/cache/DefaultS3BlockCache.java +++ b/core/src/main/scala/kafka/log/s3/cache/DefaultS3BlockCache.java @@ -54,6 +54,7 @@ private CompletableFuture read0(long streamId, long endOffset, Re context.objects = objectManager.getObjects(streamId, context.nextStartOffset, endOffset, 2); context.objectIndex = 0; } + // TODO: handle minor/major compact object may contain non-continuous stream records. // previous object is completed read o or is the first object. context.reader = new ObjectReader(context.objects.get(context.objectIndex), s3Operator); context.objectIndex++; diff --git a/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java b/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java index c8c6fe1afb..82a041a149 100644 --- a/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java +++ b/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java @@ -181,7 +181,7 @@ private boolean verifyWalStreamRanges(ObjectStreamRange range) { @Override public CompletableFuture commitMinorCompactObject(CommitCompactObjectRequest request) { - return null; + return CompletableFuture.completedFuture(null); } @Override diff --git a/core/src/main/scala/kafka/log/s3/model/StreamRecordBatch.java b/core/src/main/scala/kafka/log/s3/model/StreamRecordBatch.java index 809090fd81..b88135fe69 100644 --- a/core/src/main/scala/kafka/log/s3/model/StreamRecordBatch.java +++ b/core/src/main/scala/kafka/log/s3/model/StreamRecordBatch.java @@ -58,6 +58,10 @@ public int compareTo(StreamRecordBatch o) { if (rst != 0) { return rst; } + rst = Long.compare(epoch, o.epoch); + if (rst != 0) { + return rst; + } return Long.compare(baseOffset, o.baseOffset); } } diff --git a/core/src/main/scala/kafka/log/s3/objects/CommitCompactObjectRequest.java b/core/src/main/scala/kafka/log/s3/objects/CommitCompactObjectRequest.java index aab72f9ad3..435de8b7f5 100644 --- a/core/src/main/scala/kafka/log/s3/objects/CommitCompactObjectRequest.java +++ b/core/src/main/scala/kafka/log/s3/objects/CommitCompactObjectRequest.java @@ -17,6 +17,7 @@ package kafka.log.s3.objects; +import java.util.LinkedList; import java.util.List; public class CommitCompactObjectRequest { @@ -61,6 +62,13 @@ public void setStreamObjects(List streamObjects) { this.streamObjects = streamObjects; } + public void addStreamObject(StreamObject streamObject) { + if (streamObjects == null) { + streamObjects = new LinkedList<>(); + } + streamObjects.add(streamObject); + } + public List getCompactedObjectIds() { return compactedObjectIds; } @@ -69,6 +77,13 @@ public void setCompactedObjectIds(List compactedObjectIds) { this.compactedObjectIds = compactedObjectIds; } + public void addCompactedObjectId(long compactedObjectId) { + if (compactedObjectIds == null) { + compactedObjectIds = new LinkedList<>(); + } + compactedObjectIds.add(compactedObjectId); + } + public List getStreamRanges() { return streamRanges; } @@ -76,4 +91,11 @@ public List getStreamRanges() { public void setStreamRanges(List streamRanges) { this.streamRanges = streamRanges; } + + public void addStreamRange(ObjectStreamRange streamRange) { + if (streamRanges == null) { + streamRanges = new LinkedList<>(); + } + streamRanges.add(streamRange); + } } diff --git a/core/src/main/scala/kafka/log/s3/objects/ObjectStreamRange.java b/core/src/main/scala/kafka/log/s3/objects/ObjectStreamRange.java index 8d620a79bd..da86bbf87b 100644 --- a/core/src/main/scala/kafka/log/s3/objects/ObjectStreamRange.java +++ b/core/src/main/scala/kafka/log/s3/objects/ObjectStreamRange.java @@ -23,6 +23,15 @@ public class ObjectStreamRange { private long startOffset; private long endOffset; + public ObjectStreamRange() {} + + public ObjectStreamRange(long streamId, long epoch, long startOffset, long endOffset) { + this.streamId = streamId; + this.epoch = epoch; + this.startOffset = startOffset; + this.endOffset = endOffset; + } + public long getStreamId() { return streamId; } diff --git a/core/src/test/java/kafka/log/s3/DefaultS3BlockCacheTest.java b/core/src/test/java/kafka/log/s3/DefaultS3BlockCacheTest.java index ea6617d867..32bce40d99 100644 --- a/core/src/test/java/kafka/log/s3/DefaultS3BlockCacheTest.java +++ b/core/src/test/java/kafka/log/s3/DefaultS3BlockCacheTest.java @@ -53,26 +53,23 @@ public void setup() { @Test public void testRead() throws Exception { - S3ObjectMetadata metadata1 = new S3ObjectMetadata(0, 0, S3ObjectType.WAL_LOOSE); - ObjectWriter objectWriter = new ObjectWriter(metadata1.key(), s3Operator, 1024, 1024); + ObjectWriter objectWriter = new ObjectWriter(0, s3Operator, 1024, 1024); objectWriter.write(newRecord(233, 10, 5, 512)); objectWriter.write(newRecord(233, 15, 10, 512)); objectWriter.write(newRecord(233, 25, 5, 512)); objectWriter.write(newRecord(234, 0, 5, 512)); objectWriter.close(); - metadata1 = new S3ObjectMetadata(0, objectWriter.size(), S3ObjectType.WAL_LOOSE); + S3ObjectMetadata metadata1 = new S3ObjectMetadata(0, objectWriter.size(), S3ObjectType.WAL_LOOSE); - S3ObjectMetadata metadata2 = new S3ObjectMetadata(1, 0, S3ObjectType.WAL_LOOSE); - objectWriter = new ObjectWriter(metadata2.key(), s3Operator, 1024, 1024); + objectWriter = new ObjectWriter(1, s3Operator, 1024, 1024); objectWriter.write(newRecord(233, 30, 10, 512)); objectWriter.close(); - metadata2 = new S3ObjectMetadata(1, objectWriter.size(), S3ObjectType.WAL_LOOSE); + S3ObjectMetadata metadata2 = new S3ObjectMetadata(1, objectWriter.size(), S3ObjectType.WAL_LOOSE); - S3ObjectMetadata metadata3 = new S3ObjectMetadata(2, 0, S3ObjectType.WAL_LOOSE); - objectWriter = new ObjectWriter(metadata3.key(), s3Operator, 1024, 1024); + objectWriter = new ObjectWriter(2, s3Operator, 1024, 1024); objectWriter.write(newRecord(233, 40, 20, 512)); objectWriter.close(); - metadata3 = new S3ObjectMetadata(2, objectWriter.size(), S3ObjectType.WAL_LOOSE); + S3ObjectMetadata metadata3 = new S3ObjectMetadata(2, objectWriter.size(), S3ObjectType.WAL_LOOSE); when(objectManager.getObjects(eq(233L), eq(11L), eq(60L), eq(2))).thenReturn(List.of( metadata1, metadata2 diff --git a/core/src/test/java/kafka/log/s3/MinorCompactTaskTest.java b/core/src/test/java/kafka/log/s3/MinorCompactTaskTest.java new file mode 100644 index 0000000000..8c74ac112a --- /dev/null +++ b/core/src/test/java/kafka/log/s3/MinorCompactTaskTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3; + +import kafka.log.s3.model.StreamRecordBatch; +import kafka.log.s3.objects.CommitCompactObjectRequest; +import kafka.log.s3.objects.ObjectManager; +import kafka.log.s3.objects.StreamObject; +import kafka.log.s3.operator.MemoryS3Operator; +import kafka.log.s3.operator.S3Operator; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class MinorCompactTaskTest { + ObjectManager objectManager; + S3Operator s3Operator; + MinorCompactTask minorCompactTask; + + @BeforeEach + public void setup() { + objectManager = mock(ObjectManager.class); + s3Operator = new MemoryS3Operator(); + minorCompactTask = new MinorCompactTask(128 * 1024, 1000, 1024, objectManager, s3Operator); + } + + @Test + public void testTryCompact() { + AtomicLong objectIdAlloc = new AtomicLong(10); + doAnswer(invocation -> CompletableFuture.completedFuture(objectIdAlloc.getAndIncrement())).when(objectManager).prepareObject(anyInt(), anyLong()); + when(objectManager.commitMinorCompactObject(any())).thenReturn(CompletableFuture.completedFuture(null)); + + List records = List.of( + new StreamRecordBatch(233, 0, 10, DefaultRecordBatch.of(2, 512)), + new StreamRecordBatch(233, 0, 12, DefaultRecordBatch.of(2, 128)), + new StreamRecordBatch(234, 0, 20, DefaultRecordBatch.of(2, 128)) + ); + minorCompactTask.tryCompact(new MinorCompactTask.MinorCompactPart(0, records)); + + records = List.of( + new StreamRecordBatch(233, 0, 14, DefaultRecordBatch.of(2, 512)), + new StreamRecordBatch(234, 0, 22, DefaultRecordBatch.of(2, 128)), + new StreamRecordBatch(235, 0, 11, DefaultRecordBatch.of(2, 128)) + ); + minorCompactTask.tryCompact(new MinorCompactTask.MinorCompactPart(1, records)); + records = List.of( + new StreamRecordBatch(235, 1, 30, DefaultRecordBatch.of(2, 128)) + ); + minorCompactTask.tryCompact(new MinorCompactTask.MinorCompactPart(2, records)); + + minorCompactTask.close(); + ArgumentCaptor reqArg = ArgumentCaptor.forClass(CommitCompactObjectRequest.class); + verify(objectManager, times(1)).commitMinorCompactObject(reqArg.capture()); + // expect + // - stream233 split + // - stream234 write to one stream range + // - stream235 with different epoch, write to two stream range. + CommitCompactObjectRequest request = reqArg.getValue(); + assertEquals(10, request.getObjectId()); + assertEquals(List.of(0L, 1L, 2L), request.getCompactedObjectIds()); + assertEquals(3, request.getStreamRanges().size()); + assertEquals(234, request.getStreamRanges().get(0).getStreamId()); + assertEquals(20, request.getStreamRanges().get(0).getStartOffset()); + assertEquals(24, request.getStreamRanges().get(0).getEndOffset()); + assertEquals(235, request.getStreamRanges().get(1).getStreamId()); + assertEquals(11, request.getStreamRanges().get(1).getStartOffset()); + assertEquals(13, request.getStreamRanges().get(1).getEndOffset()); + assertEquals(235, request.getStreamRanges().get(2).getStreamId()); + assertEquals(30, request.getStreamRanges().get(2).getStartOffset()); + assertEquals(32, request.getStreamRanges().get(2).getEndOffset()); + + assertEquals(1, request.getStreamObjects().size()); + StreamObject streamObject = request.getStreamObjects().get(0); + assertEquals(233, streamObject.getStreamId()); + assertEquals(11, streamObject.getObjectId()); + assertEquals(10, streamObject.getStartOffset()); + assertEquals(16, streamObject.getEndOffset()); + } + +} diff --git a/core/src/test/java/kafka/log/s3/ObjectWriterTest.java b/core/src/test/java/kafka/log/s3/ObjectWriterTest.java index 892191f18e..4f55c715b0 100644 --- a/core/src/test/java/kafka/log/s3/ObjectWriterTest.java +++ b/core/src/test/java/kafka/log/s3/ObjectWriterTest.java @@ -42,7 +42,7 @@ public void testWrite() throws ExecutionException, InterruptedException { S3ObjectMetadata metadata = new S3ObjectMetadata(1, 0, S3ObjectType.WAL_LOOSE); S3Operator s3Operator = new MemoryS3Operator(); - ObjectWriter objectWriter = new ObjectWriter(metadata.key(), s3Operator, 1024, 1024); + ObjectWriter objectWriter = new ObjectWriter(1, s3Operator, 1024, 1024); StreamRecordBatch r1 = newRecord(233, 10, 5, 512); objectWriter.write(r1); StreamRecordBatch r2 = newRecord(233, 15, 10, 512);