From 07794064b4d81876127ddd6d6ad86476fe7597c2 Mon Sep 17 00:00:00 2001 From: Christopher Peck Date: Thu, 23 May 2024 15:04:27 -0700 Subject: [PATCH 1/3] overflow should not create size based on current value --- .../io/writer/impl/MutableOffHeapByteArrayStore.java | 4 +++- .../io/writer/impl/MutableOffHeapByteArrayStoreTest.java | 8 ++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/MutableOffHeapByteArrayStore.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/MutableOffHeapByteArrayStore.java index 54323e8fc3c6..3d21dcac0b2e 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/MutableOffHeapByteArrayStore.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/MutableOffHeapByteArrayStore.java @@ -178,7 +178,9 @@ public MutableOffHeapByteArrayStore(PinotDataBufferMemoryManager memoryManager, int numArrays, int avgArrayLen) { _memoryManager = memoryManager; _allocationContext = allocationContext; - _startSize = numArrays * (avgArrayLen + 4); // For each array, we store the array and its startoffset (4 bytes) + int estimatedSize = + numArrays * (avgArrayLen + 4); // For each array, we store the array and its startoffset (4 bytes) + _startSize = estimatedSize > 0 ? estimatedSize : Integer.MAX_VALUE; expand(_startSize); } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/writer/impl/MutableOffHeapByteArrayStoreTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/writer/impl/MutableOffHeapByteArrayStoreTest.java index 049bad01ef1f..29099b39e847 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/writer/impl/MutableOffHeapByteArrayStoreTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/writer/impl/MutableOffHeapByteArrayStoreTest.java @@ -56,6 +56,14 @@ public void maxValueTest() store.close(); } + @Test + public void startSizeOverflowTest() + throws Exception { + MutableOffHeapByteArrayStore store = + new MutableOffHeapByteArrayStore(_memoryManager, "stringColumn", 3, 1024 * 1024 * 1024); + store.close(); + } + @Test public void overflowTest() throws Exception { From 93c9508e1c0e9dfb4f19ae335fe86a1fa52d3e68 Mon Sep 17 00:00:00 2001 From: Christopher Peck Date: Fri, 24 May 2024 16:27:23 -0700 Subject: [PATCH 2/3] handle huge overflow, refactor getStartSize --- .../impl/MutableOffHeapByteArrayStore.java | 19 ++++++++------- .../MutableOffHeapByteArrayStoreTest.java | 24 ++++++++++++------- 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/MutableOffHeapByteArrayStore.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/MutableOffHeapByteArrayStore.java index 3d21dcac0b2e..d738a9b30dbd 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/MutableOffHeapByteArrayStore.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/MutableOffHeapByteArrayStore.java @@ -170,17 +170,20 @@ public void close() private final int _startSize; @VisibleForTesting - public int getStartSize() { - return _startSize; + public static int getStartSize(int numArrays, int avgArrayLen) { + // For each array, we store the array and its startoffset (4 bytes) + long estimatedSize = numArrays * ((long) avgArrayLen + 4); + if (estimatedSize > 0 && estimatedSize <= Integer.MAX_VALUE) { + return (int) estimatedSize; + } + return Integer.MAX_VALUE; } public MutableOffHeapByteArrayStore(PinotDataBufferMemoryManager memoryManager, String allocationContext, int numArrays, int avgArrayLen) { _memoryManager = memoryManager; _allocationContext = allocationContext; - int estimatedSize = - numArrays * (avgArrayLen + 4); // For each array, we store the array and its startoffset (4 bytes) - _startSize = estimatedSize > 0 ? estimatedSize : Integer.MAX_VALUE; + _startSize = getStartSize(numArrays, avgArrayLen); expand(_startSize); } @@ -220,10 +223,10 @@ public int add(byte[] value) { int index = buffer.add(value); if (index < 0) { // Need to expand the buffer - int currentBufferSize = buffer.getSize(); - if ((currentBufferSize << 1) >= 0) { + long nextBufferSize = buffer.getSize() << 1; + if (nextBufferSize > 0 && nextBufferSize <= Integer.MAX_VALUE) { // The expanded buffer size should be enough for the current value - buffer = expand(Math.max(currentBufferSize << 1, valueLength + Integer.BYTES)); + buffer = expand(Math.max((int) nextBufferSize, valueLength + Integer.BYTES)); } else { // Int overflow buffer = expand(Integer.MAX_VALUE); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/writer/impl/MutableOffHeapByteArrayStoreTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/writer/impl/MutableOffHeapByteArrayStoreTest.java index 29099b39e847..f74757943144 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/writer/impl/MutableOffHeapByteArrayStoreTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/writer/impl/MutableOffHeapByteArrayStoreTest.java @@ -29,6 +29,7 @@ public class MutableOffHeapByteArrayStoreTest { private PinotDataBufferMemoryManager _memoryManager; + private static final int ONE_GB = 1024 * 1024 * 1024; @BeforeClass public void setUp() { @@ -44,8 +45,11 @@ public void tearDown() @Test public void maxValueTest() throws Exception { - MutableOffHeapByteArrayStore store = new MutableOffHeapByteArrayStore(_memoryManager, "stringColumn", 1024, 32); - final int arrSize = store.getStartSize(); + int numArrays = 1024; + int avgArrayLen = 32; + MutableOffHeapByteArrayStore store = + new MutableOffHeapByteArrayStore(_memoryManager, "stringColumn", numArrays, avgArrayLen); + final int arrSize = MutableOffHeapByteArrayStore.getStartSize(numArrays, avgArrayLen); byte[] dataIn = new byte[arrSize - 4]; for (int i = 0; i < dataIn.length; i++) { dataIn[i] = (byte) (i % Byte.MAX_VALUE); @@ -57,18 +61,20 @@ public void maxValueTest() } @Test - public void startSizeOverflowTest() - throws Exception { - MutableOffHeapByteArrayStore store = - new MutableOffHeapByteArrayStore(_memoryManager, "stringColumn", 3, 1024 * 1024 * 1024); - store.close(); + public void startSizeTest() { + Assert.assertEquals(MutableOffHeapByteArrayStore.getStartSize(1, ONE_GB), ONE_GB + 4); + Assert.assertEquals(MutableOffHeapByteArrayStore.getStartSize(3, ONE_GB), Integer.MAX_VALUE); + Assert.assertEquals(MutableOffHeapByteArrayStore.getStartSize(5, ONE_GB), Integer.MAX_VALUE); } @Test public void overflowTest() throws Exception { - MutableOffHeapByteArrayStore store = new MutableOffHeapByteArrayStore(_memoryManager, "stringColumn", 1024, 32); - final int maxSize = store.getStartSize() - 4; + int numArrays = 1024; + int avgArrayLen = 32; + MutableOffHeapByteArrayStore store = + new MutableOffHeapByteArrayStore(_memoryManager, "stringColumn", numArrays, avgArrayLen); + final int maxSize = MutableOffHeapByteArrayStore.getStartSize(numArrays, avgArrayLen) - 4; byte[] b1 = new byte[3]; for (int i = 0; i < b1.length; i++) { From a421c37e3b1e88029831e45455ea249a39193130 Mon Sep 17 00:00:00 2001 From: Christopher Peck Date: Sat, 1 Jun 2024 15:50:57 -0700 Subject: [PATCH 3/3] limit change to init logic --- .../local/io/writer/impl/MutableOffHeapByteArrayStore.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/MutableOffHeapByteArrayStore.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/MutableOffHeapByteArrayStore.java index d738a9b30dbd..8ddd6d6a87ef 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/MutableOffHeapByteArrayStore.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/MutableOffHeapByteArrayStore.java @@ -223,10 +223,10 @@ public int add(byte[] value) { int index = buffer.add(value); if (index < 0) { // Need to expand the buffer - long nextBufferSize = buffer.getSize() << 1; - if (nextBufferSize > 0 && nextBufferSize <= Integer.MAX_VALUE) { + int currentBufferSize = buffer.getSize(); + if ((currentBufferSize << 1) >= 0) { // The expanded buffer size should be enough for the current value - buffer = expand(Math.max((int) nextBufferSize, valueLength + Integer.BYTES)); + buffer = expand(Math.max(currentBufferSize << 1, valueLength + Integer.BYTES)); } else { // Int overflow buffer = expand(Integer.MAX_VALUE);