diff --git a/core/src/main/scala/kafka/log/s3/ObjectReader.java b/core/src/main/scala/kafka/log/s3/ObjectReader.java index a725c4ae0e..483d73a9ce 100644 --- a/core/src/main/scala/kafka/log/s3/ObjectReader.java +++ b/core/src/main/scala/kafka/log/s3/ObjectReader.java @@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static kafka.log.s3.ObjectWriter.Footer.FOOTER_SIZE; +import static org.apache.kafka.metadata.stream.ObjectUtils.NOOP_OFFSET; public class ObjectReader implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(ObjectReader.class); @@ -225,7 +226,7 @@ public List find(long streamId, long startOffset, long endOffset nextMaxBytes -= Math.min(nextMaxBytes, blockSize); } matched = true; - if (nextStartOffset >= endOffset || nextMaxBytes == 0) { + if ((endOffset != NOOP_OFFSET && nextStartOffset >= endOffset) || nextMaxBytes == 0) { break; } } else if (matched) { diff --git a/core/src/main/scala/kafka/log/s3/cache/BlockCache.java b/core/src/main/scala/kafka/log/s3/cache/BlockCache.java index ad1db9369e..55ce124018 100644 --- a/core/src/main/scala/kafka/log/s3/cache/BlockCache.java +++ b/core/src/main/scala/kafka/log/s3/cache/BlockCache.java @@ -19,6 +19,8 @@ import kafka.log.s3.model.StreamRecordBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collections; @@ -35,8 +37,11 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; public class BlockCache { + private static final Logger LOGGER = LoggerFactory.getLogger(BlockCache.class); + static final int BLOCK_SIZE = 1024 * 1024; + static final int MAX_READAHEAD_SIZE = 128 * 1024 * 1024; private final long maxSize; - private final Map> stream2cache = new HashMap<>(); + final Map stream2cache = new HashMap<>(); private final LRUCache inactive = new LRUCache<>(); private final LRUCache active = new LRUCache<>(); private final AtomicLong size = new AtomicLong(); @@ -62,56 +67,56 @@ public void put0(long streamId, List records) { records.forEach(StreamRecordBatch::release); return; } - boolean overlapped = false; records = new ArrayList<>(records); - NavigableMap streamCache = stream2cache.computeIfAbsent(streamId, id -> new TreeMap<>()); + StreamCache streamCache = stream2cache.computeIfAbsent(streamId, id -> new StreamCache()); long startOffset = records.get(0).getBaseOffset(); long endOffset = records.get(records.size() - 1).getLastOffset(); - // TODO: generate readahead. - Map.Entry floorEntry = streamCache.floorEntry(startOffset); - SortedMap tailMap = streamCache.tailMap(floorEntry != null ? floorEntry.getKey() : startOffset); + + // generate readahead. + Readahead readahead = genReadahead(streamId, records); + // remove overlapped part. + Map.Entry floorEntry = streamCache.blocks.floorEntry(startOffset); + SortedMap tailMap = streamCache.blocks.tailMap(floorEntry != null ? floorEntry.getKey() : startOffset); for (Map.Entry entry : tailMap.entrySet()) { CacheBlock cacheBlock = entry.getValue(); if (cacheBlock.firstOffset >= endOffset) { break; } // overlap is a rare case, so removeIf is fine for the performance. - if (records.removeIf(record -> { + records.removeIf(record -> { boolean remove = record.getLastOffset() > cacheBlock.firstOffset && record.getBaseOffset() < cacheBlock.lastOffset; if (remove) { record.release(); } return remove; - })) { - overlapped = true; - } + }); } // ensure the cache size. int size = records.stream().mapToInt(StreamRecordBatch::size).sum(); ensureCapacity(size); - // TODO: split records to 1MB blocks. - if (overlapped) { - // split to multiple cache blocks. - long expectStartOffset = -1L; - List part = new ArrayList<>(records.size() / 2); - for (StreamRecordBatch record : records) { - if (expectStartOffset == -1L || record.getBaseOffset() == expectStartOffset) { - part.add(record); - } else { - put(streamId, streamCache, new CacheBlock(part)); - part = new ArrayList<>(records.size() / 2); - part.add(record); - } - expectStartOffset = record.getLastOffset(); - } - if (!part.isEmpty()) { - put(streamId, streamCache, new CacheBlock(part)); + // split to 1MB cache blocks which one block contains sequential records. + long expectStartOffset = -1L; + List part = new ArrayList<>(records.size() / 2); + int partSize = 0; + for (StreamRecordBatch record : records) { + if (expectStartOffset == -1L || record.getBaseOffset() == expectStartOffset || partSize >= BLOCK_SIZE) { + part.add(record); + partSize += record.size(); + } else { + // put readahead to the first block. + put(streamId, streamCache, new CacheBlock(part, readahead)); + readahead = null; + part = new ArrayList<>(records.size() / 2); + partSize = 0; + part.add(record); } - } else { - put(streamId, streamCache, new CacheBlock(records)); + expectStartOffset = record.getLastOffset(); + } + if (!part.isEmpty()) { + put(streamId, streamCache, new CacheBlock(part, readahead)); } } @@ -131,23 +136,24 @@ public GetCacheResult get(long streamId, long startOffset, long endOffset, int m } public GetCacheResult get0(long streamId, long startOffset, long endOffset, int maxBytes) { - NavigableMap streamCache = stream2cache.get(streamId); + StreamCache streamCache = stream2cache.get(streamId); if (streamCache == null) { return GetCacheResult.empty(); } - Map.Entry floorEntry = streamCache.floorEntry(startOffset); - streamCache = streamCache.tailMap(floorEntry != null ? floorEntry.getKey() : startOffset, true); + Map.Entry floorEntry = streamCache.blocks.floorEntry(startOffset); + NavigableMap streamCacheBlocks = streamCache.blocks.tailMap(floorEntry != null ? floorEntry.getKey() : startOffset, true); long nextStartOffset = startOffset; int nextMaxBytes = maxBytes; Readahead readahead = null; LinkedList records = new LinkedList<>(); - for (Map.Entry entry : streamCache.entrySet()) { + for (Map.Entry entry : streamCacheBlocks.entrySet()) { CacheBlock cacheBlock = entry.getValue(); if (cacheBlock.lastOffset < nextStartOffset || nextStartOffset < cacheBlock.firstOffset) { break; } if (readahead == null && cacheBlock.readahead != null) { readahead = cacheBlock.readahead; + cacheBlock.readahead = null; } nextMaxBytes = readFromCacheBlock(records, cacheBlock, nextStartOffset, endOffset, nextMaxBytes); nextStartOffset = records.getLast().getLastOffset(); @@ -201,7 +207,12 @@ private void ensureCapacity(int size) { if (entry == null) { break; } - CacheBlock cacheBlock = stream2cache.get(entry.getKey().streamId).remove(entry.getKey().startOffset); + StreamCache streamCache = stream2cache.get(entry.getKey().streamId); + if (streamCache == null) { + LOGGER.error("[BUG] Stream cache not found for streamId: {}", entry.getKey().streamId); + continue; + } + CacheBlock cacheBlock = streamCache.blocks.remove(entry.getKey().startOffset); cacheBlock.free(); if (maxSize - this.size.addAndGet(-entry.getValue()) >= size) { return; @@ -210,12 +221,55 @@ private void ensureCapacity(int size) { } } - private void put(long streamId, NavigableMap streamCache, CacheBlock cacheBlock) { - streamCache.put(cacheBlock.firstOffset, cacheBlock); + private void put(long streamId, StreamCache streamCache, CacheBlock cacheBlock) { + streamCache.blocks.put(cacheBlock.firstOffset, cacheBlock); active.put(new CacheKey(streamId, cacheBlock.firstOffset), cacheBlock.size); size.getAndAdd(cacheBlock.size); } + + Readahead genReadahead(long streamId, List records) { + if (records.isEmpty()) { + return null; + } + long startOffset = records.get(records.size() - 1).getLastOffset(); + int size = records.stream().mapToInt(StreamRecordBatch::size).sum(); + size = alignBlockSize(size); + StreamCache streamCache = stream2cache.get(streamId); + if (streamCache != null && streamCache.evict) { + // exponential fallback when cache is tight. + size = alignBlockSize(size / 2); + streamCache.evict = false; + } else { + if (size < MAX_READAHEAD_SIZE / 2) { + // exponential growth + size = size * 2; + } else { + // linear growth + size += BLOCK_SIZE; + } + } + size = Math.min(Math.max(size, BLOCK_SIZE), MAX_READAHEAD_SIZE); + return new + + Readahead(startOffset, size); + } + + int alignBlockSize(int size) { + return (size + BLOCK_SIZE - 1) / BLOCK_SIZE * BLOCK_SIZE; + } + + static class StreamCache { + NavigableMap blocks; + boolean evict; + + public StreamCache() { + blocks = new TreeMap<>(); + evict = false; + } + + } + static class CacheKey { final long streamId; final long startOffset; @@ -256,10 +310,6 @@ public CacheBlock(List records, Readahead readahead) { this.readahead = readahead; } - public CacheBlock(List records) { - this(records, null); - } - public void free() { records.forEach(StreamRecordBatch::release); records = null; diff --git a/core/src/main/scala/kafka/log/s3/cache/DefaultS3BlockCache.java b/core/src/main/scala/kafka/log/s3/cache/DefaultS3BlockCache.java index f6f7f2c710..f81c82023e 100644 --- a/core/src/main/scala/kafka/log/s3/cache/DefaultS3BlockCache.java +++ b/core/src/main/scala/kafka/log/s3/cache/DefaultS3BlockCache.java @@ -17,40 +17,72 @@ package kafka.log.s3.cache; -import kafka.log.es.FutureUtil; +import kafka.log.es.utils.Threads; import kafka.log.s3.ObjectReader; import kafka.log.s3.model.StreamRecordBatch; import kafka.log.s3.objects.ObjectManager; import kafka.log.s3.operator.S3Operator; import org.apache.kafka.common.utils.CloseableIterator; +import org.apache.kafka.common.utils.ThreadUtils; import org.apache.kafka.metadata.stream.S3ObjectMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; +import static kafka.log.es.FutureUtil.failedFuture; +import static kafka.log.es.FutureUtil.propagate; +import static org.apache.kafka.metadata.stream.ObjectUtils.NOOP_OFFSET; + public class DefaultS3BlockCache implements S3BlockCache { + private static final Logger LOGGER = LoggerFactory.getLogger(DefaultS3BlockCache.class); private final LRUCache objectReaderLRU = new LRUCache<>(); + private final Map> readaheadTasks = new ConcurrentHashMap<>(); private final BlockCache cache; + private final ExecutorService backgroundExectutor; private final ObjectManager objectManager; private final S3Operator s3Operator; public DefaultS3BlockCache(long cacheBytesSize, ObjectManager objectManager, S3Operator s3Operator) { this.cache = new BlockCache(cacheBytesSize); + this.backgroundExectutor = Threads.newFixedThreadPool( + 1, + ThreadUtils.createThreadFactory("s3-block-cache-background-%d", false), + LOGGER); this.objectManager = objectManager; this.s3Operator = s3Operator; } @Override public CompletableFuture read(long streamId, long startOffset, long endOffset, int maxBytes) { + return read0(streamId, startOffset, endOffset, maxBytes, true); + } + + public CompletableFuture read0(long streamId, long startOffset, long endOffset, int maxBytes, boolean awaitReadahead) { if (startOffset >= endOffset || maxBytes <= 0) { return CompletableFuture.completedFuture(new ReadDataBlock(Collections.emptyList())); } + + if (awaitReadahead) { + // expect readahead will fill the cache with the data we need. + CompletableFuture readaheadCf = readaheadTasks.get(new ReadingTaskKey(streamId, startOffset)); + if (readaheadCf != null) { + CompletableFuture readCf = new CompletableFuture<>(); + readaheadCf.whenComplete((nil, ex) -> propagate(read0(streamId, startOffset, endOffset, maxBytes, false), readCf)); + return readCf; + } + } + long nextStartOffset = startOffset; int nextMaxBytes = maxBytes; // 1. get from cache @@ -60,13 +92,14 @@ public CompletableFuture read(long streamId, long startOffset, lo nextStartOffset = cacheRecords.get(cacheRecords.size() - 1).getLastOffset(); nextMaxBytes -= Math.min(nextMaxBytes, cacheRecords.stream().mapToInt(r -> r.getRecordBatch().rawPayload().remaining()).sum()); } + cacheRst.getReadahead().ifPresent(readahead -> backgroundReadahead(streamId, readahead)); if (nextStartOffset >= endOffset || nextMaxBytes == 0) { return CompletableFuture.completedFuture(new ReadDataBlock(cacheRecords)); } // 2. get from s3 List objects = objectManager.getObjects(streamId, nextStartOffset, endOffset, 2); ReadContext context = new ReadContext(objects, nextStartOffset, nextMaxBytes); - return read0(streamId, endOffset, context).thenApply(s3Rst -> { + return readFromS3(streamId, endOffset, context).thenApply(s3Rst -> { List records = new ArrayList<>(cacheRst.getRecords()); records.addAll(s3Rst.getRecords()); return new ReadDataBlock(records); @@ -78,10 +111,15 @@ public void put(Map> stream2records) { stream2records.forEach(cache::put); } - private CompletableFuture read0(long streamId, long endOffset, ReadContext context) { + private CompletableFuture readFromS3(long streamId, long endOffset, ReadContext context) { if (context.objectIndex >= context.objects.size()) { context.objects = objectManager.getObjects(streamId, context.nextStartOffset, endOffset, 2); context.objectIndex = 0; + if (context.objects.isEmpty()) { + LOGGER.error("[BUG] fail to read, expect objects not empty, streamId={}, startOffset={}, endOffset={}", + streamId, context.nextStartOffset, endOffset); + return failedFuture(new IllegalStateException("fail to read, expect objects not empty")); + } } ObjectReader reader = getObjectReader(context.objects.get(context.objectIndex)); context.objectIndex++; @@ -109,7 +147,7 @@ private CompletableFuture read0(long streamId, long endOffset, Re context.records.add(recordBatch); nextStartOffset = recordBatch.getLastOffset(); nextMaxBytes -= Math.min(nextMaxBytes, recordBatch.getRecordBatch().rawPayload().remaining()); - if (nextStartOffset >= endOffset || nextMaxBytes == 0) { + if ((endOffset != NOOP_OFFSET && nextStartOffset >= endOffset) || nextMaxBytes == 0) { fulfill = true; } } @@ -125,10 +163,10 @@ private CompletableFuture read0(long streamId, long endOffset, Re if (fulfill) { return CompletableFuture.completedFuture(new ReadDataBlock(context.records)); } else { - return read0(streamId, endOffset, context); + return readFromS3(streamId, endOffset, context); } } catch (Throwable e) { - return FutureUtil.failedFuture(e); + return failedFuture(e); } finally { reader.release(); } @@ -136,13 +174,27 @@ private CompletableFuture read0(long streamId, long endOffset, Re }); } + private void backgroundReadahead(long streamId, BlockCache.Readahead readahead) { + backgroundExectutor.execute(() -> { + List objects = objectManager.getObjects(streamId, readahead.getStartOffset(), NOOP_OFFSET, 2); + if (objects.isEmpty()) { + return; + } + CompletableFuture readaheadCf = readFromS3(streamId, NOOP_OFFSET, + new ReadContext(objects, readahead.getStartOffset(), readahead.getSize())); + ReadingTaskKey readingTaskKey = new ReadingTaskKey(streamId, readahead.getStartOffset()); + readaheadTasks.put(readingTaskKey, readaheadCf); + readaheadCf + .thenAccept(readDataBlock -> cache.put(streamId, readDataBlock.getRecords())) + .whenComplete((nil, ex) -> readaheadTasks.remove(readingTaskKey, readaheadCf)); + }); + } + private ObjectReader getObjectReader(S3ObjectMetadata metadata) { synchronized (objectReaderLRU) { // TODO: evict by object readers index cache size while (objectReaderLRU.size() > 128) { - Optional.ofNullable(objectReaderLRU.pop()).ifPresent(entry -> { - entry.getValue().close(); - }); + Optional.ofNullable(objectReaderLRU.pop()).ifPresent(entry -> entry.getValue().close()); } ObjectReader objectReader = objectReaderLRU.get(metadata.objectId()); if (objectReader == null) { @@ -169,5 +221,27 @@ public ReadContext(List objects, long startOffset, int maxByte } + static class ReadingTaskKey { + final long streamId; + final long startOffset; + + public ReadingTaskKey(long streamId, long startOffset) { + this.streamId = streamId; + this.startOffset = startOffset; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ReadingTaskKey that = (ReadingTaskKey) o; + return streamId == that.streamId && startOffset == that.startOffset; + } + + @Override + public int hashCode() { + return Objects.hash(streamId, startOffset); + } + } } diff --git a/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java b/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java index 52977ad667..1d55ac5f0c 100644 --- a/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java +++ b/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java @@ -56,6 +56,10 @@ import java.util.function.Supplier; import java.util.stream.Collectors; +import static org.apache.kafka.metadata.stream.ObjectUtils.NOOP_OFFSET; + + +// TODO: deprecate or fix it. public class MemoryMetadataManager implements StreamManager, ObjectManager { private static final int MOCK_BROKER_ID = 0; @@ -162,9 +166,9 @@ public CompletableFuture commitWALObject(CommitWALObjec } // commit object S3Object s3Object = new S3Object( - objectId, objectSize, object.getObjectKey(), - object.getPreparedTimeInMs(), object.getExpiredTimeInMs(), System.currentTimeMillis(), -1, - S3ObjectState.COMMITTED); + objectId, objectSize, object.getObjectKey(), + object.getPreparedTimeInMs(), object.getExpiredTimeInMs(), System.currentTimeMillis(), -1, + S3ObjectState.COMMITTED); this.objectsMetadata.put(objectId, s3Object ); // build metadata @@ -237,11 +241,13 @@ public List getServerObjects() { @Override public List getObjects(long streamId, long startOffset, long endOffset, int limit) { // TODO: support search not only in wal objects + endOffset = endOffset == NOOP_OFFSET ? Long.MAX_VALUE : endOffset; + long finalEndOffset = endOffset; CompletableFuture> future = this.submitEvent(() -> { int need = limit; List objs = new ArrayList<>(); MemoryStreamMetadata streamMetadata = this.streamsMetadata.get(streamId); - if (endOffset <= streamMetadata.startOffset) { + if (finalEndOffset <= streamMetadata.startOffset) { return objs; } MemoryBrokerWALMetadata metadata = this.brokerWALMetadata.get(MOCK_BROKER_ID); @@ -252,7 +258,7 @@ public List getObjects(long streamId, long startOffset, long e if (need <= 0) { break; } - if (!walObject.intersect(streamId, startOffset, endOffset)) { + if (!walObject.intersect(streamId, startOffset, finalEndOffset)) { continue; } // find stream index, get object diff --git a/core/src/main/scala/kafka/log/s3/metadata/StreamMetadataManager.java b/core/src/main/scala/kafka/log/s3/metadata/StreamMetadataManager.java index dc221e0d26..f008d7695f 100644 --- a/core/src/main/scala/kafka/log/s3/metadata/StreamMetadataManager.java +++ b/core/src/main/scala/kafka/log/s3/metadata/StreamMetadataManager.java @@ -30,8 +30,8 @@ import org.apache.kafka.metadata.stream.S3Object; import org.apache.kafka.metadata.stream.S3ObjectMetadata; import org.apache.kafka.metadata.stream.S3StreamConstant; -import org.apache.kafka.metadata.stream.S3WALObjectMetadata; import org.apache.kafka.metadata.stream.S3StreamObject; +import org.apache.kafka.metadata.stream.S3WALObjectMetadata; import org.apache.kafka.metadata.stream.StreamOffsetRange; import org.apache.kafka.raft.OffsetAndEpoch; import org.slf4j.Logger; @@ -51,6 +51,8 @@ import java.util.concurrent.Executors; import java.util.stream.Collectors; +import static org.apache.kafka.metadata.stream.ObjectUtils.NOOP_OFFSET; + public class StreamMetadataManager implements InRangeObjectsFetcher { // TODO: optimize by more suitable concurrent protection @@ -174,6 +176,7 @@ public CompletableFuture fetch(long streamId, long startOffset, streamId, startOffset, endOffset, limit, streamStartOffset); return CompletableFuture.completedFuture(InRangeObjects.INVALID); } + endOffset = endOffset == NOOP_OFFSET ? streamEndOffset : endOffset; if (endOffset > streamEndOffset) { // lag behind, need to wait for cache catch up return pendingFetch(streamId, startOffset, endOffset, limit); diff --git a/core/src/main/scala/kafka/log/s3/objects/ObjectManager.java b/core/src/main/scala/kafka/log/s3/objects/ObjectManager.java index cbf588023d..0b0812626c 100644 --- a/core/src/main/scala/kafka/log/s3/objects/ObjectManager.java +++ b/core/src/main/scala/kafka/log/s3/objects/ObjectManager.java @@ -17,9 +17,10 @@ package kafka.log.s3.objects; +import org.apache.kafka.metadata.stream.S3ObjectMetadata; + import java.util.List; import java.util.concurrent.CompletableFuture; -import org.apache.kafka.metadata.stream.S3ObjectMetadata; /** * Object metadata registry. @@ -62,7 +63,7 @@ public interface ObjectManager { * * @param streamId stream id. * @param startOffset get range start offset. - * @param endOffset get range end offset. + * @param endOffset get range end offset. NOOP_OFFSET represent endOffset is unlimited. * @param limit max object range count. * @return {@link S3ObjectMetadata} */ diff --git a/core/src/test/java/kafka/log/s3/DefaultS3BlockCacheTest.java b/core/src/test/java/kafka/log/s3/DefaultS3BlockCacheTest.java index 2b6a025fb0..09ba084488 100644 --- a/core/src/test/java/kafka/log/s3/DefaultS3BlockCacheTest.java +++ b/core/src/test/java/kafka/log/s3/DefaultS3BlockCacheTest.java @@ -23,6 +23,7 @@ import kafka.log.s3.objects.ObjectManager; import kafka.log.s3.operator.MemoryS3Operator; import kafka.log.s3.operator.S3Operator; +import org.apache.kafka.metadata.stream.ObjectUtils; import org.apache.kafka.metadata.stream.S3ObjectMetadata; import org.apache.kafka.metadata.stream.S3ObjectType; import org.junit.jupiter.api.BeforeEach; @@ -30,11 +31,18 @@ import org.junit.jupiter.api.Test; import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @Tag("S3Unit") @@ -85,6 +93,37 @@ public void testRead() throws Exception { assertEquals(60, rst.getRecords().get(4).getLastOffset()); } + @Test + public void testRead_readahead() throws ExecutionException, InterruptedException { + objectManager = mock(ObjectManager.class); + s3Operator = spy(new MemoryS3Operator()); + s3BlockCache = new DefaultS3BlockCache(1024 * 1024, objectManager, s3Operator); + + ObjectWriter objectWriter = ObjectWriter.writer(0, s3Operator, 1024, 1024); + objectWriter.write(233, List.of( + newRecord(233, 10, 5, 512), + newRecord(233, 15, 5, 512) + )); + objectWriter.close(); + S3ObjectMetadata metadata1 = new S3ObjectMetadata(0, objectWriter.size(), S3ObjectType.WAL); + + objectWriter = ObjectWriter.writer(1, s3Operator, 1024, 1024); + objectWriter.write(233, List.of(newRecord(233, 20, 10, 512))); + objectWriter.close(); + S3ObjectMetadata metadata2 = new S3ObjectMetadata(1, objectWriter.size(), S3ObjectType.WAL); + + when(objectManager.getObjects(eq(233L), eq(10L), eq(11L), eq(2))).thenReturn(List.of(metadata1)); + + s3BlockCache.read(233L, 10L, 11L, 10000).get(); + // range read index and range read data + verify(s3Operator, times(2)).rangeRead(eq(ObjectUtils.genKey(0, 0)), anyLong(), anyLong(), any()); + verify(s3Operator, times(0)).rangeRead(eq(ObjectUtils.genKey(0, 1)), anyLong(), anyLong(), any()); + // trigger readahead + when(objectManager.getObjects(eq(233L), eq(20L), eq(-1L), eq(2))).thenReturn(List.of(metadata2)); + s3BlockCache.read(233L, 15L, 16L, 10000).get(); + verify(s3Operator, timeout(1000).times(2)).rangeRead(eq(ObjectUtils.genKey(0, 1)), anyLong(), anyLong(), any()); + } + StreamRecordBatch newRecord(long streamId, long offset, int count, int payloadSize) { return new StreamRecordBatch(streamId, 0, offset, count, TestUtils.random(payloadSize)); } diff --git a/core/src/test/java/kafka/log/s3/cache/BlockCacheTest.java b/core/src/test/java/kafka/log/s3/cache/BlockCacheTest.java index 6f9a0bab12..de31b2cf14 100644 --- a/core/src/test/java/kafka/log/s3/cache/BlockCacheTest.java +++ b/core/src/test/java/kafka/log/s3/cache/BlockCacheTest.java @@ -26,6 +26,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; @Tag("S3Unit") public class BlockCacheTest { @@ -91,6 +92,55 @@ public void testLRU() { assertNull(lru.pop()); } + @Test + public void testReadahead() { + BlockCache blockCache = new BlockCache(16 * 1024 * 1024); + blockCache.put(233L, List.of( + newRecord(233L, 10, 1, 1024 * 1024), + newRecord(233L, 11, 1, 1024) + )); + + // first read the block + BlockCache.GetCacheResult rst = blockCache.get(233L, 10, 11, Integer.MAX_VALUE); + assertEquals(1, rst.getRecords().size()); + assertEquals(10L, rst.getRecords().get(0).getBaseOffset()); + assertEquals(12, rst.getReadahead().get().getStartOffset()); + assertEquals(1024 * 1024 * 2 * 2, rst.getReadahead().get().getSize()); + + // repeat read the block, the readahead mark is clear. + rst = blockCache.get(233L, 10, 11, Integer.MAX_VALUE); + assertTrue(rst.getReadahead().isEmpty()); + } + + @Test + public void testGenReadahead() { + BlockCache blockCache = new BlockCache(16 * 1024 * 1024); + BlockCache.Readahead readahead = blockCache.genReadahead(233L, List.of( + newRecord(233L, 10, 1, 1024 * 1024), + newRecord(233L, 11, 1, 1024) + )); + assertEquals(12, readahead.getStartOffset()); + // exponential growth + assertEquals(BlockCache.BLOCK_SIZE * 2 * 2, readahead.getSize()); + + + readahead = blockCache.genReadahead(233L, List.of( + newRecord(233L, 10, 1, BlockCache.MAX_READAHEAD_SIZE / 2 + 1) + )); + assertEquals(11, readahead.getStartOffset()); + // linear growth + assertEquals(BlockCache.MAX_READAHEAD_SIZE / 2 + BlockCache.BLOCK_SIZE * 2, readahead.getSize()); + + BlockCache.StreamCache streamCache = new BlockCache.StreamCache(); + streamCache.evict = true; + blockCache.stream2cache.put(233L, streamCache); + // exponential fallback + readahead = blockCache.genReadahead(233L, List.of( + newRecord(233L, 10, 1, 2 * BlockCache.BLOCK_SIZE) + )); + assertEquals(BlockCache.BLOCK_SIZE, readahead.getSize()); + } + private static StreamRecordBatch newRecord(long streamId, long offset, int count, int size) { return new StreamRecordBatch(streamId, 0, offset, count, TestUtils.random(size)); } diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java index 30dc57b916..bebcd4a55d 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java @@ -17,30 +17,31 @@ package org.apache.kafka.image; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Queue; -import java.util.stream.Collectors; import org.apache.kafka.common.metadata.AssignedStreamIdRecord; import org.apache.kafka.image.writer.ImageWriter; import org.apache.kafka.image.writer.ImageWriterOptions; import org.apache.kafka.metadata.stream.InRangeObjects; import org.apache.kafka.metadata.stream.RangeMetadata; import org.apache.kafka.metadata.stream.S3ObjectMetadata; -import org.apache.kafka.metadata.stream.StreamOffsetRange; import org.apache.kafka.metadata.stream.S3StreamObject; import org.apache.kafka.metadata.stream.S3WALObject; +import org.apache.kafka.metadata.stream.StreamOffsetRange; import org.apache.kafka.server.common.ApiMessageAndVersion; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Queue; +import java.util.stream.Collectors; + public final class S3StreamsMetadataImage { public static final S3StreamsMetadataImage EMPTY = - new S3StreamsMetadataImage(-1, Collections.emptyMap(), Collections.emptyMap()); + new S3StreamsMetadataImage(-1, Collections.emptyMap(), Collections.emptyMap()); private long nextAssignedStreamId; @@ -49,9 +50,9 @@ public final class S3StreamsMetadataImage { private final Map brokerWALMetadata; public S3StreamsMetadataImage( - long assignedStreamId, - Map streamsMetadata, - Map brokerWALMetadata) { + long assignedStreamId, + Map streamsMetadata, + Map brokerWALMetadata) { this.nextAssignedStreamId = assignedStreamId + 1; this.streamsMetadata = streamsMetadata; this.brokerWALMetadata = brokerWALMetadata; @@ -64,8 +65,8 @@ boolean isEmpty() { public void write(ImageWriter writer, ImageWriterOptions options) { writer.write( - new ApiMessageAndVersion( - new AssignedStreamIdRecord().setAssignedStreamId(nextAssignedStreamId - 1), (short) 0)); + new ApiMessageAndVersion( + new AssignedStreamIdRecord().setAssignedStreamId(nextAssignedStreamId - 1), (short) 0)); streamsMetadata.values().forEach(image -> image.write(writer, options)); brokerWALMetadata.values().forEach(image -> image.write(writer, options)); } @@ -138,12 +139,7 @@ private List rangeSearchers(long streamId, long startOffset, long S3StreamMetadataImage streamMetadata = streamsMetadata.get(streamId); List rangeSearchers = new ArrayList<>(); // TODO: refactor to make ranges in order - List ranges = streamMetadata.getRanges().values().stream().sorted(new Comparator() { - @Override - public int compare(RangeMetadata o1, RangeMetadata o2) { - return o1.rangeIndex() - o2.rangeIndex(); - } - }).collect(Collectors.toList()); + List ranges = streamMetadata.getRanges().values().stream().sorted(Comparator.comparingInt(RangeMetadata::rangeIndex)).collect(Collectors.toList()); for (RangeMetadata range : ranges) { if (range.endOffset() <= startOffset) { continue; @@ -208,7 +204,7 @@ private Queue rangeOfStreamObjects() { long startOffset = obj.streamOffsetRange().getStartOffset(); long endOffset = obj.streamOffsetRange().getEndOffset(); S3ObjectMetadata s3ObjectMetadata = new S3ObjectMetadata( - obj.objectId(), obj.objectType(), List.of(obj.streamOffsetRange()), obj.dataTimeInMs()); + obj.objectId(), obj.objectType(), List.of(obj.streamOffsetRange()), obj.dataTimeInMs()); return new S3ObjectMetadataWrapper(s3ObjectMetadata, startOffset, endOffset); }).collect(Collectors.toCollection(LinkedList::new)); } @@ -229,8 +225,8 @@ public InRangeObjects getObjects(int limit) { long nextStartOffset = startOffset; while (limit > 0 - && nextStartOffset < endOffset - && (!streamObjects.isEmpty() || !walObjects.isEmpty())) { + && nextStartOffset < endOffset + && (!streamObjects.isEmpty() || !walObjects.isEmpty())) { S3ObjectMetadataWrapper streamRange = null; if (walObjects.isEmpty() || (!streamObjects.isEmpty() && streamObjects.peek().startOffset() < walObjects.peek().startOffset())) { streamRange = streamObjects.poll(); @@ -289,8 +285,8 @@ public boolean equals(Object obj) { } S3StreamsMetadataImage other = (S3StreamsMetadataImage) obj; return this.nextAssignedStreamId == other.nextAssignedStreamId - && this.streamsMetadata.equals(other.streamsMetadata) - && this.brokerWALMetadata.equals(other.brokerWALMetadata); + && this.streamsMetadata.equals(other.streamsMetadata) + && this.brokerWALMetadata.equals(other.brokerWALMetadata); } @Override @@ -322,11 +318,11 @@ public long nextAssignedStreamId() { @Override public String toString() { return "S3StreamsMetadataImage{" + - "nextAssignedStreamId=" + nextAssignedStreamId + - ", streamsMetadata=" + streamsMetadata.entrySet().stream(). - map(e -> e.getKey() + ":" + e.getValue()).collect(Collectors.joining(", ")) + - ", brokerWALMetadata=" + brokerWALMetadata.entrySet().stream(). - map(e -> e.getKey() + ":" + e.getValue()).collect(Collectors.joining(", ")) + - '}'; + "nextAssignedStreamId=" + nextAssignedStreamId + + ", streamsMetadata=" + streamsMetadata.entrySet().stream(). + map(e -> e.getKey() + ":" + e.getValue()).collect(Collectors.joining(", ")) + + ", brokerWALMetadata=" + brokerWALMetadata.entrySet().stream(). + map(e -> e.getKey() + ":" + e.getValue()).collect(Collectors.joining(", ")) + + '}'; } } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/ObjectUtils.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/ObjectUtils.java index 8dc7de6e96..49f3426875 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/ObjectUtils.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/ObjectUtils.java @@ -19,6 +19,7 @@ public class ObjectUtils { public static final long NOOP_OBJECT_ID = -1L; + public static final long NOOP_OFFSET = -1L; private static String namespace = "DEFAULT"; public static void setNamespace(String namespace) {