diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsBenchmark.java index 1a6fc81e4eb8..bd203ea6fcb1 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsBenchmark.java @@ -318,7 +318,7 @@ static ColumnarLongs createColumnarLongs(String encoding, ByteBuffer buffer) case "none-longs": case "zstd-auto": case "zstd-longs": - return CompressedColumnarLongsSupplier.fromByteBuffer(buffer, ByteOrder.LITTLE_ENDIAN).get(); + return CompressedColumnarLongsSupplier.fromByteBuffer(buffer, ByteOrder.LITTLE_ENDIAN, null).get(); } throw new IllegalArgumentException("unknown encoding"); diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/CompressedColumnarIntsBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/CompressedColumnarIntsBenchmark.java index 5db092ba02ab..d132d781a966 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/CompressedColumnarIntsBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/CompressedColumnarIntsBenchmark.java @@ -82,7 +82,8 @@ public void setup() throws IOException ); this.compressed = CompressedVSizeColumnarIntsSupplier.fromByteBuffer( bufferCompressed, - ByteOrder.nativeOrder() + ByteOrder.nativeOrder(), + null ).get(); final ByteBuffer bufferUncompressed = serialize(VSizeColumnarInts.fromArray(vals)); diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/CompressedVSizeColumnarMultiIntsBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/CompressedVSizeColumnarMultiIntsBenchmark.java index 87665ab9597b..e77c1f8fc7a2 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/CompressedVSizeColumnarMultiIntsBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/CompressedVSizeColumnarMultiIntsBenchmark.java @@ -95,7 +95,8 @@ public void setup() throws IOException ); this.compressed = CompressedVSizeColumnarMultiIntsSupplier.fromByteBuffer( bufferCompressed, - ByteOrder.nativeOrder() + ByteOrder.nativeOrder(), + null ).get(); final ByteBuffer bufferUncompressed = serialize( diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/FloatCompressionBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/FloatCompressionBenchmark.java index c74415c5a43d..45f61e8d959b 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/FloatCompressionBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/FloatCompressionBenchmark.java @@ -75,7 +75,7 @@ public void setup() throws Exception File compFile = new File(dir, file + "-" + strategy); bufferHandler = FileUtils.map(compFile); ByteBuffer buffer = bufferHandler.get(); - supplier = CompressedColumnarFloatsSupplier.fromByteBuffer(buffer, ByteOrder.nativeOrder()); + supplier = CompressedColumnarFloatsSupplier.fromByteBuffer(buffer, ByteOrder.nativeOrder(), null); } @TearDown diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/LongCompressionBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/LongCompressionBenchmark.java index d327eaf91a05..d846c13b209c 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/LongCompressionBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/LongCompressionBenchmark.java @@ -79,7 +79,7 @@ public void setup() throws Exception File compFile = new File(dir, file + "-" + strategy + "-" + format); bufferHandler = FileUtils.map(compFile); ByteBuffer buffer = bufferHandler.get(); - supplier = CompressedColumnarLongsSupplier.fromByteBuffer(buffer, ByteOrder.nativeOrder()); + supplier = CompressedColumnarLongsSupplier.fromByteBuffer(buffer, ByteOrder.nativeOrder(), null); } @TearDown diff --git a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumnPartSupplier.java b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumnPartSupplier.java index c51fbc3384e2..66e5e9afac42 100644 --- a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumnPartSupplier.java +++ b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumnPartSupplier.java @@ -22,6 +22,7 @@ import com.google.common.base.Supplier; import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.column.ComplexColumn; import org.apache.druid.segment.data.CompressedVSizeColumnarIntsSupplier; @@ -40,10 +41,12 @@ public class CompressedBigDecimalColumnPartSupplier implements Supplier metrics = Maps.newLinkedHashMap(); @@ -385,7 +386,10 @@ public MMappedIndex mapDir(File inDir) throws IOException fileDimensionName ); - dimValueUtf8Lookups.put(dimension, GenericIndexed.read(dimBuffer, GenericIndexed.UTF8_STRATEGY)); + dimValueUtf8Lookups.put( + dimension, + GenericIndexed.read(dimBuffer, GenericIndexed.UTF8_STRATEGY, smooshedFiles) + ); dimColumns.put(dimension, VSizeColumnarMultiInts.readFromByteBuffer(dimBuffer)); } @@ -393,7 +397,7 @@ public MMappedIndex mapDir(File inDir) throws IOException for (int i = 0; i < availableDimensions.size(); ++i) { bitmaps.put( SERIALIZER_UTILS.readString(invertedBuffer), - GenericIndexed.read(invertedBuffer, bitmapSerdeFactory.getObjectStrategy()) + GenericIndexed.read(invertedBuffer, bitmapSerdeFactory.getObjectStrategy(), smooshedFiles) ); } diff --git a/processing/src/main/java/org/apache/druid/segment/MetricHolder.java b/processing/src/main/java/org/apache/druid/segment/MetricHolder.java index 11d4b688712d..bcf6753fde52 100644 --- a/processing/src/main/java/org/apache/druid/segment/MetricHolder.java +++ b/processing/src/main/java/org/apache/druid/segment/MetricHolder.java @@ -38,6 +38,9 @@ public class MetricHolder private static final byte[] VERSION = new byte[]{0x0}; private static final SerializerUtils SERIALIZER_UTILS = new SerializerUtils(); + /** + * Read a metric column from a legacy (v8) segment. + */ public static MetricHolder fromByteBuffer(ByteBuffer buf) { final byte ver = buf.get(); @@ -51,7 +54,11 @@ public static MetricHolder fromByteBuffer(ByteBuffer buf) switch (holder.type) { case FLOAT: - holder.floatType = CompressedColumnarFloatsSupplier.fromByteBuffer(buf, ByteOrder.nativeOrder()); + holder.floatType = CompressedColumnarFloatsSupplier.fromByteBuffer( + buf, + ByteOrder.nativeOrder(), + null // OK since this method is only used for legacy segments, which always use version 1 indexed + ); break; case COMPLEX: final ComplexMetricSerde serdeForType = ComplexMetrics.getSerdeForType(holder.getTypeName()); @@ -72,7 +79,7 @@ public static MetricHolder fromByteBuffer(ByteBuffer buf) private static GenericIndexed read(ByteBuffer buf, ComplexMetricSerde serde) { - return GenericIndexed.read(buf, serde.getObjectStrategy()); + return GenericIndexed.read(buf, serde.getObjectStrategy(), null); } public enum MetricType diff --git a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSerializer.java index 8c2dfb9c028b..6b323bf4b86a 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSerializer.java @@ -26,7 +26,6 @@ import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import javax.annotation.Nullable; - import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -57,6 +56,7 @@ public class BlockLayoutColumnarDoublesSerializer implements ColumnarDoublesSeri String filenameBase, ByteOrder byteOrder, CompressionStrategy compression, + int fileSizeLimit, Closer closer ) { @@ -66,6 +66,7 @@ public class BlockLayoutColumnarDoublesSerializer implements ColumnarDoublesSeri filenameBase, compression, CompressedPools.BUFFER_SIZE, + fileSizeLimit, closer ); this.compression = compression; diff --git a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java index 010e0b698577..4e97bc29497c 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java @@ -21,6 +21,7 @@ import com.google.common.base.Supplier; import org.apache.druid.collections.ResourceHolder; +import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; import javax.annotation.Nullable; import java.nio.ByteBuffer; @@ -43,11 +44,16 @@ public BlockLayoutColumnarDoublesSupplier( int sizePer, ByteBuffer fromBuffer, ByteOrder byteOrder, - CompressionStrategy strategy + CompressionStrategy strategy, + SmooshedFileMapper smooshMapper ) { this.strategy = strategy; - this.baseDoubleBuffers = GenericIndexed.read(fromBuffer, DecompressingByteBufferObjectStrategy.of(byteOrder, strategy)); + this.baseDoubleBuffers = GenericIndexed.read( + fromBuffer, + DecompressingByteBufferObjectStrategy.of(byteOrder, strategy), + smooshMapper + ); this.totalSize = totalSize; this.sizePer = sizePer; } diff --git a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSerializer.java index 5640339a316e..c9c65b751f9e 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSerializer.java @@ -57,6 +57,7 @@ public class BlockLayoutColumnarFloatsSerializer implements ColumnarFloatsSerial String filenameBase, ByteOrder byteOrder, CompressionStrategy compression, + int fileSizeLimit, Closer closer ) { @@ -66,6 +67,7 @@ public class BlockLayoutColumnarFloatsSerializer implements ColumnarFloatsSerial filenameBase, compression, CompressedPools.BUFFER_SIZE, + fileSizeLimit, closer ); this.compression = compression; diff --git a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSupplier.java index 383a99b3f473..1ce18fdbcd95 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSupplier.java @@ -21,6 +21,7 @@ import com.google.common.base.Supplier; import org.apache.druid.collections.ResourceHolder; +import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; import javax.annotation.Nullable; import java.nio.ByteBuffer; @@ -42,10 +43,15 @@ public BlockLayoutColumnarFloatsSupplier( int sizePer, ByteBuffer fromBuffer, ByteOrder byteOrder, - CompressionStrategy strategy + CompressionStrategy strategy, + @Nullable SmooshedFileMapper smooshMapper ) { - baseFloatBuffers = GenericIndexed.read(fromBuffer, DecompressingByteBufferObjectStrategy.of(byteOrder, strategy)); + baseFloatBuffers = GenericIndexed.read( + fromBuffer, + DecompressingByteBufferObjectStrategy.of(byteOrder, strategy), + smooshMapper + ); this.totalSize = totalSize; this.sizePer = sizePer; } diff --git a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSerializer.java index 37d468d62e49..a0a65343b75d 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSerializer.java @@ -60,6 +60,7 @@ public class BlockLayoutColumnarLongsSerializer implements ColumnarLongsSerializ ByteOrder byteOrder, CompressionFactory.LongEncodingWriter writer, CompressionStrategy compression, + int fileSizeLimit, Closer closer ) { @@ -71,6 +72,7 @@ public class BlockLayoutColumnarLongsSerializer implements ColumnarLongsSerializ filenameBase, compression, bufferSize, + fileSizeLimit, closer ); this.writer = writer; diff --git a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java index aa0346c6e34e..77714e18103e 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java @@ -22,6 +22,7 @@ import com.google.common.base.Supplier; import org.apache.druid.collections.ResourceHolder; import org.apache.druid.common.semantic.SemanticUtils; +import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; import javax.annotation.Nullable; import java.nio.ByteBuffer; @@ -51,11 +52,16 @@ public BlockLayoutColumnarLongsSupplier( ByteBuffer fromBuffer, ByteOrder order, CompressionFactory.LongEncodingReader reader, - CompressionStrategy strategy + CompressionStrategy strategy, + SmooshedFileMapper smooshMapper ) { this.strategy = strategy; - this.baseLongBuffers = GenericIndexed.read(fromBuffer, DecompressingByteBufferObjectStrategy.of(order, strategy)); + this.baseLongBuffers = GenericIndexed.read( + fromBuffer, + DecompressingByteBufferObjectStrategy.of(order, strategy), + smooshMapper + ); this.totalSize = totalSize; this.sizePer = sizePer; this.baseReader = reader; diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarDoublesSuppliers.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarDoublesSuppliers.java index 86443f942cea..17d4fdf034bf 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarDoublesSuppliers.java +++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarDoublesSuppliers.java @@ -21,6 +21,7 @@ import com.google.common.base.Supplier; import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -34,9 +35,19 @@ private CompressedColumnarDoublesSuppliers() { } + /** + * Reads a column from a {@link ByteBuffer}, possibly using additional secondary files from a + * {@link SmooshedFileMapper}. + * + * @param buffer primary buffer to read from + * @param order byte order + * @param smooshMapper required for reading version 2 (multi-file) indexed. May be null if you know you are reading + * a single-file column. Generally, this should only be null in tests, not production code. + */ public static Supplier fromByteBuffer( ByteBuffer buffer, - ByteOrder order + ByteOrder order, + SmooshedFileMapper smooshMapper ) { byte versionFromBuffer = buffer.get(); @@ -54,7 +65,8 @@ public static Supplier fromByteBuffer( sizePer, buffer.asReadOnlyBuffer(), order, - compression + compression, + smooshMapper ); } throw new IAE("Unknown version[%s]", versionFromBuffer); diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarFloatsSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarFloatsSupplier.java index 64b77f07aed8..282dd7b68a5c 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarFloatsSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarFloatsSupplier.java @@ -23,9 +23,11 @@ import org.apache.druid.io.Channels; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; +import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; import org.apache.druid.segment.serde.MetaSerdeHelper; import org.apache.druid.segment.serde.Serializer; +import javax.annotation.Nullable; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -82,7 +84,20 @@ public void writeTo(WritableByteChannel channel, FileSmoosher smoosher) throws I Channels.writeFully(channel, buffer.asReadOnlyBuffer()); } - public static CompressedColumnarFloatsSupplier fromByteBuffer(ByteBuffer buffer, ByteOrder order) + /** + * Reads a column from a {@link ByteBuffer}, possibly using additional secondary files from a + * {@link SmooshedFileMapper}. + * + * @param buffer primary buffer to read from + * @param order byte order + * @param smooshMapper required for reading version 2 (multi-file) indexed. May be null if you know you are reading + * a single-file column. Generally, this should only be null in tests, not production code. + */ + public static CompressedColumnarFloatsSupplier fromByteBuffer( + ByteBuffer buffer, + ByteOrder order, + @Nullable SmooshedFileMapper smooshMapper + ) { byte versionFromBuffer = buffer.get(); @@ -99,7 +114,8 @@ public static CompressedColumnarFloatsSupplier fromByteBuffer(ByteBuffer buffer, sizePer, buffer.asReadOnlyBuffer(), order, - compression + compression, + smooshMapper ); return new CompressedColumnarFloatsSupplier( totalSize, diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializer.java index cc724ba1cc7a..519360cf3bf2 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializer.java @@ -59,6 +59,7 @@ public class CompressedColumnarIntsSerializer extends SingleValueColumnarIntsSer final int chunkFactor, final ByteOrder byteOrder, final CompressionStrategy compression, + final int fileSizeLimit, final Closer closer ) { @@ -72,6 +73,7 @@ public class CompressedColumnarIntsSerializer extends SingleValueColumnarIntsSer filenameBase, compression, chunkFactor * Integer.BYTES, + fileSizeLimit, closer ), closer diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java index 22b477c019cc..a481e897d9e1 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java @@ -114,25 +114,6 @@ GenericIndexed> getBaseIntBuffers() return baseIntBuffers; } - public static CompressedColumnarIntsSupplier fromByteBuffer(ByteBuffer buffer, ByteOrder order) - { - byte versionFromBuffer = buffer.get(); - - if (versionFromBuffer == VERSION) { - final int totalSize = buffer.getInt(); - final int sizePer = buffer.getInt(); - final CompressionStrategy compression = CompressionStrategy.forId(buffer.get()); - return new CompressedColumnarIntsSupplier( - totalSize, - sizePer, - GenericIndexed.read(buffer, DecompressingByteBufferObjectStrategy.of(order, compression)), - compression - ); - } - - throw new IAE("Unknown version[%s]", versionFromBuffer); - } - public static CompressedColumnarIntsSupplier fromByteBuffer( ByteBuffer buffer, ByteOrder order, diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarLongsSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarLongsSupplier.java index 869e4495a32d..939b4e483925 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarLongsSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarLongsSupplier.java @@ -23,9 +23,11 @@ import org.apache.druid.io.Channels; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; +import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; import org.apache.druid.segment.serde.MetaSerdeHelper; import org.apache.druid.segment.serde.Serializer; +import javax.annotation.Nullable; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -97,7 +99,20 @@ public void writeTo(WritableByteChannel channel, FileSmoosher smoosher) throws I Channels.writeFully(channel, buffer.asReadOnlyBuffer()); } - public static CompressedColumnarLongsSupplier fromByteBuffer(ByteBuffer buffer, ByteOrder order) + /** + * Reads a column from a {@link ByteBuffer}, possibly using additional secondary files from a + * {@link SmooshedFileMapper}. + * + * @param buffer primary buffer to read from + * @param order byte order + * @param smooshMapper required for reading version 2 (multi-file) indexed. May be null if you know you are reading + * a single-file column. Generally, this should only be null in tests, not production code. + */ + public static CompressedColumnarLongsSupplier fromByteBuffer( + ByteBuffer buffer, + ByteOrder order, + @Nullable SmooshedFileMapper smooshMapper + ) { byte versionFromBuffer = buffer.get(); @@ -120,7 +135,8 @@ public static CompressedColumnarLongsSupplier fromByteBuffer(ByteBuffer buffer, buffer.asReadOnlyBuffer(), order, encoding, - compression + compression, + smooshMapper ); return new CompressedColumnarLongsSupplier( totalSize, diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializer.java index 84f4799e6d26..0bf216e1cb6e 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializer.java @@ -63,6 +63,7 @@ public static CompressedVSizeColumnarIntsSerializer create( CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForValue(maxValue), IndexIO.BYTE_ORDER, compression, + GenericIndexedWriter.MAX_FILE_SIZE, closer ); } @@ -87,6 +88,7 @@ public static CompressedVSizeColumnarIntsSerializer create( final int chunkFactor, final ByteOrder byteOrder, final CompressionStrategy compression, + final int fileSizeLimit, final Closer closer ) { @@ -101,6 +103,7 @@ public static CompressedVSizeColumnarIntsSerializer create( filenameBase, compression, sizePer(maxValue, chunkFactor), + fileSizeLimit, closer ), closer diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java index b02f4fd6c88b..bd541ea3aff7 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java @@ -140,33 +140,6 @@ GenericIndexed> getBaseBuffers() return baseBuffers; } - public static CompressedVSizeColumnarIntsSupplier fromByteBuffer( - ByteBuffer buffer, - ByteOrder order - ) - { - byte versionFromBuffer = buffer.get(); - - if (versionFromBuffer == VERSION) { - final int numBytes = buffer.get(); - final int totalSize = buffer.getInt(); - final int sizePer = buffer.getInt(); - - final CompressionStrategy compression = CompressionStrategy.forId(buffer.get()); - - return new CompressedVSizeColumnarIntsSupplier( - totalSize, - sizePer, - numBytes, - GenericIndexed.read(buffer, DecompressingByteBufferObjectStrategy.of(order, compression)), - compression - ); - - } - - throw new IAE("Unknown version[%s]", versionFromBuffer); - } - public static CompressedVSizeColumnarIntsSupplier fromByteBuffer( ByteBuffer buffer, ByteOrder order, diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarMultiIntsSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarMultiIntsSupplier.java index bd4ab0694016..9ca4eb0cd8cc 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarMultiIntsSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarMultiIntsSupplier.java @@ -26,6 +26,7 @@ import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; +import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import java.io.IOException; @@ -78,18 +79,24 @@ public void writeTo(WritableByteChannel channel, FileSmoosher smoosher) throws I valueSupplier.writeTo(channel, smoosher); } - public static CompressedVSizeColumnarMultiIntsSupplier fromByteBuffer(ByteBuffer buffer, ByteOrder order) + public static CompressedVSizeColumnarMultiIntsSupplier fromByteBuffer( + ByteBuffer buffer, + ByteOrder order, + SmooshedFileMapper smooshMapper + ) { byte versionFromBuffer = buffer.get(); if (versionFromBuffer == VERSION) { CompressedVSizeColumnarIntsSupplier offsetSupplier = CompressedVSizeColumnarIntsSupplier.fromByteBuffer( buffer, - order + order, + smooshMapper ); CompressedVSizeColumnarIntsSupplier valueSupplier = CompressedVSizeColumnarIntsSupplier.fromByteBuffer( buffer, - order + order, + smooshMapper ); return new CompressedVSizeColumnarMultiIntsSupplier(offsetSupplier, valueSupplier); } diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressionFactory.java b/processing/src/main/java/org/apache/druid/segment/data/CompressionFactory.java index 55d4c2d1f883..57dcea0db635 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/CompressionFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/data/CompressionFactory.java @@ -25,10 +25,12 @@ import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; import org.apache.druid.segment.serde.MetaSerdeHelper; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.WriteOutBytes; +import javax.annotation.Nullable; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -306,13 +308,27 @@ public interface LongEncodingReader LongEncodingStrategy getStrategy(); } + /** + * Reads a column from a {@link ByteBuffer}, possibly using additional secondary files from a + * {@link SmooshedFileMapper}. + * + * @param totalSize number of rows in the column + * @param sizePer number of values per compression buffer, for compressed columns + * @param fromBuffer primary buffer to read from + * @param order byte order + * @param encodingFormat encoding of each long value + * @param strategy compression strategy, for compressed columns + * @param smooshMapper required for reading version 2 (multi-file) indexed. May be null if you know you are reading + * a single-file column. Generally, this should only be null in tests, not production code. + */ public static Supplier getLongSupplier( int totalSize, int sizePer, ByteBuffer fromBuffer, ByteOrder order, LongEncodingFormat encodingFormat, - CompressionStrategy strategy + CompressionStrategy strategy, + @Nullable SmooshedFileMapper smooshMapper ) { if (strategy == CompressionStrategy.NONE) { @@ -324,7 +340,8 @@ public static Supplier getLongSupplier( fromBuffer, order, encodingFormat.getReader(fromBuffer, order), - strategy + strategy, + smooshMapper ); } } @@ -363,6 +380,7 @@ public static ColumnarLongsSerializer getLongSerializer( order, new LongsLongEncodingWriter(order), compressionStrategy, + GenericIndexedWriter.MAX_FILE_SIZE, closer ); } @@ -373,18 +391,31 @@ public static ColumnarLongsSerializer getLongSerializer( // Float currently does not support any encoding types, and stores values as 4 byte float + /** + * Reads a column from a {@link ByteBuffer}, possibly using additional secondary files from a + * {@link SmooshedFileMapper}. + * + * @param totalSize number of rows in the column + * @param sizePer number of values per compression buffer, for compressed columns + * @param fromBuffer primary buffer to read from + * @param order byte order + * @param strategy compression strategy, for compressed columns + * @param smooshMapper required for reading version 2 (multi-file) indexed. May be null if you know you are reading + * a single-file column. Generally, this should only be null in tests, not production code. + */ public static Supplier getFloatSupplier( int totalSize, int sizePer, ByteBuffer fromBuffer, ByteOrder order, - CompressionStrategy strategy + CompressionStrategy strategy, + @Nullable SmooshedFileMapper smooshMapper ) { if (strategy == CompressionStrategy.NONE) { return new EntireLayoutColumnarFloatsSupplier(totalSize, fromBuffer, order); } else { - return new BlockLayoutColumnarFloatsSupplier(totalSize, sizePer, fromBuffer, order, strategy); + return new BlockLayoutColumnarFloatsSupplier(totalSize, sizePer, fromBuffer, order, strategy, smooshMapper); } } @@ -406,26 +437,45 @@ public static ColumnarFloatsSerializer getFloatSerializer( filenameBase, order, compressionStrategy, + GenericIndexedWriter.MAX_FILE_SIZE, closer ); } } + /** + * Reads a column from a {@link ByteBuffer}, possibly using additional secondary files from a + * {@link SmooshedFileMapper}. + * + * @param totalSize number of rows in the column + * @param sizePer number of values per compression buffer, for compressed columns + * @param fromBuffer primary buffer to read from + * @param byteOrder byte order + * @param strategy compression strategy, for compressed columns + * @param smooshMapper required for reading version 2 (multi-file) indexed. May be null if you know you are reading + * a single-file column. Generally, this should only be null in tests, not production code. + */ public static Supplier getDoubleSupplier( int totalSize, int sizePer, ByteBuffer fromBuffer, ByteOrder byteOrder, - CompressionStrategy strategy + CompressionStrategy strategy, + SmooshedFileMapper smooshMapper ) { - switch (strategy) { - case NONE: - return new EntireLayoutColumnarDoublesSupplier(totalSize, fromBuffer, byteOrder); - default: - return new BlockLayoutColumnarDoublesSupplier(totalSize, sizePer, fromBuffer, byteOrder, strategy); + if (strategy == CompressionStrategy.NONE) { + return new EntireLayoutColumnarDoublesSupplier(totalSize, fromBuffer, byteOrder); + } else { + return new BlockLayoutColumnarDoublesSupplier( + totalSize, + sizePer, + fromBuffer, + byteOrder, + strategy, + smooshMapper + ); } - } public static ColumnarDoublesSerializer getDoubleSerializer( @@ -446,6 +496,7 @@ public static ColumnarDoublesSerializer getDoubleSerializer( filenameBase, byteOrder, compression, + GenericIndexedWriter.MAX_FILE_SIZE, closer ); } diff --git a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java index 2c61d85a2ed1..419f24a0922a 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java +++ b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java @@ -22,6 +22,7 @@ import com.google.common.primitives.Ints; import org.apache.druid.collections.ResourceHolder; import org.apache.druid.common.utils.SerializerUtils; +import org.apache.druid.error.DruidException; import org.apache.druid.io.Channels; import org.apache.druid.java.util.common.ByteBufferUtils; import org.apache.druid.java.util.common.IAE; @@ -174,32 +175,36 @@ public boolean readRetainsBufferReference() } }; - public static GenericIndexed read(ByteBuffer buffer, ObjectStrategy strategy) - { - byte versionFromBuffer = buffer.get(); - - if (VERSION_ONE == versionFromBuffer) { - return createGenericIndexedVersionOne(buffer, strategy); - } else if (VERSION_TWO == versionFromBuffer) { - throw new IAE( - "use read(ByteBuffer buffer, ObjectStrategy strategy, SmooshedFileMapper fileMapper)" - + " to read version 2 indexed." - ); - } - throw new IAE("Unknown version[%d]", (int) versionFromBuffer); - } - - public static GenericIndexed read(ByteBuffer buffer, ObjectStrategy strategy, SmooshedFileMapper fileMapper) + /** + * Reads a GenericIndexed from a {@link ByteBuffer}, possibly using additional secondary files from a + * {@link SmooshedFileMapper}. + * + * @param buffer primary buffer to read from + * @param strategy deserialization strategy + * @param fileMapper required for reading version 2 (multi-file) indexed. May be null if you know you are reading + * a version 1 indexed. + */ + public static GenericIndexed read( + ByteBuffer buffer, + ObjectStrategy strategy, + @Nullable SmooshedFileMapper fileMapper + ) { byte versionFromBuffer = buffer.get(); if (VERSION_ONE == versionFromBuffer) { return createGenericIndexedVersionOne(buffer, strategy); } else if (VERSION_TWO == versionFromBuffer) { + if (fileMapper == null) { + throw DruidException.defensive( + "use read(ByteBuffer buffer, ObjectStrategy strategy, SmooshedFileMapper fileMapper)" + + " with non-null fileMapper to read version 2 indexed." + ); + } return createGenericIndexedVersionTwo(buffer, strategy, fileMapper); } - throw new IAE("Unknown version [%s]", versionFromBuffer); + throw new IAE("Unknown version[%s]", versionFromBuffer); } public static GenericIndexed fromArray(T[] objects, ObjectStrategy strategy) diff --git a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java index ddc6bbe88767..524eab0b515e 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java +++ b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java @@ -52,6 +52,7 @@ public class GenericIndexedWriter implements DictionaryWriter { private static final int PAGE_SIZE = 4096; + public static final int MAX_FILE_SIZE = Integer.MAX_VALUE - PAGE_SIZE; private static final MetaSerdeHelper SINGLE_FILE_META_SERDE_HELPER = MetaSerdeHelper .firstWriteByte((GenericIndexedWriter x) -> GenericIndexed.VERSION_ONE) @@ -72,18 +73,31 @@ public class GenericIndexedWriter implements DictionaryWriter .writeByteArray(x -> x.fileNameByteArray); + /** + * Creates a new writer that accepts byte buffers and compresses them. + * + * @param segmentWriteOutMedium supplier of temporary files + * @param filenameBase base filename to be used for secondary files, if multiple files are needed + * @param compressionStrategy compression strategy to apply + * @param bufferSize size of the buffers that will be passed in + * @param fileSizeLimit limit for files created by the writer. In production code, this should always be + * {@link GenericIndexedWriter#MAX_FILE_SIZE}. The parameter is exposed only for testing. + * @param closer closer to attach temporary compression buffers to + */ public static GenericIndexedWriter ofCompressedByteBuffers( final SegmentWriteOutMedium segmentWriteOutMedium, final String filenameBase, final CompressionStrategy compressionStrategy, final int bufferSize, + final int fileSizeLimit, final Closer closer ) { GenericIndexedWriter writer = new GenericIndexedWriter<>( segmentWriteOutMedium, filenameBase, - compressedByteBuffersWriteObjectStrategy(compressionStrategy, bufferSize, closer) + compressedByteBuffersWriteObjectStrategy(compressionStrategy, bufferSize, closer), + fileSizeLimit ); writer.objectsSorted = false; return writer; @@ -169,7 +183,7 @@ public GenericIndexedWriter( ObjectStrategy strategy ) { - this(segmentWriteOutMedium, filenameBase, strategy, Integer.MAX_VALUE & ~PAGE_SIZE); + this(segmentWriteOutMedium, filenameBase, strategy, MAX_FILE_SIZE); } public GenericIndexedWriter( diff --git a/processing/src/main/java/org/apache/druid/segment/data/IntermediateColumnarLongsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/IntermediateColumnarLongsSerializer.java index 7403f8dfd20b..ae8cb15cee80 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/IntermediateColumnarLongsSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/data/IntermediateColumnarLongsSerializer.java @@ -145,6 +145,7 @@ private void makeDelegate() throws IOException order, writer, compression, + GenericIndexedWriter.MAX_FILE_SIZE, closer ); } diff --git a/processing/src/main/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializer.java index 0fac36399d1d..4c882d88f889 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializer.java @@ -35,12 +35,24 @@ public class V3CompressedVSizeColumnarMultiIntsSerializer extends ColumnarMultiI { private static final byte VERSION = V3CompressedVSizeColumnarMultiIntsSupplier.VERSION; + /** + * Creates a new serializer. + * + * @param columnName name of the column to write + * @param segmentWriteOutMedium supplier of temporary files + * @param filenameBase base filename to be used for secondary files, if multiple files are needed + * @param maxValue maximum integer value that will be written to the column + * @param compression compression strategy to apply + * @param fileSizeLimit limit for files created by the writer. In production code, this should always be + * {@link GenericIndexedWriter#MAX_FILE_SIZE}. The parameter is exposed only for testing. + */ public static V3CompressedVSizeColumnarMultiIntsSerializer create( final String columnName, final SegmentWriteOutMedium segmentWriteOutMedium, final String filenameBase, final int maxValue, - final CompressionStrategy compression + final CompressionStrategy compression, + final int fileSizeLimit ) { return new V3CompressedVSizeColumnarMultiIntsSerializer( @@ -48,20 +60,22 @@ public static V3CompressedVSizeColumnarMultiIntsSerializer create( new CompressedColumnarIntsSerializer( columnName, segmentWriteOutMedium, - filenameBase, + filenameBase + ".offsets", CompressedColumnarIntsSupplier.MAX_INTS_IN_BUFFER, IndexIO.BYTE_ORDER, compression, + fileSizeLimit, segmentWriteOutMedium.getCloser() ), new CompressedVSizeColumnarIntsSerializer( columnName, segmentWriteOutMedium, - filenameBase, + filenameBase + ".values", maxValue, CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForValue(maxValue), IndexIO.BYTE_ORDER, compression, + fileSizeLimit, segmentWriteOutMedium.getCloser() ) ); diff --git a/processing/src/main/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSupplier.java index 3bb934cd296c..c1d6d82f407a 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSupplier.java @@ -56,24 +56,6 @@ private V3CompressedVSizeColumnarMultiIntsSupplier( this.valueSupplier = valueSupplier; } - public static V3CompressedVSizeColumnarMultiIntsSupplier fromByteBuffer(ByteBuffer buffer, ByteOrder order) - { - byte versionFromBuffer = buffer.get(); - - if (versionFromBuffer == VERSION) { - CompressedColumnarIntsSupplier offsetSupplier = CompressedColumnarIntsSupplier.fromByteBuffer( - buffer, - order - ); - CompressedVSizeColumnarIntsSupplier valueSupplier = CompressedVSizeColumnarIntsSupplier.fromByteBuffer( - buffer, - order - ); - return new V3CompressedVSizeColumnarMultiIntsSupplier(offsetSupplier, valueSupplier); - } - throw new IAE("Unknown version[%s]", versionFromBuffer); - } - public static V3CompressedVSizeColumnarMultiIntsSupplier fromByteBuffer(ByteBuffer buffer, ByteOrder order, SmooshedFileMapper mapper) { byte versionFromBuffer = buffer.get(); diff --git a/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java b/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java index 311e72cdcfe7..5913425cc1a4 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java @@ -968,18 +968,20 @@ private ColumnHolder readNestedFieldColumn(String field, int fieldIndex) int pos = dataBuffer.position(); final Supplier longs = longsLength > 0 ? CompressedColumnarLongsSupplier.fromByteBuffer( dataBuffer, - byteOrder + byteOrder, + columnBuilder.getFileMapper() ) : () -> null; dataBuffer.position(pos + longsLength); pos = dataBuffer.position(); final Supplier doubles = doublesLength > 0 ? CompressedColumnarDoublesSuppliers.fromByteBuffer( dataBuffer, - byteOrder + byteOrder, + columnBuilder.getFileMapper() ) : () -> null; dataBuffer.position(pos + doublesLength); final WritableSupplier ints; if (version == DictionaryEncodedColumnPartSerde.VERSION.COMPRESSED) { - ints = CompressedVSizeColumnarIntsSupplier.fromByteBuffer(dataBuffer, byteOrder); + ints = CompressedVSizeColumnarIntsSupplier.fromByteBuffer(dataBuffer, byteOrder, columnBuilder.getFileMapper()); } else { ints = VSizeColumnarInts.readFromByteBuffer(dataBuffer); } diff --git a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnAndIndexSupplier.java b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnAndIndexSupplier.java index 6f7e2f8a2905..2723effb6ae6 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnAndIndexSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnAndIndexSupplier.java @@ -142,7 +142,8 @@ public static ScalarDoubleColumnAndIndexSupplier read( final Supplier doubles = CompressedColumnarDoublesSuppliers.fromByteBuffer( doublesValueColumn, - byteOrder + byteOrder, + columnBuilder.getFileMapper() ); final ByteBuffer valueIndexBuffer = NestedCommonFormatColumnPartSerde.loadInternalFile( mapper, diff --git a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnAndIndexSupplier.java b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnAndIndexSupplier.java index 063dc2bfb9c0..5cca18e4323a 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnAndIndexSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnAndIndexSupplier.java @@ -151,7 +151,8 @@ public static ScalarLongColumnAndIndexSupplier read( final Supplier longs = CompressedColumnarLongsSupplier.fromByteBuffer( longsValueColumn, - byteOrder + byteOrder, + columnBuilder.getFileMapper() ); return new ScalarLongColumnAndIndexSupplier( longDictionarySupplier, diff --git a/processing/src/main/java/org/apache/druid/segment/nested/ScalarStringColumnAndIndexSupplier.java b/processing/src/main/java/org/apache/druid/segment/nested/ScalarStringColumnAndIndexSupplier.java index ee7fe1475b73..386c6fba7852 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/ScalarStringColumnAndIndexSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/ScalarStringColumnAndIndexSupplier.java @@ -85,7 +85,8 @@ public static ScalarStringColumnAndIndexSupplier read( ); final CompressedVSizeColumnarIntsSupplier ints = CompressedVSizeColumnarIntsSupplier.fromByteBuffer( encodedValueColumn, - byteOrder + byteOrder, + columnBuilder.getFileMapper() ); final ByteBuffer valueIndexBuffer = NestedCommonFormatColumnPartSerde.loadInternalFile( mapper, diff --git a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnAndIndexSupplier.java b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnAndIndexSupplier.java index 6a2ec4769762..d3254c536f4a 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnAndIndexSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnAndIndexSupplier.java @@ -163,7 +163,8 @@ public static VariantColumnAndIndexSupplier read( ); final CompressedVSizeColumnarIntsSupplier ints = CompressedVSizeColumnarIntsSupplier.fromByteBuffer( encodedValueColumn, - byteOrder + byteOrder, + fileMapper ); final ByteBuffer valueIndexBuffer = NestedCommonFormatColumnPartSerde.loadInternalFile( fileMapper, diff --git a/processing/src/main/java/org/apache/druid/segment/serde/DictionaryEncodedColumnPartSerde.java b/processing/src/main/java/org/apache/druid/segment/serde/DictionaryEncodedColumnPartSerde.java index 02e7f5b4d397..4dd4de0e42af 100644 --- a/processing/src/main/java/org/apache/druid/segment/serde/DictionaryEncodedColumnPartSerde.java +++ b/processing/src/main/java/org/apache/druid/segment/serde/DictionaryEncodedColumnPartSerde.java @@ -29,6 +29,7 @@ import org.apache.druid.io.Channels; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; +import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; import org.apache.druid.segment.column.BaseColumn; import org.apache.druid.segment.column.ColumnBuilder; import org.apache.druid.segment.column.ColumnConfig; @@ -332,10 +333,10 @@ public void read( final WritableSupplier rMultiValuedColumn; if (hasMultipleValues) { - rMultiValuedColumn = readMultiValuedColumn(rVersion, buffer, rFlags); + rMultiValuedColumn = readMultiValuedColumn(rVersion, buffer, rFlags, builder.getFileMapper()); rSingleValuedColumn = null; } else { - rSingleValuedColumn = readSingleValuedColumn(rVersion, buffer); + rSingleValuedColumn = readSingleValuedColumn(rVersion, buffer, builder.getFileMapper()); rMultiValuedColumn = null; } @@ -381,20 +382,29 @@ public void read( } } - private WritableSupplier readSingleValuedColumn(VERSION version, ByteBuffer buffer) + private WritableSupplier readSingleValuedColumn( + VERSION version, + ByteBuffer buffer, + SmooshedFileMapper smooshReader + ) { switch (version) { case UNCOMPRESSED_SINGLE_VALUE: case UNCOMPRESSED_WITH_FLAGS: return VSizeColumnarInts.readFromByteBuffer(buffer); case COMPRESSED: - return CompressedVSizeColumnarIntsSupplier.fromByteBuffer(buffer, byteOrder); + return CompressedVSizeColumnarIntsSupplier.fromByteBuffer(buffer, byteOrder, smooshReader); default: throw new IAE("Unsupported single-value version[%s]", version); } } - private WritableSupplier readMultiValuedColumn(VERSION version, ByteBuffer buffer, int flags) + private WritableSupplier readMultiValuedColumn( + VERSION version, + ByteBuffer buffer, + int flags, + SmooshedFileMapper smooshReader + ) { switch (version) { case UNCOMPRESSED_MULTI_VALUE: { @@ -409,9 +419,9 @@ private WritableSupplier readMultiValuedColumn(VERSION versio } case COMPRESSED: { if (Feature.MULTI_VALUE.isSet(flags)) { - return CompressedVSizeColumnarMultiIntsSupplier.fromByteBuffer(buffer, byteOrder); + return CompressedVSizeColumnarMultiIntsSupplier.fromByteBuffer(buffer, byteOrder, smooshReader); } else if (Feature.MULTI_VALUE_V3.isSet(flags)) { - return V3CompressedVSizeColumnarMultiIntsSupplier.fromByteBuffer(buffer, byteOrder); + return V3CompressedVSizeColumnarMultiIntsSupplier.fromByteBuffer(buffer, byteOrder, smooshReader); } else { throw new IAE("Unrecognized multi-value flag[%d] for version[%s]", flags, version); } diff --git a/processing/src/main/java/org/apache/druid/segment/serde/DoubleNumericColumnPartSerde.java b/processing/src/main/java/org/apache/druid/segment/serde/DoubleNumericColumnPartSerde.java index 012b3ed77b05..a1611e4a73c3 100644 --- a/processing/src/main/java/org/apache/druid/segment/serde/DoubleNumericColumnPartSerde.java +++ b/processing/src/main/java/org/apache/druid/segment/serde/DoubleNumericColumnPartSerde.java @@ -99,7 +99,8 @@ public Deserializer getDeserializer() return (buffer, builder, columnConfig, parent) -> { final Supplier column = CompressedColumnarDoublesSuppliers.fromByteBuffer( buffer, - byteOrder + byteOrder, + builder.getFileMapper() ); DoubleNumericColumnSupplier columnSupplier = new DoubleNumericColumnSupplier( column, diff --git a/processing/src/main/java/org/apache/druid/segment/serde/DoubleNumericColumnPartSerdeV2.java b/processing/src/main/java/org/apache/druid/segment/serde/DoubleNumericColumnPartSerdeV2.java index 520249b923e8..dd3dbdf37b18 100644 --- a/processing/src/main/java/org/apache/druid/segment/serde/DoubleNumericColumnPartSerdeV2.java +++ b/processing/src/main/java/org/apache/druid/segment/serde/DoubleNumericColumnPartSerdeV2.java @@ -147,7 +147,8 @@ public Deserializer getDeserializer() int initialPos = buffer.position(); final Supplier column = CompressedColumnarDoublesSuppliers.fromByteBuffer( buffer, - byteOrder + byteOrder, + builder.getFileMapper() ); buffer.position(initialPos + offset); diff --git a/processing/src/main/java/org/apache/druid/segment/serde/FloatNumericColumnPartSerde.java b/processing/src/main/java/org/apache/druid/segment/serde/FloatNumericColumnPartSerde.java index 441f774c7d17..e8f1e6c73dac 100644 --- a/processing/src/main/java/org/apache/druid/segment/serde/FloatNumericColumnPartSerde.java +++ b/processing/src/main/java/org/apache/druid/segment/serde/FloatNumericColumnPartSerde.java @@ -99,7 +99,8 @@ public Deserializer getDeserializer() return (buffer, builder, columnConfig, parent) -> { final CompressedColumnarFloatsSupplier column = CompressedColumnarFloatsSupplier.fromByteBuffer( buffer, - byteOrder + byteOrder, + builder.getFileMapper() ); FloatNumericColumnSupplier columnSupplier = new FloatNumericColumnSupplier( column, diff --git a/processing/src/main/java/org/apache/druid/segment/serde/FloatNumericColumnPartSerdeV2.java b/processing/src/main/java/org/apache/druid/segment/serde/FloatNumericColumnPartSerdeV2.java index 116d2dec9b09..d79f0b1d11cf 100644 --- a/processing/src/main/java/org/apache/druid/segment/serde/FloatNumericColumnPartSerdeV2.java +++ b/processing/src/main/java/org/apache/druid/segment/serde/FloatNumericColumnPartSerdeV2.java @@ -145,7 +145,8 @@ public Deserializer getDeserializer() int initialPos = buffer.position(); final CompressedColumnarFloatsSupplier column = CompressedColumnarFloatsSupplier.fromByteBuffer( buffer, - byteOrder + byteOrder, + builder.getFileMapper() ); buffer.position(initialPos + offset); final ImmutableBitmap bitmap; diff --git a/processing/src/main/java/org/apache/druid/segment/serde/LongNumericColumnPartSerde.java b/processing/src/main/java/org/apache/druid/segment/serde/LongNumericColumnPartSerde.java index fa94b91ecd4e..be1050abc014 100644 --- a/processing/src/main/java/org/apache/druid/segment/serde/LongNumericColumnPartSerde.java +++ b/processing/src/main/java/org/apache/druid/segment/serde/LongNumericColumnPartSerde.java @@ -99,7 +99,8 @@ public Deserializer getDeserializer() return (buffer, builder, columnConfig, parent) -> { final CompressedColumnarLongsSupplier column = CompressedColumnarLongsSupplier.fromByteBuffer( buffer, - byteOrder + byteOrder, + builder.getFileMapper() ); LongNumericColumnSupplier columnSupplier = new LongNumericColumnSupplier( column, diff --git a/processing/src/main/java/org/apache/druid/segment/serde/LongNumericColumnPartSerdeV2.java b/processing/src/main/java/org/apache/druid/segment/serde/LongNumericColumnPartSerdeV2.java index 272670e88ef8..7c2e65478cf8 100644 --- a/processing/src/main/java/org/apache/druid/segment/serde/LongNumericColumnPartSerdeV2.java +++ b/processing/src/main/java/org/apache/druid/segment/serde/LongNumericColumnPartSerdeV2.java @@ -147,7 +147,8 @@ public Deserializer getDeserializer() int initialPos = buffer.position(); final CompressedColumnarLongsSupplier column = CompressedColumnarLongsSupplier.fromByteBuffer( buffer, - byteOrder + byteOrder, + builder.getFileMapper() ); buffer.position(initialPos + offset); final ImmutableBitmap bitmap; diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java index 2d213eddb5f8..950c007e5c28 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java @@ -20,6 +20,7 @@ package org.apache.druid.segment.data; import com.google.common.base.Function; +import com.google.common.base.Supplier; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import it.unimi.dsi.fastutil.ints.IntArrayList; @@ -35,6 +36,8 @@ import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import org.apache.druid.segment.writeout.WriteOutBytes; import org.apache.druid.utils.CloseableUtils; +import org.hamcrest.MatcherAssert; +import org.hamcrest.Matchers; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -150,6 +153,60 @@ public void testMultiValueFileLargeData() throws Exception } } + @Test + public void testLargeColumn() throws IOException + { + final File columnDir = temporaryFolder.newFolder(); + final String columnName = "column"; + final long numRows = 500_000; // enough values that we expect to switch into large-column mode + + try ( + SegmentWriteOutMedium segmentWriteOutMedium = + TmpFileSegmentWriteOutMediumFactory.instance().makeSegmentWriteOutMedium(temporaryFolder.newFolder()); + FileSmoosher smoosher = new FileSmoosher(columnDir) + ) { + final Random random = new Random(0); + final int fileSizeLimit = 128_000; // limit to 128KB so we switch to large-column mode sooner + final CompressedColumnarIntsSerializer serializer = new CompressedColumnarIntsSerializer( + columnName, + segmentWriteOutMedium, + columnName, + CompressedColumnarIntsSupplier.MAX_INTS_IN_BUFFER, + byteOrder, + compressionStrategy, + fileSizeLimit, + segmentWriteOutMedium.getCloser() + ); + serializer.open(); + + for (int i = 0; i < numRows; i++) { + serializer.addValue(random.nextInt() ^ Integer.MIN_VALUE); + } + + try (SmooshedWriter primaryWriter = smoosher.addWithSmooshedWriter(columnName, serializer.getSerializedSize())) { + serializer.writeTo(primaryWriter, smoosher); + } + } + + try (SmooshedFileMapper smooshMapper = SmooshedFileMapper.load(columnDir)) { + MatcherAssert.assertThat( + "Number of value parts written", // ensure the column actually ended up multi-part + smooshMapper.getInternalFilenames().stream().filter(s -> s.startsWith("column_value_")).count(), + Matchers.greaterThan(1L) + ); + + final Supplier columnSupplier = CompressedColumnarIntsSupplier.fromByteBuffer( + smooshMapper.mapFile(columnName), + byteOrder, + smooshMapper + ); + + try (final ColumnarInts column = columnSupplier.get()) { + Assert.assertEquals(numRows, column.size()); + } + } + } + // this test takes ~30 minutes to run @Ignore @Test @@ -168,6 +225,7 @@ public void testTooManyValues() throws IOException CompressedColumnarIntsSupplier.MAX_INTS_IN_BUFFER, byteOrder, compressionStrategy, + GenericIndexedWriter.MAX_FILE_SIZE, segmentWriteOutMedium.getCloser() ); serializer.open(); @@ -198,6 +256,7 @@ private void checkSerializedSizeAndData(int chunkFactor) throws Exception chunkFactor, byteOrder, compressionStrategy, + GenericIndexedWriter.MAX_FILE_SIZE, segmentWriteOutMedium.getCloser() ); CompressedColumnarIntsSupplier supplierFromList = CompressedColumnarIntsSupplier.fromList( @@ -221,7 +280,8 @@ private void checkSerializedSizeAndData(int chunkFactor) throws Exception // read from ByteBuffer and check values CompressedColumnarIntsSupplier supplierFromByteBuffer = CompressedColumnarIntsSupplier.fromByteBuffer( ByteBuffer.wrap(IOUtils.toByteArray(writeOutBytes.asInputStream())), - byteOrder + byteOrder, + null ); ColumnarInts columnarInts = supplierFromByteBuffer.get(); Assert.assertEquals(vals.length, columnarInts.size()); @@ -247,6 +307,7 @@ private void checkV2SerializedSizeAndData(int chunkFactor) throws Exception "test", compressionStrategy, Long.BYTES * 10000, + GenericIndexedWriter.MAX_FILE_SIZE, segmentWriteOutMedium.getCloser() ), segmentWriteOutMedium.getCloser() @@ -264,7 +325,8 @@ private void checkV2SerializedSizeAndData(int chunkFactor) throws Exception // read from ByteBuffer and check values CompressedColumnarIntsSupplier supplierFromByteBuffer = CompressedColumnarIntsSupplier.fromByteBuffer( mapper.mapFile("test"), - byteOrder + byteOrder, + null ); ColumnarInts columnarInts = supplierFromByteBuffer.get(); Assert.assertEquals(vals.length, columnarInts.size()); diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplierTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplierTest.java index 214da00365bc..25ad99dbd12a 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplierTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplierTest.java @@ -109,7 +109,7 @@ private void makeWithSerde(final int chunkSize) throws IOException final byte[] bytes = baos.toByteArray(); Assert.assertEquals(theSupplier.getSerializedSize(), bytes.length); - supplier = CompressedColumnarIntsSupplier.fromByteBuffer(ByteBuffer.wrap(bytes), ByteOrder.nativeOrder()); + supplier = CompressedColumnarIntsSupplier.fromByteBuffer(ByteBuffer.wrap(bytes), ByteOrder.nativeOrder(), null); columnarInts = supplier.get(); } diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java index e4f416c31ee4..51b0c2a63e17 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java @@ -23,11 +23,18 @@ import com.google.common.primitives.Doubles; import it.unimi.dsi.fastutil.ints.IntArrays; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; +import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; +import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import org.apache.druid.utils.CloseableUtils; +import org.hamcrest.CoreMatchers; +import org.hamcrest.MatcherAssert; +import org.hamcrest.Matchers; import org.junit.Assert; +import org.junit.Assume; import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; @@ -37,12 +44,14 @@ import org.junit.runners.Parameterized; import java.io.ByteArrayOutputStream; +import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.Channels; import java.util.ArrayList; import java.util.List; +import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; @@ -131,6 +140,63 @@ public void testChunkSerde() throws Exception testWithValues(chunk); } + @Test + public void testLargeColumn() throws IOException + { + // This test only makes sense if we can use BlockLayoutColumnarDoubleSerializer directly. + // Exclude incompatible compressionStrategy. + Assume.assumeThat(compressionStrategy, CoreMatchers.not(CoreMatchers.equalTo(CompressionStrategy.NONE))); + + final File columnDir = temporaryFolder.newFolder(); + final String columnName = "column"; + final long numRows = 500_000; // enough values that we expect to switch into large-column mode + + try ( + SegmentWriteOutMedium segmentWriteOutMedium = + TmpFileSegmentWriteOutMediumFactory.instance().makeSegmentWriteOutMedium(temporaryFolder.newFolder()); + FileSmoosher smoosher = new FileSmoosher(columnDir) + ) { + final Random random = new Random(0); + final int fileSizeLimit = 128_000; // limit to 128KB so we switch to large-column mode sooner + final ColumnarDoublesSerializer serializer = new BlockLayoutColumnarDoublesSerializer( + columnName, + segmentWriteOutMedium, + columnName, + order, + compressionStrategy, + fileSizeLimit, + segmentWriteOutMedium.getCloser() + ); + serializer.open(); + + for (int i = 0; i < numRows; i++) { + serializer.add(random.nextLong()); + } + + try (SmooshedWriter primaryWriter = smoosher.addWithSmooshedWriter(columnName, serializer.getSerializedSize())) { + serializer.writeTo(primaryWriter, smoosher); + } + } + + try (SmooshedFileMapper smooshMapper = SmooshedFileMapper.load(columnDir)) { + MatcherAssert.assertThat( + "Number of value parts written", // ensure the column actually ended up multi-part + smooshMapper.getInternalFilenames().stream().filter(s -> s.startsWith("column_value_")).count(), + Matchers.greaterThan(1L) + ); + + final Supplier columnSupplier = CompressedColumnarDoublesSuppliers.fromByteBuffer( + smooshMapper.mapFile(columnName), + order, + smooshMapper + ); + + try (final ColumnarDoubles column = columnSupplier.get()) { + Assert.assertEquals(numRows, column.size()); + } + } + } + // this test takes ~45 minutes to run @Ignore @Test @@ -179,7 +245,7 @@ public void testWithValues(double[] values) throws Exception serializer.writeTo(Channels.newChannel(baos), null); Assert.assertEquals(baos.size(), serializer.getSerializedSize()); Supplier supplier = CompressedColumnarDoublesSuppliers - .fromByteBuffer(ByteBuffer.wrap(baos.toByteArray()), order); + .fromByteBuffer(ByteBuffer.wrap(baos.toByteArray()), order, null); try (ColumnarDoubles doubles = supplier.get()) { assertIndexMatchesVals(doubles, values); for (int i = 0; i < 10; i++) { diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java index 1994e316de49..20a82cecbc6c 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java @@ -23,11 +23,18 @@ import com.google.common.primitives.Floats; import it.unimi.dsi.fastutil.ints.IntArrays; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; +import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; +import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import org.apache.druid.utils.CloseableUtils; +import org.hamcrest.CoreMatchers; +import org.hamcrest.MatcherAssert; +import org.hamcrest.Matchers; import org.junit.Assert; +import org.junit.Assume; import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; @@ -37,12 +44,14 @@ import org.junit.runners.Parameterized; import java.io.ByteArrayOutputStream; +import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.Channels; import java.util.ArrayList; import java.util.List; +import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; @@ -138,6 +147,63 @@ public void testChunkSerde() throws Exception testWithValues(chunk); } + @Test + public void testLargeColumn() throws IOException + { + // This test only makes sense if we can use BlockLayoutColumnarFloatSerializer directly. + // Exclude incompatible compressionStrategy. + Assume.assumeThat(compressionStrategy, CoreMatchers.not(CoreMatchers.equalTo(CompressionStrategy.NONE))); + + final File columnDir = temporaryFolder.newFolder(); + final String columnName = "column"; + final long numRows = 500_000; // enough values that we expect to switch into large-column mode + + try ( + SegmentWriteOutMedium segmentWriteOutMedium = + TmpFileSegmentWriteOutMediumFactory.instance().makeSegmentWriteOutMedium(temporaryFolder.newFolder()); + FileSmoosher smoosher = new FileSmoosher(columnDir) + ) { + final Random random = new Random(0); + final int fileSizeLimit = 128_000; // limit to 128KB so we switch to large-column mode sooner + final ColumnarFloatsSerializer serializer = new BlockLayoutColumnarFloatsSerializer( + columnName, + segmentWriteOutMedium, + columnName, + order, + compressionStrategy, + fileSizeLimit, + segmentWriteOutMedium.getCloser() + ); + serializer.open(); + + for (int i = 0; i < numRows; i++) { + serializer.add(random.nextLong()); + } + + try (SmooshedWriter primaryWriter = smoosher.addWithSmooshedWriter(columnName, serializer.getSerializedSize())) { + serializer.writeTo(primaryWriter, smoosher); + } + } + + try (SmooshedFileMapper smooshMapper = SmooshedFileMapper.load(columnDir)) { + MatcherAssert.assertThat( + "Number of value parts written", // ensure the column actually ended up multi-part + smooshMapper.getInternalFilenames().stream().filter(s -> s.startsWith("column_value_")).count(), + Matchers.greaterThan(1L) + ); + + final Supplier columnSupplier = CompressedColumnarFloatsSupplier.fromByteBuffer( + smooshMapper.mapFile(columnName), + order, + smooshMapper + ); + + try (final ColumnarFloats column = columnSupplier.get()) { + Assert.assertEquals(numRows, column.size()); + } + } + } + // this test takes ~30 minutes to run @Ignore @Test @@ -188,7 +254,7 @@ public void testWithValues(float[] values) throws Exception serializer.writeTo(Channels.newChannel(baos), null); Assert.assertEquals(baos.size(), serializer.getSerializedSize()); CompressedColumnarFloatsSupplier supplier = CompressedColumnarFloatsSupplier - .fromByteBuffer(ByteBuffer.wrap(baos.toByteArray()), order); + .fromByteBuffer(ByteBuffer.wrap(baos.toByteArray()), order, null); try (ColumnarFloats floats = supplier.get()) { assertIndexMatchesVals(floats, values); @@ -241,9 +307,8 @@ private void testSupplierSerde(CompressedColumnarFloatsSupplier supplier, float[ final byte[] bytes = baos.toByteArray(); Assert.assertEquals(supplier.getSerializedSize(), bytes.length); - CompressedColumnarFloatsSupplier anotherSupplier = CompressedColumnarFloatsSupplier.fromByteBuffer( - ByteBuffer.wrap(bytes), order - ); + CompressedColumnarFloatsSupplier anotherSupplier = + CompressedColumnarFloatsSupplier.fromByteBuffer(ByteBuffer.wrap(bytes), order, null); try (ColumnarFloats indexed = anotherSupplier.get()) { assertIndexMatchesVals(indexed, vals); } diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsAutoEncodingSerdeTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsAutoEncodingSerdeTest.java index 4876a347fb21..db8babedd269 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsAutoEncodingSerdeTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsAutoEncodingSerdeTest.java @@ -115,7 +115,7 @@ public void testValues(long[] values) throws Exception serializer.writeTo(Channels.newChannel(baos), null); Assert.assertEquals(baos.size(), serializer.getSerializedSize()); CompressedColumnarLongsSupplier supplier = - CompressedColumnarLongsSupplier.fromByteBuffer(ByteBuffer.wrap(baos.toByteArray()), order); + CompressedColumnarLongsSupplier.fromByteBuffer(ByteBuffer.wrap(baos.toByteArray()), order, null); ColumnarLongs longs = supplier.get(); assertIndexMatchesVals(longs, values); diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java index dfc457fe8b9e..0e55a0ca29fd 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java @@ -23,11 +23,18 @@ import com.google.common.primitives.Longs; import it.unimi.dsi.fastutil.ints.IntArrays; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; +import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; +import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import org.apache.druid.utils.CloseableUtils; +import org.hamcrest.CoreMatchers; +import org.hamcrest.MatcherAssert; +import org.hamcrest.Matchers; import org.junit.Assert; +import org.junit.Assume; import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; @@ -37,12 +44,14 @@ import org.junit.runners.Parameterized; import java.io.ByteArrayOutputStream; +import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.Channels; import java.util.ArrayList; import java.util.List; +import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; @@ -166,6 +175,65 @@ public void testTooManyValues() throws IOException } } + @Test + public void testLargeColumn() throws IOException + { + // This test only makes sense if we can use BlockLayoutColumnarLongsSerializer directly. Exclude incompatible + // combinations of compressionStrategy, encodingStrategy. + Assume.assumeThat(compressionStrategy, CoreMatchers.not(CoreMatchers.equalTo(CompressionStrategy.NONE))); + Assume.assumeThat(encodingStrategy, CoreMatchers.equalTo(CompressionFactory.LongEncodingStrategy.LONGS)); + + final File columnDir = temporaryFolder.newFolder(); + final String columnName = "column"; + final long numRows = 500_000; // enough values that we expect to switch into large-column mode + + try ( + SegmentWriteOutMedium segmentWriteOutMedium = + TmpFileSegmentWriteOutMediumFactory.instance().makeSegmentWriteOutMedium(temporaryFolder.newFolder()); + FileSmoosher smoosher = new FileSmoosher(columnDir) + ) { + final Random random = new Random(0); + final int fileSizeLimit = 128_000; // limit to 128KB so we switch to large-column mode sooner + final ColumnarLongsSerializer serializer = new BlockLayoutColumnarLongsSerializer( + columnName, + segmentWriteOutMedium, + columnName, + order, + new LongsLongEncodingWriter(order), + compressionStrategy, + fileSizeLimit, + segmentWriteOutMedium.getCloser() + ); + serializer.open(); + + for (int i = 0; i < numRows; i++) { + serializer.add(random.nextLong()); + } + + try (SmooshedWriter primaryWriter = smoosher.addWithSmooshedWriter(columnName, serializer.getSerializedSize())) { + serializer.writeTo(primaryWriter, smoosher); + } + } + + try (SmooshedFileMapper smooshMapper = SmooshedFileMapper.load(columnDir)) { + MatcherAssert.assertThat( + "Number of value parts written", // ensure the column actually ended up multi-part + smooshMapper.getInternalFilenames().stream().filter(s -> s.startsWith("column_value_")).count(), + Matchers.greaterThan(1L) + ); + + final CompressedColumnarLongsSupplier columnSupplier = CompressedColumnarLongsSupplier.fromByteBuffer( + smooshMapper.mapFile(columnName), + order, + smooshMapper + ); + + try (final ColumnarLongs column = columnSupplier.get()) { + Assert.assertEquals(numRows, column.size()); + } + } + } + public void testWithValues(long[] values) throws Exception { testValues(values); @@ -193,7 +261,7 @@ public void testValues(long[] values) throws Exception serializer.writeTo(Channels.newChannel(baos), null); Assert.assertEquals(baos.size(), serializer.getSerializedSize()); CompressedColumnarLongsSupplier supplier = CompressedColumnarLongsSupplier - .fromByteBuffer(ByteBuffer.wrap(baos.toByteArray()), order); + .fromByteBuffer(ByteBuffer.wrap(baos.toByteArray()), order, null); try (ColumnarLongs longs = supplier.get()) { assertIndexMatchesVals(longs, values); @@ -255,10 +323,8 @@ private void testSupplierSerde(CompressedColumnarLongsSupplier supplier, long[] final byte[] bytes = baos.toByteArray(); Assert.assertEquals(supplier.getSerializedSize(), bytes.length); - CompressedColumnarLongsSupplier anotherSupplier = CompressedColumnarLongsSupplier.fromByteBuffer( - ByteBuffer.wrap(bytes), - order - ); + CompressedColumnarLongsSupplier anotherSupplier = + CompressedColumnarLongsSupplier.fromByteBuffer(ByteBuffer.wrap(bytes), order, null); try (ColumnarLongs indexed = anotherSupplier.get()) { assertIndexMatchesVals(indexed, vals); } diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializerTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializerTest.java index c06e11c90d94..f0208d791b30 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializerTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializerTest.java @@ -20,6 +20,7 @@ package org.apache.druid.segment.data; import com.google.common.base.Function; +import com.google.common.base.Supplier; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import com.google.common.primitives.Ints; @@ -34,6 +35,8 @@ import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import org.apache.druid.segment.writeout.WriteOutBytes; import org.apache.druid.utils.CloseableUtils; +import org.hamcrest.MatcherAssert; +import org.hamcrest.Matchers; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -125,6 +128,7 @@ private void checkSerializedSizeAndData(int chunkSize) throws Exception chunkSize, byteOrder, compressionStrategy, + GenericIndexedWriter.MAX_FILE_SIZE, segmentWriteOutMedium.getCloser() ); CompressedVSizeColumnarIntsSupplier supplierFromList = CompressedVSizeColumnarIntsSupplier.fromList( @@ -149,7 +153,8 @@ private void checkSerializedSizeAndData(int chunkSize) throws Exception // read from ByteBuffer and check values CompressedVSizeColumnarIntsSupplier supplierFromByteBuffer = CompressedVSizeColumnarIntsSupplier.fromByteBuffer( ByteBuffer.wrap(IOUtils.toByteArray(writeOutBytes.asInputStream())), - byteOrder + byteOrder, + null ); ColumnarInts columnarInts = supplierFromByteBuffer.get(); for (int i = 0; i < vals.length; ++i) { @@ -199,6 +204,7 @@ public void testTooManyValues() throws IOException "test", compressionStrategy, Long.BYTES * 10000, + GenericIndexedWriter.MAX_FILE_SIZE, segmentWriteOutMedium.getCloser() ); CompressedVSizeColumnarIntsSerializer serializer = new CompressedVSizeColumnarIntsSerializer( @@ -236,6 +242,7 @@ private void checkV2SerializedSizeAndData(int chunkSize) throws Exception "test", compressionStrategy, Long.BYTES * 10000, + GenericIndexedWriter.MAX_FILE_SIZE, segmentWriteOutMedium.getCloser() ); CompressedVSizeColumnarIntsSerializer writer = new CompressedVSizeColumnarIntsSerializer( @@ -264,7 +271,8 @@ private void checkV2SerializedSizeAndData(int chunkSize) throws Exception CompressedVSizeColumnarIntsSupplier supplierFromByteBuffer = CompressedVSizeColumnarIntsSupplier.fromByteBuffer( mapper.mapFile("test"), - byteOrder + byteOrder, + null ); ColumnarInts columnarInts = supplierFromByteBuffer.get(); @@ -284,4 +292,59 @@ public void testMultiValueFileLargeData() throws Exception } } + @Test + public void testLargeColumn() throws IOException + { + final File columnDir = temporaryFolder.newFolder(); + final String columnName = "column"; + final int maxValue = Integer.MAX_VALUE; + final long numRows = 500_000; // enough values that we expect to switch into large-column mode + + try ( + SegmentWriteOutMedium segmentWriteOutMedium = + TmpFileSegmentWriteOutMediumFactory.instance().makeSegmentWriteOutMedium(temporaryFolder.newFolder()); + FileSmoosher smoosher = new FileSmoosher(columnDir) + ) { + final Random random = new Random(0); + final int fileSizeLimit = 128_000; // limit to 128KB so we switch to large-column mode sooner + final CompressedVSizeColumnarIntsSerializer serializer = new CompressedVSizeColumnarIntsSerializer( + columnName, + segmentWriteOutMedium, + columnName, + maxValue, + CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForValue(maxValue), + byteOrder, + compressionStrategy, + fileSizeLimit, + segmentWriteOutMedium.getCloser() + ); + serializer.open(); + + for (int i = 0; i < numRows; i++) { + serializer.addValue(random.nextInt() ^ Integer.MIN_VALUE); + } + + try (SmooshedWriter primaryWriter = smoosher.addWithSmooshedWriter(columnName, serializer.getSerializedSize())) { + serializer.writeTo(primaryWriter, smoosher); + } + } + + try (SmooshedFileMapper smooshMapper = SmooshedFileMapper.load(columnDir)) { + MatcherAssert.assertThat( + "Number of value parts written", // ensure the column actually ended up multi-part + smooshMapper.getInternalFilenames().stream().filter(s -> s.startsWith("column_value_")).count(), + Matchers.greaterThan(1L) + ); + + final Supplier columnSupplier = CompressedVSizeColumnarIntsSupplier.fromByteBuffer( + smooshMapper.mapFile(columnName), + byteOrder, + smooshMapper + ); + + try (final ColumnarInts column = columnSupplier.get()) { + Assert.assertEquals(numRows, column.size()); + } + } + } } diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplierTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplierTest.java index 48a443ef6610..6231d73a14f0 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplierTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplierTest.java @@ -136,7 +136,7 @@ private void makeWithSerde(final int chunkSize) throws IOException final byte[] bytes = baos.toByteArray(); Assert.assertEquals(theSupplier.getSerializedSize(), bytes.length); - supplier = CompressedVSizeColumnarIntsSupplier.fromByteBuffer(ByteBuffer.wrap(bytes), byteOrder); + supplier = CompressedVSizeColumnarIntsSupplier.fromByteBuffer(ByteBuffer.wrap(bytes), byteOrder, null); columnarInts = supplier.get(); } diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarMultiIntsSupplierTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarMultiIntsSupplierTest.java index 60de4d78ed55..2798cf7a5d32 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarMultiIntsSupplierTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarMultiIntsSupplierTest.java @@ -87,7 +87,8 @@ public WritableSupplier fromByteBuffer(ByteBuffer buffer) return wrapSupplier( CompressedVSizeColumnarMultiIntsSupplier.fromByteBuffer( buffer, - ByteOrder.nativeOrder() + ByteOrder.nativeOrder(), + null ), closer ); diff --git a/processing/src/test/java/org/apache/druid/segment/data/GenericIndexedTest.java b/processing/src/test/java/org/apache/druid/segment/data/GenericIndexedTest.java index 52e5087f0f37..ad0c39f98a88 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/GenericIndexedTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/GenericIndexedTest.java @@ -126,7 +126,7 @@ private GenericIndexed serializeAndDeserialize(GenericIndexed in final ByteBuffer byteBuffer = ByteBuffer.wrap(baos.toByteArray()); Assert.assertEquals(indexed.getSerializedSize(), byteBuffer.remaining()); - GenericIndexed deserialized = GenericIndexed.read(byteBuffer, GenericIndexed.STRING_STRATEGY); + GenericIndexed deserialized = GenericIndexed.read(byteBuffer, GenericIndexed.STRING_STRATEGY, null); Assert.assertEquals(0, byteBuffer.remaining()); return deserialized; } diff --git a/processing/src/test/java/org/apache/druid/segment/data/TestColumnCompression.java b/processing/src/test/java/org/apache/druid/segment/data/TestColumnCompression.java index ec745ffa2301..6458db738f9b 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/TestColumnCompression.java +++ b/processing/src/test/java/org/apache/druid/segment/data/TestColumnCompression.java @@ -92,7 +92,8 @@ public void setUp() throws Exception ); this.compressed = CompressedVSizeColumnarMultiIntsSupplier.fromByteBuffer( buffer, - ByteOrder.nativeOrder() + ByteOrder.nativeOrder(), + null ).get(); filter = new BitSet(); diff --git a/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java b/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java index 29ba49913c44..246b32b2ac00 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java @@ -20,6 +20,7 @@ package org.apache.druid.segment.data; import com.google.common.base.Function; +import com.google.common.base.Supplier; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import org.apache.commons.io.IOUtils; @@ -34,6 +35,8 @@ import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import org.apache.druid.segment.writeout.WriteOutBytes; import org.apache.druid.utils.CloseableUtils; +import org.hamcrest.MatcherAssert; +import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Before; import org.junit.Ignore; @@ -45,6 +48,7 @@ import org.junit.runners.Parameterized; import java.io.File; +import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.ArrayList; @@ -151,6 +155,65 @@ public void testMultiValueFileLargeData() throws Exception } } + @Test + public void testLargeColumn() throws IOException + { + final File columnDir = temporaryFolder.newFolder(); + final String columnName = "column"; + final long numRows = 500_000; // enough values that we expect to switch into large-column mode + + try ( + SegmentWriteOutMedium segmentWriteOutMedium = + TmpFileSegmentWriteOutMediumFactory.instance().makeSegmentWriteOutMedium(temporaryFolder.newFolder()); + FileSmoosher smoosher = new FileSmoosher(columnDir) + ) { + final Random random = new Random(0); + final int fileSizeLimit = 128_000; // limit to 128KB so we switch to large-column mode sooner + final V3CompressedVSizeColumnarMultiIntsSerializer serializer = + V3CompressedVSizeColumnarMultiIntsSerializer.create( + columnName, + segmentWriteOutMedium, + columnName, + Integer.MAX_VALUE, + compressionStrategy, + fileSizeLimit + ); + serializer.open(); + + for (int i = 0; i < numRows; i++) { + serializer.addValues(new ArrayBasedIndexedInts(new int[]{random.nextInt() ^ Integer.MIN_VALUE})); + } + + try (SmooshedWriter primaryWriter = smoosher.addWithSmooshedWriter(columnName, serializer.getSerializedSize())) { + serializer.writeTo(primaryWriter, smoosher); + } + } + + try (SmooshedFileMapper smooshMapper = SmooshedFileMapper.load(columnDir)) { + MatcherAssert.assertThat( + "Number of offset parts written", // ensure the offsets subcolumn actually ended up multi-part + smooshMapper.getInternalFilenames().stream().filter(s -> s.startsWith("column.offsets_value_")).count(), + Matchers.greaterThan(1L) + ); + + MatcherAssert.assertThat( + "Number of value parts written", // ensure the values subcolumn actually ended up multi-part + smooshMapper.getInternalFilenames().stream().filter(s -> s.startsWith("column.values_value_")).count(), + Matchers.greaterThan(1L) + ); + + final Supplier columnSupplier = V3CompressedVSizeColumnarMultiIntsSupplier.fromByteBuffer( + smooshMapper.mapFile(columnName), + byteOrder, + smooshMapper + ); + + try (final ColumnarMultiInts column = columnSupplier.get()) { + Assert.assertEquals(numRows, column.size()); + } + } + } + // this test takes ~30 minutes to run @Ignore @Test @@ -207,6 +270,7 @@ private void checkSerializedSizeAndData(int offsetChunkFactor, int valueChunkFac offsetChunkFactor, byteOrder, compressionStrategy, + GenericIndexedWriter.MAX_FILE_SIZE, segmentWriteOutMedium.getCloser() ); CompressedVSizeColumnarIntsSerializer valueWriter = new CompressedVSizeColumnarIntsSerializer( @@ -217,6 +281,7 @@ private void checkSerializedSizeAndData(int offsetChunkFactor, int valueChunkFac valueChunkFactor, byteOrder, compressionStrategy, + GenericIndexedWriter.MAX_FILE_SIZE, segmentWriteOutMedium.getCloser() ); V3CompressedVSizeColumnarMultiIntsSerializer writer = @@ -244,7 +309,8 @@ private void checkSerializedSizeAndData(int offsetChunkFactor, int valueChunkFac // read from ByteBuffer and check values V3CompressedVSizeColumnarMultiIntsSupplier supplierFromByteBuffer = V3CompressedVSizeColumnarMultiIntsSupplier.fromByteBuffer( ByteBuffer.wrap(IOUtils.toByteArray(writeOutBytes.asInputStream())), - byteOrder + byteOrder, + null ); try (final ColumnarMultiInts columnarMultiInts = supplierFromByteBuffer.get()) { @@ -281,6 +347,7 @@ private void checkV2SerializedSizeAndData(int offsetChunkFactor, int valueChunkF "offset", compressionStrategy, Long.BYTES * 250000, + GenericIndexedWriter.MAX_FILE_SIZE, segmentWriteOutMedium.getCloser() ), segmentWriteOutMedium.getCloser() @@ -291,6 +358,7 @@ private void checkV2SerializedSizeAndData(int offsetChunkFactor, int valueChunkF "value", compressionStrategy, Long.BYTES * 250000, + GenericIndexedWriter.MAX_FILE_SIZE, segmentWriteOutMedium.getCloser() ); CompressedVSizeColumnarIntsSerializer valueWriter = new CompressedVSizeColumnarIntsSerializer( @@ -316,7 +384,7 @@ private void checkV2SerializedSizeAndData(int offsetChunkFactor, int valueChunkF SmooshedFileMapper mapper = Smoosh.map(tmpDirectory); V3CompressedVSizeColumnarMultiIntsSupplier supplierFromByteBuffer = - V3CompressedVSizeColumnarMultiIntsSupplier.fromByteBuffer(mapper.mapFile("test"), byteOrder); + V3CompressedVSizeColumnarMultiIntsSupplier.fromByteBuffer(mapper.mapFile("test"), byteOrder, null); ColumnarMultiInts columnarMultiInts = supplierFromByteBuffer.get(); Assert.assertEquals(columnarMultiInts.size(), vals.size()); for (int i = 0; i < vals.size(); ++i) { @@ -359,6 +427,7 @@ private void generateV2SerializedSizeAndData( "offset", compressionStrategy, Long.BYTES * 250000, + GenericIndexedWriter.MAX_FILE_SIZE, segmentWriteOutMedium.getCloser() ), segmentWriteOutMedium.getCloser() @@ -369,6 +438,7 @@ private void generateV2SerializedSizeAndData( "value", compressionStrategy, Long.BYTES * 250000, + GenericIndexedWriter.MAX_FILE_SIZE, segmentWriteOutMedium.getCloser() ); CompressedVSizeColumnarIntsSerializer valueWriter = new CompressedVSizeColumnarIntsSerializer( diff --git a/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSupplierTest.java b/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSupplierTest.java index ad529e10bdbb..f6a0ccbaeb76 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSupplierTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSupplierTest.java @@ -85,7 +85,8 @@ public WritableSupplier fromByteBuffer(ByteBuffer buffer) return wrapSupplier( V3CompressedVSizeColumnarMultiIntsSupplier.fromByteBuffer( buffer, - ByteOrder.nativeOrder() + ByteOrder.nativeOrder(), + null ), closer ); diff --git a/processing/src/test/java/org/apache/druid/segment/nested/NestedFieldColumnIndexSupplierTest.java b/processing/src/test/java/org/apache/druid/segment/nested/NestedFieldColumnIndexSupplierTest.java index c9d7e05c622d..e661b747a108 100644 --- a/processing/src/test/java/org/apache/druid/segment/nested/NestedFieldColumnIndexSupplierTest.java +++ b/processing/src/test/java/org/apache/druid/segment/nested/NestedFieldColumnIndexSupplierTest.java @@ -146,7 +146,7 @@ public void setup() throws IOException arrayWriter.open(); writeToBuffer(arrayBuffer, arrayWriter); - GenericIndexed strings = GenericIndexed.read(stringBuffer, GenericIndexed.UTF8_STRATEGY); + GenericIndexed strings = GenericIndexed.read(stringBuffer, GenericIndexed.UTF8_STRATEGY, null); globalStrings = () -> strings.singleThreaded(); globalLongs = FixedIndexed.read(longBuffer, TypeStrategies.LONG, ByteOrder.nativeOrder(), Long.BYTES); globalDoubles = FixedIndexed.read(doubleBuffer, TypeStrategies.DOUBLE, ByteOrder.nativeOrder(), Double.BYTES); @@ -1241,7 +1241,7 @@ public void testEnsureNoImproperSelectionFromAdjustedGlobals() throws IOExceptio doubleWriter.open(); writeToBuffer(doubleBuffer, doubleWriter); - GenericIndexed strings = GenericIndexed.read(stringBuffer, GenericIndexed.UTF8_STRATEGY); + GenericIndexed strings = GenericIndexed.read(stringBuffer, GenericIndexed.UTF8_STRATEGY, null); Supplier> stringIndexed = () -> strings.singleThreaded(); Supplier> longIndexed = FixedIndexed.read(longBuffer, TypeStrategies.LONG, ByteOrder.nativeOrder(), Long.BYTES); Supplier> doubleIndexed = FixedIndexed.read(doubleBuffer, TypeStrategies.DOUBLE, ByteOrder.nativeOrder(), Double.BYTES); @@ -1293,7 +1293,8 @@ public void testEnsureNoImproperSelectionFromAdjustedGlobals() throws IOExceptio Integer.BYTES ); - GenericIndexed bitmaps = GenericIndexed.read(bitmapsBuffer, roaringFactory.getObjectStrategy()); + GenericIndexed bitmaps = + GenericIndexed.read(bitmapsBuffer, roaringFactory.getObjectStrategy(), null); NestedFieldColumnIndexSupplier indexSupplier = new NestedFieldColumnIndexSupplier<>( new FieldTypeInfo.TypeSet( @@ -1397,7 +1398,8 @@ private NestedFieldColumnIndexSupplier makeSingleTypeStringSupplier(ColumnCon Integer.BYTES ); - GenericIndexed bitmaps = GenericIndexed.read(bitmapsBuffer, roaringFactory.getObjectStrategy()); + GenericIndexed bitmaps = + GenericIndexed.read(bitmapsBuffer, roaringFactory.getObjectStrategy(), null); return new NestedFieldColumnIndexSupplier<>( new FieldTypeInfo.TypeSet( @@ -1481,7 +1483,8 @@ private NestedFieldColumnIndexSupplier makeSingleTypeStringWithNullsSupplier( Integer.BYTES ); - GenericIndexed bitmaps = GenericIndexed.read(bitmapsBuffer, roaringFactory.getObjectStrategy()); + GenericIndexed bitmaps = + GenericIndexed.read(bitmapsBuffer, roaringFactory.getObjectStrategy(), null); return new NestedFieldColumnIndexSupplier<>( new FieldTypeInfo.TypeSet( @@ -1561,7 +1564,8 @@ private NestedFieldColumnIndexSupplier makeSingleTypeLongSupplier(ColumnConfi Integer.BYTES ); - GenericIndexed bitmaps = GenericIndexed.read(bitmapsBuffer, roaringFactory.getObjectStrategy()); + GenericIndexed bitmaps = + GenericIndexed.read(bitmapsBuffer, roaringFactory.getObjectStrategy(), null); return new NestedFieldColumnIndexSupplier<>( new FieldTypeInfo.TypeSet( @@ -1646,7 +1650,8 @@ private NestedFieldColumnIndexSupplier makeSingleTypeLongSupplierWithNull(Col Integer.BYTES ); - GenericIndexed bitmaps = GenericIndexed.read(bitmapsBuffer, roaringFactory.getObjectStrategy()); + GenericIndexed bitmaps = + GenericIndexed.read(bitmapsBuffer, roaringFactory.getObjectStrategy(), null); return new NestedFieldColumnIndexSupplier<>( new FieldTypeInfo.TypeSet( @@ -1726,7 +1731,8 @@ private NestedFieldColumnIndexSupplier makeSingleTypeDoubleSupplier(ColumnCon Integer.BYTES ); - GenericIndexed bitmaps = GenericIndexed.read(bitmapsBuffer, roaringFactory.getObjectStrategy()); + GenericIndexed bitmaps = + GenericIndexed.read(bitmapsBuffer, roaringFactory.getObjectStrategy(), null); return new NestedFieldColumnIndexSupplier<>( new FieldTypeInfo.TypeSet( @@ -1811,7 +1817,8 @@ private NestedFieldColumnIndexSupplier makeSingleTypeDoubleSupplierWithNull(C Integer.BYTES ); - GenericIndexed bitmaps = GenericIndexed.read(bitmapsBuffer, roaringFactory.getObjectStrategy()); + GenericIndexed bitmaps = + GenericIndexed.read(bitmapsBuffer, roaringFactory.getObjectStrategy(), null); return new NestedFieldColumnIndexSupplier<>( new FieldTypeInfo.TypeSet( @@ -1903,7 +1910,8 @@ private NestedFieldColumnIndexSupplier makeVariantSupplierWithNull(ColumnConf Integer.BYTES ); - GenericIndexed bitmaps = GenericIndexed.read(bitmapsBuffer, roaringFactory.getObjectStrategy()); + GenericIndexed bitmaps = + GenericIndexed.read(bitmapsBuffer, roaringFactory.getObjectStrategy(), null); return new NestedFieldColumnIndexSupplier<>( new FieldTypeInfo.TypeSet( diff --git a/processing/src/test/java/org/apache/druid/segment/serde/DictionaryEncodedStringIndexSupplierTest.java b/processing/src/test/java/org/apache/druid/segment/serde/DictionaryEncodedStringIndexSupplierTest.java index 263b4132dd7d..74319c884a83 100644 --- a/processing/src/test/java/org/apache/druid/segment/serde/DictionaryEncodedStringIndexSupplierTest.java +++ b/processing/src/test/java/org/apache/druid/segment/serde/DictionaryEncodedStringIndexSupplierTest.java @@ -157,10 +157,11 @@ private StringUtf8ColumnIndexSupplier makeStringWithNullsSupplier() throws IO writeToBuffer(byteBuffer, stringWriter); writeToBuffer(bitmapsBuffer, bitmapWriter); - GenericIndexed bitmaps = GenericIndexed.read(bitmapsBuffer, roaringFactory.getObjectStrategy()); + GenericIndexed bitmaps = + GenericIndexed.read(bitmapsBuffer, roaringFactory.getObjectStrategy(), null); return new StringUtf8ColumnIndexSupplier<>( roaringFactory.getBitmapFactory(), - GenericIndexed.read(byteBuffer, GenericIndexed.UTF8_STRATEGY)::singleThreaded, + GenericIndexed.read(byteBuffer, GenericIndexed.UTF8_STRATEGY, null)::singleThreaded, bitmaps, null ); diff --git a/processing/src/test/java/org/apache/druid/segment/serde/HyperUniquesSerdeForTest.java b/processing/src/test/java/org/apache/druid/segment/serde/HyperUniquesSerdeForTest.java index cdc502c6c193..99f3c24a54a5 100644 --- a/processing/src/test/java/org/apache/druid/segment/serde/HyperUniquesSerdeForTest.java +++ b/processing/src/test/java/org/apache/druid/segment/serde/HyperUniquesSerdeForTest.java @@ -93,7 +93,7 @@ public void deserializeColumn(ByteBuffer byteBuffer, ColumnBuilder columnBuilder { final GenericIndexed column; if (columnBuilder.getFileMapper() == null) { - column = GenericIndexed.read(byteBuffer, getObjectStrategy()); + column = GenericIndexed.read(byteBuffer, getObjectStrategy(), null); } else { column = GenericIndexed.read(byteBuffer, getObjectStrategy(), columnBuilder.getFileMapper()); }