Skip to content

Commit

Permalink
feat: use a separate poolSize to limit the size of the pool
Browse files Browse the repository at this point in the history
Signed-off-by: Ning Yu <[email protected]>
  • Loading branch information
Chillax-0v0 committed Aug 20, 2024
1 parent 12d2eba commit 336a625
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 3 deletions.
15 changes: 13 additions & 2 deletions s3stream/src/main/java/com/automq/stream/FixedSizeByteBufPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.netty.buffer.ByteBuf;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;

/**
* A pool of fixed-size {@link ByteBuf}.
Expand All @@ -31,6 +32,11 @@ public class FixedSizeByteBufPool {
*/
private final int maxPoolSize;
private final Queue<ByteBuf> pool = new ConcurrentLinkedQueue<>();
/**
* The current size of the pool.
* We use an {@link AtomicInteger} rather than {@link Queue#size()} to avoid the cost of traversing the queue.
*/
private final AtomicInteger poolSize = new AtomicInteger(0);

public FixedSizeByteBufPool(int bufferSize, int maxPoolSize) {
this.bufferSize = bufferSize;
Expand All @@ -43,7 +49,11 @@ public FixedSizeByteBufPool(int bufferSize, int maxPoolSize) {
*/
public ByteBuf get() {
ByteBuf buffer = pool.poll();
return buffer == null ? allocate() : buffer;
if (buffer == null) {
return allocate();
}
poolSize.decrementAndGet();
return buffer;
}

private ByteBuf allocate() {
Expand All @@ -57,12 +67,13 @@ private ByteBuf allocate() {
public void release(ByteBuf buffer) {
assert buffer.capacity() == bufferSize;

if (pool.size() >= maxPoolSize) {
if (poolSize.get() >= maxPoolSize) {
buffer.release();
return;
}

buffer.clear();
pool.offer(buffer);
poolSize.incrementAndGet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class FixedSizeByteBufPoolTest {

@BeforeEach
void setUp() {
pool = new FixedSizeByteBufPool(BUFFER_SIZE);
pool = new FixedSizeByteBufPool(BUFFER_SIZE, 2);
}

@Test
Expand Down Expand Up @@ -79,4 +79,23 @@ void testMultipleBuffers() {
assertTrue(buffer4 == buffer1 || buffer4 == buffer2, "Buffer4 should be one of the released buffers");
assertNotSame(buffer3, buffer4, "Buffer3 and Buffer4 should not be the same after release");
}

@Test
void testOverCapacity() {
ByteBuf buffer1 = pool.get();
ByteBuf buffer2 = pool.get();
ByteBuf buffer3 = pool.get();

pool.release(buffer1);
pool.release(buffer2);
pool.release(buffer3);

ByteBuf buffer4 = pool.get();
ByteBuf buffer5 = pool.get();
ByteBuf buffer6 = pool.get();

assertSame(buffer4, buffer1, "Buffer4 should be reused from the pool");
assertSame(buffer5, buffer2, "Buffer5 should be reused from the pool");
assertNotSame(buffer6, buffer3, "Buffer6 should not be reused from the pool");
}
}

0 comments on commit 336a625

Please sign in to comment.