Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(s3): remove range concept in MemoryMetadataManager #30

Merged
merged 2 commits into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 59 additions & 66 deletions core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import kafka.log.s3.model.RangeMetadata;
import kafka.log.s3.model.StreamMetadata;
import kafka.log.s3.objects.CommitCompactObjectRequest;
import kafka.log.s3.objects.CommitStreamObjectRequest;
import kafka.log.s3.objects.CommitWalObjectRequest;
Expand All @@ -47,6 +45,7 @@
import org.apache.kafka.metadata.stream.S3Object;
import org.apache.kafka.metadata.stream.S3ObjectState;
import org.apache.kafka.metadata.stream.S3ObjectStreamIndex;
import org.apache.kafka.metadata.stream.S3StreamObject;
import org.apache.kafka.metadata.stream.S3WALObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -56,16 +55,32 @@ public class MemoryMetadataManager implements StreamManager, ObjectManager {
private static final int MOCK_BROKER_ID = 0;
private static final Logger LOGGER = LoggerFactory.getLogger(MemoryMetadataManager.class);
private final EventDriver eventDriver;
private final Map<Long/*objectId*/, S3Object> objectsMetadata;

private volatile long nextAssignedObjectId = 0;
private final Map<Long/*objectId*/, S3Object> objectsMetadata;
private volatile long nextAssignedStreamId = 0;
private final Map<Long/*streamId*/, MemoryStreamMetadata> streamsMetadata;
private final Map<Integer/*brokerId*/, MemoryBrokerWALMetadata> brokerWALMetadata;

private static class MemoryStreamMetadata {
private long streamId;
private long epoch;
private long startOffset;
private long endOffset;
private List<S3StreamObject> streamObjects;

public MemoryStreamMetadata(long streamId, long epoch, long startOffset, long endOffset) {
this.streamId = streamId;
this.epoch = epoch;
this.startOffset = startOffset;
this.endOffset = endOffset;
}

private final Map<Long, StreamMetadata> streamsMetadata;

private final Map<Integer, MemoryBrokerWALMetadata> brokerWALMetadata;
public void addStreamObject(S3StreamObject object) {
streamObjects.add(object);
}
}

private static class MemoryBrokerWALMetadata {

private final int brokerId;
private final List<S3WALObject> walObjects;

Expand All @@ -75,8 +90,6 @@ public MemoryBrokerWALMetadata(int brokerId) {
}
}

private volatile long nextAssignedStreamId = 0;

public MemoryMetadataManager() {
this.eventDriver = new EventDriver();
this.objectsMetadata = new HashMap<>();
Expand Down Expand Up @@ -145,15 +158,15 @@ public CompletableFuture<CommitWalObjectResponse> commitWalObject(CommitWalObjec
// build metadata
MemoryBrokerWALMetadata walMetadata = this.brokerWALMetadata.computeIfAbsent(MOCK_BROKER_ID,
k -> new MemoryBrokerWALMetadata(k));
Map<Long, S3ObjectStreamIndex> index = new HashMap<>();
Map<Long, List<S3ObjectStreamIndex>> index = new HashMap<>();
streamRanges.stream().forEach(range -> {
long streamId = range.getStreamId();
long startOffset = range.getStartOffset();
long endOffset = range.getEndOffset();
index.put(streamId, new S3ObjectStreamIndex(streamId, startOffset, endOffset));
index.put(streamId, List.of(new S3ObjectStreamIndex(streamId, startOffset, endOffset)));
// update range endOffset
StreamMetadata streamMetadata = this.streamsMetadata.get(streamId);
streamMetadata.getRanges().get(streamMetadata.getRanges().size() - 1).setEndOffset(endOffset);
MemoryStreamMetadata streamMetadata = this.streamsMetadata.get(streamId);
streamMetadata.endOffset = endOffset;
});
S3WALObject walObject = new S3WALObject(objectId, MOCK_BROKER_ID, index);
walMetadata.walObjects.add(walObject);
Expand All @@ -165,15 +178,15 @@ private boolean verifyWalStreamRanges(ObjectStreamRange range) {
long streamId = range.getStreamId();
long epoch = range.getEpoch();
// verify
StreamMetadata streamMetadata = this.streamsMetadata.get(streamId);
MemoryStreamMetadata streamMetadata = this.streamsMetadata.get(streamId);
if (streamMetadata == null) {
return false;
}
// compare epoch
if (streamMetadata.getEpoch() > epoch) {
if (streamMetadata.epoch > epoch) {
return false;
}
if (streamMetadata.getEpoch() < epoch) {
if (streamMetadata.epoch < epoch) {
return false;
}
return true;
Expand Down Expand Up @@ -216,35 +229,26 @@ public List<S3ObjectMetadata> getObjects(long streamId, long startOffset, long e
CompletableFuture<List<S3ObjectMetadata>> future = this.submitEvent(() -> {
int need = limit;
List<S3ObjectMetadata> objs = new ArrayList<>();
StreamMetadata streamMetadata = this.streamsMetadata.get(streamId);
if (endOffset <= streamMetadata.getStartOffset()) {
MemoryStreamMetadata streamMetadata = this.streamsMetadata.get(streamId);
if (endOffset <= streamMetadata.startOffset) {
return objs;
}
MemoryBrokerWALMetadata metadata = this.brokerWALMetadata.get(MOCK_BROKER_ID);
if (metadata == null) {
return objs;
}
List<RangeMetadata> ranges = streamMetadata.getRanges();
for (RangeMetadata range : ranges) {
if (endOffset < range.getStartOffset() || need <= 0) {
for (S3WALObject walObject : metadata.walObjects) {
if (need <= 0) {
break;
}
if (startOffset >= range.getEndOffset()) {
if (!walObject.intersect(streamId, startOffset, endOffset)) {
continue;
}
// find range, get wal objects
int brokerId = range.getBrokerId();
MemoryBrokerWALMetadata walMetadata = this.brokerWALMetadata.get(brokerId);
for (S3WALObject walObject : walMetadata.walObjects) {
if (need <= 0) {
break;
}
// TODO: speed up query
if (!walObject.intersect(streamId, startOffset, endOffset)) {
continue;
}
// find stream index, get object
S3Object object = this.objectsMetadata.get(walObject.objectId());
S3ObjectMetadata obj = new S3ObjectMetadata(walObject.objectId(), object.getObjectSize(), walObject.objectType());
objs.add(obj);
need--;
}
// find stream index, get object
S3Object object = this.objectsMetadata.get(walObject.objectId());
S3ObjectMetadata obj = new S3ObjectMetadata(walObject.objectId(), object.getObjectSize(), walObject.objectType());
objs.add(obj);
need--;
}
return objs;
});
Expand All @@ -261,7 +265,7 @@ public CompletableFuture<Long> createStream() {
return this.submitEvent(() -> {
long streamId = this.nextAssignedStreamId++;
this.streamsMetadata.put(streamId,
new StreamMetadata(streamId, 0, -1, 0, new ArrayList<>()));
new MemoryStreamMetadata(streamId, 0, 0, 0));
return streamId;
});
}
Expand All @@ -275,34 +279,17 @@ public CompletableFuture<OpenStreamMetadata> openStream(long streamId, long epoc
throw new StreamNotExistException("Stream " + streamId + " does not exist");
}
// verify epoch match
StreamMetadata streamMetadata = this.streamsMetadata.get(streamId);
if (streamMetadata.getEpoch() > epoch) {
MemoryStreamMetadata streamMetadata = this.streamsMetadata.get(streamId);
if (streamMetadata.epoch > epoch) {
throw new StreamFencedException("Stream " + streamId + " is fenced");
}
if (streamMetadata.getEpoch() == epoch) {
// get active range
int rangesCount = streamMetadata.getRanges().size();
long endOffset = 0;
if (rangesCount != 0) {
endOffset = streamMetadata.getRanges().get(streamMetadata.getRanges().size() - 1).getEndOffset();
} else {
streamMetadata.getRanges().add(new RangeMetadata(0, 0, 0, MOCK_BROKER_ID));
}
return new OpenStreamMetadata(streamId, epoch, streamMetadata.getStartOffset(), endOffset);
}
// create new range
long newEpoch = epoch;
int newRangeIndex = streamMetadata.getRangeIndex() + 1;
long startOffset = 0;
if (newRangeIndex > 0) {
startOffset = streamMetadata.getRanges().get(streamMetadata.getRanges().size() - 1).getEndOffset();
if (streamMetadata.epoch == epoch) {
return new OpenStreamMetadata(streamId, epoch, streamMetadata.startOffset, streamMetadata.endOffset);
}
RangeMetadata rangeMetadata = new RangeMetadata(newRangeIndex, startOffset, startOffset, MOCK_BROKER_ID);
streamMetadata.getRanges().add(rangeMetadata);
// update epoch
// update epoch and rangeIndex
streamMetadata.setRangeIndex(newRangeIndex);
streamMetadata.setEpoch(newEpoch);
return new OpenStreamMetadata(streamId, newEpoch, startOffset, startOffset);
streamMetadata.epoch = epoch;
return new OpenStreamMetadata(streamId, epoch, streamMetadata.startOffset, streamMetadata.endOffset);
});
}

Expand Down Expand Up @@ -380,7 +367,13 @@ public MemoryMetadataEvent(CompletableFuture<T> cb, Supplier<T> eventHandler) {
}

public void done() {
cb.complete(eventHandler.get());
try {
T value = eventHandler.get();
cb.complete(value);
} catch (Exception e) {
LOGGER.error("Failed to execute event", e);
cb.completeExceptionally(e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,36 @@

package org.apache.kafka.metadata.stream;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.kafka.common.metadata.WALObjectRecord;
import org.apache.kafka.common.metadata.WALObjectRecord.StreamIndex;
import org.apache.kafka.server.common.ApiMessageAndVersion;

public class S3WALObject {

private final long objectId;

private final int brokerId;
private final Map<Long/*streamId*/, S3ObjectStreamIndex> streamsIndex;
private final Map<Long/*streamId*/, List<S3ObjectStreamIndex>> streamsIndex;

private final S3ObjectType objectType = S3ObjectType.UNKNOWN;

public S3WALObject(long objectId, int brokerId, final Map<Long, S3ObjectStreamIndex> streamsIndex) {
public S3WALObject(long objectId, int brokerId, final Map<Long, List<S3ObjectStreamIndex>> streamsIndex) {
this.objectId = objectId;
this.brokerId = brokerId;
this.streamsIndex = streamsIndex;
}

public boolean intersect(long streamId, long startOffset, long endOffset) {
S3ObjectStreamIndex streamIndex = streamsIndex.get(streamId);
if (streamIndex == null) {
List<S3ObjectStreamIndex> indices = streamsIndex.get(streamId);
if (indices == null || indices.isEmpty()) {
return false;
}
if (endOffset <= streamIndex.getStartOffset() || startOffset >= streamIndex.getEndOffset()) {
S3ObjectStreamIndex firstIndex = indices.get(0);
S3ObjectStreamIndex lastIndex = indices.get(indices.size() - 1);
if (endOffset <= firstIndex.getStartOffset() || startOffset >= lastIndex.getEndOffset()) {
return false;
}
return true;
Expand All @@ -55,26 +57,24 @@ public ApiMessageAndVersion toRecord() {
.setObjectId(objectId)
.setBrokerId(brokerId)
.setStreamsIndex(
streamsIndex.values().stream()
streamsIndex.values().stream().flatMap(List::stream)
.map(S3ObjectStreamIndex::toRecordStreamIndex)
.collect(Collectors.toList())), (short) 0);
}

public static S3WALObject of(WALObjectRecord record) {
Map<Long, List<S3ObjectStreamIndex>> collect = record.streamsIndex().stream()
.map(index -> new S3ObjectStreamIndex(index.streamId(), index.startOffset(), index.endOffset()))
.collect(Collectors.groupingBy(S3ObjectStreamIndex::getStreamId));
S3WALObject s3WalObject = new S3WALObject(record.objectId(), record.brokerId(),
record.streamsIndex().stream().collect(Collectors.toMap(
StreamIndex::streamId, S3ObjectStreamIndex::of)));
collect);
return s3WalObject;
}

public Integer getBrokerId() {
return brokerId;
}

public Map<Long, S3ObjectStreamIndex> getStreamsIndex() {
return streamsIndex;
}

public Long objectId() {
return objectId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ public void testS3WALObjects() {
// verify delta and check image's write
BrokerS3WALMetadataImage image1 = new BrokerS3WALMetadataImage(BROKER0, List.of(
new S3WALObject(0L, BROKER0, Map.of(
STREAM0, new S3ObjectStreamIndex(STREAM0, 0L, 100L),
STREAM1, new S3ObjectStreamIndex(STREAM1, 0L, 200L))),
STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 0L, 100L)),
STREAM1, List.of(new S3ObjectStreamIndex(STREAM1, 0L, 200L)))),
new S3WALObject(1L, BROKER0, Map.of(
STREAM0, new S3ObjectStreamIndex(STREAM0, 101L, 200L)))));
STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 101L, 200L))))));
assertEquals(image1, delta0.apply());
testToImageAndBack(image1);

Expand All @@ -93,7 +93,7 @@ STREAM1, new S3ObjectStreamIndex(STREAM1, 0L, 200L))),
// verify delta and check image's write
BrokerS3WALMetadataImage image2 = new BrokerS3WALMetadataImage(BROKER0, List.of(
new S3WALObject(1L, BROKER0, Map.of(
STREAM0, new S3ObjectStreamIndex(STREAM0, 101L, 200L)))));
STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 101L, 200L))))));
assertEquals(image2, delta1.apply());
testToImageAndBack(image2);
}
Expand Down