From 161722664aad96a205f3001e2a60e28a2a338565 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Sat, 23 Jan 2016 14:18:22 -0800 Subject: [PATCH 1/6] Initial bloom filter implementation --- .../apache/spark/util/sketch/BloomFilter.java | 142 +++++++++++ .../spark/util/sketch/DefaultBloomFilter.java | 220 ++++++++++++++++++ .../spark/util/sketch/BloomFilterSuite.scala | 106 +++++++++ 3 files changed, 468 insertions(+) create mode 100644 common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java create mode 100644 common/sketch/src/main/java/org/apache/spark/util/sketch/DefaultBloomFilter.java create mode 100644 common/sketch/src/test/scala/org/apache/spark/util/sketch/BloomFilterSuite.scala diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java new file mode 100644 index 0000000000000..88b3841d1fd22 --- /dev/null +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.sketch; + +/** + * A Bloom filter is a space-efficient probabilistic data structure, that is used to test whether + * an element is a member of a set. It returns false when the element is definitely not in the + * set, returns true when the element is probably in the set. + * + * Internally a Bloom filter is initialized with 2 information: how many space to use(number of + * bits) and how many hash values to calculate for each record. To get as lower false positive + * probability as possible, user should call {@link BloomFilter#create} to automatically pick a + * best combination of these 2 parameters. + * + * Currently the following data types are supported: + * + * + * The implementation is largely based on the {@code BloomFilter} class from guava. + */ +public abstract class BloomFilter { + /** + * Returns the probability that {@linkplain #mightContain(Object)} will erroneously return + * {@code true} for an object that has not actually been put in the {@code BloomFilter}. + * + *

Ideally, this number should be close to the {@code fpp} parameter + * passed in to create this bloom filter, or smaller. If it is + * significantly higher, it is usually the case that too many elements (more than + * expected) have been put in the {@code BloomFilter}, degenerating it. + */ + public abstract double expectedFpp(); + + /** + * Returns the number of bits in the underlying bit array. + */ + public abstract long bitSize(); + + /** + * Puts an element into this {@code BloomFilter}. Ensures that subsequent invocations of + * {@link #mightContain(Object)} with the same element will always return {@code true}. + * + * @return true if the bloom filter's bits changed as a result of this operation. If the bits + * changed, this is definitely the first time {@code object} has been added to the + * filter. If the bits haven't changed, this might be the first time {@code object} + * has been added to the filter. Note that {@code put(t)} always returns the + * opposite result to what {@code mightContain(t)} would have returned at the time + * it is called." + */ + public abstract boolean put(Object item); + + /** + * Determines whether a given bloom filter is compatible with this bloom filter. For two + * bloom filters to be compatible, they must have the same bit size. + * + * @param other The bloom filter to check for compatibility. + */ + public abstract boolean isCompatible(BloomFilter other); + + /** + * Combines this bloom filter with another bloom filter by performing a bitwise OR of the + * underlying data. The mutations happen to this instance. Callers must ensure the + * bloom filters are appropriately sized to avoid saturating them. + * + * @param other The bloom filter to combine this bloom filter with. It is not mutated. + * @throws IllegalArgumentException if {@code isCompatible(that) == false} + */ + public abstract BloomFilter mergeInPlace(BloomFilter other); + + /** + * Returns {@code true} if the element might have been put in this Bloom filter, + * {@code false} if this is definitely not the case. + */ + public abstract boolean mightContain(Object item); + + /** + * Computes the optimal k (number of hashes per element inserted in Bloom filter), given the + * expected insertions and total number of bits in the Bloom filter. + * + * See http://en.wikipedia.org/wiki/File:Bloom_filter_fp_probability.svg for the formula. + * + * @param n expected insertions (must be positive) + * @param m total number of bits in Bloom filter (must be positive) + */ + private static int optimalNumOfHashFunctions(long n, long m) { + // (m / n) * log(2), but avoid truncation due to division! + return Math.max(1, (int) Math.round((double) m / n * Math.log(2))); + } + + /** + * Computes m (total bits of Bloom filter) which is expected to achieve, for the specified + * expected insertions, the required false positive probability. + * + * See http://en.wikipedia.org/wiki/Bloom_filter#Probability_of_false_positives for the formula. + * + * @param n expected insertions (must be positive) + * @param p false positive rate (must be 0 < p < 1) + */ + private static long optimalNumOfBits(long n, double p) { + if (p == 0) { + p = Double.MIN_VALUE; + } + return (long) (-n * Math.log(p) / (Math.log(2) * Math.log(2))); + } + + public static BloomFilter create(long expectedInsertions) { + return create(expectedInsertions, 0.03); + } + + public static BloomFilter create(long expectedInsertions, double fpp) { + assert fpp > 0.0 : "False positive probability must be > 0.0"; + assert fpp < 1.0 : "False positive probability must be < 1.0"; + long numBits = optimalNumOfBits(expectedInsertions, fpp); + return create(expectedInsertions, numBits); + } + + public static BloomFilter create(long expectedInsertions, long numBits) { + assert expectedInsertions > 0 : "Expected insertions must be > 0"; + assert numBits > 0 : "number of bits must be > 0"; + int numHashFunctions = optimalNumOfHashFunctions(expectedInsertions, numBits); + return new DefaultBloomFilter(numHashFunctions, numBits); + } +} diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/DefaultBloomFilter.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/DefaultBloomFilter.java new file mode 100644 index 0000000000000..35fefa3c48f8c --- /dev/null +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/DefaultBloomFilter.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.sketch; + +import java.io.UnsupportedEncodingException; +import java.util.Arrays; + +public class DefaultBloomFilter extends BloomFilter { + + private final int numHashFunctions; + private final BitArray bits; + + public DefaultBloomFilter(int numHashFunctions, long numBits) { + this.numHashFunctions = numHashFunctions; + this.bits = new BitArray(numBits); + } + + @Override + public double expectedFpp() { + return Math.pow((double) bits.bitCount() / bits.bitSize(), numHashFunctions); + } + + @Override + public long bitSize() { + return bits.bitSize(); + } + + private static long hashObjectToLong(Object item) { + if (item instanceof String) { + try { + byte[] bytes = ((String) item).getBytes("utf-8"); + return hashBytesToLong(bytes); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException("Only support utf-8 string", e); + } + } else { + long longValue; + + if (item instanceof Long) { + longValue = (Long) item; + } else if (item instanceof Integer) { + longValue = ((Integer) item).longValue(); + } else if (item instanceof Short) { + longValue = ((Short) item).longValue(); + } else if (item instanceof Byte) { + longValue = ((Byte) item).longValue(); + } else { + throw new IllegalArgumentException( + "Support for " + item.getClass().getName() + " not implemented" + ); + } + + int h1 = Murmur3_x86_32.hashLong(longValue, 0); + int h2 = Murmur3_x86_32.hashLong(longValue, h1); + return (((long) h1) << 32) | (h2 & 0xFFFFFFFFL); + } + } + + private static long hashBytesToLong(byte[] bytes) { + int h1 = Murmur3_x86_32.hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, bytes.length, 0); + int h2 = Murmur3_x86_32.hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, bytes.length, h1); + return (((long) h1) << 32) | (h2 & 0xFFFFFFFFL); + } + + @Override + public boolean put(Object item) { + long bitSize = bits.bitSize(); + long hash64 = hashObjectToLong(item); + int h1 = (int) (hash64 >> 32); + int h2 = (int) hash64; + + boolean bitsChanged = false; + for (int i = 1; i <= numHashFunctions; i++) { + int combinedHash = h1 + (i * h2); + // Flip all the bits if it's negative (guaranteed positive number) + if (combinedHash < 0) { + combinedHash = ~combinedHash; + } + bitsChanged |= bits.set(combinedHash % bitSize); + } + return bitsChanged; + } + + @Override + public boolean mightContain(Object item) { + long bitSize = bits.bitSize(); + long hash64 = hashObjectToLong(item); + int h1 = (int) (hash64 >> 32); + int h2 = (int) hash64; + + for (int i = 1; i <= numHashFunctions; i++) { + int combinedHash = h1 + (i * h2); + // Flip all the bits if it's negative (guaranteed positive number) + if (combinedHash < 0) { + combinedHash = ~combinedHash; + } + if (!bits.get(combinedHash % bitSize)) { + return false; + } + } + return true; + } + + @Override + public boolean isCompatible(BloomFilter other) { + if (other == null) { + return false; + } + + if (!(other instanceof DefaultBloomFilter)) { + return false; + } + + DefaultBloomFilter that = (DefaultBloomFilter) other; + return this.bitSize() == that.bitSize() && this.numHashFunctions == that.numHashFunctions; + } + + @Override + public BloomFilter mergeInPlace(BloomFilter other) { + if (!isCompatible(other)) { + throw new IllegalArgumentException("Can't merge incompatible bloom filter"); + } + + DefaultBloomFilter that = (DefaultBloomFilter) other; + this.bits.putAll(that.bits); + return this; + } + + static final class BitArray { + final long[] data; + long bitCount; + + static int safeCast(long numBits) { + long numWords = (long) Math.ceil(numBits / 64.0); + assert numWords <= Integer.MAX_VALUE : "Can't allocate enough space for " + numBits + " bits"; + return (int) numWords; + } + + BitArray(long numBits) { + this(new long[safeCast(numBits)]); + } + + BitArray(long[] data) { + assert data.length > 0 : "data length is zero!"; + this.data = data; + long bitCount = 0; + for (long value : data) { + bitCount += Long.bitCount(value); + } + this.bitCount = bitCount; + } + + /** Returns true if the bit changed value. */ + boolean set(long index) { + if (!get(index)) { + data[(int) (index >>> 6)] |= (1L << index); + bitCount++; + return true; + } + return false; + } + + boolean get(long index) { + return (data[(int) (index >>> 6)] & (1L << index)) != 0; + } + + /** Number of bits */ + long bitSize() { + return (long) data.length * Long.SIZE; + } + + /** Number of set bits (1s) */ + long bitCount() { + return bitCount; + } + + BitArray copy() { + return new BitArray(data.clone()); + } + + /** Combines the two BitArrays using bitwise OR. */ + void putAll(BitArray array) { + assert data.length == array.data.length : "BitArrays must be of equal length when merging"; + bitCount = 0; + for (int i = 0; i < data.length; i++) { + data[i] |= array.data[i]; + bitCount += Long.bitCount(data[i]); + } + } + + @Override + public boolean equals(Object o) { + if (o instanceof BitArray) { + BitArray bitArray = (BitArray) o; + return Arrays.equals(data, bitArray.data); + } + return false; + } + + @Override + public int hashCode() { + return Arrays.hashCode(data); + } + } +} diff --git a/common/sketch/src/test/scala/org/apache/spark/util/sketch/BloomFilterSuite.scala b/common/sketch/src/test/scala/org/apache/spark/util/sketch/BloomFilterSuite.scala new file mode 100644 index 0000000000000..639a002be7a95 --- /dev/null +++ b/common/sketch/src/test/scala/org/apache/spark/util/sketch/BloomFilterSuite.scala @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.sketch + +import scala.reflect.ClassTag +import scala.util.Random + +import org.scalatest.FunSuite // scalastyle:ignore funsuite + +class BloomFilterSuite extends FunSuite { // scalastyle:ignore funsuite + + def testAccuracy[T: ClassTag]( + typeName: String, numItems: Int)(itemGenerator: Random => T): Unit = { + test(s"accuracy - $typeName") { + // use a fixed seed to make the test predictable. + val r = new Random(37) + val fpp = 0.05 + val numInsertion = numItems / 10 + + val allItems = Array.fill(numItems)(itemGenerator(r)) + + val filter = BloomFilter.create(numInsertion, fpp) + + // insert first `numInsertion` items. + var i = 0 + while (i < numInsertion) { + filter.put(allItems(i)) + i += 1 + } + + i = 0 + while (i < numInsertion) { + // false negative is not allowed. + assert(filter.mightContain(allItems(i))) + i += 1 + } + + // The number of inserted items doesn't exceed `expectedInsertions`, so the `expectedFpp` + // should not be significantly higher than the one we passed in to create this bloom filter. + assert(filter.expectedFpp() - fpp < 0.001) + + var errorCount = 0 + while (i < numItems) { + if (filter.mightContain(allItems(i))) errorCount += 1 + i += 1 + } + + // Also check the actual fpp is not significantly higher than we expected. + val actualFpp = errorCount.toDouble / (numItems - numInsertion) + // Skip error count that is too small. + assert(errorCount < 50 || actualFpp - fpp < 0.001) + } + } + + def testMergeInPlace[T: ClassTag](typeName: String)(itemGenerator: Random => T): Unit = { + test(s"mergeInPlace - $typeName") { + // use a fixed seed to make the test predictable. + val r = new Random(37) + + val items1 = Array.fill(1000)(itemGenerator(r)) + val items2 = Array.fill(1000)(itemGenerator(r)) + + val filter1 = BloomFilter.create(1000) + items1.foreach(filter1.put) + + val filter2 = BloomFilter.create(1000) + items2.foreach(filter2.put) + + filter1.mergeInPlace(filter2) + + items1.foreach(i => assert(filter1.mightContain(i))) + items2.foreach(i => assert(filter1.mightContain(i))) + } + } + + def testItemType[T: ClassTag]( + typeName: String, numItems: Int)(itemGenerator: Random => T): Unit = { + testAccuracy[T](typeName, numItems)(itemGenerator) + testMergeInPlace[T](typeName)(itemGenerator) + } + + testItemType[Byte]("Byte", 200) { _.nextInt().toByte } + + testItemType[Short]("Short", 1000) { _.nextInt().toShort } + + testItemType[Int]("Int", 100000) { _.nextInt() } + + testItemType[Long]("Long", 100000) { _.nextLong() } + + testItemType[String]("String", 100000) { r => r.nextString(r.nextInt(512)) } +} From 920f2928bdef741bd5b13b3f51c188955e50a24f Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 25 Jan 2016 10:50:38 -0800 Subject: [PATCH 2/6] address comments --- .../apache/spark/util/sketch/BitArray.java | 94 +++++++++++++++++++ .../apache/spark/util/sketch/BloomFilter.java | 39 +++++--- ...tBloomFilter.java => BloomFilterImpl.java} | 90 ++---------------- .../spark/util/sketch/BitArraySuite.scala | 76 +++++++++++++++ .../spark/util/sketch/BloomFilterSuite.scala | 29 +++--- 5 files changed, 216 insertions(+), 112 deletions(-) create mode 100644 common/sketch/src/main/java/org/apache/spark/util/sketch/BitArray.java rename common/sketch/src/main/java/org/apache/spark/util/sketch/{DefaultBloomFilter.java => BloomFilterImpl.java} (64%) create mode 100644 common/sketch/src/test/scala/org/apache/spark/util/sketch/BitArraySuite.scala diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/BitArray.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/BitArray.java new file mode 100644 index 0000000000000..1bc665ad54b72 --- /dev/null +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/BitArray.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.sketch; + +import java.util.Arrays; + +public final class BitArray { + private final long[] data; + private long bitCount; + + static int numWords(long numBits) { + long numWords = (long) Math.ceil(numBits / 64.0); + if (numWords > Integer.MAX_VALUE) { + throw new IllegalArgumentException("Can't allocate enough space for " + numBits + " bits"); + } + return (int) numWords; + } + + BitArray(long numBits) { + if (numBits <= 0) { + throw new IllegalArgumentException("numBits must be positive"); + } + this.data = new long[numWords(numBits)]; + long bitCount = 0; + for (long value : data) { + bitCount += Long.bitCount(value); + } + this.bitCount = bitCount; + } + + /** Returns true if the bit changed value. */ + boolean set(long index) { + if (!get(index)) { + data[(int) (index >>> 6)] |= (1L << index); + bitCount++; + return true; + } + return false; + } + + boolean get(long index) { + return (data[(int) (index >>> 6)] & (1L << index)) != 0; + } + + /** Number of bits */ + long bitSize() { + return (long) data.length * Long.SIZE; + } + + /** Number of set bits (1s) */ + long cardinality() { + return bitCount; + } + + /** Combines the two BitArrays using bitwise OR. */ + void putAll(BitArray array) { + assert data.length == array.data.length : "BitArrays must be of equal length when merging"; + long bitCount = 0; + for (int i = 0; i < data.length; i++) { + data[i] |= array.data[i]; + bitCount += Long.bitCount(data[i]); + } + this.bitCount = bitCount; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || !(o instanceof BitArray)) return false; + + BitArray bitArray = (BitArray) o; + return Arrays.equals(data, bitArray.data); + } + + @Override + public int hashCode() { + return Arrays.hashCode(data); + } +} diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java index 88b3841d1fd22..b783882532e18 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java @@ -40,8 +40,9 @@ */ public abstract class BloomFilter { /** - * Returns the probability that {@linkplain #mightContain(Object)} will erroneously return - * {@code true} for an object that has not actually been put in the {@code BloomFilter}. + * Returns the false positive probability, i.e. the probability that + * {@linkplain #mightContain(Object)} will erroneously return {@code true} for an object that + * has not actually been put in the {@code BloomFilter}. * *

Ideally, this number should be close to the {@code fpp} parameter * passed in to create this bloom filter, or smaller. If it is @@ -64,7 +65,7 @@ public abstract class BloomFilter { * filter. If the bits haven't changed, this might be the first time {@code object} * has been added to the filter. Note that {@code put(t)} always returns the * opposite result to what {@code mightContain(t)} would have returned at the time - * it is called." + * it is called. */ public abstract boolean put(Object item); @@ -116,27 +117,35 @@ private static int optimalNumOfHashFunctions(long n, long m) { * @param p false positive rate (must be 0 < p < 1) */ private static long optimalNumOfBits(long n, double p) { - if (p == 0) { - p = Double.MIN_VALUE; - } return (long) (-n * Math.log(p) / (Math.log(2) * Math.log(2))); } - public static BloomFilter create(long expectedInsertions) { - return create(expectedInsertions, 0.03); + /** + * Creates a {@link BloomFilter} with given {@code expectedNumItems} and a default 3% {@code fpp}. + */ + public static BloomFilter create(long expectedNumItems) { + return create(expectedNumItems, 0.03); } - public static BloomFilter create(long expectedInsertions, double fpp) { + /** + * Creates a {@link BloomFilter} with given {@code expectedNumItems} and {@code fpp}, it will pick + * an optimal {@code numBits} and {@code numHashFunctions} for the bloom filter. + */ + public static BloomFilter create(long expectedNumItems, double fpp) { assert fpp > 0.0 : "False positive probability must be > 0.0"; assert fpp < 1.0 : "False positive probability must be < 1.0"; - long numBits = optimalNumOfBits(expectedInsertions, fpp); - return create(expectedInsertions, numBits); + long numBits = optimalNumOfBits(expectedNumItems, fpp); + return create(expectedNumItems, numBits); } - public static BloomFilter create(long expectedInsertions, long numBits) { - assert expectedInsertions > 0 : "Expected insertions must be > 0"; + /** + * Creates a {@link BloomFilter} with given {@code expectedNumItems} and {@code numBits}, it will + * pick an optimal {@code numHashFunctions} which can minimize {@code fpp} for the bloom filter. + */ + public static BloomFilter create(long expectedNumItems, long numBits) { + assert expectedNumItems > 0 : "Expected insertions must be > 0"; assert numBits > 0 : "number of bits must be > 0"; - int numHashFunctions = optimalNumOfHashFunctions(expectedInsertions, numBits); - return new DefaultBloomFilter(numHashFunctions, numBits); + int numHashFunctions = optimalNumOfHashFunctions(expectedNumItems, numBits); + return new BloomFilterImpl(numHashFunctions, numBits); } } diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/DefaultBloomFilter.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java similarity index 64% rename from common/sketch/src/main/java/org/apache/spark/util/sketch/DefaultBloomFilter.java rename to common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java index 35fefa3c48f8c..f41946294efc6 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/DefaultBloomFilter.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java @@ -18,21 +18,20 @@ package org.apache.spark.util.sketch; import java.io.UnsupportedEncodingException; -import java.util.Arrays; -public class DefaultBloomFilter extends BloomFilter { +public class BloomFilterImpl extends BloomFilter { private final int numHashFunctions; private final BitArray bits; - public DefaultBloomFilter(int numHashFunctions, long numBits) { + BloomFilterImpl(int numHashFunctions, long numBits) { this.numHashFunctions = numHashFunctions; this.bits = new BitArray(numBits); } @Override public double expectedFpp() { - return Math.pow((double) bits.bitCount() / bits.bitSize(), numHashFunctions); + return Math.pow((double) bits.cardinality() / bits.bitSize(), numHashFunctions); } @Override @@ -122,11 +121,11 @@ public boolean isCompatible(BloomFilter other) { return false; } - if (!(other instanceof DefaultBloomFilter)) { + if (!(other instanceof BloomFilterImpl)) { return false; } - DefaultBloomFilter that = (DefaultBloomFilter) other; + BloomFilterImpl that = (BloomFilterImpl) other; return this.bitSize() == that.bitSize() && this.numHashFunctions == that.numHashFunctions; } @@ -136,85 +135,8 @@ public BloomFilter mergeInPlace(BloomFilter other) { throw new IllegalArgumentException("Can't merge incompatible bloom filter"); } - DefaultBloomFilter that = (DefaultBloomFilter) other; + BloomFilterImpl that = (BloomFilterImpl) other; this.bits.putAll(that.bits); return this; } - - static final class BitArray { - final long[] data; - long bitCount; - - static int safeCast(long numBits) { - long numWords = (long) Math.ceil(numBits / 64.0); - assert numWords <= Integer.MAX_VALUE : "Can't allocate enough space for " + numBits + " bits"; - return (int) numWords; - } - - BitArray(long numBits) { - this(new long[safeCast(numBits)]); - } - - BitArray(long[] data) { - assert data.length > 0 : "data length is zero!"; - this.data = data; - long bitCount = 0; - for (long value : data) { - bitCount += Long.bitCount(value); - } - this.bitCount = bitCount; - } - - /** Returns true if the bit changed value. */ - boolean set(long index) { - if (!get(index)) { - data[(int) (index >>> 6)] |= (1L << index); - bitCount++; - return true; - } - return false; - } - - boolean get(long index) { - return (data[(int) (index >>> 6)] & (1L << index)) != 0; - } - - /** Number of bits */ - long bitSize() { - return (long) data.length * Long.SIZE; - } - - /** Number of set bits (1s) */ - long bitCount() { - return bitCount; - } - - BitArray copy() { - return new BitArray(data.clone()); - } - - /** Combines the two BitArrays using bitwise OR. */ - void putAll(BitArray array) { - assert data.length == array.data.length : "BitArrays must be of equal length when merging"; - bitCount = 0; - for (int i = 0; i < data.length; i++) { - data[i] |= array.data[i]; - bitCount += Long.bitCount(data[i]); - } - } - - @Override - public boolean equals(Object o) { - if (o instanceof BitArray) { - BitArray bitArray = (BitArray) o; - return Arrays.equals(data, bitArray.data); - } - return false; - } - - @Override - public int hashCode() { - return Arrays.hashCode(data); - } - } } diff --git a/common/sketch/src/test/scala/org/apache/spark/util/sketch/BitArraySuite.scala b/common/sketch/src/test/scala/org/apache/spark/util/sketch/BitArraySuite.scala new file mode 100644 index 0000000000000..7db88778cff1a --- /dev/null +++ b/common/sketch/src/test/scala/org/apache/spark/util/sketch/BitArraySuite.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.sketch + +import scala.util.Random +import org.scalatest.FunSuite // scalastyle:ignore funsuite + +class BitArraySuite extends FunSuite { // scalastyle:ignore funsuite + + test("error case when create BitArray") { + intercept[IllegalArgumentException](new BitArray(0)) + intercept[IllegalArgumentException](new BitArray(64l * Integer.MAX_VALUE + 1)) + } + + test("bitSize") { + assert(new BitArray(64).bitSize() == 64) + // BitArray is word-aligned, so 65~128 bits need 2 long to store, which is 128 bits. + assert(new BitArray(65).bitSize() == 128) + assert(new BitArray(127).bitSize() == 128) + assert(new BitArray(128).bitSize() == 128) + } + + test("set") { + val bitArray = new BitArray(64) + assert(bitArray.set(1)) + // Only returns true if the bit changed. + assert(!bitArray.set(1)) + assert(bitArray.set(2)) + } + + test("normal operation") { + // use a fixed seed to make the test predictable. + val r = new Random(37) + + val bitArray = new BitArray(320) + val indexes = (1 to 100).map(_ => r.nextInt(320).toLong).distinct + + indexes.map(bitArray.set) + indexes.foreach(i => assert(bitArray.get(i))) + assert(bitArray.cardinality() == indexes.length) + } + + test("merge") { + // use a fixed seed to make the test predictable. + val r = new Random(37) + + val bitArray1 = new BitArray(64 * 6) + val bitArray2 = new BitArray(64 * 6) + + val indexes1 = (1 to 100).map(_ => r.nextInt(64 * 6).toLong).distinct + val indexes2 = (1 to 100).map(_ => r.nextInt(64 * 6).toLong).distinct + + indexes1.map(bitArray1.set) + indexes2.map(bitArray2.set) + + bitArray1.putAll(bitArray2) + indexes1.foreach(i => assert(bitArray1.get(i))) + indexes2.foreach(i => assert(bitArray1.get(i))) + assert(bitArray1.cardinality() == (indexes1 ++ indexes2).distinct.length) + } +} diff --git a/common/sketch/src/test/scala/org/apache/spark/util/sketch/BloomFilterSuite.scala b/common/sketch/src/test/scala/org/apache/spark/util/sketch/BloomFilterSuite.scala index 639a002be7a95..80f1a18326844 100644 --- a/common/sketch/src/test/scala/org/apache/spark/util/sketch/BloomFilterSuite.scala +++ b/common/sketch/src/test/scala/org/apache/spark/util/sketch/BloomFilterSuite.scala @@ -24,15 +24,14 @@ import org.scalatest.FunSuite // scalastyle:ignore funsuite class BloomFilterSuite extends FunSuite { // scalastyle:ignore funsuite - def testAccuracy[T: ClassTag]( - typeName: String, numItems: Int)(itemGenerator: Random => T): Unit = { + def testAccuracy[T: ClassTag](typeName: String, numItems: Int)(itemGen: Random => T): Unit = { test(s"accuracy - $typeName") { // use a fixed seed to make the test predictable. val r = new Random(37) val fpp = 0.05 val numInsertion = numItems / 10 - val allItems = Array.fill(numItems)(itemGenerator(r)) + val allItems = Array.fill(numItems)(itemGen(r)) val filter = BloomFilter.create(numInsertion, fpp) @@ -50,7 +49,7 @@ class BloomFilterSuite extends FunSuite { // scalastyle:ignore funsuite i += 1 } - // The number of inserted items doesn't exceed `expectedInsertions`, so the `expectedFpp` + // The number of inserted items doesn't exceed `expectedNumItems`, so the `expectedFpp` // should not be significantly higher than the one we passed in to create this bloom filter. assert(filter.expectedFpp() - fpp < 0.001) @@ -67,31 +66,35 @@ class BloomFilterSuite extends FunSuite { // scalastyle:ignore funsuite } } - def testMergeInPlace[T: ClassTag](typeName: String)(itemGenerator: Random => T): Unit = { + def testMergeInPlace[T: ClassTag](typeName: String, numItems: Int)(itemGen: Random => T): Unit = { test(s"mergeInPlace - $typeName") { // use a fixed seed to make the test predictable. val r = new Random(37) - val items1 = Array.fill(1000)(itemGenerator(r)) - val items2 = Array.fill(1000)(itemGenerator(r)) + val items1 = Array.fill(numItems / 2)(itemGen(r)) + val items2 = Array.fill(numItems / 2)(itemGen(r)) - val filter1 = BloomFilter.create(1000) + val filter1 = BloomFilter.create(numItems) items1.foreach(filter1.put) - val filter2 = BloomFilter.create(1000) + val filter2 = BloomFilter.create(numItems) items2.foreach(filter2.put) filter1.mergeInPlace(filter2) + // After merge, `filter1` has `numItems` items which doesn't exceed `expectedNumItems`, so the + // `expectedFpp` should not be significantly higher than the default one: 3% + // Skip byte type as it has too little distinct values. + assert(typeName == "Byte" || 0.03 - filter1.expectedFpp() < 0.001) + items1.foreach(i => assert(filter1.mightContain(i))) items2.foreach(i => assert(filter1.mightContain(i))) } } - def testItemType[T: ClassTag]( - typeName: String, numItems: Int)(itemGenerator: Random => T): Unit = { - testAccuracy[T](typeName, numItems)(itemGenerator) - testMergeInPlace[T](typeName)(itemGenerator) + def testItemType[T: ClassTag](typeName: String, numItems: Int)(itemGen: Random => T): Unit = { + testAccuracy[T](typeName, numItems)(itemGen) + testMergeInPlace[T](typeName, numItems)(itemGen) } testItemType[Byte]("Byte", 200) { _.nextInt().toByte } From 3633952cc88ef6745bcface7d4073912cf02b14d Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 25 Jan 2016 11:55:44 -0800 Subject: [PATCH 3/6] fix style --- .../scala/org/apache/spark/util/sketch/BitArraySuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/common/sketch/src/test/scala/org/apache/spark/util/sketch/BitArraySuite.scala b/common/sketch/src/test/scala/org/apache/spark/util/sketch/BitArraySuite.scala index 7db88778cff1a..0cb3dbdc5d626 100644 --- a/common/sketch/src/test/scala/org/apache/spark/util/sketch/BitArraySuite.scala +++ b/common/sketch/src/test/scala/org/apache/spark/util/sketch/BitArraySuite.scala @@ -18,13 +18,14 @@ package org.apache.spark.util.sketch import scala.util.Random + import org.scalatest.FunSuite // scalastyle:ignore funsuite class BitArraySuite extends FunSuite { // scalastyle:ignore funsuite test("error case when create BitArray") { intercept[IllegalArgumentException](new BitArray(0)) - intercept[IllegalArgumentException](new BitArray(64l * Integer.MAX_VALUE + 1)) + intercept[IllegalArgumentException](new BitArray(64L * Integer.MAX_VALUE + 1)) } test("bitSize") { From 4fce26ed018c2336f7ef67c31778b442adb31a91 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 25 Jan 2016 13:08:51 -0800 Subject: [PATCH 4/6] add comment --- .../java/org/apache/spark/util/sketch/BloomFilterImpl.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java index f41946294efc6..980a0c5effbbc 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java @@ -79,6 +79,11 @@ private static long hashBytesToLong(byte[] bytes) { @Override public boolean put(Object item) { long bitSize = bits.bitSize(); + + // Here we first hash the input element into 2 int hash values, h1 and h2, then produce n hash + // values by `h1 + i * h2` with 1 <= i <= numHashFunctions. + // Note that `CountMinSketch` use a different strategy for long type, it hash the input long + // element with every i to produce n hash values. long hash64 = hashObjectToLong(item); int h1 = (int) (hash64 >> 32); int h2 = (int) hash64; From b850bfd16753d7e820532c0fb888ceb220fc78f1 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 25 Jan 2016 15:09:05 -0800 Subject: [PATCH 5/6] address comments --- .../apache/spark/util/sketch/BloomFilter.java | 6 ++-- .../spark/util/sketch/BitArraySuite.scala | 6 ++-- .../spark/util/sketch/BloomFilterSuite.scala | 35 ++++++------------- 3 files changed, 18 insertions(+), 29 deletions(-) diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java index b783882532e18..c918e778eead6 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java @@ -120,11 +120,13 @@ private static long optimalNumOfBits(long n, double p) { return (long) (-n * Math.log(p) / (Math.log(2) * Math.log(2))); } + static final double DEFAULT_FPP = 0.03; + /** - * Creates a {@link BloomFilter} with given {@code expectedNumItems} and a default 3% {@code fpp}. + * Creates a {@link BloomFilter} with given {@code expectedNumItems} and the default {@code fpp}. */ public static BloomFilter create(long expectedNumItems) { - return create(expectedNumItems, 0.03); + return create(expectedNumItems, DEFAULT_FPP); } /** diff --git a/common/sketch/src/test/scala/org/apache/spark/util/sketch/BitArraySuite.scala b/common/sketch/src/test/scala/org/apache/spark/util/sketch/BitArraySuite.scala index 0cb3dbdc5d626..ff728f0ebcb85 100644 --- a/common/sketch/src/test/scala/org/apache/spark/util/sketch/BitArraySuite.scala +++ b/common/sketch/src/test/scala/org/apache/spark/util/sketch/BitArraySuite.scala @@ -51,7 +51,7 @@ class BitArraySuite extends FunSuite { // scalastyle:ignore funsuite val bitArray = new BitArray(320) val indexes = (1 to 100).map(_ => r.nextInt(320).toLong).distinct - indexes.map(bitArray.set) + indexes.foreach(bitArray.set) indexes.foreach(i => assert(bitArray.get(i))) assert(bitArray.cardinality() == indexes.length) } @@ -66,8 +66,8 @@ class BitArraySuite extends FunSuite { // scalastyle:ignore funsuite val indexes1 = (1 to 100).map(_ => r.nextInt(64 * 6).toLong).distinct val indexes2 = (1 to 100).map(_ => r.nextInt(64 * 6).toLong).distinct - indexes1.map(bitArray1.set) - indexes2.map(bitArray2.set) + indexes1.foreach(bitArray1.set) + indexes2.foreach(bitArray2.set) bitArray1.putAll(bitArray2) indexes1.foreach(i => assert(bitArray1.get(i))) diff --git a/common/sketch/src/test/scala/org/apache/spark/util/sketch/BloomFilterSuite.scala b/common/sketch/src/test/scala/org/apache/spark/util/sketch/BloomFilterSuite.scala index 80f1a18326844..185d49be72df6 100644 --- a/common/sketch/src/test/scala/org/apache/spark/util/sketch/BloomFilterSuite.scala +++ b/common/sketch/src/test/scala/org/apache/spark/util/sketch/BloomFilterSuite.scala @@ -23,6 +23,7 @@ import scala.util.Random import org.scalatest.FunSuite // scalastyle:ignore funsuite class BloomFilterSuite extends FunSuite { // scalastyle:ignore funsuite + private final val EPSILON = 0.01 def testAccuracy[T: ClassTag](typeName: String, numItems: Int)(itemGen: Random => T): Unit = { test(s"accuracy - $typeName") { @@ -36,33 +37,20 @@ class BloomFilterSuite extends FunSuite { // scalastyle:ignore funsuite val filter = BloomFilter.create(numInsertion, fpp) // insert first `numInsertion` items. - var i = 0 - while (i < numInsertion) { - filter.put(allItems(i)) - i += 1 - } - - i = 0 - while (i < numInsertion) { - // false negative is not allowed. - assert(filter.mightContain(allItems(i))) - i += 1 - } + allItems.take(numInsertion).foreach(filter.put) + + // false negative is not allowed. + assert(allItems.take(numInsertion).forall(filter.mightContain)) // The number of inserted items doesn't exceed `expectedNumItems`, so the `expectedFpp` // should not be significantly higher than the one we passed in to create this bloom filter. - assert(filter.expectedFpp() - fpp < 0.001) + assert(filter.expectedFpp() - fpp < EPSILON) - var errorCount = 0 - while (i < numItems) { - if (filter.mightContain(allItems(i))) errorCount += 1 - i += 1 - } + val errorCount = allItems.drop(numInsertion).count(filter.mightContain) // Also check the actual fpp is not significantly higher than we expected. val actualFpp = errorCount.toDouble / (numItems - numInsertion) - // Skip error count that is too small. - assert(errorCount < 50 || actualFpp - fpp < 0.001) + assert(actualFpp - fpp < EPSILON) } } @@ -83,9 +71,8 @@ class BloomFilterSuite extends FunSuite { // scalastyle:ignore funsuite filter1.mergeInPlace(filter2) // After merge, `filter1` has `numItems` items which doesn't exceed `expectedNumItems`, so the - // `expectedFpp` should not be significantly higher than the default one: 3% - // Skip byte type as it has too little distinct values. - assert(typeName == "Byte" || 0.03 - filter1.expectedFpp() < 0.001) + // `expectedFpp` should not be significantly higher than the default one. + assert(filter1.expectedFpp() - BloomFilter.DEFAULT_FPP < EPSILON) items1.foreach(i => assert(filter1.mightContain(i))) items2.foreach(i => assert(filter1.mightContain(i))) @@ -97,7 +84,7 @@ class BloomFilterSuite extends FunSuite { // scalastyle:ignore funsuite testMergeInPlace[T](typeName, numItems)(itemGen) } - testItemType[Byte]("Byte", 200) { _.nextInt().toByte } + testItemType[Byte]("Byte", 160) { _.nextInt().toByte } testItemType[Short]("Short", 1000) { _.nextInt().toShort } From a9a6e834834a03f36084d51041235dc7c7621ff0 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 25 Jan 2016 15:27:45 -0800 Subject: [PATCH 6/6] add error merge test --- .../apache/spark/util/sketch/BloomFilter.java | 2 +- .../spark/util/sketch/BloomFilterImpl.java | 23 ++++++++++++++++--- .../spark/util/sketch/BloomFilterSuite.scala | 18 +++++++++++++++ 3 files changed, 39 insertions(+), 4 deletions(-) diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java index c918e778eead6..38949c6311df8 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java @@ -85,7 +85,7 @@ public abstract class BloomFilter { * @param other The bloom filter to combine this bloom filter with. It is not mutated. * @throws IllegalArgumentException if {@code isCompatible(that) == false} */ - public abstract BloomFilter mergeInPlace(BloomFilter other); + public abstract BloomFilter mergeInPlace(BloomFilter other) throws IncompatibleMergeException; /** * Returns {@code true} if the element might have been put in this Bloom filter, diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java index 980a0c5effbbc..bbd6cf719dc0e 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java @@ -135,12 +135,29 @@ public boolean isCompatible(BloomFilter other) { } @Override - public BloomFilter mergeInPlace(BloomFilter other) { - if (!isCompatible(other)) { - throw new IllegalArgumentException("Can't merge incompatible bloom filter"); + public BloomFilter mergeInPlace(BloomFilter other) throws IncompatibleMergeException { + // Duplicates the logic of `isCompatible` here to provide better error message. + if (other == null) { + throw new IncompatibleMergeException("Cannot merge null bloom filter"); + } + + if (!(other instanceof BloomFilter)) { + throw new IncompatibleMergeException( + "Cannot merge bloom filter of class " + other.getClass().getName() + ); } BloomFilterImpl that = (BloomFilterImpl) other; + + if (this.bitSize() != that.bitSize()) { + throw new IncompatibleMergeException("Cannot merge bloom filters with different bit size"); + } + + if (this.numHashFunctions != that.numHashFunctions) { + throw new IncompatibleMergeException( + "Cannot merge bloom filters with different number of hash functions"); + } + this.bits.putAll(that.bits); return this; } diff --git a/common/sketch/src/test/scala/org/apache/spark/util/sketch/BloomFilterSuite.scala b/common/sketch/src/test/scala/org/apache/spark/util/sketch/BloomFilterSuite.scala index 185d49be72df6..d2de509f19517 100644 --- a/common/sketch/src/test/scala/org/apache/spark/util/sketch/BloomFilterSuite.scala +++ b/common/sketch/src/test/scala/org/apache/spark/util/sketch/BloomFilterSuite.scala @@ -93,4 +93,22 @@ class BloomFilterSuite extends FunSuite { // scalastyle:ignore funsuite testItemType[Long]("Long", 100000) { _.nextLong() } testItemType[String]("String", 100000) { r => r.nextString(r.nextInt(512)) } + + test("incompatible merge") { + intercept[IncompatibleMergeException] { + BloomFilter.create(1000).mergeInPlace(null) + } + + intercept[IncompatibleMergeException] { + val filter1 = BloomFilter.create(1000, 6400) + val filter2 = BloomFilter.create(1000, 3200) + filter1.mergeInPlace(filter2) + } + + intercept[IncompatibleMergeException] { + val filter1 = BloomFilter.create(1000, 6400) + val filter2 = BloomFilter.create(2000, 6400) + filter1.mergeInPlace(filter2) + } + } }