Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use mmap for nested column value to dictionary id lookup for more chill heap usage during serialization #14919

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -228,13 +228,12 @@ public void writeMergedValueDictionary(List<IndexableAdapter> adapters) throws I
} else {
// all the bells and whistles
logicalType = ColumnType.NESTED_DATA;
final NestedDataColumnSerializer defaultSerializer = new NestedDataColumnSerializer(
serializer = new NestedDataColumnSerializer(
name,
indexSpec,
segmentWriteOutMedium,
closer
);
serializer = defaultSerializer;
}

serializer.openDictionaryWriter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@

package org.apache.druid.segment.column;

import com.google.common.base.Supplier;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.data.DictionaryWriter;
import org.apache.druid.segment.data.EncodedStringDictionaryWriter;
import org.apache.druid.segment.data.FrontCodedIndexed;
import org.apache.druid.segment.data.FrontCodedIndexedWriter;
import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.data.GenericIndexedWriter;
Expand All @@ -33,6 +36,7 @@

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Iterator;

public class StringEncodingStrategies
Expand Down Expand Up @@ -67,6 +71,39 @@ public static DictionaryWriter<String> getStringDictionaryWriter(
}
}

public static Supplier<? extends Indexed<ByteBuffer>> getStringDictionarySupplier(
SmooshedFileMapper mapper,
ByteBuffer stringDictionaryBuffer,
ByteOrder byteOrder
)
{
final int dictionaryStartPosition = stringDictionaryBuffer.position();
final byte dictionaryVersion = stringDictionaryBuffer.get();

if (dictionaryVersion == EncodedStringDictionaryWriter.VERSION) {
final byte encodingId = stringDictionaryBuffer.get();
if (encodingId == StringEncodingStrategy.FRONT_CODED_ID) {
return FrontCodedIndexed.read(
stringDictionaryBuffer,
byteOrder
);
} else if (encodingId == StringEncodingStrategy.UTF8_ID) {
// this cannot happen naturally right now since generic indexed is written in the 'legacy' format, but
// this provides backwards compatibility should we switch at some point in the future to always
// writing dictionaryVersion
return GenericIndexed.read(stringDictionaryBuffer, GenericIndexed.UTF8_STRATEGY, mapper)::singleThreaded;
} else {
throw new ISE("impossible, unknown encoding strategy id: %s", encodingId);
}
} else {
// legacy format that only supports plain utf8 enoding stored in GenericIndexed and the byte we are reading
// as dictionaryVersion is actually also the GenericIndexed version, so we reset start position so the
// GenericIndexed version can be correctly read
stringDictionaryBuffer.position(dictionaryStartPosition);
return GenericIndexed.read(stringDictionaryBuffer, GenericIndexed.UTF8_STRATEGY, mapper)::singleThreaded;
}
}

/**
* Adapter to convert {@link Indexed<ByteBuffer>} with utf8 encoded bytes into {@link Indexed<String>} to be friendly
* to consumers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public T get(int index) throws IOException
if (index == 0 && hasNulls) {
return null;
}
int startOffset = index * width;
int startOffset = (hasNulls ? index - 1 : index) * width;
readBuffer.clear();
valuesOut.readFully(startOffset, readBuffer);
readBuffer.clear();
Expand Down Expand Up @@ -197,14 +197,14 @@ private void readPage()
{
iteratorBuffer.clear();
try {
if (totalCount - pos < PAGE_SIZE) {
int size = (totalCount - pos) * width;
if (numWritten - (pos - startPos) < PAGE_SIZE) {
int size = (numWritten - (pos - startPos)) * width;
iteratorBuffer.limit(size);
valuesOut.readFully((long) pos * width, iteratorBuffer);
valuesOut.readFully((long) (pos - startPos) * width, iteratorBuffer);
} else {
valuesOut.readFully((long) pos * width, iteratorBuffer);
valuesOut.readFully((long) (pos - startPos) * width, iteratorBuffer);
}
iteratorBuffer.flip();
iteratorBuffer.clear();
}
catch (IOException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ private long getBucketOffset(int index) throws IOException
{
getOffsetBuffer.clear();
headerOut.readFully(index * (long) Integer.BYTES, getOffsetBuffer);
getOffsetBuffer.clear();
return getOffsetBuffer.getInt(0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.primitives.Ints;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import it.unimi.dsi.fastutil.longs.LongList;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
Expand Down Expand Up @@ -293,7 +294,16 @@ public T get(int index) throws IOException
long endOffset = getOffset(index);
int valueSize = checkedCastNonnegativeLongToInt(endOffset - startOffset);
if (valueSize == 0) {
return null;
if (NullHandling.replaceWithDefault()) {
return null;
}
ByteBuffer bb = ByteBuffer.allocate(Integer.BYTES);
valuesOut.readFully(startOffset - Integer.BYTES, bb);
bb.flip();
if (bb.getInt() < 0) {
return null;
}
return strategy.fromByteBuffer(bb, 0);
}
ByteBuffer bb = ByteBuffer.allocate(valueSize);
valuesOut.readFully(startOffset, bb);
Expand Down
Loading