Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(s3stream): resolve unintended buffer reuse #1945

Merged
merged 1 commit into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class ByteBufAlloc {
public static final int STREAM_SET_OBJECT_COMPACTION_READ = 9;
public static final int STREAM_SET_OBJECT_COMPACTION_WRITE = 10;
public static final int BLOCK_CACHE = 11;
public static final int S3_WAL = 12;
public static ByteBufAllocMetric byteBufAllocMetric = null;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ public WriteOptions apiCallAttemptTimeout(long apiCallAttemptTimeout) {
return this;
}

// If enable the fast retry, the data buffer may be released after the write future is completed.
// Be careful to use this option, ensure that you reuse the data buffer only after
// it has been released by the writer.
public WriteOptions enableFastRetry(boolean enableFastRetry) {
this.enableFastRetry = enableFastRetry;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

package com.automq.stream.s3.wal.impl.object;

import com.automq.stream.FixedSizeByteBufPool;
import com.automq.stream.ByteBufSeqAlloc;
import com.automq.stream.s3.ByteBufAlloc;
import com.automq.stream.s3.network.ThrottleStrategy;
import com.automq.stream.s3.operator.ObjectStorage;
Expand Down Expand Up @@ -43,13 +43,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.automq.stream.s3.ByteBufAlloc.S3_WAL;
import static com.automq.stream.s3.wal.common.RecordHeader.RECORD_HEADER_SIZE;
import static com.automq.stream.s3.wal.common.RecordHeader.RECORD_HEADER_WITHOUT_CRC_SIZE;

public class ObjectWALService implements WriteAheadLog {
private static final Logger log = LoggerFactory.getLogger(ObjectWALService.class);

protected FixedSizeByteBufPool byteBufPool = new FixedSizeByteBufPool(RECORD_HEADER_SIZE, 1024 * Runtime.getRuntime().availableProcessors());
protected ByteBufSeqAlloc byteBufAlloc = new ByteBufSeqAlloc(S3_WAL, 8);
protected ObjectStorage objectStorage;
protected ObjectWALConfig config;

Expand Down Expand Up @@ -87,30 +88,24 @@ public WALMetadata metadata() {

@Override
public AppendResult append(TraceContext context, ByteBuf data, int crc) throws OverCapacityException {
ByteBuf header = byteBufPool.get();
ByteBuf header = byteBufAlloc.byteBuffer(RECORD_HEADER_SIZE);
assert header.refCnt() == 1;

final CompletableFuture<AppendResult.CallbackResult> appendResultFuture = new CompletableFuture<>();
appendResultFuture.whenComplete((result, cause) -> {
// Return the header buffer to the buffer pool.
assert header.refCnt() == 1;
byteBufPool.release(header);
});

try {
final long recordSize = RECORD_HEADER_SIZE + data.readableBytes();

long expectedWriteOffset = accumulator.append(recordSize, start -> {
CompositeByteBuf recordByteBuf = ByteBufAlloc.compositeByteBuffer();
Record record = WALUtil.generateRecord(data, header.retainedDuplicate(), 0, start, true);
Record record = WALUtil.generateRecord(data, header, 0, start, true);
recordByteBuf.addComponents(true, record.header(), record.body());
return recordByteBuf;
}, appendResultFuture);

return new AppendResultImpl(expectedWriteOffset, appendResultFuture);
} catch (Exception e) {
// Make sure the header buffer and data buffer is released.
if (header.refCnt() > 1) {
if (header.refCnt() > 0) {
header.release();
} else {
log.error("[Bug] The header buffer is already released.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,8 +490,10 @@ assert record != null;
int objectLength = objectBuffer.readableBytes();
uploadMap.put(firstOffset, recordList);

// Enable fast retry.
ObjectStorage.WriteOptions writeOptions = new ObjectStorage.WriteOptions().enableFastRetry(true);
String path = objectPrefix + firstOffset;
CompletableFuture<ObjectStorage.WriteResult> uploadFuture = objectStorage.write(new ObjectStorage.WriteOptions().enableFastRetry(true), path, objectBuffer);
CompletableFuture<ObjectStorage.WriteResult> uploadFuture = objectStorage.write(writeOptions, path, objectBuffer);

CompletableFuture<Void> finalFuture = recordUploadMetrics(uploadFuture, startTime, objectLength)
.thenAccept(result -> {
Expand Down
Loading