From 764a4cfca69d33d50d4f07db831836c4f66c52f3 Mon Sep 17 00:00:00 2001 From: Yu Ning <78631860+Chillax-0v0@users.noreply.github.com> Date: Mon, 23 Oct 2023 10:37:41 +0800 Subject: [PATCH] feat(s3stream/wal): group commit (#422) Signed-off-by: Ning Yu --- .../java/com/automq/stream/s3/Config.java | 10 - .../java/com/automq/stream/s3/S3Storage.java | 3 +- .../java/com/automq/stream/s3/wal/Block.java | 63 ++++ .../com/automq/stream/s3/wal/BlockImpl.java | 108 ++++++ .../automq/stream/s3/wal/BlockWALService.java | 112 +++--- .../stream/s3/wal/SlidingWindowService.java | 318 ++++++++++++------ .../automq/stream/s3/wal/WriteRecordTask.java | 38 --- .../stream/s3/wal/util/ThreadFactoryImpl.java | 54 --- .../com/automq/stream/utils/FutureUtil.java | 13 + .../java/com/automq/stream/utils/Threads.java | 14 +- .../stream/s3/wal/BlockWALServiceTest.java | 59 +++- 11 files changed, 504 insertions(+), 288 deletions(-) create mode 100644 s3stream/src/main/java/com/automq/stream/s3/wal/Block.java create mode 100644 s3stream/src/main/java/com/automq/stream/s3/wal/BlockImpl.java delete mode 100644 s3stream/src/main/java/com/automq/stream/s3/wal/WriteRecordTask.java delete mode 100644 s3stream/src/main/java/com/automq/stream/s3/wal/util/ThreadFactoryImpl.java diff --git a/s3stream/src/main/java/com/automq/stream/s3/Config.java b/s3stream/src/main/java/com/automq/stream/s3/Config.java index 3dabf2d9b3..d5be065bf0 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/Config.java +++ b/s3stream/src/main/java/com/automq/stream/s3/Config.java @@ -31,7 +31,6 @@ public class Config { private long s3WALCapacity = 1024L * 1024 * 1024; private int s3WALHeaderFlushIntervalSeconds = 10; private int s3WALThread = 8; - private int s3WALQueue = 10000; private long s3WALWindowInitial = 1048576L; private long s3WALWindowIncrement = 4194304L; private long s3WALWindowMax = 536870912L; @@ -100,10 +99,6 @@ public int s3WALThread() { return s3WALThread; } - public int s3WALQueue() { - return s3WALQueue; - } - public long s3WALWindowInitial() { return s3WALWindowInitial; } @@ -270,11 +265,6 @@ public Config s3WALThread(int s3WALThread) { return this; } - public Config s3WALQueue(int s3WALQueue) { - this.s3WALQueue = s3WALQueue; - return this; - } - public Config s3WALWindowInitial(long s3WALWindowInitial) { this.s3WALWindowInitial = s3WALWindowInitial; return this; diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index 08aa0a7fd8..6bd9e51fc4 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -51,6 +51,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -81,7 +82,7 @@ public class S3Storage implements Storage { ThreadUtils.createThreadFactory("s3-storage-main-read", false), LOGGER); private final ScheduledExecutorService backgroundExecutor = Threads.newSingleThreadScheduledExecutor( ThreadUtils.createThreadFactory("s3-storage-background", true), LOGGER); - private final ScheduledExecutorService uploadWALExecutor = Threads.newFixedThreadPool( + private final ExecutorService uploadWALExecutor = Threads.newFixedThreadPool( 4, ThreadUtils.createThreadFactory("s3-storage-upload-wal", true), LOGGER); private final Queue backoffRecords = new LinkedBlockingQueue<>(); diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/Block.java b/s3stream/src/main/java/com/automq/stream/s3/wal/Block.java new file mode 100644 index 0000000000..75030d22fd --- /dev/null +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/Block.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.stream.s3.wal; + + +import com.automq.stream.s3.wal.util.WALUtil; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +import static com.automq.stream.s3.wal.WriteAheadLog.AppendResult; + +/** + * A Block contains multiple records, and will be written to the WAL in one batch. + */ +public interface Block { + /** + * The start offset of this block. + * Align to {@link WALUtil#BLOCK_SIZE} + */ + long startOffset(); + + /** + * Append a record to this block. + * + * @param recordSize The size of this record. + * @param recordSupplier The supplier of this record which receives the start offset of this record as the parameter. + * @param future The future of this record, which will be completed when the record is written to the WAL. + * @return The start offset of this record. If the size of this block exceeds the limit, return -1. + */ + long addRecord(long recordSize, Function recordSupplier, CompletableFuture future); + + /** + * Futures of all records in this block. + */ + List> futures(); + + default boolean isEmpty() { + return futures().isEmpty(); + } + + /** + * The content of this block, which contains multiple records. + */ + ByteBuffer data(); +} diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockImpl.java b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockImpl.java new file mode 100644 index 0000000000..ba9251a7a0 --- /dev/null +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockImpl.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.stream.s3.wal; + +import com.automq.stream.s3.wal.util.WALUtil; + +import java.nio.ByteBuffer; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +public class BlockImpl implements Block { + + /** + * The soft limit of block size. (128 KiB) + * TODO make it configurable + */ + private static final long SOFT_BLOCK_SIZE_LIMIT = 1 << 17; + + private final long startOffset; + /** + * The max size of this block. + * Any try to add a record to this block will fail and throw {@link BlockFullException} if the size of this block + * exceeds this limit. + */ + private final long maxSize; + + /** + * The next offset to write in this block. + * Align to {@link WALUtil#BLOCK_SIZE} + */ + private long nextOffset = 0; + private ByteBuffer data = ByteBuffer.allocate(0); + private final List> futures = new LinkedList<>(); + + public BlockImpl(long startOffset, long maxSize) { + this.startOffset = startOffset; + this.maxSize = maxSize; + } + + @Override + public long startOffset() { + return startOffset; + } + + /** + * Note: this method is NOT thread safe. + */ + @Override + public long addRecord(long recordSize, Function recordSupplier, CompletableFuture future) { + // TODO no need to align to block size + long requiredSize = WALUtil.alignLargeByBlockSize(recordSize); + long requiredCapacity = nextOffset + requiredSize; + + if (requiredCapacity > maxSize) { + return -1; + } + // if there is no record in this block, we can write a record larger than SOFT_BLOCK_SIZE_LIMIT + if (requiredCapacity > SOFT_BLOCK_SIZE_LIMIT && !futures.isEmpty()) { + return -1; + } + + // scale up the buffer + // TODO use composite buffer + if (requiredCapacity > data.capacity()) { + int newCapacity = Math.max(data.capacity() * 2, (int) requiredCapacity); + ByteBuffer newData = ByteBuffer.allocate(newCapacity); + data.flip(); + newData.put(data); + data = newData; + } + + long offsetInBlock = nextOffset; + long recordOffset = startOffset + offsetInBlock; + nextOffset += requiredSize; + data.position((int) offsetInBlock); + data.put(recordSupplier.apply(recordOffset)); + futures.add(future); + + return recordOffset; + } + + @Override + public List> futures() { + return futures; + } + + @Override + public ByteBuffer data() { + return data.duplicate().flip(); + } +} diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java index 644b313d8e..20c508ac20 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java @@ -17,13 +17,14 @@ package com.automq.stream.s3.wal; +import com.automq.stream.s3.Config; import com.automq.stream.s3.metrics.TimerUtil; import com.automq.stream.s3.metrics.operations.S3Operation; import com.automq.stream.s3.metrics.stats.OperationMetricsStats; -import com.automq.stream.s3.Config; -import com.automq.stream.s3.wal.util.ThreadFactoryImpl; import com.automq.stream.s3.wal.util.WALChannel; import com.automq.stream.s3.wal.util.WALUtil; +import com.automq.stream.utils.ThreadUtils; +import com.automq.stream.utils.Threads; import io.netty.buffer.ByteBuf; import org.apache.commons.lang3.time.StopWatch; import org.slf4j.Logger; @@ -35,11 +36,11 @@ import java.util.NoSuchElementException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; /** * /** @@ -143,7 +144,8 @@ public class BlockWALService implements WriteAheadLog { private WALHeaderCoreData walHeaderCoreData; private void startFlushWALHeaderScheduler() { - this.flushWALHeaderScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("flush-wal-header-thread-")); + this.flushWALHeaderScheduler = Threads.newSingleThreadScheduledExecutor( + ThreadUtils.createThreadFactory("flush-wal-header-thread-%d", true), LOGGER); this.flushWALHeaderScheduler.scheduleAtFixedRate(() -> { try { BlockWALService.this.flushWALHeader( @@ -401,60 +403,52 @@ public AppendResult append0(ByteBuf buf, int crc) throws OverCapacityException { TimerUtil timerUtil = new TimerUtil(); checkReadyToServe(); - ByteBuffer record = buf.nioBuffer(); - - final int recordBodyCRC = 0 == crc ? WALUtil.crc32(record) : crc; - final long expectedWriteOffset = slidingWindowService.allocateWriteOffset(record.limit(), walHeaderCoreData.getFlushedTrimOffset(), walHeaderCoreData.getCapacity() - WAL_HEADER_TOTAL_CAPACITY); + ByteBuffer body = buf.nioBuffer(); + final long recordSize = RECORD_HEADER_SIZE + body.limit(); + final int recordBodyCRC = 0 == crc ? WALUtil.crc32(body) : crc; final CompletableFuture appendResultFuture = new CompletableFuture<>(); - final AppendResult appendResult = new AppendResultImpl(expectedWriteOffset, appendResultFuture); - - // submit write task - slidingWindowService.submitWriteRecordTask(new WriteRecordTask() { - @Override - public long startOffset() { - return expectedWriteOffset; - } - - @Override - public CompletableFuture future() { - return appendResultFuture; - } + long expectedWriteOffset; - @Override - public ByteBuffer recordHeader() { - SlidingWindowService.RecordHeaderCoreData recordHeaderCoreData = new SlidingWindowService.RecordHeaderCoreData(); - recordHeaderCoreData - .setMagicCode(RECORD_HEADER_MAGIC_CODE) - .setRecordBodyLength(record.limit()) - .setRecordBodyOffset(expectedWriteOffset + RECORD_HEADER_SIZE) - .setRecordBodyCRC(recordBodyCRC); - return recordHeaderCoreData.marshal(); - } - - @Override - public ByteBuffer recordBody() { - return record; - } - - @Override - public void flushWALHeader(long windowMaxLength) throws IOException { - BlockWALService.this.flushWALHeader( - slidingWindowService.getWindowCoreData().getWindowStartOffset(), - windowMaxLength, - slidingWindowService.getWindowCoreData().getWindowNextWriteOffset(), - ShutdownType.UNGRACEFULLY - ); + Lock lock = slidingWindowService.getBlockLock(); + lock.lock(); + try { + Block block = slidingWindowService.getCurrentBlockLocked(); + expectedWriteOffset = block.addRecord(recordSize, (offset) -> record(body, recordBodyCRC, offset), appendResultFuture); + if (expectedWriteOffset < 0) { + // this block is full, create a new one + block = slidingWindowService.sealAndNewBlockLocked(block, recordSize, walHeaderCoreData.getFlushedTrimOffset(), walHeaderCoreData.getCapacity() - WAL_HEADER_TOTAL_CAPACITY); + expectedWriteOffset = block.addRecord(recordSize, (offset) -> record(body, recordBodyCRC, offset), appendResultFuture); } - }); + } finally { + lock.unlock(); + } + slidingWindowService.tryWriteBlock(); + final AppendResult appendResult = new AppendResultImpl(expectedWriteOffset, appendResultFuture); appendResult.future().whenComplete((nil, ex) -> { OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.APPEND_STORAGE_WAL).operationCount.inc(); OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.APPEND_STORAGE_WAL).operationTime.update(timerUtil.elapsed()); }); - return appendResult; } + private static ByteBuffer recordHeader(ByteBuffer body, int crc, long start) { + return new SlidingWindowService.RecordHeaderCoreData() + .setMagicCode(RECORD_HEADER_MAGIC_CODE) + .setRecordBodyLength(body.limit()) + .setRecordBodyOffset(start + RECORD_HEADER_SIZE) + .setRecordBodyCRC(crc) + .marshal(); + } + + private static ByteBuffer record(ByteBuffer body, int crc, long start) { + ByteBuffer record = ByteBuffer.allocate(RECORD_HEADER_SIZE + body.limit()); + record.put(recordHeader(body, crc, start)); + record.put(body); + record.flip(); + return record; + } + @Override public Iterator recover() { checkReadyToServe(); @@ -482,8 +476,7 @@ public CompletableFuture reset() { checkReadyToServe(); long previousNextWriteOffset = slidingWindowService.getWindowCoreData().getWindowNextWriteOffset(); - slidingWindowService.getWindowCoreData().setWindowStartOffset(previousNextWriteOffset + WALUtil.BLOCK_SIZE); - slidingWindowService.getWindowCoreData().setWindowNextWriteOffset(previousNextWriteOffset + WALUtil.BLOCK_SIZE); + slidingWindowService.resetWindow(previousNextWriteOffset + WALUtil.BLOCK_SIZE); LOGGER.info("reset sliding window and trim WAL to offset: {}", previousNextWriteOffset); return trim(previousNextWriteOffset); } @@ -516,6 +509,15 @@ private void checkReadyToServe() { } } + private SlidingWindowService.WALHeaderFlusher flusher() { + return windowMaxLength -> flushWALHeader( + slidingWindowService.getWindowCoreData().getWindowStartOffset(), + windowMaxLength, + slidingWindowService.getWindowCoreData().getWindowNextWriteOffset(), + ShutdownType.UNGRACEFULLY + ); + } + static class WALHeaderCoreData { private final AtomicLong trimOffset2 = new AtomicLong(0); private final AtomicLong flushedTrimOffset = new AtomicLong(0); @@ -684,7 +686,6 @@ public static class BlockWALServiceBuilder { private long slidingWindowInitialSize = 1 << 20; private long slidingWindowUpperLimit = 512 << 20; private long slidingWindowScaleUnit = 4 << 20; - private int writeQueueCapacity = 10000; BlockWALServiceBuilder(String blockDevicePath, long capacity) { this.blockDevicePath = blockDevicePath; @@ -697,8 +698,7 @@ public BlockWALServiceBuilder config(Config config) { .ioThreadNums(config.s3WALThread()) .slidingWindowInitialSize(config.s3WALWindowInitial()) .slidingWindowScaleUnit(config.s3WALWindowIncrement()) - .slidingWindowUpperLimit(config.s3WALWindowMax()) - .writeQueueCapacity(config.s3WALQueue()); + .slidingWindowUpperLimit(config.s3WALWindowMax()); } public BlockWALServiceBuilder flushHeaderIntervalSeconds(int flushHeaderIntervalSeconds) { @@ -726,11 +726,6 @@ public BlockWALServiceBuilder slidingWindowScaleUnit(long slidingWindowScaleUnit return this; } - public BlockWALServiceBuilder writeQueueCapacity(int writeQueueCapacity) { - this.writeQueueCapacity = writeQueueCapacity; - return this; - } - public BlockWALService build() { BlockWALService blockWALService = new BlockWALService(); @@ -751,7 +746,7 @@ public BlockWALService build() { ioThreadNums, slidingWindowUpperLimit, slidingWindowScaleUnit, - writeQueueCapacity + blockWALService.flusher() ); LOGGER.info("build BlockWALService: {}", this); @@ -769,7 +764,6 @@ public String toString() { + ", slidingWindowInitialSize=" + slidingWindowInitialSize + ", slidingWindowUpperLimit=" + slidingWindowUpperLimit + ", slidingWindowScaleUnit=" + slidingWindowScaleUnit - + ", writeQueueCapacity=" + writeQueueCapacity + '}'; } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java index 8ffb613498..48b6b1a57c 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java @@ -17,19 +17,21 @@ package com.automq.stream.s3.wal; -import com.automq.stream.s3.wal.util.ThreadFactoryImpl; import com.automq.stream.s3.wal.util.WALChannel; import com.automq.stream.s3.wal.util.WALUtil; +import com.automq.stream.utils.FutureUtil; +import com.automq.stream.utils.ThreadUtils; +import com.automq.stream.utils.Threads; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.TreeMap; -import java.util.concurrent.BlockingQueue; +import java.util.LinkedList; +import java.util.Queue; +import java.util.TreeSet; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; @@ -53,23 +55,38 @@ public class SlidingWindowService { private final int ioThreadNums; private final long upperLimit; private final long scaleUnit; - private final int writeRecordTaskWorkQueueCapacity; private final WALChannel walChannel; + private final WALHeaderFlusher walHeaderFlusher; private final WindowCoreData windowCoreData = new WindowCoreData(); private ExecutorService executorService; + private Semaphore semaphore; - public SlidingWindowService(WALChannel walChannel, int ioThreadNums, long upperLimit, long scaleUnit, int queueCapacity) { + /** + * The lock of {@link #pendingBlocks}, {@link #writingBlocks}, {@link #currentBlock}. + */ + private final Lock blockLock = new ReentrantLock(); + + /** + * Blocks that are waiting to be written. + */ + private final Queue pendingBlocks = new LinkedList<>(); + + /** + * Blocks that are being written. + */ + private final TreeSet writingBlocks = new TreeSet<>(); + + /** + * The current block, records are added to this block. + */ + private Block currentBlock; + + public SlidingWindowService(WALChannel walChannel, int ioThreadNums, long upperLimit, long scaleUnit, WALHeaderFlusher flusher) { this.walChannel = walChannel; this.ioThreadNums = ioThreadNums; this.upperLimit = upperLimit; this.scaleUnit = scaleUnit; - this.writeRecordTaskWorkQueueCapacity = queueCapacity; - } - - private ExecutorService newCachedThreadPool(int nThreads) { - BlockingQueue workQueue = new LinkedBlockingQueue<>(writeRecordTaskWorkQueueCapacity); - ThreadFactoryImpl threadFactory = new ThreadFactoryImpl("block-wal-io-thread-"); - return new ThreadPoolExecutor(1, nThreads, 1L, TimeUnit.MINUTES, workQueue, threadFactory); + this.walHeaderFlusher = flusher; } public void resetWindowWhenRecoverOver(long startOffset, long nextWriteOffset, long maxLength) { @@ -78,76 +95,194 @@ public void resetWindowWhenRecoverOver(long startOffset, long nextWriteOffset, l windowCoreData.setWindowMaxLength(maxLength); } + public void resetWindow(long offset) { + windowCoreData.setWindowStartOffset(offset); + windowCoreData.setWindowNextWriteOffset(offset); + } + public WindowCoreData getWindowCoreData() { return windowCoreData; } public void start() throws IOException { - this.executorService = newCachedThreadPool(ioThreadNums); + this.executorService = Threads.newFixedThreadPool(ioThreadNums, + ThreadUtils.createThreadFactory("block-wal-io-thread-%d", false), LOGGER); + this.semaphore = new Semaphore(ioThreadNums); } public boolean shutdown(long timeout, TimeUnit unit) { + boolean gracefulShutdown; + this.executorService.shutdown(); try { - return this.executorService.awaitTermination(timeout, unit); + gracefulShutdown = this.executorService.awaitTermination(timeout, unit); } catch (InterruptedException e) { this.executorService.shutdownNow(); - return false; + gracefulShutdown = false; + } + + if (gracefulShutdown) { + this.getWindowCoreData().setWindowNextWriteOffset(this.getWindowCoreData().getWindowStartOffset()); } + + return gracefulShutdown; } - public long allocateWriteOffset(final int recordBodyLength, final long trimOffset, final long recordSectionCapacity) throws OverCapacityException { - int totalWriteSize = RECORD_HEADER_SIZE + recordBodyLength; + /** + * Try to write a block. If the semaphore is not available, it will return immediately. + */ + public void tryWriteBlock() { + if (semaphore.tryAcquire()) { + Block block = pollBlock(); + if (block != null) { + executorService.submit(new WriteBlockProcessor(block)); + } else { + semaphore.release(); + } + } + } - long lastWriteOffset; - long newWriteOffset; - long expectedWriteOffset; - do { - lastWriteOffset = windowCoreData.getWindowNextWriteOffset(); + public Lock getBlockLock() { + return blockLock; + } - expectedWriteOffset = WALUtil.alignLargeByBlockSize(lastWriteOffset); + /** + * Seal and create a new block. It + * - puts the previous block to the write queue + * - creates a new block, sets it as the current block and returns it + * Note: this method is NOT thread safe, and it should be called with {@link #blockLock} locked. + */ + public Block sealAndNewBlockLocked(Block previousBlock, long minSize, long trimOffset, long recordSectionCapacity) throws OverCapacityException { + long startOffset = nextBlockStartOffset(previousBlock); + + // If the end of the physical device is insufficient for this block, jump to the start of the physical device + if ((recordSectionCapacity - startOffset % recordSectionCapacity) < minSize) { + startOffset = startOffset + recordSectionCapacity - startOffset % recordSectionCapacity; + } - // If the end of the physical device is insufficient for this write, jump to the start of the physical device - if ((recordSectionCapacity - expectedWriteOffset % recordSectionCapacity) < totalWriteSize) { - expectedWriteOffset = expectedWriteOffset + recordSectionCapacity - expectedWriteOffset % recordSectionCapacity; - } + // Not enough space for this block + if (startOffset + minSize - trimOffset > recordSectionCapacity) { + LOGGER.error("failed to allocate write offset as the ring buffer is full: startOffset: {}, minSize: {}, trimOffset: {}, recordSectionCapacity: {}", + startOffset, minSize, trimOffset, recordSectionCapacity); + throw new OverCapacityException(String.format("failed to allocate write offset: ring buffer is full: startOffset: %d, minSize: %d, trimOffset: %d, recordSectionCapacity: %d", + startOffset, minSize, trimOffset, recordSectionCapacity)); + } - if (expectedWriteOffset + totalWriteSize - trimOffset > recordSectionCapacity) { - // Not enough space for this write - LOGGER.error("failed to allocate write offset as the ring buffer is full: expectedWriteOffset: {}, totalWriteSize: {}, trimOffset: {}, recordSectionCapacity: {}", - expectedWriteOffset, totalWriteSize, trimOffset, recordSectionCapacity); - throw new OverCapacityException(String.format("failed to allocate write offset: ring buffer is full: expectedWriteOffset: %d, totalWriteSize: %d, trimOffset: %d, recordSectionCapacity: %d", - expectedWriteOffset, totalWriteSize, trimOffset, recordSectionCapacity)); - } + // The size of the block should not be larger than the end of the physical device + long maxSize = Math.min(recordSectionCapacity - startOffset + trimOffset, upperLimit); - newWriteOffset = WALUtil.alignLargeByBlockSize(expectedWriteOffset + totalWriteSize); - } while (!windowCoreData.compareAndSetWindowNextWriteOffset(lastWriteOffset, newWriteOffset)); + Block newBlock = new BlockImpl(startOffset, maxSize); + if (!previousBlock.isEmpty()) { + // There are some records to be written in the previous block + pendingBlocks.add(previousBlock); + } + setCurrentBlockLocked(newBlock); + return newBlock; + } - return expectedWriteOffset; + /** + * Get the current block. + * Note: this method is NOT thread safe, and it should be called with {@link #blockLock} locked. + */ + public Block getCurrentBlockLocked() { + // The current block is null only when no record has been written + if (null == currentBlock) { + currentBlock = nextBlock(windowCoreData.getWindowNextWriteOffset()); + } + return currentBlock; } - public void submitWriteRecordTask(WriteRecordTask ioTask) { - executorService.submit(new WriteRecordTaskProcessor(ioTask)); + /** + * Set the current block. + * Note: this method is NOT thread safe, and it should be called with {@link #blockLock} locked. + */ + private void setCurrentBlockLocked(Block block) { + this.currentBlock = block; } + /** + * Get the start offset of the next block. + */ + private long nextBlockStartOffset(Block block) { + return block.startOffset() + WALUtil.alignLargeByBlockSize(block.data().limit()); + } - private void writeRecord(WriteRecordTask ioTask) throws IOException { - final ByteBuffer totalRecord = ByteBuffer.allocate(ioTask.recordHeader().limit() + ioTask.recordBody().limit()); + /** + * Create a new block with the given start offset. + * This method is only used when we don't know the maximum length of the new block. + */ + private Block nextBlock(long startOffset) { + // Trick: we cannot determine the maximum length of the block here, so we set it to 0 first. + // When we try to write a record, this block will be found full, and then a new block will be created. + return new BlockImpl(startOffset, 0); + } - totalRecord.put(ioTask.recordHeader()); + /** + * Get the first block to be written. If there is no non-empty block, return null. + */ + private Block pollBlock() { + blockLock.lock(); + try { + return pollBlockLocked(); + } finally { + blockLock.unlock(); + } + } - totalRecord.put(ioTask.recordBody()); + /** + * Get the first block to be written. If there is no non-empty block, return null. + * Note: this method is NOT thread safe, and it should be called with {@link #blockLock} locked. + */ + private Block pollBlockLocked() { + Block pendingBlock = pendingBlocks.poll(); + if (null != pendingBlock) { + writingBlocks.add(pendingBlock.startOffset()); + return pendingBlock; + } - totalRecord.position(0); + Block currentBlock = getCurrentBlockLocked(); + if (currentBlock.isEmpty()) { + // No record to be written + return null; + } + setCurrentBlockLocked(nextBlock(nextBlockStartOffset(currentBlock))); + return currentBlock; + } + + /** + * Calculate the start offset of the first block which has not been flushed yet. + */ + private long calculateStartOffset(Block wroteBlock) { + blockLock.lock(); + try { + return calculateStartOffsetLocked(wroteBlock); + } finally { + blockLock.unlock(); + } + } + + /** + * Calculate the start offset of the first block which has not been flushed yet. + * Note: this method is NOT thread safe, and it should be called with {@link #blockLock} locked. + */ + private long calculateStartOffsetLocked(Block wroteBlock) { + writingBlocks.remove(wroteBlock.startOffset()); + if (writingBlocks.isEmpty()) { + return getCurrentBlockLocked().startOffset(); + } + return writingBlocks.first(); + } + + private void writeBlockData(Block block) throws IOException { // TODO: make this beautiful - long position = WALUtil.recordOffsetToPosition(ioTask.startOffset(), walChannel.capacity() - WAL_HEADER_TOTAL_CAPACITY, WAL_HEADER_TOTAL_CAPACITY); + long position = WALUtil.recordOffsetToPosition(block.startOffset(), walChannel.capacity() - WAL_HEADER_TOTAL_CAPACITY, WAL_HEADER_TOTAL_CAPACITY); - walChannel.write(totalRecord, position); + walChannel.write(block.data(), position); } - public boolean makeWriteOffsetMatchWindow(final WriteRecordTask writeRecordTask) throws IOException { - long newWindowEndOffset = writeRecordTask.startOffset() + writeRecordTask.recordHeader().limit() + writeRecordTask.recordBody().limit(); + private boolean makeWriteOffsetMatchWindow(final Block block) throws IOException { + long newWindowEndOffset = block.startOffset() + block.data().limit(); // align to block size newWindowEndOffset = WALUtil.alignLargeByBlockSize(newWindowEndOffset); long windowStartOffset = windowCoreData.getWindowStartOffset(); @@ -163,13 +298,12 @@ public boolean makeWriteOffsetMatchWindow(final WriteRecordTask writeRecordTask) // the new window length is bigger than upper limit, reject this write request LOGGER.error("new windows size {} exceeds upper limit {}, reject this write request, window start offset: {}, new window end offset: {}", newWindowMaxLength, upperLimit, windowStartOffset, newWindowEndOffset); - writeRecordTask.future().completeExceptionally( - new OverCapacityException(String.format("new windows size exceeds upper limit %d", upperLimit)) - ); + FutureUtil.completeExceptionally(block.futures(), + new OverCapacityException(String.format("new windows size exceeds upper limit %d", upperLimit))); return false; } } - windowCoreData.scaleOutWindow(writeRecordTask, newWindowMaxLength); + windowCoreData.scaleOutWindow(walHeaderFlusher, newWindowMaxLength); } return true; } @@ -264,8 +398,7 @@ public ByteBuffer marshal() { } public static class WindowCoreData { - private final Lock treeMapIOTaskRequestLock = new ReentrantLock(); - private final TreeMap treeMapWriteRecordTask = new TreeMap<>(); + private final Lock scaleOutLock = new ReentrantLock(); private final AtomicLong windowMaxLength = new AtomicLong(0); /** * Next write offset of sliding window, always aligned to the {@link WALUtil#BLOCK_SIZE}. @@ -305,44 +438,20 @@ public void setWindowStartOffset(long windowStartOffset) { this.windowStartOffset.set(windowStartOffset); } - public void putWriteRecordTask(WriteRecordTask writeRecordTask) { - this.treeMapIOTaskRequestLock.lock(); - try { - this.treeMapWriteRecordTask.put(writeRecordTask.startOffset(), writeRecordTask); - } finally { - this.treeMapIOTaskRequestLock.unlock(); - } - } - - public void calculateStartOffset(long wroteOffset) { - this.treeMapIOTaskRequestLock.lock(); - try { - treeMapWriteRecordTask.remove(wroteOffset); - - if (treeMapWriteRecordTask.isEmpty()) { - setWindowStartOffset(getWindowNextWriteOffset()); - } else { - setWindowStartOffset(treeMapWriteRecordTask.firstKey()); - } - } finally { - this.treeMapIOTaskRequestLock.unlock(); - } - } - - public void scaleOutWindow(WriteRecordTask writeRecordTask, long newWindowMaxLength) throws IOException { + public void scaleOutWindow(WALHeaderFlusher flusher, long newWindowMaxLength) throws IOException { boolean scaleWindowHappened = false; - treeMapIOTaskRequestLock.lock(); + scaleOutLock.lock(); try { - if (newWindowMaxLength < windowMaxLength.get()) { + if (newWindowMaxLength < getWindowMaxLength()) { // Another thread has already scaled out the window. return; } - writeRecordTask.flushWALHeader(newWindowMaxLength); + flusher.flush(newWindowMaxLength); setWindowMaxLength(newWindowMaxLength); scaleWindowHappened = true; } finally { - treeMapIOTaskRequestLock.unlock(); + scaleOutLock.unlock(); if (scaleWindowHappened) { LOGGER.info("window scale out to {}", newWindowMaxLength); } else { @@ -352,26 +461,32 @@ public void scaleOutWindow(WriteRecordTask writeRecordTask, long newWindowMaxLen } } - class WriteRecordTaskProcessor implements Runnable { - private final WriteRecordTask writeRecordTask; + class WriteBlockProcessor implements Runnable { + private final Block block; - public WriteRecordTaskProcessor(WriteRecordTask writeRecordTask) { - this.writeRecordTask = writeRecordTask; + public WriteBlockProcessor(Block block) { + this.block = block; } @Override public void run() { - try { - if (makeWriteOffsetMatchWindow(writeRecordTask)) { - - windowCoreData.putWriteRecordTask(writeRecordTask); + Block block = this.block; + do { + writeBlock(block); + block = pollBlock(); + } while (block != null); + semaphore.release(); + } - writeRecord(writeRecordTask); + private void writeBlock(Block block) { + try { + if (makeWriteOffsetMatchWindow(block)) { + writeBlockData(block); // Update the start offset of the sliding window after finishing writing the record. - windowCoreData.calculateStartOffset(writeRecordTask.startOffset()); + windowCoreData.setWindowStartOffset(calculateStartOffset(block)); - writeRecordTask.future().complete(new AppendResult.CallbackResult() { + FutureUtil.complete(block.futures(), new AppendResult.CallbackResult() { @Override public long flushedOffset() { return windowCoreData.getWindowStartOffset(); @@ -383,11 +498,14 @@ public String toString() { } }); } - - } catch (IOException e) { - writeRecordTask.future().completeExceptionally(e); - LOGGER.error(String.format("failed to write record, offset: %s", writeRecordTask.startOffset()), e); + } catch (Exception e) { + FutureUtil.completeExceptionally(block.futures(), e); + LOGGER.error(String.format("failed to write record, offset: %s", block.startOffset()), e); } } } + + interface WALHeaderFlusher { + void flush(long windowMaxLength) throws IOException; + } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/WriteRecordTask.java b/s3stream/src/main/java/com/automq/stream/s3/wal/WriteRecordTask.java deleted file mode 100644 index cc3b72339a..0000000000 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/WriteRecordTask.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.automq.stream.s3.wal; - - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.concurrent.CompletableFuture; - -import static com.automq.stream.s3.wal.WriteAheadLog.AppendResult; - -public interface WriteRecordTask { - // Align to {@link WALUtil#BLOCK_SIZE} - long startOffset(); - - CompletableFuture future(); - - ByteBuffer recordHeader(); - - ByteBuffer recordBody(); - - void flushWALHeader(long windowMaxLength) throws IOException; -} diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/util/ThreadFactoryImpl.java b/s3stream/src/main/java/com/automq/stream/s3/wal/util/ThreadFactoryImpl.java deleted file mode 100644 index 1c42313536..0000000000 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/util/ThreadFactoryImpl.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.automq.stream.s3.wal.util; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicLong; - -public class ThreadFactoryImpl implements ThreadFactory { - - private static final Logger LOGGER = LoggerFactory.getLogger(ThreadFactoryImpl.class.getSimpleName()); - - private final AtomicLong threadIndex = new AtomicLong(0); - private final String threadNamePrefix; - private final boolean daemon; - - public ThreadFactoryImpl(final String threadNamePrefix) { - this(threadNamePrefix, false); - } - - public ThreadFactoryImpl(final String threadNamePrefix, boolean daemon) { - this.threadNamePrefix = threadNamePrefix; - this.daemon = daemon; - } - - - @Override - public Thread newThread(Runnable r) { - Thread thread = new Thread(r, threadNamePrefix + this.threadIndex.incrementAndGet()); - thread.setDaemon(daemon); - - // Log all uncaught exception - thread.setUncaughtExceptionHandler((t, e) -> LOGGER.error("Uncaught exception in thread '{}':", t.getName(), e)); - - return thread; - } -} diff --git a/s3stream/src/main/java/com/automq/stream/utils/FutureUtil.java b/s3stream/src/main/java/com/automq/stream/utils/FutureUtil.java index b8f3ecb41c..3017d5936c 100644 --- a/s3stream/src/main/java/com/automq/stream/utils/FutureUtil.java +++ b/s3stream/src/main/java/com/automq/stream/utils/FutureUtil.java @@ -19,6 +19,7 @@ import org.slf4j.Logger; +import java.util.Collection; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; @@ -93,4 +94,16 @@ public static Throwable cause(Throwable ex) { return ex; } + public static void completeExceptionally(Collection> futures, Throwable ex) { + for (CompletableFuture future : futures) { + future.completeExceptionally(ex); + } + } + + public static void complete(Collection> futures, T value) { + for (CompletableFuture future : futures) { + future.complete(value); + } + } + } diff --git a/s3stream/src/main/java/com/automq/stream/utils/Threads.java b/s3stream/src/main/java/com/automq/stream/utils/Threads.java index 5a66cd42a6..3b9dd2ff3a 100644 --- a/s3stream/src/main/java/com/automq/stream/utils/Threads.java +++ b/s3stream/src/main/java/com/automq/stream/utils/Threads.java @@ -19,17 +19,17 @@ import org.slf4j.Logger; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; public class Threads { - public static ScheduledThreadPoolExecutor newFixedThreadPool(int nThreads, ThreadFactory threadFactory, Logger logger) { - return newFixedThreadPool(nThreads, threadFactory, logger, false); - } - - public static ScheduledThreadPoolExecutor newFixedThreadPool(int nThreads, ThreadFactory threadFactory, Logger logger, boolean removeOnCancelPolicy) { - ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(nThreads, threadFactory) { + public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory, Logger logger) { + return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), threadFactory) { @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); @@ -38,8 +38,6 @@ protected void afterExecute(Runnable r, Throwable t) { } } }; - executor.setRemoveOnCancelPolicy(removeOnCancelPolicy); - return executor; } public static ScheduledThreadPoolExecutor newSingleThreadScheduledExecutor(ThreadFactory threadFactory, Logger logger) { diff --git a/s3stream/src/test/java/com/automq/stream/s3/wal/BlockWALServiceTest.java b/s3stream/src/test/java/com/automq/stream/s3/wal/BlockWALServiceTest.java index cd1a0a8341..077f274d54 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/wal/BlockWALServiceTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/wal/BlockWALServiceTest.java @@ -65,6 +65,8 @@ public void testSingleThreadAppendBasic() throws IOException, OverCapacityExcept .slidingWindowScaleUnit(4096) .build() .start(); + AtomicLong maxFlushedOffset = new AtomicLong(-1); + AtomicLong maxRecordOffset = new AtomicLong(-1); try { for (int i = 0; i < recordCount; i++) { ByteBuf data = TestUtils.random(recordSize); @@ -74,14 +76,22 @@ public void testSingleThreadAppendBasic() throws IOException, OverCapacityExcept final long expectedOffset = i * WALUtil.alignLargeByBlockSize(recordSize); assertEquals(expectedOffset, appendResult.recordOffset()); assertEquals(0, appendResult.recordOffset() % WALUtil.BLOCK_SIZE); - appendResult.future().whenCompleteAsync((callbackResult, throwable) -> { + appendResult.future().whenComplete((callbackResult, throwable) -> { assertNull(throwable); - assertTrue(callbackResult.flushedOffset() > expectedOffset, "flushedOffset: " + callbackResult.flushedOffset() + ", expectedOffset: " + expectedOffset); + maxFlushedOffset.accumulateAndGet(callbackResult.flushedOffset(), Math::max); + maxRecordOffset.accumulateAndGet(expectedOffset, Math::max); assertEquals(0, callbackResult.flushedOffset() % WALUtil.alignLargeByBlockSize(recordSize)); + }).whenComplete((callbackResult, throwable) -> { + if (null != throwable) { + throwable.printStackTrace(); + System.exit(1); + } }); } } finally { wal.shutdownGracefully(); + assertTrue(maxFlushedOffset.get() > maxRecordOffset.get(), + "maxFlushedOffset should be greater than maxRecordOffset. maxFlushedOffset: " + maxFlushedOffset.get() + ", maxRecordOffset: " + maxRecordOffset.get()); } } @@ -96,8 +106,10 @@ public void testSingleThreadAppendWhenOverCapacity() throws IOException, Interru .slidingWindowScaleUnit(4096) .build() .start(); + AtomicLong maxFlushedOffset = new AtomicLong(-1); + AtomicLong maxRecordOffset = new AtomicLong(-1); try { - AtomicLong appendedOffset = new AtomicLong(-1); + AtomicLong flushedOffset = new AtomicLong(-1); for (int i = 0; i < recordCount; i++) { ByteBuf data = TestUtils.random(recordSize); AppendResult appendResult; @@ -107,12 +119,12 @@ public void testSingleThreadAppendWhenOverCapacity() throws IOException, Interru appendResult = wal.append(data); } catch (OverCapacityException e) { Thread.yield(); - long appendedOffsetValue = appendedOffset.get(); - if (appendedOffsetValue < 0) { + long flushedOffsetValue = flushedOffset.get(); + if (flushedOffsetValue < 0) { Thread.sleep(100); continue; } - wal.trim(appendedOffsetValue).join(); + wal.trim(flushedOffsetValue).join(); continue; } break; @@ -120,23 +132,24 @@ public void testSingleThreadAppendWhenOverCapacity() throws IOException, Interru final long recordOffset = appendResult.recordOffset(); assertEquals(0, recordOffset % WALUtil.BLOCK_SIZE); - appendResult.future().whenCompleteAsync((callbackResult, throwable) -> { + appendResult.future().whenComplete((callbackResult, throwable) -> { assertNull(throwable); - assertTrue(callbackResult.flushedOffset() > recordOffset, "flushedOffset: " + callbackResult.flushedOffset() + ", recordOffset: " + recordOffset); + maxFlushedOffset.accumulateAndGet(callbackResult.flushedOffset(), Math::max); + maxRecordOffset.accumulateAndGet(recordOffset, Math::max); assertEquals(0, callbackResult.flushedOffset() % WALUtil.alignLargeByBlockSize(recordSize)); - // Update the appended offset. - long old; - do { - old = appendedOffset.get(); - if (old >= recordOffset) { - break; - } - } while (!appendedOffset.compareAndSet(old, recordOffset)); + flushedOffset.accumulateAndGet(recordOffset, Math::max); + }).whenComplete((callbackResult, throwable) -> { + if (null != throwable) { + throwable.printStackTrace(); + System.exit(1); + } }); } } finally { wal.shutdownGracefully(); + assertTrue(maxFlushedOffset.get() > maxRecordOffset.get(), + "maxFlushedOffset should be greater than maxRecordOffset. maxFlushedOffset: " + maxFlushedOffset.get() + ", maxRecordOffset: " + maxRecordOffset.get()); } } @@ -151,6 +164,8 @@ public void testMultiThreadAppend() throws InterruptedException, IOException { .build() .start(); ExecutorService executorService = Executors.newFixedThreadPool(threadCount); + AtomicLong maxFlushedOffset = new AtomicLong(-1); + AtomicLong maxRecordOffset = new AtomicLong(-1); try { for (int t = 0; t < threadCount; t++) { executorService.submit(() -> Assertions.assertDoesNotThrow(() -> { @@ -159,11 +174,17 @@ public void testMultiThreadAppend() throws InterruptedException, IOException { final AppendResult appendResult = wal.append(data); - appendResult.future().whenCompleteAsync((callbackResult, throwable) -> { + appendResult.future().whenComplete((callbackResult, throwable) -> { assertNull(throwable); assertEquals(0, appendResult.recordOffset() % WALUtil.BLOCK_SIZE); - assertTrue(callbackResult.flushedOffset() > appendResult.recordOffset(), "flushedOffset: " + callbackResult.flushedOffset() + ", recordOffset: " + appendResult.recordOffset()); + maxFlushedOffset.accumulateAndGet(callbackResult.flushedOffset(), Math::max); + maxRecordOffset.accumulateAndGet(appendResult.recordOffset(), Math::max); assertEquals(0, callbackResult.flushedOffset() % WALUtil.alignLargeByBlockSize(recordSize)); + }).whenComplete((callbackResult, throwable) -> { + if (null != throwable) { + throwable.printStackTrace(); + System.exit(1); + } }); } })); @@ -172,6 +193,8 @@ public void testMultiThreadAppend() throws InterruptedException, IOException { executorService.shutdown(); assertTrue(executorService.awaitTermination(15, TimeUnit.SECONDS)); wal.shutdownGracefully(); + assertTrue(maxFlushedOffset.get() > maxRecordOffset.get(), + "maxFlushedOffset should be greater than maxRecordOffset. maxFlushedOffset: " + maxFlushedOffset.get() + ", maxRecordOffset: " + maxRecordOffset.get()); } }