Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stream-client): merge minor compact to WAL object #41

Merged
merged 1 commit into from
Sep 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions core/src/main/scala/kafka/log/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ public class S3Storage implements Storage {
private final BlockingQueue<WalWriteRequest> waitingLogConfirmedRequests;
private final WriteAheadLog log;
private final LogCache logCache;
private final AtomicLong logConfirmPosition = new AtomicLong();
private final AtomicLong processedLogConfirmPosition = new AtomicLong();
private final AtomicLong logConfirmOffset = new AtomicLong();
private final AtomicLong processedLogConfirmOffset = new AtomicLong();
private final ScheduledExecutorService mainExecutor = Executors.newSingleThreadScheduledExecutor(
ThreadUtils.createThreadFactory("s3-storage-main", false));
private final ScheduledExecutorService backgroundExecutor = Executors.newSingleThreadScheduledExecutor(
Expand Down Expand Up @@ -73,7 +73,7 @@ public CompletableFuture<Void> append(StreamRecordBatch streamRecord) {
FlatStreamRecordBatch flatStreamRecordBatch = FlatStreamRecordBatch.from(streamRecord);
WriteAheadLog.AppendResult appendResult = log.append(flatStreamRecordBatch.encodedBuf.duplicate());
CompletableFuture<Void> cf = new CompletableFuture<>();
WalWriteRequest writeRequest = new WalWriteRequest(flatStreamRecordBatch, appendResult.endPosition, cf);
WalWriteRequest writeRequest = new WalWriteRequest(flatStreamRecordBatch, appendResult.offset, cf);
try {
waitingLogConfirmedRequests.put(writeRequest);
} catch (InterruptedException e) {
Expand All @@ -82,7 +82,7 @@ public CompletableFuture<Void> append(StreamRecordBatch streamRecord) {
appendResult.future.thenAccept(nil -> {
// TODO: callback is out of order, we need reorder ack in stream dimension.
// TODO: cache end offset update should consider log hollow.
logConfirmPosition.getAndUpdate(operand -> Math.max(operand, appendResult.endPosition));
logConfirmOffset.getAndUpdate(operand -> Math.max(operand, appendResult.offset));
putToCache(writeRequest);
tryCallback();
});
Expand All @@ -93,6 +93,7 @@ public CompletableFuture<Void> append(StreamRecordBatch streamRecord) {

@Override
public CompletableFuture<ReadDataBlock> read(long streamId, long startOffset, long endOffset, int maxBytes) {
// TODO: thread model to keep data safe.
List<FlatStreamRecordBatch> records = logCache.get(streamId, startOffset, endOffset, maxBytes);
if (!records.isEmpty()) {
return CompletableFuture.completedFuture(new ReadDataBlock(StreamRecordBatchCodec.decode(records)));
Expand All @@ -110,29 +111,29 @@ public CompletableFuture<ReadDataBlock> read(long streamId, long startOffset, lo
}

private void tryCallback() {
if (processedLogConfirmPosition.get() == logConfirmPosition.get()) {
if (processedLogConfirmOffset.get() == logConfirmOffset.get()) {
return;
}
mainExecutor.execute(this::tryCallback0);
}

private void tryCallback0() {
long walConfirmOffset = this.logConfirmPosition.get();
long walConfirmOffset = this.logConfirmOffset.get();
for (; ; ) {
WalWriteRequest request = waitingLogConfirmedRequests.peek();
if (request == null) break;
if (request.position <= walConfirmOffset) {
if (request.offset <= walConfirmOffset) {
waitingLogConfirmedRequests.poll();
request.cf.complete(null);
} else {
break;
}
}
processedLogConfirmPosition.set(walConfirmOffset);
processedLogConfirmOffset.set(walConfirmOffset);
}

private void putToCache(WalWriteRequest request) {
if (logCache.put(request.record, request.position)) {
if (logCache.put(request.record, request.offset)) {
// cache block is full, trigger WAL object upload.
LogCache.LogCacheBlock logCacheBlock = logCache.archiveCurrentBlock();
uploadWALObject(logCacheBlock);
Expand All @@ -150,7 +151,7 @@ private void uploadWALObject0(LogCache.LogCacheBlock logCacheBlock) {
walObjectUploadTask.prepare().get();
walObjectUploadTask.upload().get();
walObjectUploadTask.commit().get();
log.trim(logCacheBlock.logEndPosition());
log.trim(logCacheBlock.maxOffset());
freeCache(logCacheBlock.blockId());
} catch (Throwable e) {
LOGGER.error("unexpect upload wal object fail", e);
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/kafka/log/s3/WALObjectUploadTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package kafka.log.s3;

import kafka.log.s3.objects.CommitCompactObjectRequest;
import kafka.log.s3.objects.CommitWALObjectRequest;
import kafka.log.s3.objects.ObjectManager;
import kafka.log.s3.objects.ObjectStreamRange;
import kafka.log.s3.objects.StreamObject;
Expand All @@ -40,7 +40,7 @@ public class WALObjectUploadTask {
private final ObjectManager objectManager;
private final S3Operator s3Operator;
private final CompletableFuture<Long> prepareCf = new CompletableFuture<>();
private final CompletableFuture<CommitCompactObjectRequest> uploadCf = new CompletableFuture<>();
private final CompletableFuture<CommitWALObjectRequest> uploadCf = new CompletableFuture<>();

public WALObjectUploadTask(Map<Long, List<FlatStreamRecordBatch>> streamRecordsMap, int streamSplitSizeThreshold, ObjectManager objectManager, S3Operator s3Operator) {
this.streamRecordsMap = streamRecordsMap;
Expand All @@ -58,12 +58,11 @@ public CompletableFuture<Long> prepare() {
return prepareCf;
}

public CompletableFuture<CommitCompactObjectRequest> upload() {
public CompletableFuture<CommitWALObjectRequest> upload() {
prepareCf.thenAccept(objectId -> {
List<Long> streamIds = new ArrayList<>(streamRecordsMap.keySet());
Collections.sort(streamIds);
CommitCompactObjectRequest compactRequest = new CommitCompactObjectRequest();
compactRequest.setCompactedObjectIds(Collections.emptyList());
CommitWALObjectRequest compactRequest = new CommitWALObjectRequest();

ObjectWriter minorCompactObject = new ObjectWriter(objectId, s3Operator);

Expand All @@ -86,6 +85,7 @@ public CompletableFuture<CommitCompactObjectRequest> upload() {
}
}
compactRequest.setObjectId(objectId);
compactRequest.setOrderId(objectId);
CompletableFuture<Void> minorCompactObjectCf = minorCompactObject.close().thenAccept(nil -> {
compactRequest.setObjectSize(minorCompactObject.size());
});
Expand All @@ -107,7 +107,7 @@ public CompletableFuture<CommitCompactObjectRequest> upload() {
}

public CompletableFuture<Void> commit() {
return uploadCf.thenCompose(objectManager::commitMinorCompactObject);
return uploadCf.thenCompose(request -> objectManager.commitWALObject(request).thenApply(resp -> null));
}

private CompletableFuture<StreamObject> writeStreamObject(List<FlatStreamRecordBatch> streamRecords) {
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/log/s3/WalWriteRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@

public class WalWriteRequest implements Comparable<WalWriteRequest> {
final FlatStreamRecordBatch record;
final long position;
final long offset;
final CompletableFuture<Void> cf;

public WalWriteRequest(FlatStreamRecordBatch record, long position, CompletableFuture<Void> cf) {
public WalWriteRequest(FlatStreamRecordBatch record, long offset, CompletableFuture<Void> cf) {
this.record = record;
this.position = position;
this.offset = offset;
this.cf = cf;
}

Expand Down
14 changes: 7 additions & 7 deletions core/src/main/scala/kafka/log/s3/cache/LogCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ public LogCache(int cacheBlockMaxSize) {
this.activeBlock = new LogCacheBlock(cacheBlockMaxSize);
}

public boolean put(FlatStreamRecordBatch recordBatch, long endPosition) {
return activeBlock.put(recordBatch, endPosition);
public boolean put(FlatStreamRecordBatch recordBatch, long offset) {
return activeBlock.put(recordBatch, offset);
}

/**
Expand Down Expand Up @@ -84,7 +84,7 @@ public static class LogCacheBlock {
private final int maxSize;
private final Map<Long, List<FlatStreamRecordBatch>> map = new HashMap<>();
private int size = 0;
private long logEndPosition;
private long maxOffset;

public LogCacheBlock(int maxSize) {
this.blockId = BLOCK_ID_ALLOC.getAndIncrement();
Expand All @@ -95,12 +95,12 @@ public long blockId() {
return blockId;
}

public boolean put(FlatStreamRecordBatch recordBatch, long endPosition) {
public boolean put(FlatStreamRecordBatch recordBatch, long offset) {
List<FlatStreamRecordBatch> streamCache = map.computeIfAbsent(recordBatch.streamId, id -> new ArrayList<>());
streamCache.add(recordBatch);
int recordSize = recordBatch.encodedBuf.readableBytes();
size += recordSize;
logEndPosition = endPosition;
maxOffset = offset;
return size >= maxSize;
}

Expand Down Expand Up @@ -136,8 +136,8 @@ public Map<Long, List<FlatStreamRecordBatch>> records() {
return map;
}

public long logEndPosition() {
return logEndPosition;
public long maxOffset() {
return maxOffset;
}

}
Expand Down
48 changes: 22 additions & 26 deletions core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import kafka.log.s3.model.StreamOffset;
import kafka.log.s3.objects.CommitCompactObjectRequest;
import kafka.log.s3.objects.CommitStreamObjectRequest;
import kafka.log.s3.objects.CommitWalObjectRequest;
import kafka.log.s3.objects.CommitWalObjectResponse;
import kafka.log.s3.objects.CommitWALObjectRequest;
import kafka.log.s3.objects.CommitWALObjectResponse;
import kafka.log.s3.objects.ObjectManager;
import kafka.log.s3.objects.ObjectStreamRange;
import kafka.log.s3.objects.OpenStreamMetadata;
Expand Down Expand Up @@ -145,11 +146,9 @@ public CompletableFuture<Long> prepareObject(int count, long ttl) {
}

@Override
public CompletableFuture<CommitWalObjectResponse> commitWalObject(CommitWalObjectRequest request) {
public CompletableFuture<CommitWALObjectResponse> commitWALObject(CommitWALObjectRequest request) {
return this.submitEvent(() -> {
CommitWalObjectResponse resp = new CommitWalObjectResponse();
List<Long> failedStreamIds = new ArrayList<>();
resp.setFailedStreamIds(failedStreamIds);
CommitWALObjectResponse resp = new CommitWALObjectResponse();
long objectId = request.getObjectId();
long objectSize = request.getObjectSize();
List<ObjectStreamRange> streamRanges = request.getStreamRanges();
Expand All @@ -160,21 +159,15 @@ public CompletableFuture<CommitWalObjectResponse> commitWalObject(CommitWalObjec
if (object.getS3ObjectState() != S3ObjectState.PREPARED) {
throw new RuntimeException("Object " + objectId + " is not in prepared state");
}
// verify the stream
streamRanges.stream().filter(range -> !verifyWalStreamRanges(range)).mapToLong(ObjectStreamRange::getStreamId)
.forEach(failedStreamIds::add);
if (!failedStreamIds.isEmpty()) {
return resp;
}
// commit object
this.objectsMetadata.put(objectId, new S3Object(
objectId, objectSize, object.getObjectKey(),
object.getPreparedTimeInMs(), object.getExpiredTimeInMs(), System.currentTimeMillis(), -1,
S3ObjectState.COMMITTED)
objectId, objectSize, object.getObjectKey(),
object.getPreparedTimeInMs(), object.getExpiredTimeInMs(), System.currentTimeMillis(), -1,
S3ObjectState.COMMITTED)
);
// build metadata
MemoryBrokerWALMetadata walMetadata = this.brokerWALMetadata.computeIfAbsent(MOCK_BROKER_ID,
k -> new MemoryBrokerWALMetadata(k));
k -> new MemoryBrokerWALMetadata(k));
Map<Long, List<S3ObjectStreamIndex>> index = new HashMap<>();
streamRanges.stream().forEach(range -> {
long streamId = range.getStreamId();
Expand All @@ -185,7 +178,15 @@ public CompletableFuture<CommitWalObjectResponse> commitWalObject(CommitWalObjec
MemoryStreamMetadata streamMetadata = this.streamsMetadata.get(streamId);
streamMetadata.endOffset = endOffset;
});
S3WALObject walObject = new S3WALObject(objectId, MOCK_BROKER_ID, index);
request.getStreamObjects().forEach(streamObject -> {
MemoryStreamMetadata streamMetadata = this.streamsMetadata.get(streamObject.getStreamId());
S3StreamObject s3StreamObject = new S3StreamObject(streamObject.getObjectId(), streamObject.getObjectSize(),
streamObject.getStreamId(), streamObject.getStartOffset(), streamObject.getEndOffset());
streamMetadata.addStreamObject(s3StreamObject);
streamMetadata.endOffset = Math.max(streamMetadata.endOffset, streamObject.getEndOffset());
});

S3WALObject walObject = new S3WALObject(objectId, MOCK_BROKER_ID, index, request.getOrderId());
walMetadata.walObjects.add(walObject);
return resp;
});
Expand All @@ -209,11 +210,6 @@ private boolean verifyWalStreamRanges(ObjectStreamRange range) {
return true;
}

@Override
public CompletableFuture<Void> commitMinorCompactObject(CommitCompactObjectRequest request) {
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<Void> commitMajorCompactObject(CommitCompactObjectRequest request) {
return null;
Expand Down Expand Up @@ -282,7 +278,7 @@ public CompletableFuture<Long> createStream() {
return this.submitEvent(() -> {
long streamId = this.nextAssignedStreamId++;
this.streamsMetadata.put(streamId,
new MemoryStreamMetadata(streamId));
new MemoryStreamMetadata(streamId));
return streamId;
});
}
Expand Down Expand Up @@ -373,9 +369,9 @@ private S3Object prepareObject(long objectId, long ttl) {
long preparedTs = System.currentTimeMillis();
String objectKey = ObjectUtils.genKey(0, "todocluster", objectId);
return new S3Object(
objectId, -1, objectKey,
preparedTs, preparedTs + ttl, -1, -1,
S3ObjectState.PREPARED);
objectId, -1, objectKey,
preparedTs, preparedTs + ttl, -1, -1,
S3ObjectState.PREPARED);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,24 @@

package kafka.log.s3.objects;

import java.util.Collections;
import java.util.LinkedList;
import java.util.List;

public class CommitWalObjectRequest {
public class CommitWALObjectRequest {
private long objectId;
private long objectSize;
/**
* The stream ranges of the compacted object.
*/
private List<ObjectStreamRange> streamRanges;

/**
* The stream objects which split from the compacted object.
*/
private List<StreamObject> streamObjects;
private long orderId;

public long getObjectId() {
return objectId;
}
Expand All @@ -44,10 +52,46 @@ public void setObjectSize(long objectSize) {
}

public List<ObjectStreamRange> getStreamRanges() {
if (streamRanges == null) {
return Collections.emptyList();
}
return streamRanges;
}

public void setStreamRanges(List<ObjectStreamRange> streamRanges) {
this.streamRanges = streamRanges;
}

public void addStreamRange(ObjectStreamRange streamRange) {
if (streamRanges == null) {
streamRanges = new LinkedList<>();
}
streamRanges.add(streamRange);
}

public List<StreamObject> getStreamObjects() {
if (streamObjects == null) {
return Collections.emptyList();
}
return streamObjects;
}

public void setStreamObjects(List<StreamObject> streamObjects) {
this.streamObjects = streamObjects;
}

public void addStreamObject(StreamObject streamObject) {
if (streamObjects == null) {
streamObjects = new LinkedList<>();
}
streamObjects.add(streamObject);
}

public long getOrderId() {
return orderId;
}

public void setOrderId(long orderId) {
this.orderId = orderId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,8 @@

package kafka.log.s3.objects;

import java.util.Collections;
import java.util.List;

/**
* Commit wal loose object response.
* When stream is fenced, the stream id will be added to failedStreamIds.
* Commit WAL object response.
*/
public class CommitWalObjectResponse {
private List<Long> failedStreamIds;

public List<Long> getFailedStreamIds() {
if (failedStreamIds == null) {
return Collections.emptyList();
}
return failedStreamIds;
}

public void setFailedStreamIds(List<Long> failedStreamIds) {
this.failedStreamIds = failedStreamIds;
}
public class CommitWALObjectResponse {
}
Loading