Skip to content

Commit

Permalink
apache#3147 Add option for not compressing dimension without affectin…
Browse files Browse the repository at this point in the history
…g backward compatibility
  • Loading branch information
navis committed Mar 24, 2020
1 parent 887b685 commit 138c294
Show file tree
Hide file tree
Showing 30 changed files with 270 additions and 268 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

import io.druid.segment.data.BitSlicedBitmap;
import io.druid.segment.data.ObjectStrategy;
import io.druid.segment.data.VSizeIndexed;
import io.druid.segment.data.VSizeIndexedInts;
import io.druid.segment.data.VSizedIndexedInt;
import io.druid.segment.data.VSizedInt;

import java.nio.ByteBuffer;

Expand All @@ -31,18 +31,18 @@
public class ColumnPartProviders
{
@SuppressWarnings("unchecked")
public static <T> ColumnPartProvider<T> ofInstance(final VSizeIndexedInts instance)
public static <T> ColumnPartProvider<T> with(final VSizedInt instance)
{
return ofInstance((T) instance, instance.getSerializedSize(), instance.size());
return with((T) instance, instance.getSerializedSize(), instance.size());
}

@SuppressWarnings("unchecked")
public static <T> ColumnPartProvider<T> ofInstance(final VSizeIndexed instance)
public static <T> ColumnPartProvider<T> with(final VSizedIndexedInt instance)
{
return ofInstance((T) instance, instance.getSerializedSize(), instance.size());
return with((T) instance, instance.getSerializedSize(), instance.size());
}

public static <T> ColumnPartProvider<T> ofInstance(final T instance, final long length, final int count)
public static <T> ColumnPartProvider<T> with(final T instance, final long length, final int count)
{
return new ColumnPartProvider<T>()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import io.druid.java.util.common.IAE;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.CompressedVSizeIntsIndexedSupplier;
import io.druid.segment.data.CompressedVSizedIntSupplier;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.IndexedIterable;
import io.druid.segment.data.IndexedMultivalue;
Expand All @@ -43,19 +43,19 @@
* values - indexed integer representing values in each row
*/

public class CompressedVSizeIndexedSupplier implements WritableSupplier<IndexedMultivalue<IndexedInts>>
public class CompressedVSizedIndexedIntSupplier implements WritableSupplier<IndexedMultivalue<IndexedInts>>
{
private static final byte version = 0x2;
//offsets - indexed integers of length num of rows + 1 representing offsets of starting index of first element of each row in values index
// last element represents the length of values column
private final CompressedVSizeIntsIndexedSupplier offsetSupplier;
private final CompressedVSizedIntSupplier offsetSupplier;

//values - indexed integers representing actual values in each row
private final CompressedVSizeIntsIndexedSupplier valueSupplier;
private final CompressedVSizedIntSupplier valueSupplier;

CompressedVSizeIndexedSupplier(
CompressedVSizeIntsIndexedSupplier offsetSupplier,
CompressedVSizeIntsIndexedSupplier valueSupplier
CompressedVSizedIndexedIntSupplier(
CompressedVSizedIntSupplier offsetSupplier,
CompressedVSizedIntSupplier valueSupplier
)
{
this.offsetSupplier = offsetSupplier;
Expand All @@ -81,25 +81,25 @@ public void writeToChannel(WritableByteChannel channel) throws IOException
valueSupplier.writeToChannel(channel);
}

public static CompressedVSizeIndexedSupplier fromByteBuffer(ByteBuffer buffer, ByteOrder order)
public static CompressedVSizedIndexedIntSupplier fromByteBuffer(ByteBuffer buffer, ByteOrder order)
{
byte versionFromBuffer = buffer.get();

if (versionFromBuffer == version) {
CompressedVSizeIntsIndexedSupplier offsetSupplier = CompressedVSizeIntsIndexedSupplier.fromByteBuffer(
CompressedVSizedIntSupplier offsetSupplier = CompressedVSizedIntSupplier.fromByteBuffer(
buffer,
order
);
CompressedVSizeIntsIndexedSupplier valueSupplier = CompressedVSizeIntsIndexedSupplier.fromByteBuffer(
CompressedVSizedIntSupplier valueSupplier = CompressedVSizedIntSupplier.fromByteBuffer(
buffer,
order
);
return new CompressedVSizeIndexedSupplier(offsetSupplier, valueSupplier);
return new CompressedVSizedIndexedIntSupplier(offsetSupplier, valueSupplier);
}
throw new IAE("Unknown version[%s]", versionFromBuffer);
}

public static CompressedVSizeIndexedSupplier fromIterable(
public static CompressedVSizedIndexedIntSupplier fromIterable(
Iterable<IndexedInts> objectsIterable,
int maxValue,
final ByteOrder byteOrder,
Expand All @@ -121,21 +121,21 @@ public static CompressedVSizeIndexedSupplier fromIterable(
}
offsetList.add(offset);
int offsetMax = offset;
CompressedVSizeIntsIndexedSupplier headerSupplier = CompressedVSizeIntsIndexedSupplier.fromList(
CompressedVSizedIntSupplier headerSupplier = CompressedVSizedIntSupplier.fromList(
offsetList,
offsetMax,
CompressedVSizeIntsIndexedSupplier.maxIntsInBufferForValue(offsetMax),
CompressedVSizedIntSupplier.maxIntsInBufferForValue(offsetMax),
byteOrder,
compression
);
CompressedVSizeIntsIndexedSupplier valuesSupplier = CompressedVSizeIntsIndexedSupplier.fromList(
CompressedVSizedIntSupplier valuesSupplier = CompressedVSizedIntSupplier.fromList(
values,
maxValue,
CompressedVSizeIntsIndexedSupplier.maxIntsInBufferForValue(maxValue),
CompressedVSizedIntSupplier.maxIntsInBufferForValue(maxValue),
byteOrder,
compression
);
return new CompressedVSizeIndexedSupplier(headerSupplier, valuesSupplier);
return new CompressedVSizedIndexedIntSupplier(headerSupplier, valuesSupplier);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import io.druid.java.util.common.IAE;
import io.druid.segment.data.CompressedIntsIndexedSupplier;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.CompressedVSizeIntsIndexedSupplier;
import io.druid.segment.data.CompressedVSizedIntSupplier;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.IndexedMultivalue;
import io.druid.segment.data.WritableSupplier;
Expand All @@ -42,23 +42,23 @@
* If we want to streams VSizeInts, we must know the max value in the value sets. It's easy to know the max id of
* values(like dimension cardinality while encoding dimension), but difficult to known the max id of offsets.
*/
public class CompressedVSizeIndexedV3Supplier implements WritableSupplier<IndexedMultivalue<IndexedInts>>
public class CompressedVSizedIndexedIntV3Supplier implements WritableSupplier<IndexedMultivalue<IndexedInts>>
{
public static final byte VERSION = 0x3;

private final CompressedIntsIndexedSupplier offsetSupplier;
private final CompressedVSizeIntsIndexedSupplier valueSupplier;
private final CompressedVSizedIntSupplier valueSupplier;

CompressedVSizeIndexedV3Supplier(
CompressedVSizedIndexedIntV3Supplier(
CompressedIntsIndexedSupplier offsetSupplier,
CompressedVSizeIntsIndexedSupplier valueSupplier
CompressedVSizedIntSupplier valueSupplier
)
{
this.offsetSupplier = offsetSupplier;
this.valueSupplier = valueSupplier;
}

public static CompressedVSizeIndexedV3Supplier fromByteBuffer(ByteBuffer buffer, ByteOrder order)
public static CompressedVSizedIndexedIntV3Supplier fromByteBuffer(ByteBuffer buffer, ByteOrder order)
{
byte versionFromBuffer = buffer.get();

Expand All @@ -67,17 +67,17 @@ public static CompressedVSizeIndexedV3Supplier fromByteBuffer(ByteBuffer buffer,
buffer,
order
);
CompressedVSizeIntsIndexedSupplier valueSupplier = CompressedVSizeIntsIndexedSupplier.fromByteBuffer(
CompressedVSizedIntSupplier valueSupplier = CompressedVSizedIntSupplier.fromByteBuffer(
buffer,
order
);
return new CompressedVSizeIndexedV3Supplier(offsetSupplier, valueSupplier);
return new CompressedVSizedIndexedIntV3Supplier(offsetSupplier, valueSupplier);
}
throw new IAE("Unknown version[%s]", versionFromBuffer);
}

// for test
public static CompressedVSizeIndexedV3Supplier fromIterable(
public static CompressedVSizedIndexedIntV3Supplier fromIterable(
Iterable<IndexedInts> objectsIterable,
int offsetChunkFactor,
int maxValue,
Expand Down Expand Up @@ -105,14 +105,14 @@ public static CompressedVSizeIndexedV3Supplier fromIterable(
byteOrder,
compression
);
CompressedVSizeIntsIndexedSupplier valuesSupplier = CompressedVSizeIntsIndexedSupplier.fromList(
CompressedVSizedIntSupplier valuesSupplier = CompressedVSizedIntSupplier.fromList(
values,
maxValue,
CompressedVSizeIntsIndexedSupplier.maxIntsInBufferForValue(maxValue),
CompressedVSizedIntSupplier.maxIntsInBufferForValue(maxValue),
byteOrder,
compression
);
return new CompressedVSizeIndexedV3Supplier(headerSupplier, valuesSupplier);
return new CompressedVSizedIndexedIntV3Supplier(headerSupplier, valuesSupplier);
}

@Override
Expand All @@ -138,7 +138,7 @@ public void writeToChannel(WritableByteChannel channel) throws IOException
@Override
public IndexedMultivalue<IndexedInts> get()
{
return new CompressedVSizeIndexedSupplier.CompressedVSizeIndexed(offsetSupplier.get(), valueSupplier.get());
return new CompressedVSizedIndexedIntSupplier.CompressedVSizeIndexed(offsetSupplier.get(), valueSupplier.get());
}

}
30 changes: 15 additions & 15 deletions processing/src/main/java/io/druid/segment/IndexIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
import io.druid.segment.data.ByteBufferSerializer;
import io.druid.segment.data.CompressedLongsIndexedSupplier;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.CompressedVSizeIntsIndexedSupplier;
import io.druid.segment.data.CompressedVSizedIntSupplier;
import io.druid.segment.data.Dictionary;
import io.druid.segment.data.GenericIndexed;
import io.druid.segment.data.Indexed;
Expand All @@ -75,8 +75,8 @@
import io.druid.segment.data.IndexedRTree;
import io.druid.segment.data.ListIndexed;
import io.druid.segment.data.ObjectStrategy;
import io.druid.segment.data.VSizeIndexed;
import io.druid.segment.data.VSizeIndexedInts;
import io.druid.segment.data.VSizedIndexedInt;
import io.druid.segment.data.VSizedInt;
import io.druid.segment.serde.BitmapIndexColumnPartSupplier;
import io.druid.segment.serde.ColumnPartSerde;
import io.druid.segment.serde.ComplexColumnPartSerde;
Expand Down Expand Up @@ -491,7 +491,7 @@ public MMappedIndex mapDir(File inDir) throws IOException
}

Map<String, GenericIndexed<String>> dimValueLookups = Maps.newHashMap();
Map<String, VSizeIndexed> dimColumns = Maps.newHashMap();
Map<String, VSizedIndexedInt> dimColumns = Maps.newHashMap();
Map<String, GenericIndexed<ImmutableBitmap>> bitmaps = Maps.newHashMap();

for (String dimension : IndexedIterable.create(availableDimensions)) {
Expand All @@ -505,7 +505,7 @@ public MMappedIndex mapDir(File inDir) throws IOException
);

dimValueLookups.put(dimension, GenericIndexed.read(dimBuffer, ObjectStrategy.STRING_STRATEGY));
dimColumns.put(dimension, VSizeIndexed.readFromByteBuffer(dimBuffer));
dimColumns.put(dimension, VSizedIndexedInt.readFromByteBuffer(dimBuffer));
}

ByteBuffer invertedBuffer = smooshedFiles.mapFile("inverted.drd");
Expand Down Expand Up @@ -626,15 +626,15 @@ public void convertV8toV9(File v8Dir, File v9Dir, IndexSpec indexSpec)

int emptyStrIdx = dictionary.indexOf("");
List<Integer> singleValCol = null;
VSizeIndexed multiValCol = VSizeIndexed.readFromByteBuffer(dimBuffer.asReadOnlyBuffer());
VSizedIndexedInt multiValCol = VSizedIndexedInt.readFromByteBuffer(dimBuffer.asReadOnlyBuffer());
GenericIndexed<ImmutableBitmap> bitmaps = bitmapIndexes.get(dimension);
ImmutableRTree spatialIndex = spatialIndexes.get(dimension);

final BitmapFactory bitmapFactory = bitmapSerdeFactory.getBitmapFactory();
boolean onlyOneValue = true;
MutableBitmap nullsSet = null;
for (int i = 0; i < multiValCol.size(); ++i) {
VSizeIndexedInts rowValue = multiValCol.get(i);
VSizedInt rowValue = multiValCol.get(i);
if (!onlyOneValue) {
break;
}
Expand Down Expand Up @@ -687,13 +687,13 @@ public void convertV8toV9(File v8Dir, File v9Dir, IndexSpec indexSpec)
bumpedDictionary = false;
}

final VSizeIndexed finalMultiValCol = multiValCol;
final VSizedIndexedInt finalMultiValCol = multiValCol;
singleValCol = new AbstractList<Integer>()
{
@Override
public Integer get(int index)
{
final VSizeIndexedInts ints = finalMultiValCol.get(index);
final VSizedInt ints = finalMultiValCol.get(index);
return ints.size() == 0 ? 0 : ints.get(0) + (bumpedDictionary ? 1 : 0);
}

Expand Down Expand Up @@ -722,20 +722,20 @@ public int size()
if (singleValCol != null) {
if (compressionStrategy != null) {
columnPartBuilder.withSingleValuedColumn(
CompressedVSizeIntsIndexedSupplier.fromList(
CompressedVSizedIntSupplier.fromList(
singleValCol,
dictionary.size(),
CompressedVSizeIntsIndexedSupplier.maxIntsInBufferForValue(dictionary.size()),
CompressedVSizedIntSupplier.maxIntsInBufferForValue(dictionary.size()),
BYTE_ORDER,
compressionStrategy
)
);
} else {
columnPartBuilder.withSingleValuedColumn(VSizeIndexedInts.fromList(singleValCol, dictionary.size()));
columnPartBuilder.withSingleValuedColumn(VSizedInt.fromList(singleValCol, dictionary.size()));
}
} else if (compressionStrategy != null) {
columnPartBuilder.withMultiValuedColumn(
CompressedVSizeIndexedSupplier.fromIterable(
CompressedVSizedIndexedIntSupplier.fromIterable(
multiValCol,
dictionary.size(),
BYTE_ORDER,
Expand Down Expand Up @@ -906,7 +906,7 @@ public QueryableIndex load(File inDir, ObjectMapper mapper) throws IOException
Map<String, Column> columns = Maps.newHashMap();

for (String dimension : index.getAvailableDimensions()) {
VSizeIndexed column = index.getDimColumn(dimension);
VSizedIndexedInt column = index.getDimColumn(dimension);
ColumnPartProvider<Dictionary<String>> dictionary = index.getDimValueLookup(dimension).asColumnPartProvider();
ColumnBuilder builder = new ColumnBuilder()
.setType(ValueDesc.STRING)
Expand All @@ -915,7 +915,7 @@ public QueryableIndex load(File inDir, ObjectMapper mapper) throws IOException
new DictionaryEncodedColumnSupplier(
dictionary,
null,
ColumnPartProviders.<IndexedMultivalue<IndexedInts>>ofInstance(
ColumnPartProviders.<IndexedMultivalue<IndexedInts>>with(
column, column.getSerializedSize(), column.size()
),
null
Expand Down
10 changes: 6 additions & 4 deletions processing/src/main/java/io/druid/segment/IndexMergerV9.java
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public void close() throws IOException

/************ Create Inverted Indexes *************/
final ArrayList<ColumnPartWriter<ImmutableBitmap>> bitmapIndexWriters = setupBitmapIndexWriters(
ioPeon, mergedDimensions, dimCardinalities, indexSpec.getBitmapSerdeFactory()
ioPeon, mergedDimensions, dimCardinalities, indexSpec
);
final ArrayList<ColumnPartWriter<ImmutableRTree>> spatialIndexWriters = setupSpatialIndexWriters(
ioPeon, mergedDimensions, indexSpec, dimCapabilities
Expand Down Expand Up @@ -351,7 +351,7 @@ public void close() throws IOException
}

ArrayList<ColumnPartWriter<ImmutableBitmap>> cubeBitmapWriters = setupBitmapIndexWriters(
ioPeon, cubeDims, ImmutableMap.of(), indexer.getBitmapSerdeFactory()
ioPeon, cubeDims, ImmutableMap.of(), indexer
);
MutableBitmap[][] bitmaps = new MutableBitmap[cubeDims.size()][];
for (int dimId = 0; dimId < bitmaps.length; dimId++) {
Expand Down Expand Up @@ -703,15 +703,17 @@ private void makeInvertedIndexes(
progress.stopSection(section);
}

private static final int MAX_GROUP = 32;
private static final double CUMULATIVE_THRESHOLD = 16384;

private ArrayList<ColumnPartWriter<ImmutableBitmap>> setupBitmapIndexWriters(
final IOPeon ioPeon,
final List<String> mergedDimensions,
final Map<String, Integer> dimCardinalities,
final BitmapSerdeFactory serdeFactory
final IndexSpec indexSpec
) throws IOException
{
BitmapSerdeFactory serdeFactory = indexSpec.getBitmapSerdeFactory();
ArrayList<ColumnPartWriter<ImmutableBitmap>> writers = Lists.newArrayListWithCapacity(mergedDimensions.size());
BitmapFactory bitmapFactory = serdeFactory.getBitmapFactory();
for (String dimension : mergedDimensions) {
Expand All @@ -720,7 +722,7 @@ private ArrayList<ColumnPartWriter<ImmutableBitmap>> setupBitmapIndexWriters(
);
Integer cardinality = dimCardinalities.get(dimension);
if (cardinality != null && cardinality > CUMULATIVE_THRESHOLD) {
int group = Math.min(32, (int) Math.ceil(cardinality / CUMULATIVE_THRESHOLD));
int group = Math.min(MAX_GROUP, (int) Math.ceil(cardinality / CUMULATIVE_THRESHOLD));
int threshold = cardinality / group + group;
writer = new CumulativeBitmapWriter(
ioPeon, String.format("%s.inverted.cumulative", dimension), writer, serdeFactory, threshold
Expand Down
Loading

0 comments on commit 138c294

Please sign in to comment.