-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
dynamic chunk sizing for v4 raw forward index #12945
Changes from 12 commits
3e8c3ba
d16cfd6
a4cae70
c529619
3d0b1c1
7ecd0d0
9a884f8
ee7c4ba
e8b09c8
5efa50e
a392023
06f35f9
b8a6173
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.pinot.segment.local.segment.creator.impl.fwd; | ||
|
||
public class ForwardIndexUtils { | ||
private static final int TARGET_MIN_CHUNK_SIZE = 4 * 1024; | ||
|
||
private ForwardIndexUtils() { | ||
} | ||
|
||
/** | ||
* Get the dynamic target chunk size based on the maximum length of the values, target number of documents per chunk. | ||
* | ||
* If targetDocsPerChunk is negative, the target chunk size is the targetMaxChunkSizeBytes and chunk size | ||
* shall not be dynamically chosen | ||
* @param maxLength max length of the values | ||
* @param targetDocsPerChunk target number of documents to store per chunk | ||
* @param targetMaxChunkSizeBytes target max chunk size in bytes | ||
*/ | ||
public static int getDynamicTargetChunkSize(int maxLength, int targetDocsPerChunk, int targetMaxChunkSizeBytes) { | ||
if (targetDocsPerChunk < 0 || (long) maxLength * targetDocsPerChunk > Integer.MAX_VALUE) { | ||
return targetMaxChunkSizeBytes; | ||
} | ||
return Math.max(Math.min(maxLength * targetDocsPerChunk, targetMaxChunkSizeBytes), TARGET_MIN_CHUNK_SIZE); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,14 +31,11 @@ | |
import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator; | ||
import org.apache.pinot.spi.data.FieldSpec.DataType; | ||
|
||
|
||
/** | ||
* Raw (non-dictionary-encoded) forward index creator for single-value column of variable length data type (BIG_DECIMAL, | ||
* STRING, BYTES). | ||
*/ | ||
public class SingleValueVarByteRawIndexCreator implements ForwardIndexCreator { | ||
private static final int DEFAULT_NUM_DOCS_PER_CHUNK = 1000; | ||
private static final int TARGET_MAX_CHUNK_SIZE = 1024 * 1024; | ||
|
||
private final VarByteChunkWriter _indexWriter; | ||
private final DataType _valueType; | ||
|
@@ -57,7 +54,8 @@ public SingleValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressionType | |
int totalDocs, DataType valueType, int maxLength) | ||
throws IOException { | ||
this(baseIndexDir, compressionType, column, totalDocs, valueType, maxLength, false, | ||
ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION); | ||
ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION, ForwardIndexConfig.DEFAULT_TARGET_MAX_CHUNK_SIZE, | ||
ForwardIndexConfig.DEFAULT_TARGET_DOCS_PER_CHUNK); | ||
} | ||
|
||
/** | ||
|
@@ -70,23 +68,32 @@ public SingleValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressionType | |
* @param maxLength length of longest entry (in bytes) | ||
* @param deriveNumDocsPerChunk true if writer should auto-derive the number of rows per chunk | ||
* @param writerVersion writer format version | ||
* @param targetMaxChunkSizeBytes target max chunk size in bytes, applicable only for V4 or when | ||
* deriveNumDocsPerChunk is true | ||
* @param targetDocsPerChunk target number of docs per chunk | ||
* @throws IOException | ||
*/ | ||
public SingleValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType, String column, | ||
int totalDocs, DataType valueType, int maxLength, boolean deriveNumDocsPerChunk, int writerVersion) | ||
int totalDocs, DataType valueType, int maxLength, boolean deriveNumDocsPerChunk, int writerVersion, | ||
int targetMaxChunkSizeBytes, int targetDocsPerChunk) | ||
throws IOException { | ||
File file = new File(baseIndexDir, column + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION); | ||
int numDocsPerChunk = deriveNumDocsPerChunk ? getNumDocsPerChunk(maxLength) : DEFAULT_NUM_DOCS_PER_CHUNK; | ||
int numDocsPerChunk = | ||
deriveNumDocsPerChunk ? getNumDocsPerChunk(maxLength, targetMaxChunkSizeBytes) : targetDocsPerChunk; | ||
|
||
// For columns with very small max value, target chunk size should also be capped to reduce memory during read | ||
int dynamicTargetChunkSize = | ||
ForwardIndexUtils.getDynamicTargetChunkSize(maxLength, targetDocsPerChunk, targetMaxChunkSizeBytes); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should this method take numDocsPerChunk instead of targetDocsPerChunk here? or we can check deriveNumDocsPerChunk, if it's true we also derive dynamicTargetChunkSize otherwise use targetMaxChunkSizeBytes instead? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it is correct. If not configured, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That is clearer 🙂 |
||
_indexWriter = writerVersion < VarByteChunkForwardIndexWriterV4.VERSION ? new VarByteChunkForwardIndexWriter(file, | ||
compressionType, totalDocs, numDocsPerChunk, maxLength, writerVersion) | ||
: new VarByteChunkForwardIndexWriterV4(file, compressionType, TARGET_MAX_CHUNK_SIZE); | ||
: new VarByteChunkForwardIndexWriterV4(file, compressionType, dynamicTargetChunkSize); | ||
_valueType = valueType; | ||
} | ||
|
||
@VisibleForTesting | ||
public static int getNumDocsPerChunk(int lengthOfLongestEntry) { | ||
public static int getNumDocsPerChunk(int lengthOfLongestEntry, int targetMaxChunkSizeBytes) { | ||
int overheadPerEntry = lengthOfLongestEntry + VarByteChunkForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE; | ||
return Math.max(TARGET_MAX_CHUNK_SIZE / overheadPerEntry, 1); | ||
return Math.max(targetMaxChunkSizeBytes / overheadPerEntry, 1); | ||
} | ||
|
||
@Override | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe also put a lower bound to this?