Skip to content

Commit

Permalink
use mmap for nested column value to dictionary id lookup for more chi…
Browse files Browse the repository at this point in the history
…ll heap usage during serialization (#14919)
  • Loading branch information
clintropolis authored Sep 13, 2023
1 parent 286eeca commit 23b78c0
Show file tree
Hide file tree
Showing 22 changed files with 560 additions and 394 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -228,13 +228,12 @@ public void writeMergedValueDictionary(List<IndexableAdapter> adapters) throws I
} else {
// all the bells and whistles
logicalType = ColumnType.NESTED_DATA;
final NestedDataColumnSerializer defaultSerializer = new NestedDataColumnSerializer(
serializer = new NestedDataColumnSerializer(
name,
indexSpec,
segmentWriteOutMedium,
closer
);
serializer = defaultSerializer;
}

serializer.openDictionaryWriter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class NestedDataColumnMergerV4 implements DimensionMergerV9
private final Closer closer;

private ColumnDescriptor.Builder descriptorBuilder;
private GenericColumnSerializer<?> serializer;
private NestedDataColumnSerializerV4 serializer;

public NestedDataColumnMergerV4(
String name,
Expand Down Expand Up @@ -111,13 +111,12 @@ public void writeMergedValueDictionary(List<IndexableAdapter> adapters) throws I

descriptorBuilder = new ColumnDescriptor.Builder();

final NestedDataColumnSerializerV4 defaultSerializer = new NestedDataColumnSerializerV4(
serializer = new NestedDataColumnSerializerV4(
name,
indexSpec,
segmentWriteOutMedium,
closer
);
serializer = defaultSerializer;

final ComplexColumnPartSerde partSerde = ComplexColumnPartSerde.serializerBuilder()
.withTypeName(NestedDataComplexTypeSerde.TYPE_NAME)
Expand All @@ -127,14 +126,14 @@ public void writeMergedValueDictionary(List<IndexableAdapter> adapters) throws I
.setHasMultipleValues(false)
.addSerde(partSerde);

defaultSerializer.open();
defaultSerializer.serializeFields(mergedFields);
serializer.open();
serializer.serializeFields(mergedFields);

int stringCardinality;
int longCardinality;
int doubleCardinality;
if (numMergeIndex == 1) {
defaultSerializer.serializeDictionaries(
serializer.serializeDictionaries(
sortedLookup.getSortedStrings(),
sortedLookup.getSortedLongs(),
sortedLookup.getSortedDoubles()
Expand All @@ -155,7 +154,7 @@ public void writeMergedValueDictionary(List<IndexableAdapter> adapters) throws I
sortedDoubleLookups,
DOUBLE_MERGING_COMPARATOR
);
defaultSerializer.serializeDictionaries(
serializer.serializeDictionaries(
() -> stringIterator,
() -> longIterator,
() -> doubleIterator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@

package org.apache.druid.segment.column;

import com.google.common.base.Supplier;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.data.DictionaryWriter;
import org.apache.druid.segment.data.EncodedStringDictionaryWriter;
import org.apache.druid.segment.data.FrontCodedIndexed;
import org.apache.druid.segment.data.FrontCodedIndexedWriter;
import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.data.GenericIndexedWriter;
Expand All @@ -33,6 +36,7 @@

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Iterator;

public class StringEncodingStrategies
Expand Down Expand Up @@ -67,6 +71,39 @@ public static DictionaryWriter<String> getStringDictionaryWriter(
}
}

public static Supplier<? extends Indexed<ByteBuffer>> getStringDictionarySupplier(
SmooshedFileMapper mapper,
ByteBuffer stringDictionaryBuffer,
ByteOrder byteOrder
)
{
final int dictionaryStartPosition = stringDictionaryBuffer.position();
final byte dictionaryVersion = stringDictionaryBuffer.get();

if (dictionaryVersion == EncodedStringDictionaryWriter.VERSION) {
final byte encodingId = stringDictionaryBuffer.get();
if (encodingId == StringEncodingStrategy.FRONT_CODED_ID) {
return FrontCodedIndexed.read(
stringDictionaryBuffer,
byteOrder
);
} else if (encodingId == StringEncodingStrategy.UTF8_ID) {
// this cannot happen naturally right now since generic indexed is written in the 'legacy' format, but
// this provides backwards compatibility should we switch at some point in the future to always
// writing dictionaryVersion
return GenericIndexed.read(stringDictionaryBuffer, GenericIndexed.UTF8_STRATEGY, mapper)::singleThreaded;
} else {
throw new ISE("impossible, unknown encoding strategy id: %s", encodingId);
}
} else {
// legacy format that only supports plain utf8 enoding stored in GenericIndexed and the byte we are reading
// as dictionaryVersion is actually also the GenericIndexed version, so we reset start position so the
// GenericIndexed version can be correctly read
stringDictionaryBuffer.position(dictionaryStartPosition);
return GenericIndexed.read(stringDictionaryBuffer, GenericIndexed.UTF8_STRATEGY, mapper)::singleThreaded;
}
}

/**
* Adapter to convert {@link Indexed<ByteBuffer>} with utf8 encoded bytes into {@link Indexed<String>} to be friendly
* to consumers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public T get(int index) throws IOException
if (index == 0 && hasNulls) {
return null;
}
int startOffset = index * width;
int startOffset = (hasNulls ? index - 1 : index) * width;
readBuffer.clear();
valuesOut.readFully(startOffset, readBuffer);
readBuffer.clear();
Expand Down Expand Up @@ -197,14 +197,14 @@ private void readPage()
{
iteratorBuffer.clear();
try {
if (totalCount - pos < PAGE_SIZE) {
int size = (totalCount - pos) * width;
if (numWritten - (pos - startPos) < PAGE_SIZE) {
int size = (numWritten - (pos - startPos)) * width;
iteratorBuffer.limit(size);
valuesOut.readFully((long) pos * width, iteratorBuffer);
valuesOut.readFully((long) (pos - startPos) * width, iteratorBuffer);
} else {
valuesOut.readFully((long) pos * width, iteratorBuffer);
valuesOut.readFully((long) (pos - startPos) * width, iteratorBuffer);
}
iteratorBuffer.flip();
iteratorBuffer.clear();
}
catch (IOException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ private long getBucketOffset(int index) throws IOException
{
getOffsetBuffer.clear();
headerOut.readFully(index * (long) Integer.BYTES, getOffsetBuffer);
getOffsetBuffer.clear();
return getOffsetBuffer.getInt(0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.primitives.Ints;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import it.unimi.dsi.fastutil.longs.LongList;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
Expand Down Expand Up @@ -293,7 +294,16 @@ public T get(int index) throws IOException
long endOffset = getOffset(index);
int valueSize = checkedCastNonnegativeLongToInt(endOffset - startOffset);
if (valueSize == 0) {
return null;
if (NullHandling.replaceWithDefault()) {
return null;
}
ByteBuffer bb = ByteBuffer.allocate(Integer.BYTES);
valuesOut.readFully(startOffset - Integer.BYTES, bb);
bb.flip();
if (bb.getInt() < 0) {
return null;
}
return strategy.fromByteBuffer(bb, 0);
}
ByteBuffer bb = ByteBuffer.allocate(valueSize);
valuesOut.readFully(startOffset, bb);
Expand Down
Loading

0 comments on commit 23b78c0

Please sign in to comment.