Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various fixes for large columns. #17691

Merged
merged 6 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ static ColumnarLongs createColumnarLongs(String encoding, ByteBuffer buffer)
case "none-longs":
case "zstd-auto":
case "zstd-longs":
return CompressedColumnarLongsSupplier.fromByteBuffer(buffer, ByteOrder.LITTLE_ENDIAN).get();
return CompressedColumnarLongsSupplier.fromByteBuffer(buffer, ByteOrder.LITTLE_ENDIAN, null).get();
}

throw new IllegalArgumentException("unknown encoding");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ public void setup() throws IOException
);
this.compressed = CompressedVSizeColumnarIntsSupplier.fromByteBuffer(
bufferCompressed,
ByteOrder.nativeOrder()
ByteOrder.nativeOrder(),
null
).get();

final ByteBuffer bufferUncompressed = serialize(VSizeColumnarInts.fromArray(vals));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ public void setup() throws IOException
);
this.compressed = CompressedVSizeColumnarMultiIntsSupplier.fromByteBuffer(
bufferCompressed,
ByteOrder.nativeOrder()
ByteOrder.nativeOrder(),
null
).get();

final ByteBuffer bufferUncompressed = serialize(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void setup() throws Exception
File compFile = new File(dir, file + "-" + strategy);
bufferHandler = FileUtils.map(compFile);
ByteBuffer buffer = bufferHandler.get();
supplier = CompressedColumnarFloatsSupplier.fromByteBuffer(buffer, ByteOrder.nativeOrder());
supplier = CompressedColumnarFloatsSupplier.fromByteBuffer(buffer, ByteOrder.nativeOrder(), null);
}

@TearDown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void setup() throws Exception
File compFile = new File(dir, file + "-" + strategy + "-" + format);
bufferHandler = FileUtils.map(compFile);
ByteBuffer buffer = bufferHandler.get();
supplier = CompressedColumnarLongsSupplier.fromByteBuffer(buffer, ByteOrder.nativeOrder());
supplier = CompressedColumnarLongsSupplier.fromByteBuffer(buffer, ByteOrder.nativeOrder(), null);
}

@TearDown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.google.common.base.Supplier;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.column.ComplexColumn;
import org.apache.druid.segment.data.CompressedVSizeColumnarIntsSupplier;
Expand All @@ -40,10 +41,12 @@ public class CompressedBigDecimalColumnPartSupplier implements Supplier<ComplexC
* Compressed.
*
* @param buffer Byte buffer
* @param smooshMapper mapper for secondary files, in case of large columns
* @return new instance of CompressedBigDecimalColumnPartSupplier
*/
public static CompressedBigDecimalColumnPartSupplier fromByteBuffer(
ByteBuffer buffer
ByteBuffer buffer,
SmooshedFileMapper smooshMapper
)
{
byte versionFromBuffer = buffer.get();
Expand All @@ -53,11 +56,12 @@ public static CompressedBigDecimalColumnPartSupplier fromByteBuffer(

CompressedVSizeColumnarIntsSupplier scaleSupplier = CompressedVSizeColumnarIntsSupplier.fromByteBuffer(
buffer,
IndexIO.BYTE_ORDER
IndexIO.BYTE_ORDER,
smooshMapper
);

V3CompressedVSizeColumnarMultiIntsSupplier magnitudeSupplier =
V3CompressedVSizeColumnarMultiIntsSupplier.fromByteBuffer(buffer, IndexIO.BYTE_ORDER);
V3CompressedVSizeColumnarMultiIntsSupplier.fromByteBuffer(buffer, IndexIO.BYTE_ORDER, smooshMapper);

return new CompressedBigDecimalColumnPartSupplier(
buffer.position() - positionStart,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.druid.segment.data.ArrayBasedIndexedInts;
import org.apache.druid.segment.data.CompressedVSizeColumnarIntsSerializer;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.data.GenericIndexedWriter;
import org.apache.druid.segment.data.V3CompressedVSizeColumnarMultiIntsSerializer;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;

Expand Down Expand Up @@ -66,7 +67,8 @@ public static CompressedBigDecimalLongColumnSerializer create(
segmentWriteOutMedium,
String.format(Locale.ROOT, "%s.magnitude", filenameBase),
Integer.MAX_VALUE,
CompressionStrategy.LZ4
CompressionStrategy.LZ4,
GenericIndexedWriter.MAX_FILE_SIZE
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any reason to call this in non-test code with any value other than GenericIndexedWriter.MAX_FILE_SIZE? like wondering if we should make a version of the creators that automatically passes this argument in and mark the one that takes the size argument as for tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this, but this seemed fine enough and makes the GenericIndexedWriter API less cluttered. Having two overloads of the method with slightly different arguments seemed like it might be confusing.

Copy link
Member

@clintropolis clintropolis Jan 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, i guess new callers of these methods aren’t very frequent so maybe its ok. I was just afraid it just seems more confusing for the production code to be accepting an argument where there is basically one reasonable value to ever pass in, so any new callers should hopefully check around to see what the other callers are doing to pass in the same constant. Maybe we should add javadocs on these create methods to indicate that most callers should specify GenericIndexedWriter.MAX_FILE_SIZE as the argument except for testing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a javadoc to the GenericIndexedWriter#ofCompressedByteBuffers and V3CompressedVSizeColumnarMultiIntsSerializer#create methods. There are some other methods that now accept fileSizeLimit, but they are package-private so are really only used by tests and by closely-related code. I didn't add javadocs to those since it didn't seem necessary.

)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public CompressedBigDecimal extractValue(InputRow inputRow, String metricName)
public void deserializeColumn(ByteBuffer buffer, ColumnBuilder builder)
{
builder.setComplexColumnSupplier(
CompressedBigDecimalColumnPartSupplier.fromByteBuffer(buffer)
CompressedBigDecimalColumnPartSupplier.fromByteBuffer(buffer, builder.getFileMapper())
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,8 @@ protected void setupEncodedValueWriter() throws IOException
segmentWriteOutMedium,
filenameBase,
cardinality,
compressionStrategy
compressionStrategy,
GenericIndexedWriter.MAX_FILE_SIZE
);
} else {
encodedValueSerializer =
Expand Down
10 changes: 7 additions & 3 deletions processing/src/main/java/org/apache/druid/segment/IndexIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,8 @@ public MMappedIndex mapDir(File inDir) throws IOException

CompressedColumnarLongsSupplier timestamps = CompressedColumnarLongsSupplier.fromByteBuffer(
smooshedFiles.mapFile(makeTimeFile(inDir, BYTE_ORDER).getName()),
BYTE_ORDER
BYTE_ORDER,
smooshedFiles
);

Map<String, MetricHolder> metrics = Maps.newLinkedHashMap();
Expand Down Expand Up @@ -385,15 +386,18 @@ public MMappedIndex mapDir(File inDir) throws IOException
fileDimensionName
);

dimValueUtf8Lookups.put(dimension, GenericIndexed.read(dimBuffer, GenericIndexed.UTF8_STRATEGY));
dimValueUtf8Lookups.put(
dimension,
GenericIndexed.read(dimBuffer, GenericIndexed.UTF8_STRATEGY, smooshedFiles)
);
dimColumns.put(dimension, VSizeColumnarMultiInts.readFromByteBuffer(dimBuffer));
}

ByteBuffer invertedBuffer = smooshedFiles.mapFile("inverted.drd");
for (int i = 0; i < availableDimensions.size(); ++i) {
bitmaps.put(
SERIALIZER_UTILS.readString(invertedBuffer),
GenericIndexed.read(invertedBuffer, bitmapSerdeFactory.getObjectStrategy())
GenericIndexed.read(invertedBuffer, bitmapSerdeFactory.getObjectStrategy(), smooshedFiles)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
private static final byte[] VERSION = new byte[]{0x0};
private static final SerializerUtils SERIALIZER_UTILS = new SerializerUtils();

/**
* Read a metric column from a legacy (v8) segment.
*/
public static MetricHolder fromByteBuffer(ByteBuffer buf)
{
final byte ver = buf.get();
Expand All @@ -51,7 +54,11 @@

switch (holder.type) {
case FLOAT:
holder.floatType = CompressedColumnarFloatsSupplier.fromByteBuffer(buf, ByteOrder.nativeOrder());
holder.floatType = CompressedColumnarFloatsSupplier.fromByteBuffer(
buf,
ByteOrder.nativeOrder(),
null // OK since this method is only used for legacy segments, which always use version 1 indexed
);
break;
case COMPLEX:
final ComplexMetricSerde serdeForType = ComplexMetrics.getSerdeForType(holder.getTypeName());
Expand All @@ -72,7 +79,7 @@

private static <T> GenericIndexed<T> read(ByteBuffer buf, ComplexMetricSerde serde)
{
return GenericIndexed.read(buf, serde.getObjectStrategy());
return GenericIndexed.read(buf, serde.getObjectStrategy(), null);

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation Note

Invoking
ComplexMetricSerde.getObjectStrategy
should be avoided because it has been deprecated.
}

public enum MetricType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;

import javax.annotation.Nullable;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
Expand Down Expand Up @@ -57,6 +56,7 @@ public class BlockLayoutColumnarDoublesSerializer implements ColumnarDoublesSeri
String filenameBase,
ByteOrder byteOrder,
CompressionStrategy compression,
int fileSizeLimit,
Closer closer
)
{
Expand All @@ -66,6 +66,7 @@ public class BlockLayoutColumnarDoublesSerializer implements ColumnarDoublesSeri
filenameBase,
compression,
CompressedPools.BUFFER_SIZE,
fileSizeLimit,
closer
);
this.compression = compression;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.base.Supplier;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
Expand All @@ -43,11 +44,16 @@ public BlockLayoutColumnarDoublesSupplier(
int sizePer,
ByteBuffer fromBuffer,
ByteOrder byteOrder,
CompressionStrategy strategy
CompressionStrategy strategy,
SmooshedFileMapper smooshMapper
)
{
this.strategy = strategy;
this.baseDoubleBuffers = GenericIndexed.read(fromBuffer, DecompressingByteBufferObjectStrategy.of(byteOrder, strategy));
this.baseDoubleBuffers = GenericIndexed.read(
fromBuffer,
DecompressingByteBufferObjectStrategy.of(byteOrder, strategy),
smooshMapper
);
this.totalSize = totalSize;
this.sizePer = sizePer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class BlockLayoutColumnarFloatsSerializer implements ColumnarFloatsSerial
String filenameBase,
ByteOrder byteOrder,
CompressionStrategy compression,
int fileSizeLimit,
Closer closer
)
{
Expand All @@ -66,6 +67,7 @@ public class BlockLayoutColumnarFloatsSerializer implements ColumnarFloatsSerial
filenameBase,
compression,
CompressedPools.BUFFER_SIZE,
fileSizeLimit,
closer
);
this.compression = compression;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.base.Supplier;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
Expand All @@ -42,10 +43,15 @@ public BlockLayoutColumnarFloatsSupplier(
int sizePer,
ByteBuffer fromBuffer,
ByteOrder byteOrder,
CompressionStrategy strategy
CompressionStrategy strategy,
@Nullable SmooshedFileMapper smooshMapper
)
{
baseFloatBuffers = GenericIndexed.read(fromBuffer, DecompressingByteBufferObjectStrategy.of(byteOrder, strategy));
baseFloatBuffers = GenericIndexed.read(
fromBuffer,
DecompressingByteBufferObjectStrategy.of(byteOrder, strategy),
smooshMapper
);
this.totalSize = totalSize;
this.sizePer = sizePer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class BlockLayoutColumnarLongsSerializer implements ColumnarLongsSerializ
ByteOrder byteOrder,
CompressionFactory.LongEncodingWriter writer,
CompressionStrategy compression,
int fileSizeLimit,
Closer closer
)
{
Expand All @@ -71,6 +72,7 @@ public class BlockLayoutColumnarLongsSerializer implements ColumnarLongsSerializ
filenameBase,
compression,
bufferSize,
fileSizeLimit,
closer
);
this.writer = writer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.base.Supplier;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.common.semantic.SemanticUtils;
import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -51,11 +52,16 @@ public BlockLayoutColumnarLongsSupplier(
ByteBuffer fromBuffer,
ByteOrder order,
CompressionFactory.LongEncodingReader reader,
CompressionStrategy strategy
CompressionStrategy strategy,
SmooshedFileMapper smooshMapper
)
{
this.strategy = strategy;
this.baseLongBuffers = GenericIndexed.read(fromBuffer, DecompressingByteBufferObjectStrategy.of(order, strategy));
this.baseLongBuffers = GenericIndexed.read(
fromBuffer,
DecompressingByteBufferObjectStrategy.of(order, strategy),
smooshMapper
);
this.totalSize = totalSize;
this.sizePer = sizePer;
this.baseReader = reader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.base.Supplier;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
Expand All @@ -34,9 +35,19 @@ private CompressedColumnarDoublesSuppliers()
{
}

/**
* Reads a column from a {@link ByteBuffer}, possibly using additional secondary files from a
* {@link SmooshedFileMapper}.
*
* @param buffer primary buffer to read from
* @param order byte order
* @param smooshMapper required for reading version 2 (multi-file) indexed. May be null if you know you are reading
* a single-file column. Generally, this should only be null in tests, not production code.
*/
public static Supplier<ColumnarDoubles> fromByteBuffer(
ByteBuffer buffer,
ByteOrder order
ByteOrder order,
SmooshedFileMapper smooshMapper
)
{
byte versionFromBuffer = buffer.get();
Expand All @@ -54,7 +65,8 @@ public static Supplier<ColumnarDoubles> fromByteBuffer(
sizePer,
buffer.asReadOnlyBuffer(),
order,
compression
compression,
smooshMapper
);
}
throw new IAE("Unknown version[%s]", versionFromBuffer);
Expand Down
Loading
Loading