Skip to content

Commit

Permalink
feat(s3stream/wal): group commit (#422)
Browse files Browse the repository at this point in the history
Signed-off-by: Ning Yu <[email protected]>
  • Loading branch information
Chillax-0v0 authored Oct 23, 2023
1 parent d403d26 commit d5504bf
Show file tree
Hide file tree
Showing 11 changed files with 504 additions and 288 deletions.
10 changes: 0 additions & 10 deletions s3stream/src/main/java/com/automq/stream/s3/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ public class Config {
private long s3WALCapacity = 1024L * 1024 * 1024;
private int s3WALHeaderFlushIntervalSeconds = 10;
private int s3WALThread = 8;
private int s3WALQueue = 10000;
private long s3WALWindowInitial = 1048576L;
private long s3WALWindowIncrement = 4194304L;
private long s3WALWindowMax = 536870912L;
Expand Down Expand Up @@ -100,10 +99,6 @@ public int s3WALThread() {
return s3WALThread;
}

public int s3WALQueue() {
return s3WALQueue;
}

public long s3WALWindowInitial() {
return s3WALWindowInitial;
}
Expand Down Expand Up @@ -270,11 +265,6 @@ public Config s3WALThread(int s3WALThread) {
return this;
}

public Config s3WALQueue(int s3WALQueue) {
this.s3WALQueue = s3WALQueue;
return this;
}

public Config s3WALWindowInitial(long s3WALWindowInitial) {
this.s3WALWindowInitial = s3WALWindowInitial;
return this;
Expand Down
3 changes: 2 additions & 1 deletion s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
Expand Down Expand Up @@ -81,7 +82,7 @@ public class S3Storage implements Storage {
ThreadUtils.createThreadFactory("s3-storage-main-read", false), LOGGER);
private final ScheduledExecutorService backgroundExecutor = Threads.newSingleThreadScheduledExecutor(
ThreadUtils.createThreadFactory("s3-storage-background", true), LOGGER);
private final ScheduledExecutorService uploadWALExecutor = Threads.newFixedThreadPool(
private final ExecutorService uploadWALExecutor = Threads.newFixedThreadPool(
4, ThreadUtils.createThreadFactory("s3-storage-upload-wal", true), LOGGER);

private final Queue<BackoffRecord> backoffRecords = new LinkedBlockingQueue<>();
Expand Down
63 changes: 63 additions & 0 deletions s3stream/src/main/java/com/automq/stream/s3/wal/Block.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.stream.s3.wal;


import com.automq.stream.s3.wal.util.WALUtil;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

import static com.automq.stream.s3.wal.WriteAheadLog.AppendResult;

/**
* A Block contains multiple records, and will be written to the WAL in one batch.
*/
public interface Block {
/**
* The start offset of this block.
* Align to {@link WALUtil#BLOCK_SIZE}
*/
long startOffset();

/**
* Append a record to this block.
*
* @param recordSize The size of this record.
* @param recordSupplier The supplier of this record which receives the start offset of this record as the parameter.
* @param future The future of this record, which will be completed when the record is written to the WAL.
* @return The start offset of this record. If the size of this block exceeds the limit, return -1.
*/
long addRecord(long recordSize, Function<Long, ByteBuffer> recordSupplier, CompletableFuture<AppendResult.CallbackResult> future);

/**
* Futures of all records in this block.
*/
List<CompletableFuture<AppendResult.CallbackResult>> futures();

default boolean isEmpty() {
return futures().isEmpty();
}

/**
* The content of this block, which contains multiple records.
*/
ByteBuffer data();
}
108 changes: 108 additions & 0 deletions s3stream/src/main/java/com/automq/stream/s3/wal/BlockImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.stream.s3.wal;

import com.automq.stream.s3.wal.util.WALUtil;

import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

public class BlockImpl implements Block {

/**
* The soft limit of block size. (128 KiB)
* TODO make it configurable
*/
private static final long SOFT_BLOCK_SIZE_LIMIT = 1 << 17;

private final long startOffset;
/**
* The max size of this block.
* Any try to add a record to this block will fail and throw {@link BlockFullException} if the size of this block
* exceeds this limit.
*/
private final long maxSize;

/**
* The next offset to write in this block.
* Align to {@link WALUtil#BLOCK_SIZE}
*/
private long nextOffset = 0;
private ByteBuffer data = ByteBuffer.allocate(0);
private final List<CompletableFuture<WriteAheadLog.AppendResult.CallbackResult>> futures = new LinkedList<>();

public BlockImpl(long startOffset, long maxSize) {
this.startOffset = startOffset;
this.maxSize = maxSize;
}

@Override
public long startOffset() {
return startOffset;
}

/**
* Note: this method is NOT thread safe.
*/
@Override
public long addRecord(long recordSize, Function<Long, ByteBuffer> recordSupplier, CompletableFuture<WriteAheadLog.AppendResult.CallbackResult> future) {
// TODO no need to align to block size
long requiredSize = WALUtil.alignLargeByBlockSize(recordSize);
long requiredCapacity = nextOffset + requiredSize;

if (requiredCapacity > maxSize) {
return -1;
}
// if there is no record in this block, we can write a record larger than SOFT_BLOCK_SIZE_LIMIT
if (requiredCapacity > SOFT_BLOCK_SIZE_LIMIT && !futures.isEmpty()) {
return -1;
}

// scale up the buffer
// TODO use composite buffer
if (requiredCapacity > data.capacity()) {
int newCapacity = Math.max(data.capacity() * 2, (int) requiredCapacity);
ByteBuffer newData = ByteBuffer.allocate(newCapacity);
data.flip();
newData.put(data);
data = newData;
}

long offsetInBlock = nextOffset;
long recordOffset = startOffset + offsetInBlock;
nextOffset += requiredSize;
data.position((int) offsetInBlock);
data.put(recordSupplier.apply(recordOffset));
futures.add(future);

return recordOffset;
}

@Override
public List<CompletableFuture<WriteAheadLog.AppendResult.CallbackResult>> futures() {
return futures;
}

@Override
public ByteBuffer data() {
return data.duplicate().flip();
}
}
Loading

0 comments on commit d5504bf

Please sign in to comment.