Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dynamic chunk sizing for v4 raw forward index #12945

Merged
merged 13 commits into from
May 3, 2024
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.segment.local.segment.creator.impl.fwd;

public class ForwardIndexUtils {
private static final int TARGET_MIN_CHUNK_SIZE = 4 * 1024;

private ForwardIndexUtils() {
}

/**
* Get the dynamic target chunk size based on the maximum length of the values, target number of documents per chunk.
*
* If targetDocsPerChunk is negative, the target chunk size is the targetMaxChunkSizeBytes and chunk size
* shall not be dynamically chosen
* @param maxLength max length of the values
* @param targetDocsPerChunk target number of documents to store per chunk
* @param targetMaxChunkSizeBytes target max chunk size in bytes
*/
public static int getDynamicTargetChunkSize(int maxLength, int targetDocsPerChunk, int targetMaxChunkSizeBytes) {
if (targetDocsPerChunk < 0 || (long) maxLength * targetDocsPerChunk > Integer.MAX_VALUE) {
return targetMaxChunkSizeBytes;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe also put a lower bound to this?

}
return Math.max(Math.min(maxLength * targetDocsPerChunk, targetMaxChunkSizeBytes), TARGET_MIN_CHUNK_SIZE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,28 +36,10 @@
* FLOAT, DOUBLE).
*/
public class MultiValueFixedByteRawIndexCreator implements ForwardIndexCreator {
private static final int DEFAULT_NUM_DOCS_PER_CHUNK = 1000;
private static final int TARGET_MAX_CHUNK_SIZE = 1024 * 1024;

private final VarByteChunkWriter _indexWriter;
private final DataType _valueType;

/**
* Create a var-byte raw index creator for the given column
*
* @param baseIndexDir Index directory
* @param compressionType Type of compression to use
* @param column Name of column to index
* @param totalDocs Total number of documents to index
* @param valueType Type of the values
*/
public MultiValueFixedByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType, String column,
int totalDocs, DataType valueType, int maxNumberOfMultiValueElements)
throws IOException {
this(baseIndexDir, compressionType, column, totalDocs, valueType, maxNumberOfMultiValueElements, false,
ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION);
}

/**
* Create a var-byte raw index creator for the given column
*
Expand All @@ -68,27 +50,43 @@ public MultiValueFixedByteRawIndexCreator(File baseIndexDir, ChunkCompressionTyp
* @param valueType Type of the values
* @param deriveNumDocsPerChunk true if writer should auto-derive the number of rows per chunk
* @param writerVersion writer format version
* @param targetMaxChunkSizeBytes target max chunk size in bytes, applicable only for V4 or when
* deriveNumDocsPerChunk is true
*/
public MultiValueFixedByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType, String column,
int totalDocs, DataType valueType, int maxNumberOfMultiValueElements, boolean deriveNumDocsPerChunk,
int writerVersion)
int writerVersion, int targetMaxChunkSizeBytes, int targetDocsPerChunk)
throws IOException {
this(new File(baseIndexDir, column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION), compressionType, totalDocs,
valueType, maxNumberOfMultiValueElements, deriveNumDocsPerChunk, writerVersion);
valueType, maxNumberOfMultiValueElements, deriveNumDocsPerChunk, writerVersion, targetMaxChunkSizeBytes,
targetDocsPerChunk);
}

public MultiValueFixedByteRawIndexCreator(File indexFile, ChunkCompressionType compressionType, int totalDocs,
DataType valueType, int maxNumberOfMultiValueElements, boolean deriveNumDocsPerChunk, int writerVersion)
throws IOException {
this(indexFile, compressionType, totalDocs, valueType, maxNumberOfMultiValueElements, deriveNumDocsPerChunk,
writerVersion, ForwardIndexConfig.DEFAULT_TARGET_MAX_CHUNK_SIZE,
ForwardIndexConfig.DEFAULT_TARGET_DOCS_PER_CHUNK);
}


public MultiValueFixedByteRawIndexCreator(File indexFile, ChunkCompressionType compressionType, int totalDocs,
DataType valueType, int maxNumberOfMultiValueElements, boolean deriveNumDocsPerChunk, int writerVersion,
int targetMaxChunkSizeBytes, int targetDocsPerChunk)
throws IOException {
// Store the length followed by the values
int totalMaxLength = Integer.BYTES + (maxNumberOfMultiValueElements * valueType.getStoredType().size());
int numDocsPerChunk = deriveNumDocsPerChunk ? Math.max(
TARGET_MAX_CHUNK_SIZE / (totalMaxLength + VarByteChunkForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE), 1)
: DEFAULT_NUM_DOCS_PER_CHUNK;
targetMaxChunkSizeBytes / (totalMaxLength + VarByteChunkForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE),
1) : targetDocsPerChunk;
// For columns with very small max value, target chunk size should also be capped to reduce memory during read
int dynamicTargetChunkSize =
ForwardIndexUtils.getDynamicTargetChunkSize(totalMaxLength, targetDocsPerChunk, targetMaxChunkSizeBytes);
_indexWriter =
writerVersion < VarByteChunkForwardIndexWriterV4.VERSION ? new VarByteChunkForwardIndexWriter(indexFile,
compressionType, totalDocs, numDocsPerChunk, totalMaxLength, writerVersion)
: new VarByteChunkForwardIndexWriterV4(indexFile, compressionType, TARGET_MAX_CHUNK_SIZE);
: new VarByteChunkForwardIndexWriterV4(indexFile, compressionType, dynamicTargetChunkSize);
_valueType = valueType;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
* BYTES).
*/
public class MultiValueVarByteRawIndexCreator implements ForwardIndexCreator {
private static final int TARGET_MAX_CHUNK_SIZE = 1024 * 1024;

private final VarByteChunkWriter _indexWriter;
private final DataType _valueType;
Expand All @@ -56,7 +55,8 @@ public MultiValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressionType
int totalDocs, DataType valueType, int maxRowLengthInBytes, int maxNumberOfElements)
throws IOException {
this(baseIndexDir, compressionType, column, totalDocs, valueType, ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION,
maxRowLengthInBytes, maxNumberOfElements);
maxRowLengthInBytes, maxNumberOfElements, ForwardIndexConfig.DEFAULT_TARGET_MAX_CHUNK_SIZE,
ForwardIndexConfig.DEFAULT_TARGET_DOCS_PER_CHUNK);
}

/**
Expand All @@ -72,18 +72,22 @@ public MultiValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressionType
* @param writerVersion writer format version
*/
public MultiValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType, String column,
int totalDocs, DataType valueType, int writerVersion, int maxRowLengthInBytes, int maxNumberOfElements)
int totalDocs, DataType valueType, int writerVersion, int maxRowLengthInBytes, int maxNumberOfElements,
int targetMaxChunkSizeBytes, int targetDocsPerChunk)
throws IOException {
//we will prepend the actual content with numElements and length array containing length of each element
int totalMaxLength = getTotalRowStorageBytes(maxNumberOfElements, maxRowLengthInBytes);

File file = new File(baseIndexDir, column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
int numDocsPerChunk = Math.max(
TARGET_MAX_CHUNK_SIZE / (totalMaxLength + VarByteChunkForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE),
targetMaxChunkSizeBytes / (totalMaxLength + VarByteChunkForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE),
1);
// For columns with very small max value, target chunk size should also be capped to reduce memory during read
int dynamicTargetChunkSize =
ForwardIndexUtils.getDynamicTargetChunkSize(totalMaxLength, targetDocsPerChunk, targetMaxChunkSizeBytes);
_indexWriter = writerVersion < VarByteChunkForwardIndexWriterV4.VERSION ? new VarByteChunkForwardIndexWriter(file,
compressionType, totalDocs, numDocsPerChunk, totalMaxLength, writerVersion)
: new VarByteChunkForwardIndexWriterV4(file, compressionType, TARGET_MAX_CHUNK_SIZE);
: new VarByteChunkForwardIndexWriterV4(file, compressionType, dynamicTargetChunkSize);
_valueType = valueType;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
* FLOAT, DOUBLE).
*/
public class SingleValueFixedByteRawIndexCreator implements ForwardIndexCreator {
private static final int NUM_DOCS_PER_CHUNK = 1000; // TODO: Auto-derive this based on metadata.

private final FixedByteChunkForwardIndexWriter _indexWriter;
private final DataType _valueType;

Expand All @@ -51,7 +49,8 @@ public class SingleValueFixedByteRawIndexCreator implements ForwardIndexCreator
public SingleValueFixedByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType, String column,
int totalDocs, DataType valueType)
throws IOException {
this(baseIndexDir, compressionType, column, totalDocs, valueType, ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION);
this(baseIndexDir, compressionType, column, totalDocs, valueType, ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION,
ForwardIndexConfig.DEFAULT_TARGET_DOCS_PER_CHUNK);
}

/**
Expand All @@ -66,11 +65,11 @@ public SingleValueFixedByteRawIndexCreator(File baseIndexDir, ChunkCompressionTy
* @throws IOException
*/
public SingleValueFixedByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType, String column,
int totalDocs, DataType valueType, int writerVersion)
int totalDocs, DataType valueType, int writerVersion, int targetDocsPerChunk)
throws IOException {
File file = new File(baseIndexDir, column + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
_indexWriter =
new FixedByteChunkForwardIndexWriter(file, compressionType, totalDocs, NUM_DOCS_PER_CHUNK, valueType.size(),
new FixedByteChunkForwardIndexWriter(file, compressionType, totalDocs, targetDocsPerChunk, valueType.size(),
writerVersion);
_valueType = valueType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,11 @@
import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
import org.apache.pinot.spi.data.FieldSpec.DataType;


/**
* Raw (non-dictionary-encoded) forward index creator for single-value column of variable length data type (BIG_DECIMAL,
* STRING, BYTES).
*/
public class SingleValueVarByteRawIndexCreator implements ForwardIndexCreator {
private static final int DEFAULT_NUM_DOCS_PER_CHUNK = 1000;
private static final int TARGET_MAX_CHUNK_SIZE = 1024 * 1024;

private final VarByteChunkWriter _indexWriter;
private final DataType _valueType;
Expand All @@ -57,7 +54,8 @@ public SingleValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressionType
int totalDocs, DataType valueType, int maxLength)
throws IOException {
this(baseIndexDir, compressionType, column, totalDocs, valueType, maxLength, false,
ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION);
ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION, ForwardIndexConfig.DEFAULT_TARGET_MAX_CHUNK_SIZE,
ForwardIndexConfig.DEFAULT_TARGET_DOCS_PER_CHUNK);
}

/**
Expand All @@ -70,23 +68,32 @@ public SingleValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressionType
* @param maxLength length of longest entry (in bytes)
* @param deriveNumDocsPerChunk true if writer should auto-derive the number of rows per chunk
* @param writerVersion writer format version
* @param targetMaxChunkSizeBytes target max chunk size in bytes, applicable only for V4 or when
* deriveNumDocsPerChunk is true
* @param targetDocsPerChunk target number of docs per chunk
* @throws IOException
*/
public SingleValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType, String column,
int totalDocs, DataType valueType, int maxLength, boolean deriveNumDocsPerChunk, int writerVersion)
int totalDocs, DataType valueType, int maxLength, boolean deriveNumDocsPerChunk, int writerVersion,
int targetMaxChunkSizeBytes, int targetDocsPerChunk)
throws IOException {
File file = new File(baseIndexDir, column + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
int numDocsPerChunk = deriveNumDocsPerChunk ? getNumDocsPerChunk(maxLength) : DEFAULT_NUM_DOCS_PER_CHUNK;
int numDocsPerChunk =
deriveNumDocsPerChunk ? getNumDocsPerChunk(maxLength, targetMaxChunkSizeBytes) : targetDocsPerChunk;

// For columns with very small max value, target chunk size should also be capped to reduce memory during read
int dynamicTargetChunkSize =
ForwardIndexUtils.getDynamicTargetChunkSize(maxLength, targetDocsPerChunk, targetMaxChunkSizeBytes);
Copy link
Contributor

@klsince klsince May 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this method take numDocsPerChunk instead of targetDocsPerChunk here?

or we can check deriveNumDocsPerChunk, if it's true we also derive dynamicTargetChunkSize otherwise use targetMaxChunkSizeBytes instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is correct. If not configured, targetDocsPerChunk should be 1000 by default.
Made a small cleanup PR #13093 to clarify the logic a little bit

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is clearer 🙂

_indexWriter = writerVersion < VarByteChunkForwardIndexWriterV4.VERSION ? new VarByteChunkForwardIndexWriter(file,
compressionType, totalDocs, numDocsPerChunk, maxLength, writerVersion)
: new VarByteChunkForwardIndexWriterV4(file, compressionType, TARGET_MAX_CHUNK_SIZE);
: new VarByteChunkForwardIndexWriterV4(file, compressionType, dynamicTargetChunkSize);
_valueType = valueType;
}

@VisibleForTesting
public static int getNumDocsPerChunk(int lengthOfLongestEntry) {
public static int getNumDocsPerChunk(int lengthOfLongestEntry, int targetMaxChunkSizeBytes) {
int overheadPerEntry = lengthOfLongestEntry + VarByteChunkForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
return Math.max(TARGET_MAX_CHUNK_SIZE / overheadPerEntry, 1);
return Math.max(targetMaxChunkSizeBytes / overheadPerEntry, 1);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,16 @@ public static ForwardIndexCreator createIndexCreator(IndexCreationContext contex
}
boolean deriveNumDocsPerChunk = indexConfig.isDeriveNumDocsPerChunk();
int writerVersion = indexConfig.getRawIndexWriterVersion();
int targetMaxChunkSize = indexConfig.getTargetMaxChunkSizeBytes();
int targetDocsPerChunk = indexConfig.getTargetDocsPerChunk();
if (fieldSpec.isSingleValueField()) {
return getRawIndexCreatorForSVColumn(indexDir, chunkCompressionType, columnName, storedType, numTotalDocs,
context.getLengthOfLongestEntry(), deriveNumDocsPerChunk, writerVersion);
context.getLengthOfLongestEntry(), deriveNumDocsPerChunk, writerVersion, targetMaxChunkSize,
targetDocsPerChunk);
} else {
return getRawIndexCreatorForMVColumn(indexDir, chunkCompressionType, columnName, storedType, numTotalDocs,
context.getMaxNumberOfMultiValueElements(), deriveNumDocsPerChunk, writerVersion,
context.getMaxRowLengthInBytes());
context.getMaxRowLengthInBytes(), targetMaxChunkSize, targetDocsPerChunk);
}
}
}
Expand All @@ -97,20 +100,20 @@ public static ForwardIndexCreator createIndexCreator(IndexCreationContext contex
*/
public static ForwardIndexCreator getRawIndexCreatorForSVColumn(File indexDir, ChunkCompressionType compressionType,
String column, DataType storedType, int numTotalDocs, int lengthOfLongestEntry, boolean deriveNumDocsPerChunk,
int writerVersion)
int writerVersion, int targetMaxChunkSize, int targetDocsPerChunk)
throws IOException {
switch (storedType) {
case INT:
case LONG:
case FLOAT:
case DOUBLE:
return new SingleValueFixedByteRawIndexCreator(indexDir, compressionType, column, numTotalDocs, storedType,
writerVersion);
writerVersion, targetDocsPerChunk);
case BIG_DECIMAL:
case STRING:
case BYTES:
return new SingleValueVarByteRawIndexCreator(indexDir, compressionType, column, numTotalDocs, storedType,
lengthOfLongestEntry, deriveNumDocsPerChunk, writerVersion);
lengthOfLongestEntry, deriveNumDocsPerChunk, writerVersion, targetMaxChunkSize, targetDocsPerChunk);
default:
throw new IllegalStateException("Unsupported stored type: " + storedType);
}
Expand All @@ -122,19 +125,21 @@ public static ForwardIndexCreator getRawIndexCreatorForSVColumn(File indexDir, C
*/
public static ForwardIndexCreator getRawIndexCreatorForMVColumn(File indexDir, ChunkCompressionType compressionType,
String column, DataType storedType, int numTotalDocs, int maxNumberOfMultiValueElements,
boolean deriveNumDocsPerChunk, int writerVersion, int maxRowLengthInBytes)
boolean deriveNumDocsPerChunk, int writerVersion, int maxRowLengthInBytes, int targetMaxChunkSize,
int targetDocsPerChunk)
throws IOException {
switch (storedType) {
case INT:
case LONG:
case FLOAT:
case DOUBLE:
return new MultiValueFixedByteRawIndexCreator(indexDir, compressionType, column, numTotalDocs, storedType,
maxNumberOfMultiValueElements, deriveNumDocsPerChunk, writerVersion);
maxNumberOfMultiValueElements, deriveNumDocsPerChunk, writerVersion, targetMaxChunkSize,
targetDocsPerChunk);
case STRING:
case BYTES:
return new MultiValueVarByteRawIndexCreator(indexDir, compressionType, column, numTotalDocs, storedType,
writerVersion, maxRowLengthInBytes, maxNumberOfMultiValueElements);
writerVersion, maxRowLengthInBytes, maxNumberOfMultiValueElements, targetMaxChunkSize, targetDocsPerChunk);
default:
throw new IllegalStateException("Unsupported stored type: " + storedType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public <T> void testMV(DataType dataType, List<T> inputs, ToIntFunction<T> sizeo
file.delete();
MultiValueFixedByteRawIndexCreator creator =
new MultiValueFixedByteRawIndexCreator(new File(OUTPUT_DIR), compressionType, column, numDocs, dataType,
maxElements, false, writerVersion);
maxElements, false, writerVersion, 1024 * 1024, 1000);
inputs.forEach(input -> injector.inject(creator, input));
creator.close();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public void testMVString(ChunkCompressionType compressionType, boolean useFullSi
inputs.add(values);
}
try (MultiValueVarByteRawIndexCreator creator = new MultiValueVarByteRawIndexCreator(OUTPUT_DIR, compressionType,
column, numDocs, DataType.STRING, maxTotalLength, maxElements, writerVersion)) {
column, numDocs, DataType.STRING, maxTotalLength, maxElements, writerVersion, 1024 * 1024, 1000)) {
for (String[] input : inputs) {
creator.putStringMV(input);
}
Expand Down Expand Up @@ -171,7 +171,7 @@ public void testMVBytes(ChunkCompressionType compressionType, boolean useFullSiz
inputs.add(values);
}
try (MultiValueVarByteRawIndexCreator creator = new MultiValueVarByteRawIndexCreator(OUTPUT_DIR, compressionType,
column, numDocs, DataType.BYTES, writerVersion, maxTotalLength, maxElements)) {
column, numDocs, DataType.BYTES, writerVersion, maxTotalLength, maxElements, 1024 * 1024, 1000)) {
for (byte[][] input : inputs) {
creator.putBytesMV(input);
}
Expand Down
Loading
Loading