diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java index 7e1d774bf7c5b..4aa403ec4c1ab 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java @@ -242,6 +242,7 @@ public void printPerfMetrics() { throw new IllegalStateException("Perf metrics not enabled"); } System.out.println("Average probes per lookup: " + map.getAverageProbesPerLookup()); + System.out.println("Number of hash collisions: " + map.getNumHashCollisions()); System.out.println("Time spent resizing (ms): " + map.getTimeSpentResizingMs()); System.out.println("Total memory consumption (bytes): " + map.getTotalMemoryConsumption()); } diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java b/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java index 086926d2f98ec..983edfff7a2be 100644 --- a/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java +++ b/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java @@ -48,14 +48,12 @@ public int hashUnsafeWords(Object baseObject, long baseOffset, int lengthInBytes // See https://code.google.com/p/guava-libraries/source/browse/guava/src/com/google/common/hash/Murmur3_32HashFunction.java#167 // TODO(josh) veryify that this was implemented correctly assert (lengthInBytes % 8 == 0): "lengthInBytes must be a multiple of 8 (word-aligned)"; - int k1 = 0; int h1 = seed; for (int offset = 0; offset < lengthInBytes; offset += 4) { int halfWord = PlatformDependent.UNSAFE.getInt(baseObject, baseOffset + offset); - - k1 ^= halfWord << offset; + int k1 = mixK1(halfWord); + h1 = mixH1(h1, k1); } - h1 ^= mixK1(k1); return fmix(h1, lengthInBytes); } diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/unsafe/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 63afbea6e9060..66d5c3ab30634 100644 --- a/unsafe/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/unsafe/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -149,6 +149,8 @@ public final class BytesToBytesMap { private long numKeyLookups = 0; + private long numHashCollisions = 0; + public BytesToBytesMap( MemoryAllocator allocator, int initialCapacity, @@ -257,6 +259,10 @@ public Location lookup( ); if (areEqual) { return loc; + } else { + if (enablePerfMetrics) { + numHashCollisions++; + } } } } @@ -532,6 +538,13 @@ public double getAverageProbesPerLookup() { return (1.0 * numProbes) / numKeyLookups; } + public long getNumHashCollisions() { + if (!enablePerfMetrics) { + throw new IllegalStateException(); + } + return numHashCollisions; + } + /** * Grows the size of the hash table and re-hash everything. */ diff --git a/unsafe/src/test/java/org/apache/spark/unsafe/hash/TestMurmur3_x86_32.java b/unsafe/src/test/java/org/apache/spark/unsafe/hash/TestMurmur3_x86_32.java index fc885b6fb46d1..5dbe47d47bdab 100644 --- a/unsafe/src/test/java/org/apache/spark/unsafe/hash/TestMurmur3_x86_32.java +++ b/unsafe/src/test/java/org/apache/spark/unsafe/hash/TestMurmur3_x86_32.java @@ -81,16 +81,36 @@ public void randomizedStressTestBytes() { int byteArrSize = rand.nextInt(100) * 8; byte[] bytes = new byte[byteArrSize]; rand.nextBytes(bytes); - long memoryAddr = PlatformDependent.UNSAFE.allocateMemory(byteArrSize); - PlatformDependent.copyMemory( - bytes, PlatformDependent.BYTE_ARRAY_OFFSET, null, memoryAddr, byteArrSize); Assert.assertEquals( - hasher.hashUnsafeWords(null, memoryAddr, byteArrSize), - hasher.hashUnsafeWords(null, memoryAddr, byteArrSize)); + hasher.hashUnsafeWords(bytes, PlatformDependent.BYTE_ARRAY_OFFSET, byteArrSize), + hasher.hashUnsafeWords(bytes, PlatformDependent.BYTE_ARRAY_OFFSET, byteArrSize)); - hashcodes.add(hasher.hashUnsafeWords(null, memoryAddr, byteArrSize)); - PlatformDependent.UNSAFE.freeMemory(memoryAddr); + hashcodes.add(hasher.hashUnsafeWords( + bytes, PlatformDependent.BYTE_ARRAY_OFFSET, byteArrSize)); + } + + // A very loose bound. + Assert.assertTrue(hashcodes.size() > size * 0.95); + } + + @Test + public void randomizedStressTestPaddedStrings() { + int size = 64000; + // A set used to track collision rate. + Set hashcodes = new HashSet(); + for (int i = 0; i < size; i++) { + int byteArrSize = 8; + byte[] strBytes = ("" + i).getBytes(); + byte[] paddedBytes = new byte[byteArrSize]; + System.arraycopy(strBytes, 0, paddedBytes, 0, strBytes.length); + + Assert.assertEquals( + hasher.hashUnsafeWords(paddedBytes, PlatformDependent.BYTE_ARRAY_OFFSET, byteArrSize), + hasher.hashUnsafeWords(paddedBytes, PlatformDependent.BYTE_ARRAY_OFFSET, byteArrSize)); + + hashcodes.add(hasher.hashUnsafeWords( + paddedBytes, PlatformDependent.BYTE_ARRAY_OFFSET, byteArrSize)); } // A very loose bound.