diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java index c918e778eead6..38949c6311df8 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java @@ -85,7 +85,7 @@ public abstract class BloomFilter { * @param other The bloom filter to combine this bloom filter with. It is not mutated. * @throws IllegalArgumentException if {@code isCompatible(that) == false} */ - public abstract BloomFilter mergeInPlace(BloomFilter other); + public abstract BloomFilter mergeInPlace(BloomFilter other) throws IncompatibleMergeException; /** * Returns {@code true} if the element might have been put in this Bloom filter, diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java index 980a0c5effbbc..bbd6cf719dc0e 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java @@ -135,12 +135,29 @@ public boolean isCompatible(BloomFilter other) { } @Override - public BloomFilter mergeInPlace(BloomFilter other) { - if (!isCompatible(other)) { - throw new IllegalArgumentException("Can't merge incompatible bloom filter"); + public BloomFilter mergeInPlace(BloomFilter other) throws IncompatibleMergeException { + // Duplicates the logic of `isCompatible` here to provide better error message. + if (other == null) { + throw new IncompatibleMergeException("Cannot merge null bloom filter"); + } + + if (!(other instanceof BloomFilter)) { + throw new IncompatibleMergeException( + "Cannot merge bloom filter of class " + other.getClass().getName() + ); } BloomFilterImpl that = (BloomFilterImpl) other; + + if (this.bitSize() != that.bitSize()) { + throw new IncompatibleMergeException("Cannot merge bloom filters with different bit size"); + } + + if (this.numHashFunctions != that.numHashFunctions) { + throw new IncompatibleMergeException( + "Cannot merge bloom filters with different number of hash functions"); + } + this.bits.putAll(that.bits); return this; } diff --git a/common/sketch/src/test/scala/org/apache/spark/util/sketch/BloomFilterSuite.scala b/common/sketch/src/test/scala/org/apache/spark/util/sketch/BloomFilterSuite.scala index 185d49be72df6..d2de509f19517 100644 --- a/common/sketch/src/test/scala/org/apache/spark/util/sketch/BloomFilterSuite.scala +++ b/common/sketch/src/test/scala/org/apache/spark/util/sketch/BloomFilterSuite.scala @@ -93,4 +93,22 @@ class BloomFilterSuite extends FunSuite { // scalastyle:ignore funsuite testItemType[Long]("Long", 100000) { _.nextLong() } testItemType[String]("String", 100000) { r => r.nextString(r.nextInt(512)) } + + test("incompatible merge") { + intercept[IncompatibleMergeException] { + BloomFilter.create(1000).mergeInPlace(null) + } + + intercept[IncompatibleMergeException] { + val filter1 = BloomFilter.create(1000, 6400) + val filter2 = BloomFilter.create(1000, 3200) + filter1.mergeInPlace(filter2) + } + + intercept[IncompatibleMergeException] { + val filter1 = BloomFilter.create(1000, 6400) + val filter2 = BloomFilter.create(2000, 6400) + filter1.mergeInPlace(filter2) + } + } }