From c272cb23283ff252c25a02066d6852ca6f500c6c Mon Sep 17 00:00:00 2001 From: TheR1sing3un <87409330+TheR1sing3un@users.noreply.github.com> Date: Wed, 6 Sep 2023 20:24:11 -0500 Subject: [PATCH] feat(stream-client): optimize getting objects from `StreamMetadataManager` (#64) * refactor(s3): remove inflight wal objects 1. remove inflight wal objects 2. delete redundant classes Signed-off-by: TheR1sing3un * feat(s3): support blocking `getObjects` in `StreamMetadataManager` 1. support blocking `getObjects` in `StreamMetadataManager` 2. refactor `getObjects` 3. change the unit of `s3.cache.size` from `MB` to `B` Signed-off-by: TheR1sing3un * feat(s3): more suitable log level 1. more suitable log level Signed-off-by: TheR1sing3un * fix(s3): add more concurrent protection 1. add more concurrent protection Signed-off-by: TheR1sing3un --------- Signed-off-by: TheR1sing3un --- .../controller/stream/StreamControlManager.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java index cadf060aaa..2b965a02f2 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java @@ -56,7 +56,7 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.controller.ControllerResult; import org.apache.kafka.metadata.stream.RangeMetadata; -import org.apache.kafka.metadata.stream.S3ObjectStreamIndex; +import org.apache.kafka.metadata.stream.StreamOffsetRange; import org.apache.kafka.metadata.stream.S3StreamConstant; import org.apache.kafka.metadata.stream.S3StreamObject; import org.apache.kafka.metadata.stream.S3WALObject; @@ -381,8 +381,8 @@ public ControllerResult commitWALObject(CommitWALOb records.addAll(destroyResult.records()); } - List indexes = streamRanges.stream() - .map(range -> new S3ObjectStreamIndex(range.streamId(), range.startOffset(), range.endOffset())) + List indexes = streamRanges.stream() + .map(range -> new StreamOffsetRange(range.streamId(), range.startOffset(), range.endOffset())) .collect(Collectors.toList()); // update broker's wal object BrokerS3WALMetadata brokerMetadata = this.brokersMetadata.get(brokerId); @@ -398,7 +398,7 @@ public ControllerResult commitWALObject(CommitWALOb .setBrokerId(brokerId) .setStreamsIndex( indexes.stream() - .map(S3ObjectStreamIndex::toRecordStreamIndex) + .map(StreamOffsetRange::toRecordStreamIndex) .collect(Collectors.toList())), (short) 0)); // create stream object records List streamObjects = data.streamObjects(); @@ -504,10 +504,10 @@ public void replay(WALObjectRecord record) { } // create wal object - Map> indexMap = streamIndexes + Map> indexMap = streamIndexes .stream() - .map(S3ObjectStreamIndex::of) - .collect(Collectors.groupingBy(S3ObjectStreamIndex::getStreamId)); + .map(StreamOffsetRange::of) + .collect(Collectors.groupingBy(StreamOffsetRange::getStreamId)); brokerMetadata.walObjects.put(objectId, new S3WALObject(objectId, brokerId, indexMap, orderId)); // update range