Skip to content

Commit

Permalink
Interning for OnHeapByteDictionary (#12342)
Browse files Browse the repository at this point in the history
* Interning - OnHeapByteDictionary

* Address review comments from PR 12223
  • Loading branch information
vvivekiyer authored Jun 6, 2024
1 parent f4a4bf3 commit 6290bc0
Show file tree
Hide file tree
Showing 8 changed files with 195 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public void testInterningByteBuffers() {

Interner<String> exactInterner = Interners.newStrongInterner();
Interner<String> falfInterner = new FALFInterner(nUniqueObjs);
Interner<String> falfInternerCustomHash = new FALFInterner(nUniqueObjs, s -> hashCode((String) s), Objects::equals);
Interner<String> falfInternerCustomHash =
new FALFInterner(nUniqueObjs, s -> FALFInterner.hashCode((String) s), Objects::equals);

// Go over all objects and intern them using both exact and FALF interners
int nHits1 = runInterning(allObjs, exactInterner, true);
Expand All @@ -66,36 +67,57 @@ public void testInterningByteBuffers() {

// With the better hash function, FALF interner should have more hits
Assert.assertTrue(nHits3 > (nTotalObjs - nUniqueObjs) * 0.6);
}

/**
* Ad hoc benchmarking code. In one run the MacBook laptop, FALFInterner below performs nearly twice faster (1217 ms
* vs 2230 ms) With custom hash function, FALFInterner's speed is about the same as the Guava interner.
*/
@Test
public void benchmarkingTest() {
Random random = new Random(1);

// Ad hoc benchmarking code. Disabled to avoid test slowdown.
// In one run the MacBook laptop, FALFInterner below performs nearly twice faster
// (1217 ms vs 2230 ms) With custom hash function, FALFInterner's speed is about the
// same as the Guava interner.
// for (int j = 0; j < 3; j++) {
// long time0 = System.currentTimeMillis();
// long totNHits = 0;
// for (int i = 0; i < 10000; i++) {
// totNHits += runInterning(allObjs, exactInterner, false);
// }
// long time1 = System.currentTimeMillis();
// System.out.println("Guava interner. totNHits = " + totNHits + ", time = " + (time1 - time0));
//
// time0 = System.currentTimeMillis();
// totNHits = 0;
// for (int i = 0; i < 10000; i++) {
// totNHits += runInterning(allObjs, falfInterner, false);
// }
// time1 = System.currentTimeMillis();
// System.out.println("FALF interner. totNHits = " + totNHits + ", time = " + (time1 - time0));
//
// time0 = System.currentTimeMillis();
// totNHits = 0;
// for (int i = 0; i < 10000; i++) {
// totNHits += runInterning(allObjs, falfInternerCustomHash, false);
// }
// time1 = System.currentTimeMillis();
// System.out.println("FALF interner Custom Hash. totNHits = " + totNHits + ", time = " + (time1 - time0));
// }
int nUniqueObjs = 1024;
int nTotalObjs = 8 * nUniqueObjs;

String[] allObjs = new String[nTotalObjs];

Interner<String> exactInterner = Interners.newStrongInterner();
Interner<String> falfInterner = new FALFInterner(nUniqueObjs);
Interner<String> falfInternerCustomHash =
new FALFInterner(nUniqueObjs, s -> FALFInterner.hashCode((String) s), Objects::equals);

// Create an array of objects where each object should have ~8 copies
for (int i = 0; i < nTotalObjs; i++) {
int next = random.nextInt(nUniqueObjs);
allObjs[i] = Integer.toString(next);
}

for (int j = 0; j < 3; j++) {
long time0 = System.currentTimeMillis();
long totNHits = 0;
for (int i = 0; i < 10000; i++) {
totNHits += runInterning(allObjs, exactInterner, false);
}
long time1 = System.currentTimeMillis();
System.out.println("Guava interner. totNHits = " + totNHits + ", time = " + (time1 - time0));

time0 = System.currentTimeMillis();
totNHits = 0;
for (int i = 0; i < 10000; i++) {
totNHits += runInterning(allObjs, falfInterner, false);
}
time1 = System.currentTimeMillis();
System.out.println("FALF interner. totNHits = " + totNHits + ", time = " + (time1 - time0));

time0 = System.currentTimeMillis();
totNHits = 0;
for (int i = 0; i < 10000; i++) {
totNHits += runInterning(allObjs, falfInternerCustomHash, false);
}
time1 = System.currentTimeMillis();
System.out.println("FALF interner Custom Hash. totNHits = " + totNHits + ", time = " + (time1 - time0));
}
}

private int runInterning(String[] objs, Interner<String> interner, boolean performAssert) {
Expand All @@ -111,58 +133,4 @@ private int runInterning(String[] objs, Interner<String> interner, boolean perfo
}
return nHits;
}

// Custom hash code implementation, that gives better distribution than standard hashCode()

private static final int C1 = 0xcc9e2d51;
private static final int C2 = 0x1b873593;

public static int hashCode(String s) {
int h1 = 0;

// step through value 2 chars at a time
for (int i = 1; i < s.length(); i += 2) {
int k1 = s.charAt(i - 1) | (s.charAt(i) << 16);
h1 = nextHashCode(k1, h1);
}

// deal with any remaining characters
if ((s.length() & 1) == 1) {
int k1 = s.charAt(s.length() - 1);
k1 = mixK1(k1);
h1 ^= k1;
}

return fmix(h1, s.length() * 2);
}

private static int nextHashCode(int value, int prevHashCode) {
int k1 = mixK1(value);
return mixH1(prevHashCode, k1);
}

private static int mixK1(int k1) {
k1 *= C1;
k1 = Integer.rotateLeft(k1, 15);
k1 *= C2;
return k1;
}

private static int mixH1(int h1, int k1) {
h1 ^= k1;
h1 = Integer.rotateLeft(h1, 13);
h1 = h1 * 5 + 0xe6546b64;
return h1;
}

private static int fmix(int h1, int len) {
// Force all bits to avalanche
h1 ^= len;
h1 ^= h1 >>> 16;
h1 *= 0x85ebca6b;
h1 ^= h1 >>> 13;
h1 *= 0xc2b2ae35;
h1 ^= h1 >>> 16;
return h1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.pinot.segment.local.segment.index.dictionary;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.File;
Expand Down Expand Up @@ -301,8 +300,19 @@ public static Dictionary read(PinotDataBuffer dataBuffer, ColumnMetadata metadat
FieldSpec.DataType dataType = metadata.getDataType();
boolean loadOnHeap = indexConfig.isOnHeap();
String columnName = metadata.getColumnName();

// If interning is enabled, get the required interners.
FALFInterner<String> strInterner = null;
FALFInterner<byte[]> byteInterner = null;
Intern internConfig = indexConfig.getIntern();
if (loadOnHeap) {
LOGGER.info("Loading on-heap dictionary for column: {}, intern={}", columnName, internIdentifierStr != null);
LOGGER.info("Loading on-heap dictionary for column: {}", columnName);
if (internConfig != null && !internConfig.isDisabled()) {
DictionaryInternerHolder internerHolder = DictionaryInternerHolder.getInstance();
strInterner = internerHolder.getStrInterner(internIdentifierStr, internConfig.getCapacity());
byteInterner = internerHolder.getByteInterner(internIdentifierStr, internConfig.getCapacity());
LOGGER.info("Enabling interning for dictionary column: {}", columnName);
}
}

int length = metadata.getCardinality();
Expand All @@ -325,24 +335,11 @@ public static Dictionary read(PinotDataBuffer dataBuffer, ColumnMetadata metadat
: new BigDecimalDictionary(dataBuffer, length, numBytesPerValue);
case STRING:
numBytesPerValue = metadata.getColumnMaxLength();

// If interning is enabled, get the required interners.
FALFInterner<String> strInterner = null;
FALFInterner<byte[]> byteInterner = null;
Intern internConfig = indexConfig.getIntern();
if (internConfig != null && !internConfig.isDisabled()) {
Preconditions.checkState(loadOnHeap, "Interning is only supported for on-heap dictionaries.");
DictionaryInternerHolder internerHolder = DictionaryInternerHolder.getInstance();
strInterner = internerHolder.getStrInterner(internIdentifierStr, internConfig.getCapacity());
byteInterner = internerHolder.getByteInterner(internIdentifierStr, internConfig.getCapacity());
LOGGER.info("Enabling interning for dictionary column: {}", columnName);
}

return loadOnHeap ? new OnHeapStringDictionary(dataBuffer, length, numBytesPerValue, strInterner, byteInterner)
: new StringDictionary(dataBuffer, length, numBytesPerValue);
case BYTES:
numBytesPerValue = metadata.getColumnMaxLength();
return loadOnHeap ? new OnHeapBytesDictionary(dataBuffer, length, numBytesPerValue)
return loadOnHeap ? new OnHeapBytesDictionary(dataBuffer, length, numBytesPerValue, byteInterner)
: new BytesDictionary(dataBuffer, length, numBytesPerValue);
default:
throw new IllegalStateException("Unsupported data type for dictionary: " + dataType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import java.math.BigDecimal;
import java.util.Arrays;
import javax.annotation.Nullable;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.utils.BigDecimalUtils;
import org.apache.pinot.spi.utils.ByteArray;
import org.apache.pinot.spi.utils.BytesUtils;
import org.apache.pinot.spi.utils.FALFInterner;


/**
Expand All @@ -41,15 +43,16 @@ public class OnHeapBytesDictionary extends BaseImmutableDictionary {
private final Object2IntOpenHashMap<ByteArray> _valToDictId;
private final ByteArray[] _dictIdToVal;

public OnHeapBytesDictionary(PinotDataBuffer dataBuffer, int length, int numBytesPerValue) {
public OnHeapBytesDictionary(PinotDataBuffer dataBuffer, int length, int numBytesPerValue,
@Nullable FALFInterner<byte[]> byteInterner) {
super(dataBuffer, length, numBytesPerValue);

_valToDictId = new Object2IntOpenHashMap<>(length);
_valToDictId.defaultReturnValue(Dictionary.NULL_VALUE_INDEX);
_dictIdToVal = new ByteArray[length];

for (int dictId = 0; dictId < length; dictId++) {
ByteArray value = new ByteArray(getBytes(dictId));
ByteArray value = new ByteArray(getBytes(dictId), byteInterner);
_dictIdToVal[dictId] = value;
_valToDictId.put(value, dictId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.pinot.spi.utils.BigDecimalUtils;
import org.apache.pinot.spi.utils.ByteArray;
import org.apache.pinot.spi.utils.BytesUtils;
import org.apache.pinot.spi.utils.FALFInterner;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand All @@ -49,6 +50,8 @@


public class ImmutableDictionaryTest {
private static final FALFInterner<String> STRING_INTERNER = new FALFInterner<>(500);
private static final FALFInterner<byte[]> BYTE_INTERNER = new FALFInterner<>(500, Arrays::hashCode);
private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "ImmutableDictionaryTest");
private static final Random RANDOM = new Random();
private static final String INT_COLUMN_NAME = "intColumn";
Expand Down Expand Up @@ -377,6 +380,17 @@ public void testOnHeapStringDictionary()
}
}

@Test
public void testOnHeapStringDictionaryWithInterning()
throws Exception {
try (OnHeapStringDictionary onHeapStringDictionary = new OnHeapStringDictionary(
PinotDataBuffer.mapReadOnlyBigEndianFile(
new File(TEMP_DIR, STRING_COLUMN_NAME + V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES,
_numBytesPerStringValue, STRING_INTERNER, BYTE_INTERNER)) {
testStringDictionary(onHeapStringDictionary);
}
}

private void testStringDictionary(BaseImmutableDictionary stringDictionary) {
for (int i = 0; i < NUM_VALUES; i++) {
assertEquals(stringDictionary.get(i), _stringValues[i]);
Expand Down Expand Up @@ -404,7 +418,19 @@ public void testOnHeapBytesDictionary()
throws Exception {
try (OnHeapBytesDictionary onHeapBytesDictionary = new OnHeapBytesDictionary(
PinotDataBuffer.mapReadOnlyBigEndianFile(
new File(TEMP_DIR, BYTES_COLUMN_NAME + V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES, BYTES_LENGTH)) {
new File(TEMP_DIR, BYTES_COLUMN_NAME + V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES, BYTES_LENGTH, null)) {
testBytesDictionary(onHeapBytesDictionary);
}
}

@Test
public void testOnHeapBytesDictionaryWithInterning()
throws Exception {

try (OnHeapBytesDictionary onHeapBytesDictionary = new OnHeapBytesDictionary(
PinotDataBuffer.mapReadOnlyBigEndianFile(
new File(TEMP_DIR, BYTES_COLUMN_NAME + V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES, BYTES_LENGTH,
BYTE_INTERNER)) {
testBytesDictionary(onHeapBytesDictionary);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@


public class ImmutableDictionaryTypeConversionTest {
private static final FALFInterner<String> STRING_INTERNER = new FALFInterner<>(128);
private static final FALFInterner<byte[]> BYTE_INTERNER = new FALFInterner<>(128, Arrays::hashCode);

private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "ImmutableDictionaryTypeConversionTest");
private static final Random RANDOM = new Random();
private static final String INT_COLUMN_NAME = "intColumn";
Expand Down Expand Up @@ -332,20 +335,17 @@ public void testOnHeapStringDictionary()
@Test
public void testOnHeapStringDictionaryWithInterner()
throws Exception {
FALFInterner<String> strInterner = new FALFInterner<>(128);
FALFInterner<byte[]> byteInterner = new FALFInterner<>(128, Arrays::hashCode);

try (OnHeapStringDictionary onHeapStringDictionary = new OnHeapStringDictionary(
PinotDataBuffer.mapReadOnlyBigEndianFile(
new File(TEMP_DIR, STRING_COLUMN_NAME + V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES, STRING_LENGTH,
strInterner, byteInterner)) {
STRING_INTERNER, BYTE_INTERNER)) {
testStringDictionary(onHeapStringDictionary);
}

try (OnHeapStringDictionary onHeapStringDictionary = new OnHeapStringDictionary(
PinotDataBuffer.mapReadOnlyBigEndianFile(
new File(TEMP_DIR, STRING_COLUMN_NAME + V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES, STRING_LENGTH,
strInterner, byteInterner)) {
STRING_INTERNER, BYTE_INTERNER)) {
testStringDictionary(onHeapStringDictionary);
}
}
Expand Down Expand Up @@ -390,7 +390,18 @@ public void testOnHeapBytesDictionary()
throws Exception {
try (OnHeapBytesDictionary onHeapBytesDictionary = new OnHeapBytesDictionary(
PinotDataBuffer.mapReadOnlyBigEndianFile(
new File(TEMP_DIR, BYTES_COLUMN_NAME + V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES, BYTES_LENGTH)) {
new File(TEMP_DIR, BYTES_COLUMN_NAME + V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES, BYTES_LENGTH, null)) {
testBytesDictionary(onHeapBytesDictionary);
}
}

@Test
public void testOnHeapBytesDictionaryWithInterning()
throws Exception {
try (OnHeapBytesDictionary onHeapBytesDictionary = new OnHeapBytesDictionary(
PinotDataBuffer.mapReadOnlyBigEndianFile(
new File(TEMP_DIR, BYTES_COLUMN_NAME + V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES, BYTES_LENGTH,
BYTE_INTERNER)) {
testBytesDictionary(onHeapBytesDictionary);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.util.Arrays;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -60,6 +61,14 @@ public ByteArray(byte[] bytes) {
_bytes = bytes;
}

public ByteArray(byte[] bytes, @Nullable FALFInterner<byte[]> byteInterner) {
if (byteInterner == null) {
_bytes = bytes;
} else {
_bytes = byteInterner.intern(bytes);
}
}

public byte[] getBytes() {
return _bytes;
}
Expand Down
Loading

0 comments on commit 6290bc0

Please sign in to comment.