Skip to content

Commit

Permalink
fix(s3stream): resolve unintended buffer reuse
Browse files Browse the repository at this point in the history
Signed-off-by: SSpirits <[email protected]>
  • Loading branch information
ShadowySpirits committed Aug 26, 2024
1 parent 738ae15 commit c1779c8
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class ByteBufAlloc {
public static final int STREAM_SET_OBJECT_COMPACTION_READ = 9;
public static final int STREAM_SET_OBJECT_COMPACTION_WRITE = 10;
public static final int BLOCK_CACHE = 11;
public static final int S3_WAL = 12;
public static ByteBufAllocMetric byteBufAllocMetric = null;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ public WriteOptions apiCallAttemptTimeout(long apiCallAttemptTimeout) {
return this;
}

// If enable the fast retry, the data buffer may be released after the write future is completed.
// Be careful to use this option, ensure that you reuse the data buffer only after
// it has been released by the writer.
public WriteOptions enableFastRetry(boolean enableFastRetry) {
this.enableFastRetry = enableFastRetry;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

package com.automq.stream.s3.wal.impl.object;

import com.automq.stream.FixedSizeByteBufPool;
import com.automq.stream.ByteBufSeqAlloc;
import com.automq.stream.s3.ByteBufAlloc;
import com.automq.stream.s3.network.ThrottleStrategy;
import com.automq.stream.s3.operator.ObjectStorage;
Expand Down Expand Up @@ -43,13 +43,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.automq.stream.s3.ByteBufAlloc.S3_WAL;
import static com.automq.stream.s3.wal.common.RecordHeader.RECORD_HEADER_SIZE;
import static com.automq.stream.s3.wal.common.RecordHeader.RECORD_HEADER_WITHOUT_CRC_SIZE;

public class ObjectWALService implements WriteAheadLog {
private static final Logger log = LoggerFactory.getLogger(ObjectWALService.class);

protected FixedSizeByteBufPool byteBufPool = new FixedSizeByteBufPool(RECORD_HEADER_SIZE, 1024 * Runtime.getRuntime().availableProcessors());
protected ByteBufSeqAlloc byteBufAlloc = new ByteBufSeqAlloc(S3_WAL, 8);
protected ObjectStorage objectStorage;
protected ObjectWALConfig config;

Expand Down Expand Up @@ -87,30 +88,24 @@ public WALMetadata metadata() {

@Override
public AppendResult append(TraceContext context, ByteBuf data, int crc) throws OverCapacityException {
ByteBuf header = byteBufPool.get();
ByteBuf header = byteBufAlloc.byteBuffer(RECORD_HEADER_SIZE);
assert header.refCnt() == 1;

final CompletableFuture<AppendResult.CallbackResult> appendResultFuture = new CompletableFuture<>();
appendResultFuture.whenComplete((result, cause) -> {
// Return the header buffer to the buffer pool.
assert header.refCnt() == 1;
byteBufPool.release(header);
});

try {
final long recordSize = RECORD_HEADER_SIZE + data.readableBytes();

long expectedWriteOffset = accumulator.append(recordSize, start -> {
CompositeByteBuf recordByteBuf = ByteBufAlloc.compositeByteBuffer();
Record record = WALUtil.generateRecord(data, header.retainedDuplicate(), 0, start, true);
Record record = WALUtil.generateRecord(data, header, 0, start, true);
recordByteBuf.addComponents(true, record.header(), record.body());
return recordByteBuf;
}, appendResultFuture);

return new AppendResultImpl(expectedWriteOffset, appendResultFuture);
} catch (Exception e) {
// Make sure the header buffer and data buffer is released.
if (header.refCnt() > 1) {
if (header.refCnt() > 0) {
header.release();
} else {
log.error("[Bug] The header buffer is already released.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,8 +490,10 @@ assert record != null;
int objectLength = objectBuffer.readableBytes();
uploadMap.put(firstOffset, recordList);

// Enable fast retry.
ObjectStorage.WriteOptions writeOptions = new ObjectStorage.WriteOptions().enableFastRetry(true);
String path = objectPrefix + firstOffset;
CompletableFuture<ObjectStorage.WriteResult> uploadFuture = objectStorage.write(new ObjectStorage.WriteOptions().enableFastRetry(true), path, objectBuffer);
CompletableFuture<ObjectStorage.WriteResult> uploadFuture = objectStorage.write(writeOptions, path, objectBuffer);

CompletableFuture<Void> finalFuture = recordUploadMetrics(uploadFuture, startTime, objectLength)
.thenAccept(result -> {
Expand Down

0 comments on commit c1779c8

Please sign in to comment.