diff --git a/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java b/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java index 82a041a149..71452c3afb 100644 --- a/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java +++ b/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java @@ -30,8 +30,6 @@ import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import java.util.stream.Collectors; -import kafka.log.s3.model.RangeMetadata; -import kafka.log.s3.model.StreamMetadata; import kafka.log.s3.objects.CommitCompactObjectRequest; import kafka.log.s3.objects.CommitStreamObjectRequest; import kafka.log.s3.objects.CommitWalObjectRequest; @@ -47,6 +45,7 @@ import org.apache.kafka.metadata.stream.S3Object; import org.apache.kafka.metadata.stream.S3ObjectState; import org.apache.kafka.metadata.stream.S3ObjectStreamIndex; +import org.apache.kafka.metadata.stream.S3StreamObject; import org.apache.kafka.metadata.stream.S3WALObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,16 +55,32 @@ public class MemoryMetadataManager implements StreamManager, ObjectManager { private static final int MOCK_BROKER_ID = 0; private static final Logger LOGGER = LoggerFactory.getLogger(MemoryMetadataManager.class); private final EventDriver eventDriver; - private final Map objectsMetadata; - private volatile long nextAssignedObjectId = 0; + private final Map objectsMetadata; + private volatile long nextAssignedStreamId = 0; + private final Map streamsMetadata; + private final Map brokerWALMetadata; + + private static class MemoryStreamMetadata { + private long streamId; + private long epoch; + private long startOffset; + private long endOffset; + private List streamObjects; + + public MemoryStreamMetadata(long streamId, long epoch, long startOffset, long endOffset) { + this.streamId = streamId; + this.epoch = epoch; + this.startOffset = startOffset; + this.endOffset = endOffset; + } - private final Map streamsMetadata; - - private final Map brokerWALMetadata; + public void addStreamObject(S3StreamObject object) { + streamObjects.add(object); + } + } private static class MemoryBrokerWALMetadata { - private final int brokerId; private final List walObjects; @@ -75,8 +90,6 @@ public MemoryBrokerWALMetadata(int brokerId) { } } - private volatile long nextAssignedStreamId = 0; - public MemoryMetadataManager() { this.eventDriver = new EventDriver(); this.objectsMetadata = new HashMap<>(); @@ -145,15 +158,15 @@ public CompletableFuture commitWalObject(CommitWalObjec // build metadata MemoryBrokerWALMetadata walMetadata = this.brokerWALMetadata.computeIfAbsent(MOCK_BROKER_ID, k -> new MemoryBrokerWALMetadata(k)); - Map index = new HashMap<>(); + Map> index = new HashMap<>(); streamRanges.stream().forEach(range -> { long streamId = range.getStreamId(); long startOffset = range.getStartOffset(); long endOffset = range.getEndOffset(); - index.put(streamId, new S3ObjectStreamIndex(streamId, startOffset, endOffset)); + index.put(streamId, List.of(new S3ObjectStreamIndex(streamId, startOffset, endOffset))); // update range endOffset - StreamMetadata streamMetadata = this.streamsMetadata.get(streamId); - streamMetadata.getRanges().get(streamMetadata.getRanges().size() - 1).setEndOffset(endOffset); + MemoryStreamMetadata streamMetadata = this.streamsMetadata.get(streamId); + streamMetadata.endOffset = endOffset; }); S3WALObject walObject = new S3WALObject(objectId, MOCK_BROKER_ID, index); walMetadata.walObjects.add(walObject); @@ -165,15 +178,15 @@ private boolean verifyWalStreamRanges(ObjectStreamRange range) { long streamId = range.getStreamId(); long epoch = range.getEpoch(); // verify - StreamMetadata streamMetadata = this.streamsMetadata.get(streamId); + MemoryStreamMetadata streamMetadata = this.streamsMetadata.get(streamId); if (streamMetadata == null) { return false; } // compare epoch - if (streamMetadata.getEpoch() > epoch) { + if (streamMetadata.epoch > epoch) { return false; } - if (streamMetadata.getEpoch() < epoch) { + if (streamMetadata.epoch < epoch) { return false; } return true; @@ -216,35 +229,26 @@ public List getObjects(long streamId, long startOffset, long e CompletableFuture> future = this.submitEvent(() -> { int need = limit; List objs = new ArrayList<>(); - StreamMetadata streamMetadata = this.streamsMetadata.get(streamId); - if (endOffset <= streamMetadata.getStartOffset()) { + MemoryStreamMetadata streamMetadata = this.streamsMetadata.get(streamId); + if (endOffset <= streamMetadata.startOffset) { + return objs; + } + MemoryBrokerWALMetadata metadata = this.brokerWALMetadata.get(MOCK_BROKER_ID); + if (metadata == null) { return objs; } - List ranges = streamMetadata.getRanges(); - for (RangeMetadata range : ranges) { - if (endOffset < range.getStartOffset() || need <= 0) { + for (S3WALObject walObject : metadata.walObjects) { + if (need <= 0) { break; } - if (startOffset >= range.getEndOffset()) { + if (!walObject.intersect(streamId, startOffset, endOffset)) { continue; } - // find range, get wal objects - int brokerId = range.getBrokerId(); - MemoryBrokerWALMetadata walMetadata = this.brokerWALMetadata.get(brokerId); - for (S3WALObject walObject : walMetadata.walObjects) { - if (need <= 0) { - break; - } - // TODO: speed up query - if (!walObject.intersect(streamId, startOffset, endOffset)) { - continue; - } - // find stream index, get object - S3Object object = this.objectsMetadata.get(walObject.objectId()); - S3ObjectMetadata obj = new S3ObjectMetadata(walObject.objectId(), object.getObjectSize(), walObject.objectType()); - objs.add(obj); - need--; - } + // find stream index, get object + S3Object object = this.objectsMetadata.get(walObject.objectId()); + S3ObjectMetadata obj = new S3ObjectMetadata(walObject.objectId(), object.getObjectSize(), walObject.objectType()); + objs.add(obj); + need--; } return objs; }); @@ -261,7 +265,7 @@ public CompletableFuture createStream() { return this.submitEvent(() -> { long streamId = this.nextAssignedStreamId++; this.streamsMetadata.put(streamId, - new StreamMetadata(streamId, 0, -1, 0, new ArrayList<>())); + new MemoryStreamMetadata(streamId, 0, 0, 0)); return streamId; }); } @@ -275,34 +279,17 @@ public CompletableFuture openStream(long streamId, long epoc throw new StreamNotExistException("Stream " + streamId + " does not exist"); } // verify epoch match - StreamMetadata streamMetadata = this.streamsMetadata.get(streamId); - if (streamMetadata.getEpoch() > epoch) { + MemoryStreamMetadata streamMetadata = this.streamsMetadata.get(streamId); + if (streamMetadata.epoch > epoch) { throw new StreamFencedException("Stream " + streamId + " is fenced"); } - if (streamMetadata.getEpoch() == epoch) { - // get active range - int rangesCount = streamMetadata.getRanges().size(); - long endOffset = 0; - if (rangesCount != 0) { - endOffset = streamMetadata.getRanges().get(streamMetadata.getRanges().size() - 1).getEndOffset(); - } else { - streamMetadata.getRanges().add(new RangeMetadata(0, 0, 0, MOCK_BROKER_ID)); - } - return new OpenStreamMetadata(streamId, epoch, streamMetadata.getStartOffset(), endOffset); - } - // create new range - long newEpoch = epoch; - int newRangeIndex = streamMetadata.getRangeIndex() + 1; - long startOffset = 0; - if (newRangeIndex > 0) { - startOffset = streamMetadata.getRanges().get(streamMetadata.getRanges().size() - 1).getEndOffset(); + if (streamMetadata.epoch == epoch) { + return new OpenStreamMetadata(streamId, epoch, streamMetadata.startOffset, streamMetadata.endOffset); } - RangeMetadata rangeMetadata = new RangeMetadata(newRangeIndex, startOffset, startOffset, MOCK_BROKER_ID); - streamMetadata.getRanges().add(rangeMetadata); + // update epoch // update epoch and rangeIndex - streamMetadata.setRangeIndex(newRangeIndex); - streamMetadata.setEpoch(newEpoch); - return new OpenStreamMetadata(streamId, newEpoch, startOffset, startOffset); + streamMetadata.epoch = epoch; + return new OpenStreamMetadata(streamId, epoch, streamMetadata.startOffset, streamMetadata.endOffset); }); } @@ -380,7 +367,13 @@ public MemoryMetadataEvent(CompletableFuture cb, Supplier eventHandler) { } public void done() { - cb.complete(eventHandler.get()); + try { + T value = eventHandler.get(); + cb.complete(value); + } catch (Exception e) { + LOGGER.error("Failed to execute event", e); + cb.completeExceptionally(e); + } } } } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java index 64ce5f1df5..26f6c8324c 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java @@ -17,11 +17,11 @@ package org.apache.kafka.metadata.stream; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; import org.apache.kafka.common.metadata.WALObjectRecord; -import org.apache.kafka.common.metadata.WALObjectRecord.StreamIndex; import org.apache.kafka.server.common.ApiMessageAndVersion; public class S3WALObject { @@ -29,22 +29,24 @@ public class S3WALObject { private final long objectId; private final int brokerId; - private final Map streamsIndex; + private final Map> streamsIndex; private final S3ObjectType objectType = S3ObjectType.UNKNOWN; - public S3WALObject(long objectId, int brokerId, final Map streamsIndex) { + public S3WALObject(long objectId, int brokerId, final Map> streamsIndex) { this.objectId = objectId; this.brokerId = brokerId; this.streamsIndex = streamsIndex; } public boolean intersect(long streamId, long startOffset, long endOffset) { - S3ObjectStreamIndex streamIndex = streamsIndex.get(streamId); - if (streamIndex == null) { + List indices = streamsIndex.get(streamId); + if (indices == null || indices.isEmpty()) { return false; } - if (endOffset <= streamIndex.getStartOffset() || startOffset >= streamIndex.getEndOffset()) { + S3ObjectStreamIndex firstIndex = indices.get(0); + S3ObjectStreamIndex lastIndex = indices.get(indices.size() - 1); + if (endOffset <= firstIndex.getStartOffset() || startOffset >= lastIndex.getEndOffset()) { return false; } return true; @@ -55,15 +57,17 @@ public ApiMessageAndVersion toRecord() { .setObjectId(objectId) .setBrokerId(brokerId) .setStreamsIndex( - streamsIndex.values().stream() + streamsIndex.values().stream().flatMap(List::stream) .map(S3ObjectStreamIndex::toRecordStreamIndex) .collect(Collectors.toList())), (short) 0); } public static S3WALObject of(WALObjectRecord record) { + Map> collect = record.streamsIndex().stream() + .map(index -> new S3ObjectStreamIndex(index.streamId(), index.startOffset(), index.endOffset())) + .collect(Collectors.groupingBy(S3ObjectStreamIndex::getStreamId)); S3WALObject s3WalObject = new S3WALObject(record.objectId(), record.brokerId(), - record.streamsIndex().stream().collect(Collectors.toMap( - StreamIndex::streamId, S3ObjectStreamIndex::of))); + collect); return s3WalObject; } @@ -71,10 +75,6 @@ public Integer getBrokerId() { return brokerId; } - public Map getStreamsIndex() { - return streamsIndex; - } - public Long objectId() { return objectId; } diff --git a/metadata/src/test/java/org/apache/kafka/image/BrokerS3WALMetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/BrokerS3WALMetadataImageTest.java index af5d2016f9..e8b4b0fb92 100644 --- a/metadata/src/test/java/org/apache/kafka/image/BrokerS3WALMetadataImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/BrokerS3WALMetadataImageTest.java @@ -77,10 +77,10 @@ public void testS3WALObjects() { // verify delta and check image's write BrokerS3WALMetadataImage image1 = new BrokerS3WALMetadataImage(BROKER0, List.of( new S3WALObject(0L, BROKER0, Map.of( - STREAM0, new S3ObjectStreamIndex(STREAM0, 0L, 100L), - STREAM1, new S3ObjectStreamIndex(STREAM1, 0L, 200L))), + STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 0L, 100L)), + STREAM1, List.of(new S3ObjectStreamIndex(STREAM1, 0L, 200L)))), new S3WALObject(1L, BROKER0, Map.of( - STREAM0, new S3ObjectStreamIndex(STREAM0, 101L, 200L))))); + STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 101L, 200L)))))); assertEquals(image1, delta0.apply()); testToImageAndBack(image1); @@ -93,7 +93,7 @@ STREAM1, new S3ObjectStreamIndex(STREAM1, 0L, 200L))), // verify delta and check image's write BrokerS3WALMetadataImage image2 = new BrokerS3WALMetadataImage(BROKER0, List.of( new S3WALObject(1L, BROKER0, Map.of( - STREAM0, new S3ObjectStreamIndex(STREAM0, 101L, 200L))))); + STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 101L, 200L)))))); assertEquals(image2, delta1.apply()); testToImageAndBack(image2); }