Skip to content

Commit

Permalink
feat(stream-client): optimize getting objects from `StreamMetadataMan…
Browse files Browse the repository at this point in the history
…ager` (#64)

* refactor(s3): remove inflight wal objects

1. remove inflight wal objects
2. delete redundant classes

Signed-off-by: TheR1sing3un <[email protected]>

* feat(s3): support blocking `getObjects` in `StreamMetadataManager`

1. support blocking `getObjects` in `StreamMetadataManager`
2. refactor
`getObjects`
3. change the unit of `s3.cache.size` from `MB`
to `B`

Signed-off-by: TheR1sing3un <[email protected]>

* feat(s3): more suitable log level

1. more suitable log level

Signed-off-by: TheR1sing3un <[email protected]>

* fix(s3): add more concurrent protection

1. add more concurrent protection

Signed-off-by: TheR1sing3un <[email protected]>

---------

Signed-off-by: TheR1sing3un <[email protected]>
  • Loading branch information
TheR1sing3un authored Sep 7, 2023
1 parent 4e87747 commit c272cb2
Showing 1 changed file with 7 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.controller.ControllerResult;
import org.apache.kafka.metadata.stream.RangeMetadata;
import org.apache.kafka.metadata.stream.S3ObjectStreamIndex;
import org.apache.kafka.metadata.stream.StreamOffsetRange;
import org.apache.kafka.metadata.stream.S3StreamConstant;
import org.apache.kafka.metadata.stream.S3StreamObject;
import org.apache.kafka.metadata.stream.S3WALObject;
Expand Down Expand Up @@ -381,8 +381,8 @@ public ControllerResult<CommitWALObjectResponseData> commitWALObject(CommitWALOb
records.addAll(destroyResult.records());
}

List<S3ObjectStreamIndex> indexes = streamRanges.stream()
.map(range -> new S3ObjectStreamIndex(range.streamId(), range.startOffset(), range.endOffset()))
List<StreamOffsetRange> indexes = streamRanges.stream()
.map(range -> new StreamOffsetRange(range.streamId(), range.startOffset(), range.endOffset()))
.collect(Collectors.toList());
// update broker's wal object
BrokerS3WALMetadata brokerMetadata = this.brokersMetadata.get(brokerId);
Expand All @@ -398,7 +398,7 @@ public ControllerResult<CommitWALObjectResponseData> commitWALObject(CommitWALOb
.setBrokerId(brokerId)
.setStreamsIndex(
indexes.stream()
.map(S3ObjectStreamIndex::toRecordStreamIndex)
.map(StreamOffsetRange::toRecordStreamIndex)
.collect(Collectors.toList())), (short) 0));
// create stream object records
List<StreamObject> streamObjects = data.streamObjects();
Expand Down Expand Up @@ -504,10 +504,10 @@ public void replay(WALObjectRecord record) {
}

// create wal object
Map<Long, List<S3ObjectStreamIndex>> indexMap = streamIndexes
Map<Long, List<StreamOffsetRange>> indexMap = streamIndexes
.stream()
.map(S3ObjectStreamIndex::of)
.collect(Collectors.groupingBy(S3ObjectStreamIndex::getStreamId));
.map(StreamOffsetRange::of)
.collect(Collectors.groupingBy(StreamOffsetRange::getStreamId));
brokerMetadata.walObjects.put(objectId, new S3WALObject(objectId, brokerId, indexMap, orderId));

// update range
Expand Down

0 comments on commit c272cb2

Please sign in to comment.