From 351203e9faf1e419457dac043abf59f810421089 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Mon, 1 Aug 2022 17:29:22 -0700 Subject: [PATCH] specialized FixedIndexed implementations for java value types --- .../data/FixedIndexedDoubleWriter.java | 150 ++++++++ .../segment/data/FixedIndexedDoubles.java | 199 +++++++++++ .../segment/data/FixedIndexedIntWriter.java | 8 +- .../druid/segment/data/FixedIndexedInts.java | 199 +++++++++++ .../segment/data/FixedIndexedLongWriter.java | 150 ++++++++ .../druid/segment/data/FixedIndexedLongs.java | 200 +++++++++++ .../segment/data/FixedIndexedWriter.java | 9 +- .../CompressedNestedDataComplexColumn.java | 24 +- .../nested/NestedDataColumnSerializer.java | 17 +- .../nested/NestedDataColumnSupplier.java | 18 +- ...NestedFieldLiteralColumnIndexSupplier.java | 58 ++-- ...edFieldLiteralDictionaryEncodedColumn.java | 51 +-- .../druid/segment/data/FixedIndexedTest.java | 325 +++++++++++++++++- ...edFieldLiteralColumnIndexSupplierTest.java | 104 ++---- 14 files changed, 1340 insertions(+), 172 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/segment/data/FixedIndexedDoubleWriter.java create mode 100644 processing/src/main/java/org/apache/druid/segment/data/FixedIndexedDoubles.java create mode 100644 processing/src/main/java/org/apache/druid/segment/data/FixedIndexedInts.java create mode 100644 processing/src/main/java/org/apache/druid/segment/data/FixedIndexedLongWriter.java create mode 100644 processing/src/main/java/org/apache/druid/segment/data/FixedIndexedLongs.java diff --git a/processing/src/main/java/org/apache/druid/segment/data/FixedIndexedDoubleWriter.java b/processing/src/main/java/org/apache/druid/segment/data/FixedIndexedDoubleWriter.java new file mode 100644 index 000000000000..79e3f4bddcd2 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/data/FixedIndexedDoubleWriter.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.data; + +import it.unimi.dsi.fastutil.doubles.DoubleIterator; +import org.apache.druid.io.Channels; +import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; +import org.apache.druid.segment.serde.Serializer; +import org.apache.druid.segment.writeout.SegmentWriteOutMedium; +import org.apache.druid.segment.writeout.WriteOutBytes; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.WritableByteChannel; + +/** + * Specialized version of {@link FixedIndexedWriter} for writing double value types, with no support for null values, + * and no verification that data is actually sorted. The resulting data can be read into either + * {@link FixedIndexedDoubles} or a {@link FixedIndexed}, since the format is identical. + * + * Callers should be certain that the data written is in fact sorted if specifying it as such. If null values need + * to be stored then the generic {@link FixedIndexedWriter} should be used instead. + */ +public class FixedIndexedDoubleWriter implements Serializer +{ + private static final int PAGE_SIZE = 4096; + private final SegmentWriteOutMedium segmentWriteOutMedium; + private final ByteBuffer scratch; + private int numWritten; + @Nullable + private WriteOutBytes valuesOut = null; + + private final boolean isSorted; + + public FixedIndexedDoubleWriter(SegmentWriteOutMedium segmentWriteOutMedium, boolean sorted) + { + this.segmentWriteOutMedium = segmentWriteOutMedium; + // this is a matter of faith, nothing checks + this.isSorted = sorted; + this.scratch = ByteBuffer.allocate(Double.BYTES).order(ByteOrder.nativeOrder()); + } + + public void open() throws IOException + { + this.valuesOut = segmentWriteOutMedium.makeWriteOutBytes(); + } + + @Override + public long getSerializedSize() + { + return Byte.BYTES + Byte.BYTES + Integer.BYTES + valuesOut.size(); + } + + public void write(double objectToWrite) throws IOException + { + scratch.clear(); + scratch.putDouble(objectToWrite); + scratch.flip(); + Channels.writeFully(valuesOut, scratch); + numWritten++; + } + + @Override + public void writeTo( + WritableByteChannel channel, + FileSmoosher smoosher + ) throws IOException + { + scratch.clear(); + // version 0 + scratch.put((byte) 0); + // no flags, this thing is never sorted + byte flags = 0x00; + if (isSorted) { + flags = (byte) (flags | FixedIndexed.IS_SORTED_MASK); + } + scratch.put(flags); + scratch.flip(); + Channels.writeFully(channel, scratch); + scratch.clear(); + scratch.putInt(numWritten); + scratch.flip(); + Channels.writeFully(channel, scratch); + valuesOut.writeTo(channel); + } + + public DoubleIterator getIterator() + { + final ByteBuffer iteratorBuffer = ByteBuffer.allocate(Double.BYTES * PAGE_SIZE).order(ByteOrder.nativeOrder()); + + return new DoubleIterator() + { + @Override + public double nextDouble() + { + if (pos == 0 || iteratorBuffer.position() >= iteratorBuffer.limit()) { + readPage(); + } + final double value = iteratorBuffer.getDouble(); + pos++; + return value; + } + + int pos = 0; + + @Override + public boolean hasNext() + { + return pos < numWritten; + } + + private void readPage() + { + iteratorBuffer.clear(); + try { + if (numWritten - pos < PAGE_SIZE) { + int size = (numWritten - pos) * Double.BYTES; + iteratorBuffer.limit(size); + valuesOut.readFully((long) pos * Double.BYTES, iteratorBuffer); + } else { + valuesOut.readFully((long) pos * Double.BYTES, iteratorBuffer); + } + iteratorBuffer.flip(); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + }; + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/data/FixedIndexedDoubles.java b/processing/src/main/java/org/apache/druid/segment/data/FixedIndexedDoubles.java new file mode 100644 index 000000000000..c08f78508373 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/data/FixedIndexedDoubles.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.data; + +import com.google.common.base.Preconditions; +import it.unimi.dsi.fastutil.doubles.DoubleComparator; +import it.unimi.dsi.fastutil.doubles.DoubleComparators; +import it.unimi.dsi.fastutil.doubles.DoubleIterator; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.query.monomorphicprocessing.HotLoopCallee; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Iterator; + +/** + * Specialized implementation for {@link FixedIndexed} which does not contain any null values, allowing it to + * deal in java double value types instead of {@link Double} objects, and utilize specialized {@link ByteBuffer} methods + * to more efficiently read data. + */ +public final class FixedIndexedDoubles implements Indexed, HotLoopCallee +{ + public static FixedIndexedDoubles read(ByteBuffer bb, ByteOrder byteOrder) + { + final ByteBuffer buffer = bb.asReadOnlyBuffer().order(byteOrder); + final byte version = buffer.get(); + Preconditions.checkState(version == 0, "Unknown version [%s]", version); + final byte flags = buffer.get(); + final boolean hasNull = (flags & NullHandling.IS_NULL_BYTE) == NullHandling.IS_NULL_BYTE ? true : false; + final boolean isSorted = (flags & FixedIndexed.IS_SORTED_MASK) == FixedIndexed.IS_SORTED_MASK ? true : false; + Preconditions.checkState(!hasNull, "Cannot use FixedIndexedInts for FixedIndex with null values"); + Preconditions.checkState(!(hasNull && !isSorted), "cannot have null values if not sorted"); + final int size = buffer.getInt() + (hasNull ? 1 : 0); + final int valuesOffset = buffer.position(); + final FixedIndexedDoubles fixedIndexed = new FixedIndexedDoubles( + buffer, + isSorted, + size, + valuesOffset + ); + bb.position(buffer.position() + (Double.BYTES * size)); + return fixedIndexed; + } + + private final ByteBuffer buffer; + private final int size; + private final int valuesOffset; + private final boolean isSorted; + private final DoubleComparator comparator; + + private FixedIndexedDoubles( + ByteBuffer buffer, + boolean isSorted, + int size, + int valuesOffset + ) + { + this.buffer = buffer; + this.size = size; + this.valuesOffset = valuesOffset; + this.isSorted = isSorted; + this.comparator = DoubleComparators.NATURAL_COMPARATOR; + } + + @Override + public int size() + { + return size; + } + + @Nullable + @Override + public Double get(int index) + { + return getDouble(index); + } + + @Override + public int indexOf(@Nullable Double value) + { + if (value == null) { + return -1; + } + return indexOf(value.doubleValue()); + } + + public double getDouble(int index) + { + return buffer.getDouble(valuesOffset + (index * Double.BYTES)); + } + + public int indexOf(double value) + { + if (!isSorted) { + throw new UnsupportedOperationException("Reverse lookup not allowed."); + } + int minIndex = 0; + int maxIndex = size - 1; + while (minIndex <= maxIndex) { + int currIndex = (minIndex + maxIndex) >>> 1; + + double currValue = getDouble(currIndex); + int comparison = comparator.compare(currValue, value); + if (comparison == 0) { + return currIndex; + } + + if (comparison < 0) { + minIndex = currIndex + 1; + } else { + maxIndex = currIndex - 1; + } + } + + return -(minIndex + 1); + } + + public DoubleIterator doubleIterator() + { + final ByteBuffer copy = buffer.asReadOnlyBuffer().order(buffer.order()); + copy.position(valuesOffset); + copy.limit(valuesOffset + (size * Double.BYTES)); + return new DoubleIterator() + { + @Override + public double nextDouble() + { + return copy.getDouble(); + } + + @Override + public boolean hasNext() + { + return copy.hasRemaining(); + } + }; + } + + @Override + public Iterator iterator() + { + final DoubleIterator doubleIterator = doubleIterator(); + return new Iterator() + { + @Override + public boolean hasNext() + { + return doubleIterator.hasNext(); + } + + @Override + public Double next() + { + return doubleIterator.nextDouble(); + } + }; + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("buffer", buffer); + inspector.visit("comparator", comparator); + } + + @Override + public String toString() + { + StringBuilder sb = new StringBuilder("FixedIndexedDoubles["); + if (size() > 0) { + for (int i = 0; i < size(); i++) { + double value = getDouble(i); + sb.append(value).append(',').append(' '); + } + sb.setLength(sb.length() - 2); + } + sb.append(']'); + return sb.toString(); + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/data/FixedIndexedIntWriter.java b/processing/src/main/java/org/apache/druid/segment/data/FixedIndexedIntWriter.java index 5a9a3b94e127..c50f73fdec5f 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/FixedIndexedIntWriter.java +++ b/processing/src/main/java/org/apache/druid/segment/data/FixedIndexedIntWriter.java @@ -33,8 +33,12 @@ import java.nio.channels.WritableByteChannel; /** - * Specialized version of {@link FixedIndexedWriter} for writing ints, with no support for null values, and no - * verification that data is actually sorted, it just trusts you and takes your word for it + * Specialized version of {@link FixedIndexedWriter} for writing int value types, with no support for null values, + * and no verification that data is actually sorted. The resulting data can be read into either + * {@link FixedIndexedInts} or a {@link FixedIndexed}, since the format is identical. + * + * Callers should be certain that the data written is in fact sorted if specifying it as such. If null values need + * to be stored then the generic {@link FixedIndexedWriter} should be used instead. */ public final class FixedIndexedIntWriter implements Serializer { diff --git a/processing/src/main/java/org/apache/druid/segment/data/FixedIndexedInts.java b/processing/src/main/java/org/apache/druid/segment/data/FixedIndexedInts.java new file mode 100644 index 000000000000..a3629c221433 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/data/FixedIndexedInts.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.data; + +import com.google.common.base.Preconditions; +import it.unimi.dsi.fastutil.ints.IntComparator; +import it.unimi.dsi.fastutil.ints.IntComparators; +import it.unimi.dsi.fastutil.ints.IntIterator; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.query.monomorphicprocessing.HotLoopCallee; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Iterator; + +/** + * Specialized implementation for {@link FixedIndexed} which does not contain any null values, allowing it to + * deal in java int value types instead of {@link Integer} objects, and utilize specialized {@link ByteBuffer} methods + * to more efficiently read data. + */ +public final class FixedIndexedInts implements Indexed, HotLoopCallee +{ + public static FixedIndexedInts read(ByteBuffer bb, ByteOrder byteOrder) + { + final ByteBuffer buffer = bb.asReadOnlyBuffer().order(byteOrder); + final byte version = buffer.get(); + Preconditions.checkState(version == 0, "Unknown version [%s]", version); + final byte flags = buffer.get(); + final boolean hasNull = (flags & NullHandling.IS_NULL_BYTE) == NullHandling.IS_NULL_BYTE ? true : false; + final boolean isSorted = (flags & FixedIndexed.IS_SORTED_MASK) == FixedIndexed.IS_SORTED_MASK ? true : false; + Preconditions.checkState(!hasNull, "Cannot use FixedIndexedInts for FixedIndex with null values"); + Preconditions.checkState(!(hasNull && !isSorted), "cannot have null values if not sorted"); + final int size = buffer.getInt() + (hasNull ? 1 : 0); + final int valuesOffset = buffer.position(); + final FixedIndexedInts fixedIndexed = new FixedIndexedInts( + buffer, + isSorted, + size, + valuesOffset + ); + bb.position(buffer.position() + (Integer.BYTES * size)); + return fixedIndexed; + } + + private final ByteBuffer buffer; + private final int size; + private final int valuesOffset; + private final boolean isSorted; + private final IntComparator comparator; + + private FixedIndexedInts( + ByteBuffer buffer, + boolean isSorted, + int size, + int valuesOffset + ) + { + this.buffer = buffer; + this.size = size; + this.valuesOffset = valuesOffset; + this.isSorted = isSorted; + this.comparator = IntComparators.NATURAL_COMPARATOR; + } + + @Override + public int size() + { + return size; + } + + @Nullable + @Override + public Integer get(int index) + { + return getInt(index); + } + + @Override + public int indexOf(@Nullable Integer value) + { + if (value == null) { + return -1; + } + return indexOf(value.intValue()); + } + + public int getInt(int index) + { + return buffer.getInt(valuesOffset + (index * Integer.BYTES)); + } + + public int indexOf(int value) + { + if (!isSorted) { + throw new UnsupportedOperationException("Reverse lookup not allowed."); + } + int minIndex = 0; + int maxIndex = size - 1; + while (minIndex <= maxIndex) { + int currIndex = (minIndex + maxIndex) >>> 1; + + int currValue = getInt(currIndex); + int comparison = comparator.compare(currValue, value); + if (comparison == 0) { + return currIndex; + } + + if (comparison < 0) { + minIndex = currIndex + 1; + } else { + maxIndex = currIndex - 1; + } + } + + return -(minIndex + 1); + } + + public IntIterator intIterator() + { + final ByteBuffer copy = buffer.asReadOnlyBuffer().order(buffer.order()); + copy.position(valuesOffset); + copy.limit(valuesOffset + (size * Integer.BYTES)); + return new IntIterator() + { + @Override + public int nextInt() + { + return copy.getInt(); + } + + @Override + public boolean hasNext() + { + return copy.hasRemaining(); + } + }; + } + + @Override + public Iterator iterator() + { + final IntIterator iterator = intIterator(); + return new Iterator() + { + @Override + public boolean hasNext() + { + return iterator.hasNext(); + } + + @Override + public Integer next() + { + return iterator.nextInt(); + } + }; + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("buffer", buffer); + inspector.visit("comparator", comparator); + } + + @Override + public String toString() + { + StringBuilder sb = new StringBuilder("FixedIndexedInts["); + if (size() > 0) { + for (int i = 0; i < size(); i++) { + int value = getInt(i); + sb.append(value).append(',').append(' '); + } + sb.setLength(sb.length() - 2); + } + sb.append(']'); + return sb.toString(); + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/data/FixedIndexedLongWriter.java b/processing/src/main/java/org/apache/druid/segment/data/FixedIndexedLongWriter.java new file mode 100644 index 000000000000..3126dea358bc --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/data/FixedIndexedLongWriter.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.data; + +import it.unimi.dsi.fastutil.longs.LongIterator; +import org.apache.druid.io.Channels; +import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; +import org.apache.druid.segment.serde.Serializer; +import org.apache.druid.segment.writeout.SegmentWriteOutMedium; +import org.apache.druid.segment.writeout.WriteOutBytes; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.WritableByteChannel; + +/** + * Specialized version of {@link FixedIndexedWriter} for writing long value types, with no support for null values, + * and no verification that data is actually sorted. The resulting data can be read into either + * {@link FixedIndexedLongs} or a {@link FixedIndexed}, since the format is identical. + * + * Callers should be certain that the data written is in fact sorted if specifying it as such. If null values need + * to be stored then the generic {@link FixedIndexedWriter} should be used instead. + */ +public class FixedIndexedLongWriter implements Serializer +{ + private static final int PAGE_SIZE = 4096; + private final SegmentWriteOutMedium segmentWriteOutMedium; + private final ByteBuffer scratch; + private int numWritten; + @Nullable + private WriteOutBytes valuesOut = null; + + private final boolean isSorted; + + public FixedIndexedLongWriter(SegmentWriteOutMedium segmentWriteOutMedium, boolean sorted) + { + this.segmentWriteOutMedium = segmentWriteOutMedium; + // this is a matter of faith, nothing checks + this.isSorted = sorted; + this.scratch = ByteBuffer.allocate(Long.BYTES).order(ByteOrder.nativeOrder()); + } + + public void open() throws IOException + { + this.valuesOut = segmentWriteOutMedium.makeWriteOutBytes(); + } + + @Override + public long getSerializedSize() + { + return Byte.BYTES + Byte.BYTES + Integer.BYTES + valuesOut.size(); + } + + public void write(long objectToWrite) throws IOException + { + scratch.clear(); + scratch.putLong(objectToWrite); + scratch.flip(); + Channels.writeFully(valuesOut, scratch); + numWritten++; + } + + @Override + public void writeTo( + WritableByteChannel channel, + FileSmoosher smoosher + ) throws IOException + { + scratch.clear(); + // version 0 + scratch.put((byte) 0); + // no flags, this thing is never sorted + byte flags = 0x00; + if (isSorted) { + flags = (byte) (flags | FixedIndexed.IS_SORTED_MASK); + } + scratch.put(flags); + scratch.flip(); + Channels.writeFully(channel, scratch); + scratch.clear(); + scratch.putInt(numWritten); + scratch.flip(); + Channels.writeFully(channel, scratch); + valuesOut.writeTo(channel); + } + + public LongIterator getIterator() + { + final ByteBuffer iteratorBuffer = ByteBuffer.allocate(Long.BYTES * PAGE_SIZE).order(ByteOrder.nativeOrder()); + + return new LongIterator() + { + @Override + public long nextLong() + { + if (pos == 0 || iteratorBuffer.position() >= iteratorBuffer.limit()) { + readPage(); + } + final long value = iteratorBuffer.getLong(); + pos++; + return value; + } + + int pos = 0; + + @Override + public boolean hasNext() + { + return pos < numWritten; + } + + private void readPage() + { + iteratorBuffer.clear(); + try { + if (numWritten - pos < PAGE_SIZE) { + int size = (numWritten - pos) * Long.BYTES; + iteratorBuffer.limit(size); + valuesOut.readFully((long) pos * Long.BYTES, iteratorBuffer); + } else { + valuesOut.readFully((long) pos * Long.BYTES, iteratorBuffer); + } + iteratorBuffer.flip(); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + }; + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/data/FixedIndexedLongs.java b/processing/src/main/java/org/apache/druid/segment/data/FixedIndexedLongs.java new file mode 100644 index 000000000000..1bcbb72cb676 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/data/FixedIndexedLongs.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.data; + +import com.google.common.base.Preconditions; +import it.unimi.dsi.fastutil.longs.LongComparator; +import it.unimi.dsi.fastutil.longs.LongComparators; +import it.unimi.dsi.fastutil.longs.LongIterator; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.query.monomorphicprocessing.HotLoopCallee; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Iterator; + +/** + * Specialized implementation for {@link FixedIndexed} which does not contain any null values, allowing it to + * deal in java long value types instead of {@link Long} objects, and utilize specialized {@link ByteBuffer} methods + * to more efficiently read data. + */ +public final class FixedIndexedLongs implements Indexed, HotLoopCallee +{ + public static FixedIndexedLongs read(ByteBuffer bb, ByteOrder byteOrder) + { + final ByteBuffer buffer = bb.asReadOnlyBuffer().order(byteOrder); + final byte version = buffer.get(); + Preconditions.checkState(version == 0, "Unknown version [%s]", version); + final byte flags = buffer.get(); + final boolean hasNull = (flags & NullHandling.IS_NULL_BYTE) == NullHandling.IS_NULL_BYTE ? true : false; + final boolean isSorted = (flags & FixedIndexed.IS_SORTED_MASK) == FixedIndexed.IS_SORTED_MASK ? true : false; + Preconditions.checkState(!hasNull, "Cannot use FixedIndexedInts for FixedIndex with null values"); + Preconditions.checkState(!(hasNull && !isSorted), "cannot have null values if not sorted"); + final int size = buffer.getInt() + (hasNull ? 1 : 0); + final int valuesOffset = buffer.position(); + final FixedIndexedLongs fixedIndexed = new FixedIndexedLongs( + buffer, + isSorted, + size, + valuesOffset + ); + bb.position(buffer.position() + (Double.BYTES * size)); + return fixedIndexed; + } + + private final ByteBuffer buffer; + private final int size; + private final int valuesOffset; + private final boolean isSorted; + private final LongComparator comparator; + + private FixedIndexedLongs( + ByteBuffer buffer, + boolean isSorted, + int size, + int valuesOffset + ) + { + this.buffer = buffer; + this.size = size; + this.valuesOffset = valuesOffset; + this.isSorted = isSorted; + this.comparator = LongComparators.NATURAL_COMPARATOR; + } + + @Override + public int size() + { + return size; + } + + @Nullable + @Override + public Long get(int index) + { + return getLong(index); + } + + @Override + public int indexOf(@Nullable Long value) + { + if (value == null) { + return -1; + } + return indexOf(value.longValue()); + } + + public long getLong(int index) + { + return buffer.getLong(valuesOffset + (index * Long.BYTES)); + } + + public int indexOf(long value) + { + if (!isSorted) { + throw new UnsupportedOperationException("Reverse lookup not allowed."); + } + int minIndex = 0; + int maxIndex = size - 1; + while (minIndex <= maxIndex) { + int currIndex = (minIndex + maxIndex) >>> 1; + + long currValue = getLong(currIndex); + int comparison = comparator.compare(currValue, value); + if (comparison == 0) { + return currIndex; + } + + if (comparison < 0) { + minIndex = currIndex + 1; + } else { + maxIndex = currIndex - 1; + } + } + + return -(minIndex + 1); + } + + + public LongIterator longIterator() + { + final ByteBuffer copy = buffer.asReadOnlyBuffer().order(buffer.order()); + copy.position(valuesOffset); + copy.limit(valuesOffset + (size * Long.BYTES)); + return new LongIterator() + { + @Override + public long nextLong() + { + return copy.getLong(); + } + + @Override + public boolean hasNext() + { + return copy.hasRemaining(); + } + }; + } + + @Override + public Iterator iterator() + { + final LongIterator iterator = longIterator(); + return new Iterator() + { + @Override + public boolean hasNext() + { + return iterator.hasNext(); + } + + @Override + public Long next() + { + return iterator.nextLong(); + } + }; + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("buffer", buffer); + inspector.visit("comparator", comparator); + } + + @Override + public String toString() + { + StringBuilder sb = new StringBuilder("FixedIndexedLongs["); + if (size() > 0) { + for (int i = 0; i < size(); i++) { + long value = getLong(i); + sb.append(value).append(',').append(' '); + } + sb.setLength(sb.length() - 2); + } + sb.append(']'); + return sb.toString(); + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/data/FixedIndexedWriter.java b/processing/src/main/java/org/apache/druid/segment/data/FixedIndexedWriter.java index 9246ca78820d..892bbe6eed64 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/FixedIndexedWriter.java +++ b/processing/src/main/java/org/apache/druid/segment/data/FixedIndexedWriter.java @@ -183,12 +183,13 @@ private void readPage() { iteratorBuffer.clear(); try { - if (totalCount - pos < PAGE_SIZE) { - int size = (totalCount - pos) * width; + final int adjustedPos = pos - startPos; + if (totalCount - adjustedPos < PAGE_SIZE) { + int size = (totalCount - adjustedPos) * width; iteratorBuffer.limit(size); - valuesOut.readFully((long) pos * width, iteratorBuffer); + valuesOut.readFully((long) adjustedPos * width, iteratorBuffer); } else { - valuesOut.readFully((long) pos * width, iteratorBuffer); + valuesOut.readFully((long) adjustedPos * width, iteratorBuffer); } iteratorBuffer.flip(); } diff --git a/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java b/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java index c5b2c5de66f2..a60c2b52da3e 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java @@ -49,7 +49,9 @@ import org.apache.druid.segment.data.CompressedVSizeColumnarIntsSupplier; import org.apache.druid.segment.data.CompressedVariableSizedBlobColumn; import org.apache.druid.segment.data.CompressedVariableSizedBlobColumnSupplier; -import org.apache.druid.segment.data.FixedIndexed; +import org.apache.druid.segment.data.FixedIndexedDoubles; +import org.apache.druid.segment.data.FixedIndexedInts; +import org.apache.druid.segment.data.FixedIndexedLongs; import org.apache.druid.segment.data.GenericIndexed; import org.apache.druid.segment.data.ObjectStrategy; import org.apache.druid.segment.data.ReadableOffset; @@ -86,8 +88,8 @@ public final class CompressedNestedDataComplexColumn extends NestedDataComplexCo private final NestedLiteralTypeInfo fieldInfo; private final GenericIndexed stringDictionary; - private final FixedIndexed longDictionary; - private final FixedIndexed doubleDictionary; + private final FixedIndexedLongs longDictionary; + private final FixedIndexedDoubles doubleDictionary; private final SmooshedFileMapper fileMapper; private final ConcurrentHashMap columns = new ConcurrentHashMap<>(); @@ -102,8 +104,8 @@ public CompressedNestedDataComplexColumn( GenericIndexed fields, NestedLiteralTypeInfo fieldInfo, GenericIndexed stringDictionary, - FixedIndexed longDictionary, - FixedIndexed doubleDictionary, + FixedIndexedLongs longDictionary, + FixedIndexedDoubles doubleDictionary, SmooshedFileMapper fileMapper ) { @@ -134,12 +136,12 @@ public GenericIndexed getStringDictionary() return stringDictionary; } - public FixedIndexed getLongDictionary() + public FixedIndexedLongs getLongDictionary() { return longDictionary; } - public FixedIndexed getDoubleDictionary() + public FixedIndexedDoubles getDoubleDictionary() { return doubleDictionary; } @@ -404,11 +406,9 @@ private ColumnHolder readNestedFieldColumn(String field) ) ); - final FixedIndexed localDictionary = FixedIndexed.read( + final FixedIndexedInts localDictionary = FixedIndexedInts.read( dataBuffer, - NestedDataColumnSerializer.INT_TYPE_STRATEGY, - metadata.getByteOrder(), - Integer.BYTES + metadata.getByteOrder() ); ByteBuffer bb = dataBuffer.asReadOnlyBuffer().order(metadata.getByteOrder()); int longsLength = bb.getInt(); @@ -444,7 +444,7 @@ private ColumnHolder readNestedFieldColumn(String field) longDictionary, doubleDictionary, localDictionary, - localDictionary.get(0) == 0 + localDictionary.getInt(0) == 0 ? rBitmaps.get(0) : metadata.getBitmapSerdeFactory().getBitmapFactory().makeEmptyImmutableBitmap() )); diff --git a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java index 6d113bf717e7..6df8dcb7833d 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java @@ -52,8 +52,9 @@ import org.apache.druid.segment.data.CompressedVariableSizedBlobColumnSerializer; import org.apache.druid.segment.data.CompressionFactory; import org.apache.druid.segment.data.CompressionStrategy; +import org.apache.druid.segment.data.FixedIndexedDoubleWriter; import org.apache.druid.segment.data.FixedIndexedIntWriter; -import org.apache.druid.segment.data.FixedIndexedWriter; +import org.apache.druid.segment.data.FixedIndexedLongWriter; import org.apache.druid.segment.data.GenericIndexed; import org.apache.druid.segment.data.GenericIndexedWriter; import org.apache.druid.segment.data.ObjectStrategy; @@ -115,8 +116,8 @@ public int processLiteralField(String fieldName, Object fieldValue) private GenericIndexedWriter fieldsWriter; private NestedLiteralTypeInfo.Writer fieldsInfoWriter; private GenericIndexedWriter dictionaryWriter; - private FixedIndexedWriter longDictionaryWriter; - private FixedIndexedWriter doubleDictionaryWriter; + private FixedIndexedLongWriter longDictionaryWriter; + private FixedIndexedDoubleWriter doubleDictionaryWriter; private CompressedVariableSizedBlobColumnSerializer rawWriter; private ByteBufferWriter nullBitmapWriter; private MutableBitmap nullRowsBitmap; @@ -146,19 +147,13 @@ public void open() throws IOException fieldsInfoWriter = new NestedLiteralTypeInfo.Writer(segmentWriteOutMedium); fieldsInfoWriter.open(); dictionaryWriter = createGenericIndexedWriter(GenericIndexed.STRING_STRATEGY, segmentWriteOutMedium); - longDictionaryWriter = new FixedIndexedWriter<>( + longDictionaryWriter = new FixedIndexedLongWriter( segmentWriteOutMedium, - ColumnType.LONG.getStrategy(), - ByteOrder.nativeOrder(), - Long.BYTES, true ); longDictionaryWriter.open(); - doubleDictionaryWriter = new FixedIndexedWriter<>( + doubleDictionaryWriter = new FixedIndexedDoubleWriter( segmentWriteOutMedium, - ColumnType.DOUBLE.getStrategy(), - ByteOrder.nativeOrder(), - Double.BYTES, true ); doubleDictionaryWriter.open(); diff --git a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSupplier.java b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSupplier.java index bd73fd5c8891..c191ca4b1d06 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSupplier.java @@ -32,6 +32,8 @@ import org.apache.druid.segment.column.ComplexColumn; import org.apache.druid.segment.data.CompressedVariableSizedBlobColumnSupplier; import org.apache.druid.segment.data.FixedIndexed; +import org.apache.druid.segment.data.FixedIndexedDoubles; +import org.apache.druid.segment.data.FixedIndexedLongs; import org.apache.druid.segment.data.GenericIndexed; import java.io.IOException; @@ -45,8 +47,8 @@ public class NestedDataColumnSupplier implements Supplier private final GenericIndexed fields; private final NestedLiteralTypeInfo fieldInfo; private final GenericIndexed dictionary; - private final FixedIndexed longDictionary; - private final FixedIndexed doubleDictionary; + private final FixedIndexedLongs longDictionary; + private final FixedIndexedDoubles doubleDictionary; private final ColumnConfig columnConfig; private final SmooshedFileMapper fileMapper; @@ -79,21 +81,17 @@ public NestedDataColumnSupplier( mapper, NestedDataColumnSerializer.LONG_DICTIONARY_FILE_NAME ); - longDictionary = FixedIndexed.read( + longDictionary = FixedIndexedLongs.read( longDictionaryBuffer, - ColumnType.LONG.getStrategy(), - metadata.getByteOrder(), - Long.BYTES + metadata.getByteOrder() ); final ByteBuffer doubleDictionaryBuffer = loadInternalFile( mapper, NestedDataColumnSerializer.DOUBLE_DICTIONARY_FILE_NAME ); - doubleDictionary = FixedIndexed.read( + doubleDictionary = FixedIndexedDoubles.read( doubleDictionaryBuffer, - ColumnType.DOUBLE.getStrategy(), - metadata.getByteOrder(), - Double.BYTES + metadata.getByteOrder() ); final ByteBuffer rawBuffer = loadInternalFile(mapper, NestedDataColumnSerializer.RAW_FILE_NAME).asReadOnlyBuffer(); compressedRawColumnSupplier = CompressedVariableSizedBlobColumnSupplier.fromByteBuffer( diff --git a/processing/src/main/java/org/apache/druid/segment/nested/NestedFieldLiteralColumnIndexSupplier.java b/processing/src/main/java/org/apache/druid/segment/nested/NestedFieldLiteralColumnIndexSupplier.java index e67ac8ac528e..9faf5d3ccdd3 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/NestedFieldLiteralColumnIndexSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/NestedFieldLiteralColumnIndexSupplier.java @@ -53,7 +53,9 @@ import org.apache.druid.segment.column.SimpleImmutableBitmapIndex; import org.apache.druid.segment.column.SimpleImmutableBitmapIterableIndex; import org.apache.druid.segment.column.StringValueSetIndex; -import org.apache.druid.segment.data.FixedIndexed; +import org.apache.druid.segment.data.FixedIndexedDoubles; +import org.apache.druid.segment.data.FixedIndexedInts; +import org.apache.druid.segment.data.FixedIndexedLongs; import org.apache.druid.segment.data.GenericIndexed; import org.apache.druid.segment.data.Indexed; @@ -72,10 +74,10 @@ public class NestedFieldLiteralColumnIndexSupplier implements ColumnIndexSupplie private final ColumnType singleType; private final BitmapFactory bitmapFactory; private final GenericIndexed bitmaps; - private final FixedIndexed dictionary; + private final FixedIndexedInts dictionary; private final GenericIndexed globalDictionary; - private final FixedIndexed globalLongDictionary; - private final FixedIndexed globalDoubleDictionary; + private final FixedIndexedLongs globalLongDictionary; + private final FixedIndexedDoubles globalDoubleDictionary; private final int adjustLongId; private final int adjustDoubleId; @@ -84,10 +86,10 @@ public NestedFieldLiteralColumnIndexSupplier( NestedLiteralTypeInfo.TypeSet types, BitmapFactory bitmapFactory, GenericIndexed bitmaps, - FixedIndexed dictionary, + FixedIndexedInts dictionary, GenericIndexed globalDictionary, - FixedIndexed globalLongDictionary, - FixedIndexed globalDoubleDictionary + FixedIndexedLongs globalLongDictionary, + FixedIndexedDoubles globalDoubleDictionary ) { this.singleType = types.getSingleType(); @@ -393,7 +395,7 @@ public Iterable getBitmapIterable() private int findNext() { - while (currIndex < end && !matcher.apply(globalDictionary.get(dictionary.get(currIndex)))) { + while (currIndex < end && !matcher.apply(globalDictionary.get(dictionary.getInt(currIndex)))) { currIndex++; } @@ -444,7 +446,7 @@ public Iterable getBitmapIterable() final Predicate stringPredicate = matcherFactory.makeStringPredicate(); // in the future, this could use an int iterator - final Iterator iterator = dictionary.iterator(); + final Iterator iterator = dictionary.intIterator(); int next; int index = 0; boolean nextSet = false; @@ -502,7 +504,11 @@ public double estimateSelectivity(int totalRows) if (longValue == null) { return (double) getBitmap(dictionary.indexOf(0)).size() / totalRows; } - return (double) getBitmap(dictionary.indexOf(globalLongDictionary.indexOf(longValue) + adjustLongId)).size() / totalRows; + return (double) getBitmap( + dictionary.indexOf( + globalLongDictionary.indexOf(longValue.longValue()) + adjustLongId + ) + ).size() / totalRows; } @Override @@ -511,7 +517,9 @@ public T computeBitmapResult(BitmapResultFactory bitmapResultFactory) if (longValue == null) { return bitmapResultFactory.wrapDimensionValue(getBitmap(dictionary.indexOf(0))); } - return bitmapResultFactory.wrapDimensionValue(getBitmap(dictionary.indexOf(globalLongDictionary.indexOf(longValue) + adjustLongId))); + return bitmapResultFactory.wrapDimensionValue( + getBitmap(dictionary.indexOf(globalLongDictionary.indexOf(longValue.longValue()) + adjustLongId)) + ); } }; } @@ -622,7 +630,7 @@ public Iterable getBitmapIterable() final DruidLongPredicate longPredicate = matcherFactory.makeLongPredicate(); // in the future, this could use an int iterator - final Iterator iterator = dictionary.iterator(); + final Iterator iterator = dictionary.intIterator(); int next; int index = 0; boolean nextSet = false; @@ -657,7 +665,7 @@ private void findNext() if (nextValue == 0) { nextSet = longPredicate.applyNull(); } else { - nextSet = longPredicate.applyLong(globalLongDictionary.get(nextValue - adjustLongId)); + nextSet = longPredicate.applyLong(globalLongDictionary.getLong(nextValue - adjustLongId)); } if (nextSet) { next = index; @@ -685,7 +693,9 @@ public double estimateSelectivity(int totalRows) if (doubleValue == null) { return (double) getBitmap(dictionary.indexOf(0)).size() / totalRows; } - return (double) getBitmap(dictionary.indexOf(globalDoubleDictionary.indexOf(doubleValue) + adjustDoubleId)).size() / totalRows; + return (double) getBitmap( + dictionary.indexOf(globalDoubleDictionary.indexOf(doubleValue.doubleValue()) + adjustDoubleId) + ).size() / totalRows; } @Override @@ -694,7 +704,11 @@ public T computeBitmapResult(BitmapResultFactory bitmapResultFactory) if (doubleValue == null) { return bitmapResultFactory.wrapDimensionValue(getBitmap(dictionary.indexOf(0))); } - return bitmapResultFactory.wrapDimensionValue(getBitmap(dictionary.indexOf(globalDoubleDictionary.indexOf(doubleValue) + adjustDoubleId))); + return bitmapResultFactory.wrapDimensionValue( + getBitmap( + dictionary.indexOf(globalDoubleDictionary.indexOf(doubleValue) + adjustDoubleId) + ) + ); } }; } @@ -805,7 +819,7 @@ public Iterable getBitmapIterable() final DruidDoublePredicate doublePredicate = matcherFactory.makeDoublePredicate(); // in the future, this could use an int iterator - final Iterator iterator = dictionary.iterator(); + final Iterator iterator = dictionary.intIterator(); int next; int index = 0; boolean nextSet = false; @@ -839,7 +853,7 @@ private void findNext() if (nextValue == 0) { nextSet = doublePredicate.applyNull(); } else { - nextSet = doublePredicate.applyDouble(globalDoubleDictionary.get(nextValue - adjustDoubleId)); + nextSet = doublePredicate.applyDouble(globalDoubleDictionary.getDouble(nextValue - adjustDoubleId)); } if (nextSet) { next = index; @@ -871,7 +885,7 @@ IntList getIndexes(@Nullable String value) } Long someLong = GuavaUtils.tryParseLong(value); if (someLong != null) { - globalId = globalLongDictionary.indexOf(someLong); + globalId = globalLongDictionary.indexOf(someLong.longValue()); localId = dictionary.indexOf(globalId + adjustLongId); if (localId >= 0) { intList.add(localId); @@ -880,7 +894,7 @@ IntList getIndexes(@Nullable String value) Double someDouble = Doubles.tryParse(value); if (someDouble != null) { - globalId = globalDoubleDictionary.indexOf(someDouble); + globalId = globalDoubleDictionary.indexOf(someDouble.doubleValue()); localId = dictionary.indexOf(globalId + adjustDoubleId); if (localId >= 0) { intList.add(localId); @@ -989,7 +1003,7 @@ public Iterable getBitmapIterable() final DruidDoublePredicate doublePredicate = matcherFactory.makeDoublePredicate(); // in the future, this could use an int iterator - final Iterator iterator = dictionary.iterator(); + final Iterator iterator = dictionary.intIterator(); int next; int index; boolean nextSet = false; @@ -1021,9 +1035,9 @@ private void findNext() while (!nextSet && iterator.hasNext()) { Integer nextValue = iterator.next(); if (nextValue >= adjustDoubleId) { - nextSet = doublePredicate.applyDouble(globalDoubleDictionary.get(nextValue - adjustDoubleId)); + nextSet = doublePredicate.applyDouble(globalDoubleDictionary.getDouble(nextValue - adjustDoubleId)); } else if (nextValue >= adjustLongId) { - nextSet = longPredicate.applyLong(globalLongDictionary.get(nextValue - adjustLongId)); + nextSet = longPredicate.applyLong(globalLongDictionary.getLong(nextValue - adjustLongId)); } else { nextSet = stringPredicate.apply(globalDictionary.get(nextValue)); } diff --git a/processing/src/main/java/org/apache/druid/segment/nested/NestedFieldLiteralDictionaryEncodedColumn.java b/processing/src/main/java/org/apache/druid/segment/nested/NestedFieldLiteralDictionaryEncodedColumn.java index b00ca96e8fff..8418e209c172 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/NestedFieldLiteralDictionaryEncodedColumn.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/NestedFieldLiteralDictionaryEncodedColumn.java @@ -44,7 +44,9 @@ import org.apache.druid.segment.data.ColumnarDoubles; import org.apache.druid.segment.data.ColumnarInts; import org.apache.druid.segment.data.ColumnarLongs; -import org.apache.druid.segment.data.FixedIndexed; +import org.apache.druid.segment.data.FixedIndexedDoubles; +import org.apache.druid.segment.data.FixedIndexedInts; +import org.apache.druid.segment.data.FixedIndexedLongs; import org.apache.druid.segment.data.GenericIndexed; import org.apache.druid.segment.data.IndexedInts; import org.apache.druid.segment.data.ReadableOffset; @@ -76,9 +78,9 @@ public class NestedFieldLiteralDictionaryEncodedColumn implements DictionaryEnco private final ColumnarDoubles doublesColumn; private final ColumnarInts column; private final GenericIndexed globalDictionary; - private final FixedIndexed globalLongDictionary; - private final FixedIndexed globalDoubleDictionary; - private final FixedIndexed dictionary; + private final FixedIndexedLongs globalLongDictionary; + private final FixedIndexedDoubles globalDoubleDictionary; + private final FixedIndexedInts dictionary; private final ImmutableBitmap nullBitmap; private final int adjustLongId; @@ -90,9 +92,9 @@ public NestedFieldLiteralDictionaryEncodedColumn( ColumnarDoubles doublesColumn, ColumnarInts column, GenericIndexed globalDictionary, - FixedIndexed globalLongDictionary, - FixedIndexed globalDoubleDictionary, - FixedIndexed dictionary, + FixedIndexedLongs globalLongDictionary, + FixedIndexedDoubles globalDoubleDictionary, + FixedIndexedInts dictionary, ImmutableBitmap nullBitmap ) { @@ -138,14 +140,15 @@ public IndexedInts getMultiValueRow(int rowNum) @Override public String lookupName(int id) { - final int globalId = dictionary.get(id); + final int globalId = dictionary.getInt(id); if (globalId < globalDictionary.size()) { return globalDictionary.get(globalId); } else if (globalId < adjustLongId + globalLongDictionary.size()) { - return String.valueOf(globalLongDictionary.get(globalId - adjustLongId)); - } else { - return String.valueOf(globalDoubleDictionary.get(globalId - adjustDoubleId)); + return String.valueOf(globalLongDictionary.getLong(globalId - adjustLongId)); + } else if (globalId < adjustDoubleId + globalDoubleDictionary.size()){ + return String.valueOf(globalDoubleDictionary.getDouble(globalId - adjustDoubleId)); } + return null; } @Override @@ -169,9 +172,11 @@ private int getIdFromGlobalDictionary(@Nullable String val) if (singleType != null) { switch (singleType.getType()) { case LONG: - return globalLongDictionary.indexOf(GuavaUtils.tryParseLong(val)); + Long longValue = GuavaUtils.tryParseLong(val); + return longValue == null ? -1 : globalLongDictionary.indexOf(longValue.longValue()); case DOUBLE: - return globalDoubleDictionary.indexOf(Doubles.tryParse(val)); + final Double doubleValue = Doubles.tryParse(val); + return doubleValue == null ? -1 : globalDoubleDictionary.indexOf(doubleValue); default: return globalDictionary.indexOf(val); } @@ -215,7 +220,7 @@ public int getRowValue() public float getFloat() { final int localId = getRowValue(); - final int globalId = dictionary.get(localId); + final int globalId = dictionary.getInt(localId); if (globalId == 0) { // zero assert NullHandling.replaceWithDefault(); @@ -225,9 +230,9 @@ public float getFloat() Float f = Floats.tryParse(globalDictionary.get(globalId)); return f == null ? 0f : f; } else if (globalId < adjustDoubleId) { - return globalLongDictionary.get(globalId - adjustLongId).floatValue(); + return (float) globalLongDictionary.getLong(globalId - adjustLongId); } else { - return globalDoubleDictionary.get(globalId - adjustDoubleId).floatValue(); + return (float) globalDoubleDictionary.getDouble(globalId - adjustDoubleId); } } @@ -235,7 +240,7 @@ public float getFloat() public double getDouble() { final int localId = getRowValue(); - final int globalId = dictionary.get(localId); + final int globalId = dictionary.getInt(localId); if (globalId == 0) { // zero assert NullHandling.replaceWithDefault(); @@ -245,9 +250,9 @@ public double getDouble() Double d = Doubles.tryParse(globalDictionary.get(globalId)); return d == null ? 0.0 : d; } else if (globalId < adjustDoubleId) { - return globalLongDictionary.get(globalId - adjustLongId).doubleValue(); + return (double) globalLongDictionary.getLong(globalId - adjustLongId); } else { - return globalDoubleDictionary.get(globalId - adjustDoubleId); + return globalDoubleDictionary.getDouble(globalId - adjustDoubleId); } } @@ -255,7 +260,7 @@ public double getDouble() public long getLong() { final int localId = getRowValue(); - final int globalId = dictionary.get(localId); + final int globalId = dictionary.getInt(localId); if (globalId == 0) { // zero assert NullHandling.replaceWithDefault(); @@ -265,16 +270,16 @@ public long getLong() Long l = GuavaUtils.tryParseLong(globalDictionary.get(globalId)); return l == null ? 0L : l; } else if (globalId < adjustDoubleId) { - return globalLongDictionary.get(globalId - adjustLongId); + return globalLongDictionary.getLong(globalId - adjustLongId); } else { - return globalDoubleDictionary.get(globalId - adjustDoubleId).longValue(); + return (long) globalDoubleDictionary.getDouble(globalId - adjustDoubleId); } } @Override public boolean isNull() { - return dictionary.get(getRowValue()) == 0; + return dictionary.getInt(getRowValue()) == 0; } @Override diff --git a/processing/src/test/java/org/apache/druid/segment/data/FixedIndexedTest.java b/processing/src/test/java/org/apache/druid/segment/data/FixedIndexedTest.java index dcfc5c058e1c..601a99486423 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/FixedIndexedTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/FixedIndexedTest.java @@ -20,7 +20,13 @@ package org.apache.druid.segment.data; import com.google.common.collect.ImmutableList; +import it.unimi.dsi.fastutil.doubles.DoubleIterator; +import it.unimi.dsi.fastutil.ints.IntIterator; +import it.unimi.dsi.fastutil.longs.LongIterator; import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.TypeStrategies; +import org.apache.druid.segment.nested.NestedDataColumnSerializer; +import org.apache.druid.segment.serde.Serializer; import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMedium; import org.apache.druid.testing.InitializedNullHandlingTest; import org.junit.Assert; @@ -29,6 +35,7 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import javax.annotation.Nonnull; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -40,7 +47,9 @@ @RunWith(Parameterized.class) public class FixedIndexedTest extends InitializedNullHandlingTest { - private static final Long[] LONGS = new Long[64]; + private static final long[] LONGS = new long[1 << 16]; + private static final int[] INTS = new int[1 << 16]; + private static final double[] DOUBLES = new double[1 << 16]; @Parameterized.Parameters(name = "{0}") public static Collection constructorFeeder() @@ -52,7 +61,9 @@ public static Collection constructorFeeder() public static void setup() { for (int i = 0; i < LONGS.length; i++) { - LONGS[i] = i * 10L; + LONGS[i] = i * 2L; + INTS[i] = i + 1; + DOUBLES[i] = i * 1.3; } } @@ -66,12 +77,12 @@ public FixedIndexedTest(ByteOrder byteOrder) @Test public void testGet() throws IOException { - ByteBuffer buffer = ByteBuffer.allocate(1 << 14); + ByteBuffer buffer = ByteBuffer.allocate(1 << 20); fillBuffer(buffer, order, false); FixedIndexed fixedIndexed = FixedIndexed.read(buffer, ColumnType.LONG.getStrategy(), order, Long.BYTES); - Assert.assertEquals(64, fixedIndexed.size()); + Assert.assertEquals(LONGS.length, fixedIndexed.size()); for (int i = 0; i < LONGS.length; i++) { - Assert.assertEquals(LONGS[i], fixedIndexed.get(i)); + Assert.assertEquals(LONGS[i], (long) fixedIndexed.get(i)); Assert.assertEquals(i, fixedIndexed.indexOf(LONGS[i])); } } @@ -79,26 +90,26 @@ public void testGet() throws IOException @Test public void testIterator() throws IOException { - ByteBuffer buffer = ByteBuffer.allocate(1 << 14); + ByteBuffer buffer = ByteBuffer.allocate(1 << 20); fillBuffer(buffer, order, false); FixedIndexed fixedIndexed = FixedIndexed.read(buffer, ColumnType.LONG.getStrategy(), order, Long.BYTES); Iterator iterator = fixedIndexed.iterator(); int i = 0; while (iterator.hasNext()) { - Assert.assertEquals(LONGS[i++], iterator.next()); + Assert.assertEquals(LONGS[i++], (long) iterator.next()); } } @Test public void testGetWithNull() throws IOException { - ByteBuffer buffer = ByteBuffer.allocate(1 << 14); + ByteBuffer buffer = ByteBuffer.allocate(1 << 20); fillBuffer(buffer, order, true); FixedIndexed fixedIndexed = FixedIndexed.read(buffer, ColumnType.LONG.getStrategy(), order, Long.BYTES); - Assert.assertEquals(65, fixedIndexed.size()); + Assert.assertEquals(LONGS.length + 1, fixedIndexed.size()); Assert.assertNull(fixedIndexed.get(0)); for (int i = 0; i < LONGS.length; i++) { - Assert.assertEquals(LONGS[i], fixedIndexed.get(i + 1)); + Assert.assertEquals(LONGS[i], (long) fixedIndexed.get(i + 1)); Assert.assertEquals(i + 1, fixedIndexed.indexOf(LONGS[i])); } } @@ -106,17 +117,95 @@ public void testGetWithNull() throws IOException @Test public void testIteratorWithNull() throws IOException { - ByteBuffer buffer = ByteBuffer.allocate(1 << 14); + ByteBuffer buffer = ByteBuffer.allocate(1 << 20); fillBuffer(buffer, order, true); FixedIndexed fixedIndexed = FixedIndexed.read(buffer, ColumnType.LONG.getStrategy(), order, Long.BYTES); Iterator iterator = fixedIndexed.iterator(); Assert.assertNull(iterator.next()); int i = 0; while (iterator.hasNext()) { - Assert.assertEquals(LONGS[i++], iterator.next()); + Assert.assertEquals(LONGS[i++], (long) iterator.next()); } } + @Test + public void testSpecializedInts() throws IOException + { + ByteBuffer buffer = ByteBuffer.allocate(1 << 20); + ByteBuffer buffer2 = ByteBuffer.allocate(1 << 20); + fillIntBuffers(buffer, buffer2, order); + FixedIndexed fixedIndexed = FixedIndexed.read(buffer, NestedDataColumnSerializer.INT_TYPE_STRATEGY, order, Integer.BYTES); + FixedIndexedInts specializedIndexed = FixedIndexedInts.read(buffer2, order); + Iterator iterator = fixedIndexed.iterator(); + IntIterator intIterator = specializedIndexed.intIterator(); + int i = 0; + while (iterator.hasNext()) { + int next = iterator.next(); + int nextInt = intIterator.nextInt(); + final String msg = "row : " + i; + Assert.assertEquals(msg, INTS[i], next); + Assert.assertEquals(msg, INTS[i], nextInt); + Assert.assertEquals(msg, next, (int) fixedIndexed.get(i)); + Assert.assertEquals(msg, nextInt, specializedIndexed.getInt(i)); + Assert.assertEquals(msg, i, fixedIndexed.indexOf(next)); + Assert.assertEquals(msg, i, specializedIndexed.indexOf(nextInt)); + i++; + } + Assert.assertFalse(intIterator.hasNext()); + } + + @Test + public void testSpecializedLongs() throws IOException + { + ByteBuffer buffer = ByteBuffer.allocate(1 << 20); + ByteBuffer buffer2 = ByteBuffer.allocate(1 << 20); + fillLongBuffers(buffer, buffer2, order); + FixedIndexed fixedIndexed = FixedIndexed.read(buffer, TypeStrategies.LONG, order, Long.BYTES); + FixedIndexedLongs specializedIndexed = FixedIndexedLongs.read(buffer2, order); + Iterator iterator = fixedIndexed.iterator(); + LongIterator intIterator = specializedIndexed.longIterator(); + int i = 0; + while (iterator.hasNext()) { + long next = iterator.next(); + long nextLong = intIterator.nextLong(); + final String msg = "row : " + i; + Assert.assertEquals(msg, LONGS[i], next); + Assert.assertEquals(msg, LONGS[i], nextLong); + Assert.assertEquals(msg, next, (long) fixedIndexed.get(i)); + Assert.assertEquals(msg, nextLong, specializedIndexed.getLong(i)); + Assert.assertEquals(msg, i, fixedIndexed.indexOf(next)); + Assert.assertEquals(msg, i, specializedIndexed.indexOf(nextLong)); + i++; + } + Assert.assertFalse(intIterator.hasNext()); + } + + @Test + public void testSpecializedDoubles() throws IOException + { + ByteBuffer buffer = ByteBuffer.allocate(1 << 20); + ByteBuffer buffer2 = ByteBuffer.allocate(1 << 20); + fillDoubleBuffers(buffer, buffer2, order); + FixedIndexed fixedIndexed = FixedIndexed.read(buffer, TypeStrategies.DOUBLE, order, Double.BYTES); + FixedIndexedDoubles specializedIndexed = FixedIndexedDoubles.read(buffer2, order); + Iterator iterator = fixedIndexed.iterator(); + DoubleIterator intIterator = specializedIndexed.doubleIterator(); + int i = 0; + while (iterator.hasNext()) { + double next = iterator.next(); + double nextDouble = intIterator.nextDouble(); + final String msg = "row : " + i; + Assert.assertEquals(msg, DOUBLES[i], next, 0.0); + Assert.assertEquals(msg, DOUBLES[i], nextDouble, 0.0); + Assert.assertEquals(msg, next, fixedIndexed.get(i), 0.0); + Assert.assertEquals(msg, nextDouble, specializedIndexed.getDouble(i), 0.0); + Assert.assertEquals(msg, i, fixedIndexed.indexOf(next)); + Assert.assertEquals(msg, i, specializedIndexed.indexOf(nextDouble)); + i++; + } + Assert.assertFalse(intIterator.hasNext()); + } + private static void fillBuffer(ByteBuffer buffer, ByteOrder order, boolean withNull) throws IOException { buffer.position(0); @@ -134,6 +223,212 @@ private static void fillBuffer(ByteBuffer buffer, ByteOrder order, boolean withN for (Long aLong : LONGS) { writer.write(aLong); } + WritableByteChannel channel = makeChannelForBuffer(buffer); + long size = writer.getSerializedSize(); + Iterator validationIterator = writer.getIterator(); + int i = 0; + boolean processFirstNull = withNull; + while (validationIterator.hasNext()) { + if (processFirstNull) { + Assert.assertNull(validationIterator.next()); + processFirstNull = false; + } else { + Assert.assertEquals(LONGS[i++], (long) validationIterator.next()); + } + } + buffer.position(0); + writer.writeTo(channel, null); + Assert.assertEquals(size, buffer.position()); + buffer.position(0); + } + + private static void fillIntBuffers(ByteBuffer buffer, ByteBuffer specialized, ByteOrder order) throws IOException + { + buffer.position(0); + Serializer genericWriter, specializedWriter; + FixedIndexedWriter writer = new FixedIndexedWriter<>( + new OnHeapMemorySegmentWriteOutMedium(), + NestedDataColumnSerializer.INT_TYPE_STRATEGY, + order, + Integer.BYTES, + true + ); + writer.open(); + genericWriter = writer; + + if (order == ByteOrder.nativeOrder()) { + FixedIndexedIntWriter intWriter = new FixedIndexedIntWriter( + new OnHeapMemorySegmentWriteOutMedium(), + true + ); + specializedWriter = intWriter; + intWriter.open(); + for (Integer val : INTS) { + writer.write(val); + intWriter.write(val); + } + + IntIterator validationIterator = intWriter.getIterator(); + int i = 0; + while (validationIterator.hasNext()) { + Assert.assertEquals("row : " + i, INTS[i++], validationIterator.nextInt()); + } + } else { + FixedIndexedWriter fallbackWriter = new FixedIndexedWriter<>( + new OnHeapMemorySegmentWriteOutMedium(), + NestedDataColumnSerializer.INT_TYPE_STRATEGY, + order, + Integer.BYTES, + true + ); + fallbackWriter.open(); + specializedWriter = fallbackWriter; + for (Integer val : INTS) { + writer.write(val); + fallbackWriter.write(val); + } + } + WritableByteChannel channel = makeChannelForBuffer(buffer); + long size = genericWriter.getSerializedSize(); + buffer.position(0); + writer.writeTo(channel, null); + Assert.assertEquals(size, buffer.position()); + buffer.position(0); + + WritableByteChannel specializedChannel = makeChannelForBuffer(specialized); + long sizeSpecialized = specializedWriter.getSerializedSize(); + Assert.assertEquals(size, sizeSpecialized); + specialized.position(0); + specializedWriter.writeTo(specializedChannel, null); + Assert.assertEquals(sizeSpecialized, specialized.position()); + specialized.position(0); + } + + private static void fillLongBuffers(ByteBuffer buffer, ByteBuffer specialized, ByteOrder order) throws IOException + { + buffer.position(0); + Serializer genericWriter, specializedWriter; + FixedIndexedWriter writer = new FixedIndexedWriter<>( + new OnHeapMemorySegmentWriteOutMedium(), + TypeStrategies.LONG, + order, + Long.BYTES, + true + ); + writer.open(); + genericWriter = writer; + + if (order == ByteOrder.nativeOrder()) { + FixedIndexedLongWriter longWriter = new FixedIndexedLongWriter( + new OnHeapMemorySegmentWriteOutMedium(), + true + ); + specializedWriter = longWriter; + longWriter.open(); + for (Long val : LONGS) { + writer.write(val); + longWriter.write(val); + } + LongIterator validationIterator = longWriter.getIterator(); + int i = 0; + while (validationIterator.hasNext()) { + Assert.assertEquals("row : " + i, LONGS[i++], validationIterator.nextLong()); + } + } else { + FixedIndexedWriter fallbackWriter = new FixedIndexedWriter<>( + new OnHeapMemorySegmentWriteOutMedium(), + TypeStrategies.LONG, + order, + Long.BYTES, + true + ); + fallbackWriter.open(); + specializedWriter = fallbackWriter; + for (Long val : LONGS) { + writer.write(val); + fallbackWriter.write(val); + } + } + WritableByteChannel channel = makeChannelForBuffer(buffer); + long size = genericWriter.getSerializedSize(); + buffer.position(0); + writer.writeTo(channel, null); + Assert.assertEquals(size, buffer.position()); + buffer.position(0); + + WritableByteChannel specializedChannel = makeChannelForBuffer(specialized); + long sizeSpecialized = specializedWriter.getSerializedSize(); + Assert.assertEquals(size, sizeSpecialized); + specialized.position(0); + specializedWriter.writeTo(specializedChannel, null); + Assert.assertEquals(sizeSpecialized, specialized.position()); + specialized.position(0); + } + + private static void fillDoubleBuffers(ByteBuffer buffer, ByteBuffer specialized, ByteOrder order) throws IOException + { + buffer.position(0); + Serializer genericWriter, specializedWriter; + FixedIndexedWriter writer = new FixedIndexedWriter<>( + new OnHeapMemorySegmentWriteOutMedium(), + TypeStrategies.DOUBLE, + order, + Double.BYTES, + true + ); + writer.open(); + genericWriter = writer; + + if (order == ByteOrder.nativeOrder()) { + FixedIndexedDoubleWriter doubleWriter = new FixedIndexedDoubleWriter( + new OnHeapMemorySegmentWriteOutMedium(), + true + ); + specializedWriter = doubleWriter; + doubleWriter.open(); + for (Double val : DOUBLES) { + writer.write(val); + doubleWriter.write(val); + } + DoubleIterator validationIterator = doubleWriter.getIterator(); + int i = 0; + while (validationIterator.hasNext()) { + Assert.assertEquals("row : " + i, DOUBLES[i++], validationIterator.nextDouble(), 0.0); + } + } else { + FixedIndexedWriter fallbackWriter = new FixedIndexedWriter<>( + new OnHeapMemorySegmentWriteOutMedium(), + TypeStrategies.DOUBLE, + order, + Double.BYTES, + true + ); + fallbackWriter.open(); + specializedWriter = fallbackWriter; + for (Double val : DOUBLES) { + writer.write(val); + fallbackWriter.write(val); + } + } + WritableByteChannel channel = makeChannelForBuffer(buffer); + long size = genericWriter.getSerializedSize(); + buffer.position(0); + writer.writeTo(channel, null); + Assert.assertEquals(size, buffer.position()); + buffer.position(0); + + WritableByteChannel specializedChannel = makeChannelForBuffer(specialized); + long sizeSpecialized = specializedWriter.getSerializedSize(); + Assert.assertEquals(size, sizeSpecialized); + specialized.position(0); + specializedWriter.writeTo(specializedChannel, null); + Assert.assertEquals(sizeSpecialized, specialized.position()); + specialized.position(0); + } + + @Nonnull + private static WritableByteChannel makeChannelForBuffer(ByteBuffer buffer) + { WritableByteChannel channel = new WritableByteChannel() { @Override @@ -155,10 +450,6 @@ public void close() { } }; - long size = writer.getSerializedSize(); - buffer.position(0); - writer.writeTo(channel, null); - Assert.assertEquals(size, buffer.position()); - buffer.position(0); + return channel; } } diff --git a/processing/src/test/java/org/apache/druid/segment/nested/NestedFieldLiteralColumnIndexSupplierTest.java b/processing/src/test/java/org/apache/druid/segment/nested/NestedFieldLiteralColumnIndexSupplierTest.java index 13a451428dac..567e0226ea5e 100644 --- a/processing/src/test/java/org/apache/druid/segment/nested/NestedFieldLiteralColumnIndexSupplierTest.java +++ b/processing/src/test/java/org/apache/druid/segment/nested/NestedFieldLiteralColumnIndexSupplierTest.java @@ -33,10 +33,13 @@ import org.apache.druid.segment.column.NullValueIndex; import org.apache.druid.segment.column.NumericRangeIndex; import org.apache.druid.segment.column.StringValueSetIndex; -import org.apache.druid.segment.column.TypeStrategies; import org.apache.druid.segment.data.BitmapSerdeFactory; -import org.apache.druid.segment.data.FixedIndexed; -import org.apache.druid.segment.data.FixedIndexedWriter; +import org.apache.druid.segment.data.FixedIndexedDoubleWriter; +import org.apache.druid.segment.data.FixedIndexedDoubles; +import org.apache.druid.segment.data.FixedIndexedIntWriter; +import org.apache.druid.segment.data.FixedIndexedInts; +import org.apache.druid.segment.data.FixedIndexedLongWriter; +import org.apache.druid.segment.data.FixedIndexedLongs; import org.apache.druid.segment.data.GenericIndexed; import org.apache.druid.segment.data.GenericIndexedWriter; import org.apache.druid.segment.data.RoaringBitmapSerdeFactory; @@ -61,8 +64,8 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa roaringFactory.getBitmapFactory() ); GenericIndexed globalStrings; - FixedIndexed globalLongs; - FixedIndexed globalDoubles; + FixedIndexedLongs globalLongs; + FixedIndexedDoubles globalDoubles; @Before public void setup() throws IOException @@ -86,11 +89,8 @@ public void setup() throws IOException stringWriter.write("z"); writeToBuffer(stringBuffer, stringWriter); - FixedIndexedWriter longWriter = new FixedIndexedWriter<>( + FixedIndexedLongWriter longWriter = new FixedIndexedLongWriter( new OnHeapMemorySegmentWriteOutMedium(), - TypeStrategies.LONG, - ByteOrder.nativeOrder(), - Long.BYTES, true ); longWriter.open(); @@ -103,11 +103,8 @@ public void setup() throws IOException longWriter.write(9000L); writeToBuffer(longBuffer, longWriter); - FixedIndexedWriter doubleWriter = new FixedIndexedWriter<>( + FixedIndexedDoubleWriter doubleWriter = new FixedIndexedDoubleWriter( new OnHeapMemorySegmentWriteOutMedium(), - TypeStrategies.DOUBLE, - ByteOrder.nativeOrder(), - Double.BYTES, true ); doubleWriter.open(); @@ -122,8 +119,8 @@ public void setup() throws IOException writeToBuffer(doubleBuffer, doubleWriter); globalStrings = GenericIndexed.read(stringBuffer, GenericIndexed.STRING_STRATEGY); - globalLongs = FixedIndexed.read(longBuffer, TypeStrategies.LONG, ByteOrder.nativeOrder(), Long.BYTES); - globalDoubles = FixedIndexed.read(doubleBuffer, TypeStrategies.DOUBLE, ByteOrder.nativeOrder(), Double.BYTES); + globalLongs = FixedIndexedLongs.read(longBuffer, ByteOrder.nativeOrder()); + globalDoubles = FixedIndexedDoubles.read(doubleBuffer, ByteOrder.nativeOrder()); } @@ -934,11 +931,8 @@ private NestedFieldLiteralColumnIndexSupplier makeSingleTypeStringSupplier() thr ByteBuffer localDictionaryBuffer = ByteBuffer.allocate(1 << 12).order(ByteOrder.nativeOrder()); ByteBuffer bitmapsBuffer = ByteBuffer.allocate(1 << 12); - FixedIndexedWriter localDictionaryWriter = new FixedIndexedWriter<>( + FixedIndexedIntWriter localDictionaryWriter = new FixedIndexedIntWriter( new OnHeapMemorySegmentWriteOutMedium(), - NestedDataColumnSerializer.INT_TYPE_STRATEGY, - ByteOrder.nativeOrder(), - Integer.BYTES, true ); localDictionaryWriter.open(); @@ -978,11 +972,9 @@ private NestedFieldLiteralColumnIndexSupplier makeSingleTypeStringSupplier() thr writeToBuffer(localDictionaryBuffer, localDictionaryWriter); writeToBuffer(bitmapsBuffer, bitmapWriter); - FixedIndexed dictionary = FixedIndexed.read( + FixedIndexedInts dictionary = FixedIndexedInts.read( localDictionaryBuffer, - NestedDataColumnSerializer.INT_TYPE_STRATEGY, - ByteOrder.nativeOrder(), - Integer.BYTES + ByteOrder.nativeOrder() ); GenericIndexed bitmaps = GenericIndexed.read(bitmapsBuffer, roaringFactory.getObjectStrategy()); @@ -1005,11 +997,8 @@ private NestedFieldLiteralColumnIndexSupplier makeSingleTypeStringWithNullsSuppl ByteBuffer localDictionaryBuffer = ByteBuffer.allocate(1 << 12).order(ByteOrder.nativeOrder()); ByteBuffer bitmapsBuffer = ByteBuffer.allocate(1 << 12); - FixedIndexedWriter localDictionaryWriter = new FixedIndexedWriter<>( + FixedIndexedIntWriter localDictionaryWriter = new FixedIndexedIntWriter( new OnHeapMemorySegmentWriteOutMedium(), - NestedDataColumnSerializer.INT_TYPE_STRATEGY, - ByteOrder.nativeOrder(), - Integer.BYTES, true ); localDictionaryWriter.open(); @@ -1052,11 +1041,9 @@ private NestedFieldLiteralColumnIndexSupplier makeSingleTypeStringWithNullsSuppl writeToBuffer(localDictionaryBuffer, localDictionaryWriter); writeToBuffer(bitmapsBuffer, bitmapWriter); - FixedIndexed dictionary = FixedIndexed.read( + FixedIndexedInts dictionary = FixedIndexedInts.read( localDictionaryBuffer, - NestedDataColumnSerializer.INT_TYPE_STRATEGY, - ByteOrder.nativeOrder(), - Integer.BYTES + ByteOrder.nativeOrder() ); GenericIndexed bitmaps = GenericIndexed.read(bitmapsBuffer, roaringFactory.getObjectStrategy()); @@ -1079,11 +1066,8 @@ private NestedFieldLiteralColumnIndexSupplier makeSingleTypeLongSupplier() throw ByteBuffer localDictionaryBuffer = ByteBuffer.allocate(1 << 12).order(ByteOrder.nativeOrder()); ByteBuffer bitmapsBuffer = ByteBuffer.allocate(1 << 12); - FixedIndexedWriter localDictionaryWriter = new FixedIndexedWriter<>( + FixedIndexedIntWriter localDictionaryWriter = new FixedIndexedIntWriter( new OnHeapMemorySegmentWriteOutMedium(), - NestedDataColumnSerializer.INT_TYPE_STRATEGY, - ByteOrder.nativeOrder(), - Integer.BYTES, true ); localDictionaryWriter.open(); @@ -1123,11 +1107,9 @@ private NestedFieldLiteralColumnIndexSupplier makeSingleTypeLongSupplier() throw writeToBuffer(localDictionaryBuffer, localDictionaryWriter); writeToBuffer(bitmapsBuffer, bitmapWriter); - FixedIndexed dictionary = FixedIndexed.read( + FixedIndexedInts dictionary = FixedIndexedInts.read( localDictionaryBuffer, - NestedDataColumnSerializer.INT_TYPE_STRATEGY, - ByteOrder.nativeOrder(), - Integer.BYTES + ByteOrder.nativeOrder() ); GenericIndexed bitmaps = GenericIndexed.read(bitmapsBuffer, roaringFactory.getObjectStrategy()); @@ -1150,11 +1132,8 @@ private NestedFieldLiteralColumnIndexSupplier makeSingleTypeLongSupplierWithNull ByteBuffer localDictionaryBuffer = ByteBuffer.allocate(1 << 12).order(ByteOrder.nativeOrder()); ByteBuffer bitmapsBuffer = ByteBuffer.allocate(1 << 12); - FixedIndexedWriter localDictionaryWriter = new FixedIndexedWriter<>( + FixedIndexedIntWriter localDictionaryWriter = new FixedIndexedIntWriter( new OnHeapMemorySegmentWriteOutMedium(), - NestedDataColumnSerializer.INT_TYPE_STRATEGY, - ByteOrder.nativeOrder(), - Integer.BYTES, true ); localDictionaryWriter.open(); @@ -1198,11 +1177,9 @@ private NestedFieldLiteralColumnIndexSupplier makeSingleTypeLongSupplierWithNull writeToBuffer(localDictionaryBuffer, localDictionaryWriter); writeToBuffer(bitmapsBuffer, bitmapWriter); - FixedIndexed dictionary = FixedIndexed.read( + FixedIndexedInts dictionary = FixedIndexedInts.read( localDictionaryBuffer, - NestedDataColumnSerializer.INT_TYPE_STRATEGY, - ByteOrder.nativeOrder(), - Integer.BYTES + ByteOrder.nativeOrder() ); GenericIndexed bitmaps = GenericIndexed.read(bitmapsBuffer, roaringFactory.getObjectStrategy()); @@ -1225,11 +1202,8 @@ private NestedFieldLiteralColumnIndexSupplier makeSingleTypeDoubleSupplier() thr ByteBuffer localDictionaryBuffer = ByteBuffer.allocate(1 << 12).order(ByteOrder.nativeOrder()); ByteBuffer bitmapsBuffer = ByteBuffer.allocate(1 << 12); - FixedIndexedWriter localDictionaryWriter = new FixedIndexedWriter<>( + FixedIndexedIntWriter localDictionaryWriter = new FixedIndexedIntWriter( new OnHeapMemorySegmentWriteOutMedium(), - NestedDataColumnSerializer.INT_TYPE_STRATEGY, - ByteOrder.nativeOrder(), - Integer.BYTES, true ); localDictionaryWriter.open(); @@ -1269,11 +1243,9 @@ private NestedFieldLiteralColumnIndexSupplier makeSingleTypeDoubleSupplier() thr writeToBuffer(localDictionaryBuffer, localDictionaryWriter); writeToBuffer(bitmapsBuffer, bitmapWriter); - FixedIndexed dictionary = FixedIndexed.read( + FixedIndexedInts dictionary = FixedIndexedInts.read( localDictionaryBuffer, - NestedDataColumnSerializer.INT_TYPE_STRATEGY, - ByteOrder.nativeOrder(), - Integer.BYTES + ByteOrder.nativeOrder() ); GenericIndexed bitmaps = GenericIndexed.read(bitmapsBuffer, roaringFactory.getObjectStrategy()); @@ -1296,11 +1268,8 @@ private NestedFieldLiteralColumnIndexSupplier makeSingleTypeDoubleSupplierWithNu ByteBuffer localDictionaryBuffer = ByteBuffer.allocate(1 << 12).order(ByteOrder.nativeOrder()); ByteBuffer bitmapsBuffer = ByteBuffer.allocate(1 << 12); - FixedIndexedWriter localDictionaryWriter = new FixedIndexedWriter<>( + FixedIndexedIntWriter localDictionaryWriter = new FixedIndexedIntWriter( new OnHeapMemorySegmentWriteOutMedium(), - NestedDataColumnSerializer.INT_TYPE_STRATEGY, - ByteOrder.nativeOrder(), - Integer.BYTES, true ); localDictionaryWriter.open(); @@ -1344,11 +1313,9 @@ private NestedFieldLiteralColumnIndexSupplier makeSingleTypeDoubleSupplierWithNu writeToBuffer(localDictionaryBuffer, localDictionaryWriter); writeToBuffer(bitmapsBuffer, bitmapWriter); - FixedIndexed dictionary = FixedIndexed.read( + FixedIndexedInts dictionary = FixedIndexedInts.read( localDictionaryBuffer, - NestedDataColumnSerializer.INT_TYPE_STRATEGY, - ByteOrder.nativeOrder(), - Integer.BYTES + ByteOrder.nativeOrder() ); GenericIndexed bitmaps = GenericIndexed.read(bitmapsBuffer, roaringFactory.getObjectStrategy()); @@ -1371,11 +1338,8 @@ private NestedFieldLiteralColumnIndexSupplier makeVariantSupplierWithNull() thro ByteBuffer localDictionaryBuffer = ByteBuffer.allocate(1 << 12).order(ByteOrder.nativeOrder()); ByteBuffer bitmapsBuffer = ByteBuffer.allocate(1 << 12); - FixedIndexedWriter localDictionaryWriter = new FixedIndexedWriter<>( + FixedIndexedIntWriter localDictionaryWriter = new FixedIndexedIntWriter( new OnHeapMemorySegmentWriteOutMedium(), - NestedDataColumnSerializer.INT_TYPE_STRATEGY, - ByteOrder.nativeOrder(), - Integer.BYTES, true ); localDictionaryWriter.open(); @@ -1427,11 +1391,9 @@ private NestedFieldLiteralColumnIndexSupplier makeVariantSupplierWithNull() thro writeToBuffer(localDictionaryBuffer, localDictionaryWriter); writeToBuffer(bitmapsBuffer, bitmapWriter); - FixedIndexed dictionary = FixedIndexed.read( + FixedIndexedInts dictionary = FixedIndexedInts.read( localDictionaryBuffer, - NestedDataColumnSerializer.INT_TYPE_STRATEGY, - ByteOrder.nativeOrder(), - Integer.BYTES + ByteOrder.nativeOrder() ); GenericIndexed bitmaps = GenericIndexed.read(bitmapsBuffer, roaringFactory.getObjectStrategy());