Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stream-client): minor compact #28

Merged
merged 4 commits into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 229 additions & 0 deletions core/src/main/scala/kafka/log/s3/MinorCompactTask.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.log.s3;

import kafka.log.s3.model.StreamRecordBatch;
import kafka.log.s3.objects.CommitCompactObjectRequest;
import kafka.log.s3.objects.ObjectManager;
import kafka.log.s3.objects.ObjectStreamRange;
import kafka.log.s3.objects.StreamObject;
import kafka.log.s3.operator.S3Operator;
import org.apache.kafka.common.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

class MinorCompactTask implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(MinorCompactTask.class);
private static final long NOOP_TIMESTAMP = -1L;
private final long compactSizeThreshold;
private final long maxCompactInterval;
private final int streamSplitSizeThreshold;
private final BlockingQueue<MinorCompactPart> waitingCompactRecords = new LinkedBlockingQueue<>();
private final AtomicLong waitingCompactRecordsBytesSize = new AtomicLong();
private volatile long lastCompactTimestamp = NOOP_TIMESTAMP;
private final ObjectManager objectManager;
private final S3Operator s3Operator;
private final ScheduledExecutorService schedule = Executors.newSingleThreadScheduledExecutor(
ThreadUtils.createThreadFactory("minor compact", true));

public MinorCompactTask(long compactSizeThreshold, long maxCompactInterval, int streamSplitSizeThreshold, ObjectManager objectManager, S3Operator s3Operator) {
this.compactSizeThreshold = compactSizeThreshold;
this.maxCompactInterval = maxCompactInterval;
this.streamSplitSizeThreshold = streamSplitSizeThreshold;
// TODO: close
schedule.scheduleAtFixedRate(this, 1, 1, TimeUnit.SECONDS);
this.objectManager = objectManager;
this.s3Operator = s3Operator;
}

public void tryCompact(MinorCompactPart part) {
// TODO: back pressure
if (lastCompactTimestamp == NOOP_TIMESTAMP) {
lastCompactTimestamp = System.currentTimeMillis();
}
waitingCompactRecords.add(part);
if (waitingCompactRecordsBytesSize.addAndGet(part.size) >= compactSizeThreshold) {
schedule.execute(() -> tryCompact0(false));
}
}

@Override
public void run() {
tryCompact0(false);
}

public void close() {
try {
schedule.submit(() -> tryCompact0(true)).get();
schedule.shutdown();
} catch (Throwable e) {
LOGGER.error("minor compact fail", e);
}
}

private void tryCompact0(boolean force) {
long now = System.currentTimeMillis();
boolean timeout = lastCompactTimestamp != NOOP_TIMESTAMP && (now - lastCompactTimestamp) >= maxCompactInterval;
boolean sizeExceed = waitingCompactRecordsBytesSize.get() >= compactSizeThreshold;
if (!force && !sizeExceed && !timeout) {
return;
}
try {
List<MinorCompactPart> parts = new ArrayList<>(waitingCompactRecords.size());

waitingCompactRecords.drainTo(parts);
lastCompactTimestamp = now;
waitingCompactRecordsBytesSize.getAndAdd(-parts.stream().mapToLong(r -> r.size).sum());
if (parts.isEmpty()) {
return;
}

CommitCompactObjectRequest compactRequest = new CommitCompactObjectRequest();
compactRequest.setCompactedObjectIds(parts.stream().map(p -> p.walObjectId).collect(Collectors.toList()));

long objectId = objectManager.prepareObject(1, TimeUnit.SECONDS.toMillis(30)).get();
ObjectWriter minorCompactObject = new ObjectWriter(objectId, s3Operator);

List<CompletableFuture<StreamObject>> streamObjectCfList = new LinkedList<>();
List<List<StreamRecordBatch>> streamRecordsList = sortAndSplit(parts);
for (List<StreamRecordBatch> streamRecords : streamRecordsList) {
long streamSize = streamRecords.stream().mapToLong(r -> r.getRecordBatch().rawPayload().remaining()).sum();
if (streamSize >= streamSplitSizeThreshold) {
streamObjectCfList.add(writeStreamObject(streamRecords));
} else {
for (StreamRecordBatch record : streamRecords) {
minorCompactObject.write(record);
}
long streamId = streamRecords.get(0).getStreamId();
long startOffset = streamRecords.get(0).getBaseOffset();
long endOffset = streamRecords.get(streamRecords.size() - 1).getLastOffset();
compactRequest.addStreamRange(new ObjectStreamRange(streamId, -1L, startOffset, endOffset));
// minor compact object block only contain single stream's data.
minorCompactObject.closeCurrentBlock();
}
}
minorCompactObject.close().get();

compactRequest.setObjectId(objectId);
compactRequest.setObjectSize(minorCompactObject.size());

CompletableFuture.allOf(streamObjectCfList.toArray(new CompletableFuture[0])).get();
for (CompletableFuture<StreamObject> cf : streamObjectCfList) {
compactRequest.addStreamObject(cf.get());
}

objectManager.commitMinorCompactObject(compactRequest).get();
} catch (Throwable e) {
//TODO: handle exception, only expect fail when quit.
LOGGER.error("minor compact fail", e);
}

}

private CompletableFuture<StreamObject> writeStreamObject(List<StreamRecordBatch> streamRecords) {
CompletableFuture<Long> objectIdCf = objectManager.prepareObject(1, TimeUnit.MINUTES.toMillis(30));
return objectIdCf.thenCompose(objectId -> {
ObjectWriter streamObjectWriter = new ObjectWriter(objectId, s3Operator);
for (StreamRecordBatch record : streamRecords) {
streamObjectWriter.write(record);
}
long streamId = streamRecords.get(0).getStreamId();
long startOffset = streamRecords.get(0).getBaseOffset();
long endOffset = streamRecords.get(streamRecords.size() - 1).getLastOffset();
StreamObject streamObject = new StreamObject();
streamObject.setObjectId(objectId);
streamObject.setStreamId(streamId);
streamObject.setStartOffset(startOffset);
streamObject.setEndOffset(endOffset);
return streamObjectWriter.close().thenApply(nil -> {
streamObject.setObjectSize(streamObjectWriter.size());
return streamObject;
});
});
}

/**
* Sort records and split them in (stream, epoch) dimension.
* ex.
* part0: s1-e0-m1 s1-e0-m2 s2-e0-m1 s2-e0-m2
* part1: s1-e0-m3 s1-e0-m4
* part2: s1-e1-m10 s1-e1-m11
* after split:
* list0: s1-e0-m1 s1-e0-m2 s1-e0-m3 s1-e0-m4
* list1: s1-e1-m10 s1-e1-m11
* list2: s2-e0-m1 s2-e0-m3
*/
private List<List<StreamRecordBatch>> sortAndSplit(List<MinorCompactPart> parts) {
int count = parts.stream().mapToInt(p -> p.records.size()).sum();
// TODO: more efficient sort
List<StreamRecordBatch> sortedList = new ArrayList<>(count);
for (MinorCompactPart part : parts) {
sortedList.addAll(part.records);
}
Collections.sort(sortedList);
List<List<StreamRecordBatch>> streamRecordsList = new ArrayList<>(1024);
long streamId = -1L;
long epoch = -1L;
List<StreamRecordBatch> streamRecords = null;
for (StreamRecordBatch record : sortedList) {
long recordStreamId = record.getStreamId();
long recordEpoch = record.getEpoch();
if (recordStreamId != streamId || recordEpoch != epoch) {
if (streamRecords != null) {
streamRecordsList.add(streamRecords);
}
streamRecords = new LinkedList<>();
streamId = recordStreamId;
epoch = recordEpoch;
}
if (streamRecords != null) {
streamRecords.add(record);
}
}
if (streamRecords != null) {
streamRecordsList.add(streamRecords);
}
return streamRecordsList;
}


static class MinorCompactPart {
long walObjectId;
List<StreamRecordBatch> records;
long size;

public MinorCompactPart(long walObjectId, List<StreamRecordBatch> records) {
this.walObjectId = walObjectId;
this.records = new ArrayList<>(records);
this.size = records.stream().mapToLong(r -> r.getRecordBatch().rawPayload().remaining()).sum();
}
}
}
29 changes: 20 additions & 9 deletions core/src/main/scala/kafka/log/s3/ObjectWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import kafka.log.s3.objects.ObjectStreamRange;
import kafka.log.s3.operator.S3Operator;
import kafka.log.s3.operator.Writer;
import kafka.log.s3.utils.ObjectUtils;
import org.apache.kafka.common.compress.ZstdFactory;
import org.apache.kafka.common.utils.ByteBufferOutputStream;

Expand All @@ -35,32 +36,31 @@

// TODO: memory optimization
public class ObjectWriter {
private int blockSizeThreshold;
private int partSizeThreshold;
private final S3Operator s3Operator;
private final int blockSizeThreshold;
private final int partSizeThreshold;
private final List<DataBlock> waitingUploadBlocks;
private final List<DataBlock> completedBlocks;
private IndexBlock indexBlock;
private final Writer writer;
private final String objectKey;
private final long objectId;
private long nextDataBlockPosition;

private long size;

private DataBlock dataBlock;

public ObjectWriter(String objectKey, S3Operator s3Operator, int blockSizeThreshold, int partSizeThreshold) {
this.objectKey = objectKey;
this.s3Operator = s3Operator;
public ObjectWriter(long objectId, S3Operator s3Operator, int blockSizeThreshold, int partSizeThreshold) {
this.objectId = objectId;
String objectKey = ObjectUtils.genKey(0, "todocluster", objectId);
this.blockSizeThreshold = blockSizeThreshold;
this.partSizeThreshold = partSizeThreshold;
waitingUploadBlocks = new LinkedList<>();
completedBlocks = new LinkedList<>();
writer = s3Operator.writer(objectKey);
}

public ObjectWriter(String objectKey, S3Operator s3Operator) {
this(objectKey, s3Operator, 16 * 1024 * 1024, 32 * 1024 * 1024);
public ObjectWriter(long objectId, S3Operator s3Operator) {
this(objectId, s3Operator, 16 * 1024 * 1024, 32 * 1024 * 1024);
}

public void write(StreamRecordBatch record) {
Expand All @@ -75,6 +75,13 @@ public void write(StreamRecordBatch record) {
}
}

public void closeCurrentBlock() {
if (dataBlock != null) {
dataBlock.close();
dataBlock = null;
}
}

private void tryUploadPart() {
long waitingUploadSize = waitingUploadBlocks.stream().mapToLong(DataBlock::size).sum();
if (waitingUploadSize >= partSizeThreshold) {
Expand Down Expand Up @@ -127,6 +134,10 @@ public List<ObjectStreamRange> getStreamRanges() {
return streamRanges;
}

public long objectId() {
return objectId;
}

public long size() {
return size;
}
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/scala/kafka/log/s3/S3Wal.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@ public class S3Wal implements Wal {
private final int batchIntervalMs = 200;
private final BlockingQueue<WalWriteRequest> writeBuffer;
private final WalBatchWriteTask walBatchWriteTask;
private final MinorCompactTask minorCompactTask;
private final ObjectManager objectManager;
private final S3Operator s3Operator;


public S3Wal(ObjectManager objectManager, S3Operator s3Operator) {
writeBuffer = new ArrayBlockingQueue<>(16384);
walBatchWriteTask = new WalBatchWriteTask(objectManager, s3Operator);
minorCompactTask = new MinorCompactTask(5L * 1024 * 1024 * 1024, 60, 16 * 1024 * 1024, objectManager, s3Operator);
this.objectManager = objectManager;
this.s3Operator = s3Operator;
}
Expand All @@ -59,7 +61,7 @@ public void close() {
@Override
public CompletableFuture<Void> append(StreamRecordBatch streamRecord) {
CompletableFuture<Void> cf = new CompletableFuture<>();
//TODO: copy to pooled bytebuffer to reduce gc
//TODO: copy to pooled bytebuffer to reduce gc, convert to flat record
try {
writeBuffer.put(new WalWriteRequest(streamRecord, cf));
} catch (InterruptedException e) {
Expand Down Expand Up @@ -124,6 +126,7 @@ void tryComplete() {
if (task.isDone()) {
writeTasks.poll();
task.ack();
minorCompactTask.tryCompact(new MinorCompactTask.MinorCompactPart(task.objectId(), task.records()));
} else {
return;
}
Expand Down
15 changes: 12 additions & 3 deletions core/src/main/scala/kafka/log/s3/SingleWalObjectWriteTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,19 @@

import com.automq.elasticstream.client.api.ElasticStreamClientException;
import com.automq.elasticstream.client.flatc.header.ErrorCode;
import kafka.log.s3.model.StreamRecordBatch;
import kafka.log.s3.objects.CommitWalObjectRequest;
import kafka.log.s3.objects.CommitWalObjectResponse;
import kafka.log.s3.objects.ObjectManager;
import kafka.log.s3.operator.S3Operator;
import kafka.log.s3.utils.ObjectUtils;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class SingleWalObjectWriteTask {
private final List<WalWriteRequest> requests;
Expand All @@ -40,6 +41,7 @@ public class SingleWalObjectWriteTask {
private CommitWalObjectResponse response;
private volatile boolean isDone = false;


public SingleWalObjectWriteTask(List<WalWriteRequest> records, ObjectManager objectManager, S3Operator s3Operator) {
Collections.sort(records);
this.requests = records;
Expand All @@ -57,8 +59,7 @@ public CompletableFuture<Void> upload() {
CompletableFuture<Void> writeCf = objectIdCf.thenCompose(objectId -> {
context.objectId = objectId;
// TODO: fill cluster name
String objectKey = ObjectUtils.genKey(0, "todocluster", objectId);
objectWriter = new ObjectWriter(objectKey, s3Operator);
objectWriter = new ObjectWriter(objectId, s3Operator);
for (WalWriteRequest request : requests) {
objectWriter.write(request.record);
}
Expand Down Expand Up @@ -95,6 +96,14 @@ public void ack() {
}
}

public long objectId() {
return objectWriter.objectId();
}

public List<StreamRecordBatch> records() {
return requests.stream().map(r -> r.record).collect(Collectors.toList());
}

static class UploadContext {
long objectId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ private CompletableFuture<ReadDataBlock> read0(long streamId, long endOffset, Re
context.objects = objectManager.getObjects(streamId, context.nextStartOffset, endOffset, 2);
context.objectIndex = 0;
}
// TODO: handle minor/major compact object may contain non-continuous stream records.
// previous object is completed read o or is the first object.
context.reader = new ObjectReader(context.objects.get(context.objectIndex), s3Operator);
context.objectIndex++;
Expand Down
Loading