Skip to content

Commit

Permalink
feat(stream-client): integrate minor compact to WAL
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx committed Aug 28, 2023
1 parent f4fd334 commit 60c6553
Show file tree
Hide file tree
Showing 11 changed files with 236 additions and 68 deletions.
137 changes: 98 additions & 39 deletions core/src/main/scala/kafka/log/s3/MinorCompactTask.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.log.s3;

import kafka.log.s3.model.StreamRecordBatch;
Expand All @@ -6,23 +23,25 @@
import kafka.log.s3.objects.ObjectStreamRange;
import kafka.log.s3.objects.StreamObject;
import kafka.log.s3.operator.S3Operator;
import kafka.log.s3.utils.ObjectUtils;
import org.apache.kafka.common.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;

class MinorCompactTask implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(MinorCompactTask.class);
private static final long NOOP_TIMESTAMP = -1L;
private final long compactSizeThreshold;
private final long maxCompactInterval;
Expand Down Expand Up @@ -52,99 +71,138 @@ public void tryCompact(MinorCompactPart part) {
}
waitingCompactRecords.add(part);
if (waitingCompactRecordsBytesSize.addAndGet(part.size) >= compactSizeThreshold) {
schedule.execute(this::tryCompact0);
schedule.execute(() -> tryCompact0(false));
}
}

@Override
public void run() {
tryCompact0();
tryCompact0(false);
}

public void close() {
try {
schedule.submit(() -> tryCompact0(true)).get();
schedule.shutdown();
} catch (Throwable e) {
LOGGER.error("minor compact fail", e);
}
}

private void tryCompact0() {
private void tryCompact0(boolean force) {
long now = System.currentTimeMillis();
boolean timeout = lastCompactTimestamp != NOOP_TIMESTAMP && (now - lastCompactTimestamp) >= maxCompactInterval;
boolean sizeExceed = waitingCompactRecordsBytesSize.get() >= compactSizeThreshold;
if (!sizeExceed && !timeout) {
if (!force && !sizeExceed && !timeout) {
return;
}
try {
CommitCompactObjectRequest compactReq = new CommitCompactObjectRequest();
List<MinorCompactPart> parts = new ArrayList<>(waitingCompactRecords.size());

waitingCompactRecords.drainTo(parts);

lastCompactTimestamp = now;
waitingCompactRecordsBytesSize.getAndAdd(-parts.stream().mapToLong(r -> r.size).sum());
if (parts.isEmpty()) {
return;
}

compactReq.setCompactedObjectIds(parts.stream().map(p -> p.walObjectId).collect(Collectors.toList()));
CommitCompactObjectRequest compactRequest = new CommitCompactObjectRequest();
compactRequest.setCompactedObjectIds(parts.stream().map(p -> p.walObjectId).collect(Collectors.toList()));

long objectId = objectManager.prepareObject(1, TimeUnit.SECONDS.toMillis(30)).get();
compactReq.setObjectId(objectId);
String objectKey = ObjectUtils.genKey(0, "todocluster", objectId);
ObjectWriter minorCompactObject = new ObjectWriter(objectKey, s3Operator);
ObjectWriter minorCompactObject = new ObjectWriter(objectId, s3Operator);

List<CompletableFuture<StreamObject>> streamObjectCfList = new LinkedList<>();
List<List<StreamRecordBatch>> streamRecordsList = sortAndSplit(parts);
for (List<StreamRecordBatch> streamRecords : streamRecordsList) {
long streamSize = streamRecords.stream().mapToLong(r -> r.getRecordBatch().rawPayload().remaining()).sum();
if (streamSize >= streamSplitSizeThreshold) {
long streamObjectId = objectManager.prepareObject(1, TimeUnit.SECONDS.toMillis(30)).get();
String streamObjectKey = ObjectUtils.genKey(0, "todocluster", streamObjectId);
ObjectWriter streamObjectWriter = new ObjectWriter(streamObjectKey, s3Operator);
for (StreamRecordBatch record: streamRecords) {
streamObjectWriter.write(record);
}
// TODO: parallel
streamObjectWriter.close().get();
long streamId = streamRecords.get(0).getStreamId();
long startOffset = streamRecords.get(0).getBaseOffset();
long endOffset = streamRecords.get(streamRecords.size() -1).getLastOffset();
StreamObject streamObject = new StreamObject();
streamObject.setObjectId(streamObjectId);
streamObject.setObjectSize(streamObjectWriter.size());
streamObject.setStreamId(streamId);
streamObject.setStartOffset(startOffset);
streamObject.setEndOffset(endOffset);
compactReq.addStreamObject(streamObject);
streamObjectCfList.add(writeStreamObject(streamRecords));
} else {
for (StreamRecordBatch record: streamRecords) {
for (StreamRecordBatch record : streamRecords) {
minorCompactObject.write(record);
}
long streamId = streamRecords.get(0).getStreamId();
long startOffset = streamRecords.get(0).getBaseOffset();
long endOffset = streamRecords.get(streamRecords.size() -1).getLastOffset();
compactReq.addStreamRange(new ObjectStreamRange(streamId, -1L, startOffset, endOffset));
long endOffset = streamRecords.get(streamRecords.size() - 1).getLastOffset();
compactRequest.addStreamRange(new ObjectStreamRange(streamId, -1L, startOffset, endOffset));
// minor compact object block only contain single stream's data.
minorCompactObject.closeCurrentBlock();
}
}
minorCompactObject.close().get();
compactReq.setObjectSize(minorCompactObject.size());

compactRequest.setObjectId(objectId);
compactRequest.setObjectSize(minorCompactObject.size());

CompletableFuture.allOf(streamObjectCfList.toArray(new CompletableFuture[0])).get();
for (CompletableFuture<StreamObject> cf : streamObjectCfList) {
compactRequest.addStreamObject(cf.get());
}

objectManager.commitMinorCompactObject(compactRequest).get();
} catch (Throwable e) {
//TODO: retry
//TODO: handle exception, only expect fail when quit.
LOGGER.error("minor compact fail", e);
}

}

private CompletableFuture<StreamObject> writeStreamObject(List<StreamRecordBatch> streamRecords) {
CompletableFuture<Long> objectIdCf = objectManager.prepareObject(1, TimeUnit.MINUTES.toMillis(30));
return objectIdCf.thenCompose(objectId -> {
ObjectWriter streamObjectWriter = new ObjectWriter(objectId, s3Operator);
for (StreamRecordBatch record : streamRecords) {
streamObjectWriter.write(record);
}
long streamId = streamRecords.get(0).getStreamId();
long startOffset = streamRecords.get(0).getBaseOffset();
long endOffset = streamRecords.get(streamRecords.size() - 1).getLastOffset();
StreamObject streamObject = new StreamObject();
streamObject.setObjectId(objectId);
streamObject.setStreamId(streamId);
streamObject.setStartOffset(startOffset);
streamObject.setEndOffset(endOffset);
return streamObjectWriter.close().thenApply(nil -> {
streamObject.setObjectSize(streamObjectWriter.size());
return streamObject;
});
});
}

/**
* Sort records and split them in (stream, epoch) dimension.
* ex.
* part0: s1-e0-m1 s1-e0-m2 s2-e0-m1 s2-e0-m2
* part1: s1-e0-m3 s1-e0-m4
* part2: s1-e1-m10 s1-e1-m11
* after split:
* list0: s1-e0-m1 s1-e0-m2 s1-e0-m3 s1-e0-m4
* list1: s1-e1-m10 s1-e1-m11
* list2: s2-e0-m1 s2-e0-m3
*/
private List<List<StreamRecordBatch>> sortAndSplit(List<MinorCompactPart> parts) {
int count = parts.stream().mapToInt(p -> p.records.size()).sum();
// TODO: more efficient sort
List<StreamRecordBatch> sortedList = new ArrayList<>(count);
for (MinorCompactPart part: parts) {
for (MinorCompactPart part : parts) {
sortedList.addAll(part.records);
}
Collections.sort(sortedList);
List<List<StreamRecordBatch>> streamRecordsList = new ArrayList<>(1024);
long streamId = -1L;
long epoch = -1L;
List<StreamRecordBatch> streamRecords = null;
// TODO: seperate different epoch
for (StreamRecordBatch record: sortedList) {
for (StreamRecordBatch record : sortedList) {
long recordStreamId = record.getStreamId();
if (recordStreamId != streamId) {
long recordEpoch = record.getEpoch();
if (recordStreamId != streamId || recordEpoch != epoch) {
if (streamRecords != null) {
streamRecordsList.add(streamRecords);
}
streamRecords = new LinkedList<>();
streamId = recordStreamId;
epoch = recordEpoch;
}
if (streamRecords != null) {
streamRecords.add(record);
Expand All @@ -156,6 +214,7 @@ private List<List<StreamRecordBatch>> sortAndSplit(List<MinorCompactPart> parts)
return streamRecordsList;
}


static class MinorCompactPart {
long walObjectId;
List<StreamRecordBatch> records;
Expand Down
16 changes: 12 additions & 4 deletions core/src/main/scala/kafka/log/s3/ObjectWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import kafka.log.s3.model.StreamRecordBatch;
import kafka.log.s3.objects.ObjectManager;
import kafka.log.s3.objects.ObjectStreamRange;
import kafka.log.s3.operator.S3Operator;
import kafka.log.s3.operator.Writer;
import kafka.log.s3.utils.ObjectUtils;
import org.apache.kafka.common.compress.ZstdFactory;
import org.apache.kafka.common.utils.ByteBufferOutputStream;

Expand All @@ -42,15 +44,17 @@ public class ObjectWriter {
private final List<DataBlock> completedBlocks;
private IndexBlock indexBlock;
private final Writer writer;
private final long objectId;
private final String objectKey;
private long nextDataBlockPosition;

private long size;

private DataBlock dataBlock;

public ObjectWriter(String objectKey, S3Operator s3Operator, int blockSizeThreshold, int partSizeThreshold) {
this.objectKey = objectKey;
public ObjectWriter(long objectId, S3Operator s3Operator, int blockSizeThreshold, int partSizeThreshold) {
this.objectId = objectId;
this.objectKey = ObjectUtils.genKey(0, "todocluster", objectId);
this.s3Operator = s3Operator;
this.blockSizeThreshold = blockSizeThreshold;
this.partSizeThreshold = partSizeThreshold;
Expand All @@ -59,8 +63,8 @@ public ObjectWriter(String objectKey, S3Operator s3Operator, int blockSizeThresh
writer = s3Operator.writer(objectKey);
}

public ObjectWriter(String objectKey, S3Operator s3Operator) {
this(objectKey, s3Operator, 16 * 1024 * 1024, 32 * 1024 * 1024);
public ObjectWriter(long objectId, S3Operator s3Operator) {
this(objectId, s3Operator, 16 * 1024 * 1024, 32 * 1024 * 1024);
}

public void write(StreamRecordBatch record) {
Expand Down Expand Up @@ -134,6 +138,10 @@ public List<ObjectStreamRange> getStreamRanges() {
return streamRanges;
}

public long objectId() {
return objectId;
}

public long size() {
return size;
}
Expand Down
11 changes: 4 additions & 7 deletions core/src/main/scala/kafka/log/s3/S3Wal.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,40 +20,36 @@
import kafka.log.s3.model.StreamRecordBatch;
import kafka.log.s3.objects.ObjectManager;
import kafka.log.s3.operator.S3Operator;
import kafka.log.s3.utils.ObjectUtils;
import org.apache.kafka.common.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;
import java.util.concurrent.atomic.AtomicLong;

public class S3Wal implements Wal {
private static final Logger LOGGER = LoggerFactory.getLogger(S3Wal.class);
private final int batchIntervalMs = 200;
private final BlockingQueue<WalWriteRequest> writeBuffer;
private final WalBatchWriteTask walBatchWriteTask;
private final MinorCompactTask minorCompactTask;
private final ObjectManager objectManager;
private final S3Operator s3Operator;


public S3Wal(ObjectManager objectManager, S3Operator s3Operator) {
writeBuffer = new ArrayBlockingQueue<>(16384);
walBatchWriteTask = new WalBatchWriteTask(objectManager, s3Operator);
minorCompactTask = new MinorCompactTask(5L * 1024 * 1024 * 1024, 60, 16 * 1024 * 1024
, objectManager, s3Operator);
this.objectManager = objectManager;
this.s3Operator = s3Operator;
}
Expand Down Expand Up @@ -131,6 +127,7 @@ void tryComplete() {
if (task.isDone()) {
writeTasks.poll();
task.ack();
minorCompactTask.tryCompact(new MinorCompactTask.MinorCompactPart(task.objectId(), task.records()));
} else {
return;
}
Expand Down
14 changes: 12 additions & 2 deletions core/src/main/scala/kafka/log/s3/SingleWalObjectWriteTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.automq.elasticstream.client.api.ElasticStreamClientException;
import com.automq.elasticstream.client.flatc.header.ErrorCode;
import kafka.log.s3.model.StreamRecordBatch;
import kafka.log.s3.objects.CommitWalObjectRequest;
import kafka.log.s3.objects.CommitWalObjectResponse;
import kafka.log.s3.objects.ObjectManager;
Expand All @@ -31,6 +32,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class SingleWalObjectWriteTask {
private final List<WalWriteRequest> requests;
Expand All @@ -40,6 +42,7 @@ public class SingleWalObjectWriteTask {
private CommitWalObjectResponse response;
private volatile boolean isDone = false;


public SingleWalObjectWriteTask(List<WalWriteRequest> records, ObjectManager objectManager, S3Operator s3Operator) {
Collections.sort(records);
this.requests = records;
Expand All @@ -57,8 +60,7 @@ public CompletableFuture<Void> upload() {
CompletableFuture<Void> writeCf = objectIdCf.thenCompose(objectId -> {
context.objectId = objectId;
// TODO: fill cluster name
String objectKey = ObjectUtils.genKey(0, "todocluster", objectId);
objectWriter = new ObjectWriter(objectKey, s3Operator);
objectWriter = new ObjectWriter(objectId, s3Operator);
for (WalWriteRequest request : requests) {
objectWriter.write(request.record);
}
Expand Down Expand Up @@ -95,6 +97,14 @@ public void ack() {
}
}

public long objectId() {
return objectWriter.objectId();
}

public List<StreamRecordBatch> records() {
return requests.stream().map(r -> r.record).collect(Collectors.toList());
}

static class UploadContext {
long objectId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ private CompletableFuture<ReadDataBlock> read0(long streamId, long endOffset, Re
context.objects = objectManager.getObjects(streamId, context.nextStartOffset, endOffset, 2);
context.objectIndex = 0;
}
// TODO: handle minor/major compact object may contain non-continuous stream records.
// previous object is completed read o or is the first object.
context.reader = new ObjectReader(context.objects.get(context.objectIndex), s3Operator);
context.objectIndex++;
Expand Down
Loading

0 comments on commit 60c6553

Please sign in to comment.