Skip to content

Commit

Permalink
Various fixes for large columns. (apache#17691)
Browse files Browse the repository at this point in the history
* Various fixes for large columns.

This patch fixes a class of bugs where various primitive column readers were
not providing a SmooshedFileMapper to GenericIndexed, even though the corresponding
writer could potentially write multi-file columns. For example, apache#7943 is an
instance of this bug.

This patch also includes a fix for an issue on the writer for compressed
multi-value string columns, V3CompressedVSizeColumnarMultiIntsSerializer, where it
would use the same base filename for both the offset and values sections. This bug
would only be triggered for segments in excess of 500 million rows. When a segment
has fewer rows than that, it could potentially have a values section that needs
to be split over multiple files, but the offset is never more than 4 bytes per row.
This bug was triggered by the new tests, which use a smaller fileSizeLimit.

* Use a Random seed.

* Remove erroneous test code.

* Fix two compilation problems.

* Add javadocs.

* Another javadoc.
  • Loading branch information
gianm authored Feb 3, 2025
1 parent 516531d commit 0be9815
Show file tree
Hide file tree
Showing 59 changed files with 731 additions and 185 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ static ColumnarLongs createColumnarLongs(String encoding, ByteBuffer buffer)
case "none-longs":
case "zstd-auto":
case "zstd-longs":
return CompressedColumnarLongsSupplier.fromByteBuffer(buffer, ByteOrder.LITTLE_ENDIAN).get();
return CompressedColumnarLongsSupplier.fromByteBuffer(buffer, ByteOrder.LITTLE_ENDIAN, null).get();
}

throw new IllegalArgumentException("unknown encoding");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ public void setup() throws IOException
);
this.compressed = CompressedVSizeColumnarIntsSupplier.fromByteBuffer(
bufferCompressed,
ByteOrder.nativeOrder()
ByteOrder.nativeOrder(),
null
).get();

final ByteBuffer bufferUncompressed = serialize(VSizeColumnarInts.fromArray(vals));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ public void setup() throws IOException
);
this.compressed = CompressedVSizeColumnarMultiIntsSupplier.fromByteBuffer(
bufferCompressed,
ByteOrder.nativeOrder()
ByteOrder.nativeOrder(),
null
).get();

final ByteBuffer bufferUncompressed = serialize(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void setup() throws Exception
File compFile = new File(dir, file + "-" + strategy);
bufferHandler = FileUtils.map(compFile);
ByteBuffer buffer = bufferHandler.get();
supplier = CompressedColumnarFloatsSupplier.fromByteBuffer(buffer, ByteOrder.nativeOrder());
supplier = CompressedColumnarFloatsSupplier.fromByteBuffer(buffer, ByteOrder.nativeOrder(), null);
}

@TearDown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void setup() throws Exception
File compFile = new File(dir, file + "-" + strategy + "-" + format);
bufferHandler = FileUtils.map(compFile);
ByteBuffer buffer = bufferHandler.get();
supplier = CompressedColumnarLongsSupplier.fromByteBuffer(buffer, ByteOrder.nativeOrder());
supplier = CompressedColumnarLongsSupplier.fromByteBuffer(buffer, ByteOrder.nativeOrder(), null);
}

@TearDown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.google.common.base.Supplier;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.column.ComplexColumn;
import org.apache.druid.segment.data.CompressedVSizeColumnarIntsSupplier;
Expand All @@ -40,10 +41,12 @@ public class CompressedBigDecimalColumnPartSupplier implements Supplier<ComplexC
* Compressed.
*
* @param buffer Byte buffer
* @param smooshMapper mapper for secondary files, in case of large columns
* @return new instance of CompressedBigDecimalColumnPartSupplier
*/
public static CompressedBigDecimalColumnPartSupplier fromByteBuffer(
ByteBuffer buffer
ByteBuffer buffer,
SmooshedFileMapper smooshMapper
)
{
byte versionFromBuffer = buffer.get();
Expand All @@ -53,11 +56,12 @@ public static CompressedBigDecimalColumnPartSupplier fromByteBuffer(

CompressedVSizeColumnarIntsSupplier scaleSupplier = CompressedVSizeColumnarIntsSupplier.fromByteBuffer(
buffer,
IndexIO.BYTE_ORDER
IndexIO.BYTE_ORDER,
smooshMapper
);

V3CompressedVSizeColumnarMultiIntsSupplier magnitudeSupplier =
V3CompressedVSizeColumnarMultiIntsSupplier.fromByteBuffer(buffer, IndexIO.BYTE_ORDER);
V3CompressedVSizeColumnarMultiIntsSupplier.fromByteBuffer(buffer, IndexIO.BYTE_ORDER, smooshMapper);

return new CompressedBigDecimalColumnPartSupplier(
buffer.position() - positionStart,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.druid.segment.data.ArrayBasedIndexedInts;
import org.apache.druid.segment.data.CompressedVSizeColumnarIntsSerializer;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.data.GenericIndexedWriter;
import org.apache.druid.segment.data.V3CompressedVSizeColumnarMultiIntsSerializer;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;

Expand Down Expand Up @@ -66,7 +67,8 @@ public static CompressedBigDecimalLongColumnSerializer create(
segmentWriteOutMedium,
String.format(Locale.ROOT, "%s.magnitude", filenameBase),
Integer.MAX_VALUE,
CompressionStrategy.LZ4
CompressionStrategy.LZ4,
GenericIndexedWriter.MAX_FILE_SIZE
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public CompressedBigDecimal extractValue(InputRow inputRow, String metricName)
public void deserializeColumn(ByteBuffer buffer, ColumnBuilder builder)
{
builder.setComplexColumnSupplier(
CompressedBigDecimalColumnPartSupplier.fromByteBuffer(buffer)
CompressedBigDecimalColumnPartSupplier.fromByteBuffer(buffer, builder.getFileMapper())
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,8 @@ protected void setupEncodedValueWriter() throws IOException
segmentWriteOutMedium,
filenameBase,
cardinality,
compressionStrategy
compressionStrategy,
GenericIndexedWriter.MAX_FILE_SIZE
);
} else {
encodedValueSerializer =
Expand Down
10 changes: 7 additions & 3 deletions processing/src/main/java/org/apache/druid/segment/IndexIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,8 @@ public MMappedIndex mapDir(File inDir) throws IOException

CompressedColumnarLongsSupplier timestamps = CompressedColumnarLongsSupplier.fromByteBuffer(
smooshedFiles.mapFile(makeTimeFile(inDir, BYTE_ORDER).getName()),
BYTE_ORDER
BYTE_ORDER,
smooshedFiles
);

Map<String, MetricHolder> metrics = Maps.newLinkedHashMap();
Expand Down Expand Up @@ -385,15 +386,18 @@ public MMappedIndex mapDir(File inDir) throws IOException
fileDimensionName
);

dimValueUtf8Lookups.put(dimension, GenericIndexed.read(dimBuffer, GenericIndexed.UTF8_STRATEGY));
dimValueUtf8Lookups.put(
dimension,
GenericIndexed.read(dimBuffer, GenericIndexed.UTF8_STRATEGY, smooshedFiles)
);
dimColumns.put(dimension, VSizeColumnarMultiInts.readFromByteBuffer(dimBuffer));
}

ByteBuffer invertedBuffer = smooshedFiles.mapFile("inverted.drd");
for (int i = 0; i < availableDimensions.size(); ++i) {
bitmaps.put(
SERIALIZER_UTILS.readString(invertedBuffer),
GenericIndexed.read(invertedBuffer, bitmapSerdeFactory.getObjectStrategy())
GenericIndexed.read(invertedBuffer, bitmapSerdeFactory.getObjectStrategy(), smooshedFiles)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ public class MetricHolder
private static final byte[] VERSION = new byte[]{0x0};
private static final SerializerUtils SERIALIZER_UTILS = new SerializerUtils();

/**
* Read a metric column from a legacy (v8) segment.
*/
public static MetricHolder fromByteBuffer(ByteBuffer buf)
{
final byte ver = buf.get();
Expand All @@ -51,7 +54,11 @@ public static MetricHolder fromByteBuffer(ByteBuffer buf)

switch (holder.type) {
case FLOAT:
holder.floatType = CompressedColumnarFloatsSupplier.fromByteBuffer(buf, ByteOrder.nativeOrder());
holder.floatType = CompressedColumnarFloatsSupplier.fromByteBuffer(
buf,
ByteOrder.nativeOrder(),
null // OK since this method is only used for legacy segments, which always use version 1 indexed
);
break;
case COMPLEX:
final ComplexMetricSerde serdeForType = ComplexMetrics.getSerdeForType(holder.getTypeName());
Expand All @@ -72,7 +79,7 @@ public static MetricHolder fromByteBuffer(ByteBuffer buf)

private static <T> GenericIndexed<T> read(ByteBuffer buf, ComplexMetricSerde serde)
{
return GenericIndexed.read(buf, serde.getObjectStrategy());
return GenericIndexed.read(buf, serde.getObjectStrategy(), null);
}

public enum MetricType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;

import javax.annotation.Nullable;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
Expand Down Expand Up @@ -57,6 +56,7 @@ public class BlockLayoutColumnarDoublesSerializer implements ColumnarDoublesSeri
String filenameBase,
ByteOrder byteOrder,
CompressionStrategy compression,
int fileSizeLimit,
Closer closer
)
{
Expand All @@ -66,6 +66,7 @@ public class BlockLayoutColumnarDoublesSerializer implements ColumnarDoublesSeri
filenameBase,
compression,
CompressedPools.BUFFER_SIZE,
fileSizeLimit,
closer
);
this.compression = compression;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.base.Supplier;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
Expand All @@ -43,11 +44,16 @@ public BlockLayoutColumnarDoublesSupplier(
int sizePer,
ByteBuffer fromBuffer,
ByteOrder byteOrder,
CompressionStrategy strategy
CompressionStrategy strategy,
SmooshedFileMapper smooshMapper
)
{
this.strategy = strategy;
this.baseDoubleBuffers = GenericIndexed.read(fromBuffer, DecompressingByteBufferObjectStrategy.of(byteOrder, strategy));
this.baseDoubleBuffers = GenericIndexed.read(
fromBuffer,
DecompressingByteBufferObjectStrategy.of(byteOrder, strategy),
smooshMapper
);
this.totalSize = totalSize;
this.sizePer = sizePer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class BlockLayoutColumnarFloatsSerializer implements ColumnarFloatsSerial
String filenameBase,
ByteOrder byteOrder,
CompressionStrategy compression,
int fileSizeLimit,
Closer closer
)
{
Expand All @@ -66,6 +67,7 @@ public class BlockLayoutColumnarFloatsSerializer implements ColumnarFloatsSerial
filenameBase,
compression,
CompressedPools.BUFFER_SIZE,
fileSizeLimit,
closer
);
this.compression = compression;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.base.Supplier;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
Expand All @@ -42,10 +43,15 @@ public BlockLayoutColumnarFloatsSupplier(
int sizePer,
ByteBuffer fromBuffer,
ByteOrder byteOrder,
CompressionStrategy strategy
CompressionStrategy strategy,
@Nullable SmooshedFileMapper smooshMapper
)
{
baseFloatBuffers = GenericIndexed.read(fromBuffer, DecompressingByteBufferObjectStrategy.of(byteOrder, strategy));
baseFloatBuffers = GenericIndexed.read(
fromBuffer,
DecompressingByteBufferObjectStrategy.of(byteOrder, strategy),
smooshMapper
);
this.totalSize = totalSize;
this.sizePer = sizePer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class BlockLayoutColumnarLongsSerializer implements ColumnarLongsSerializ
ByteOrder byteOrder,
CompressionFactory.LongEncodingWriter writer,
CompressionStrategy compression,
int fileSizeLimit,
Closer closer
)
{
Expand All @@ -71,6 +72,7 @@ public class BlockLayoutColumnarLongsSerializer implements ColumnarLongsSerializ
filenameBase,
compression,
bufferSize,
fileSizeLimit,
closer
);
this.writer = writer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.base.Supplier;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.common.semantic.SemanticUtils;
import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -51,11 +52,16 @@ public BlockLayoutColumnarLongsSupplier(
ByteBuffer fromBuffer,
ByteOrder order,
CompressionFactory.LongEncodingReader reader,
CompressionStrategy strategy
CompressionStrategy strategy,
SmooshedFileMapper smooshMapper
)
{
this.strategy = strategy;
this.baseLongBuffers = GenericIndexed.read(fromBuffer, DecompressingByteBufferObjectStrategy.of(order, strategy));
this.baseLongBuffers = GenericIndexed.read(
fromBuffer,
DecompressingByteBufferObjectStrategy.of(order, strategy),
smooshMapper
);
this.totalSize = totalSize;
this.sizePer = sizePer;
this.baseReader = reader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.base.Supplier;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
Expand All @@ -34,9 +35,19 @@ private CompressedColumnarDoublesSuppliers()
{
}

/**
* Reads a column from a {@link ByteBuffer}, possibly using additional secondary files from a
* {@link SmooshedFileMapper}.
*
* @param buffer primary buffer to read from
* @param order byte order
* @param smooshMapper required for reading version 2 (multi-file) indexed. May be null if you know you are reading
* a single-file column. Generally, this should only be null in tests, not production code.
*/
public static Supplier<ColumnarDoubles> fromByteBuffer(
ByteBuffer buffer,
ByteOrder order
ByteOrder order,
SmooshedFileMapper smooshMapper
)
{
byte versionFromBuffer = buffer.get();
Expand All @@ -54,7 +65,8 @@ public static Supplier<ColumnarDoubles> fromByteBuffer(
sizePer,
buffer.asReadOnlyBuffer(),
order,
compression
compression,
smooshMapper
);
}
throw new IAE("Unknown version[%s]", versionFromBuffer);
Expand Down
Loading

0 comments on commit 0be9815

Please sign in to comment.