diff --git a/LICENSE b/LICENSE
index efb46dab44da..76f6113d9811 100644
--- a/LICENSE
+++ b/LICENSE
@@ -298,6 +298,7 @@ License: https://www.apache.org/licenses/LICENSE-2.0
This product includes code from Delta Lake.
* AssignmentAlignmentSupport is an independent development but UpdateExpressionsSupport in Delta was used as a reference.
+* RoaringPositionBitmap is a Java implementation of RoaringBitmapArray in Delta.
Copyright: 2020 The Delta Lake Project Authors.
Home page: https://delta.io/
diff --git a/core/src/jmh/java/org/apache/iceberg/deletes/RoaringPositionBitmapBenchmark.java b/core/src/jmh/java/org/apache/iceberg/deletes/RoaringPositionBitmapBenchmark.java
new file mode 100644
index 000000000000..1cbc39583fbc
--- /dev/null
+++ b/core/src/jmh/java/org/apache/iceberg/deletes/RoaringPositionBitmapBenchmark.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.deletes;
+
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Timeout;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.roaringbitmap.longlong.Roaring64Bitmap;
+
+/**
+ * A benchmark that evaluates the performance of {@link RoaringPositionBitmap}.
+ *
+ *
To run this benchmark:
+ * ./gradlew :iceberg-core:jmh
+ * -PjmhIncludeRegex=RoaringPositionBitmapBenchmark
+ * -PjmhOutputPath=benchmark/roaring-position-bitmap-benchmark.txt
+ *
+ */
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.SingleShotTime)
+@Timeout(time = 5, timeUnit = TimeUnit.MINUTES)
+public class RoaringPositionBitmapBenchmark {
+
+ private static final Random RANDOM = new Random();
+ private static final int TOTAL_POSITIONS = 5_000_000;
+ private static final long STEP = 5L;
+
+ private long[] orderedPositions;
+ private long[] shuffledPositions;
+
+ @Setup
+ public void setupBenchmark() {
+ this.orderedPositions = generateOrderedPositions();
+ this.shuffledPositions = generateShuffledPositions();
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void addOrderedPositionsIcebergBitmap(Blackhole blackhole) {
+ RoaringPositionBitmap bitmap = new RoaringPositionBitmap();
+ for (long position : orderedPositions) {
+ bitmap.set(position);
+ }
+ blackhole.consume(bitmap);
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void addOrderedPositionsLibraryBitmap(Blackhole blackhole) {
+ Roaring64Bitmap bitmap = new Roaring64Bitmap();
+ for (long position : orderedPositions) {
+ bitmap.add(position);
+ }
+ blackhole.consume(bitmap);
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void addShuffledPositionsIcebergBitmap(Blackhole blackhole) {
+ RoaringPositionBitmap bitmap = new RoaringPositionBitmap();
+ for (long position : shuffledPositions) {
+ bitmap.set(position);
+ }
+ blackhole.consume(bitmap);
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void addShuffledPositionsLibraryBitmap(Blackhole blackhole) {
+ Roaring64Bitmap bitmap = new Roaring64Bitmap();
+ for (long position : shuffledPositions) {
+ bitmap.add(position);
+ }
+ blackhole.consume(bitmap);
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void addAndCheckPositionsIcebergBitmap(Blackhole blackhole) {
+ RoaringPositionBitmap bitmap = new RoaringPositionBitmap();
+
+ for (long position : shuffledPositions) {
+ bitmap.set(position);
+ }
+
+ for (long position = 0; position <= TOTAL_POSITIONS * STEP; position++) {
+ bitmap.contains(position);
+ }
+
+ blackhole.consume(bitmap);
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void addAndCheckPositionsLibraryBitmap(Blackhole blackhole) {
+ Roaring64Bitmap bitmap = new Roaring64Bitmap();
+
+ for (long position : shuffledPositions) {
+ bitmap.add(position);
+ }
+
+ for (long position = 0; position <= TOTAL_POSITIONS * STEP; position++) {
+ bitmap.contains(position);
+ }
+
+ blackhole.consume(bitmap);
+ }
+
+ private static long[] generateOrderedPositions() {
+ long[] positions = new long[TOTAL_POSITIONS];
+ for (int index = 0; index < TOTAL_POSITIONS; index++) {
+ positions[index] = index * STEP;
+ }
+ return positions;
+ }
+
+ private static long[] generateShuffledPositions() {
+ long[] positions = generateOrderedPositions();
+ shuffle(positions);
+ return positions;
+ }
+
+ private static void shuffle(long[] array) {
+ for (int index = array.length - 1; index > 0; index--) {
+ // swap with an element at a random index between 0 and index
+ int thatIndex = RANDOM.nextInt(index + 1);
+ long temp = array[index];
+ array[index] = array[thatIndex];
+ array[thatIndex] = temp;
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/deletes/RoaringPositionBitmap.java b/core/src/main/java/org/apache/iceberg/deletes/RoaringPositionBitmap.java
new file mode 100644
index 000000000000..eec130743d85
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/deletes/RoaringPositionBitmap.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.deletes;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.List;
+import java.util.function.LongConsumer;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.roaringbitmap.RoaringBitmap;
+
+/**
+ * A bitmap that supports positive 64-bit positions (the most significant bit must be 0), but is
+ * optimized for cases where most positions fit in 32 bits by using an array of 32-bit Roaring
+ * bitmaps. The internal bitmap array is grown as needed to accommodate the largest position.
+ *
+ *
Incoming 64-bit positions are divided into a 32-bit "key" using the most significant 4 bytes
+ * and a 32-bit position using the least significant 4 bytes. For each key in the set of positions,
+ * a 32-bit Roaring bitmap is maintained to store a set of 32-bit positions for that key.
+ *
+ *
To test whether a certain position is set, its most significant 4 bytes (the key) are used to
+ * find a 32-bit bitmap and the least significant 4 bytes are tested for inclusion in the bitmap. If
+ * a bitmap is not found for the key, then the position is not set.
+ *
+ *
Positions must range from 0 (inclusive) to {@link #MAX_POSITION} (inclusive). This class
+ * cannot handle positions with the key equal to Integer.MAX_VALUE because the length of the
+ * internal bitmap array is a signed 32-bit integer, which must be greater than or equal to 0.
+ * Supporting Integer.MAX_VALUE as a key would require allocating a bitmap array with size
+ * Integer.MAX_VALUE + 1, triggering an integer overflow.
+ */
+class RoaringPositionBitmap {
+
+ static final long MAX_POSITION = toPosition(Integer.MAX_VALUE - 1, Integer.MIN_VALUE);
+ private static final RoaringBitmap[] EMPTY_BITMAP_ARRAY = new RoaringBitmap[0];
+ private static final long BITMAP_COUNT_SIZE_BYTES = 8L;
+ private static final long BITMAP_KEY_SIZE_BYTES = 4L;
+
+ private RoaringBitmap[] bitmaps;
+
+ RoaringPositionBitmap() {
+ this.bitmaps = EMPTY_BITMAP_ARRAY;
+ }
+
+ private RoaringPositionBitmap(RoaringBitmap[] bitmaps) {
+ this.bitmaps = bitmaps;
+ }
+
+ /**
+ * Sets a position in the bitmap.
+ *
+ * @param pos the position
+ */
+ public void set(long pos) {
+ validatePosition(pos);
+ int key = key(pos);
+ int pos32Bits = pos32Bits(pos);
+ allocateBitmapsIfNeeded(key + 1 /* required bitmap array length */);
+ bitmaps[key].add(pos32Bits);
+ }
+
+ /**
+ * Sets a range of positions in the bitmap.
+ *
+ * @param posStartInclusive the start position of the range (inclusive)
+ * @param posEndExclusive the end position of the range (exclusive)
+ */
+ public void setRange(long posStartInclusive, long posEndExclusive) {
+ for (long pos = posStartInclusive; pos < posEndExclusive; pos++) {
+ set(pos);
+ }
+ }
+
+ /**
+ * Sets all positions from the other bitmap in this bitmap, modifying this bitmap in place.
+ *
+ * @param that the other bitmap
+ */
+ public void setAll(RoaringPositionBitmap that) {
+ allocateBitmapsIfNeeded(that.bitmaps.length);
+ for (int key = 0; key < that.bitmaps.length; key++) {
+ bitmaps[key].or(that.bitmaps[key]);
+ }
+ }
+
+ /**
+ * Checks if a position is set in the bitmap.
+ *
+ * @param pos the position
+ * @return true if the position is set in this bitmap, false otherwise
+ */
+ public boolean contains(long pos) {
+ validatePosition(pos);
+ int key = key(pos);
+ int pos32Bits = pos32Bits(pos);
+ return key < bitmaps.length && bitmaps[key].contains(pos32Bits);
+ }
+
+ /**
+ * Indicates whether the bitmap has any positions set.
+ *
+ * @return true if the bitmap is empty, false otherwise
+ */
+ public boolean isEmpty() {
+ return cardinality() == 0;
+ }
+
+ /**
+ * Returns the number of set positions in the bitmap.
+ *
+ * @return the number of set positions
+ */
+ public long cardinality() {
+ long cardinality = 0L;
+ for (RoaringBitmap bitmap : bitmaps) {
+ cardinality += bitmap.getLongCardinality();
+ }
+ return cardinality;
+ }
+
+ /**
+ * Applies run-length encoding wherever it is more space efficient.
+ *
+ * @return whether the bitmap was changed
+ */
+ public boolean runLengthEncode() {
+ boolean changed = false;
+ for (RoaringBitmap bitmap : bitmaps) {
+ changed |= bitmap.runOptimize();
+ }
+ return changed;
+ }
+
+ /**
+ * Iterates over all positions in the bitmap.
+ *
+ * @param consumer a consumer for positions
+ */
+ public void forEach(LongConsumer consumer) {
+ for (int key = 0; key < bitmaps.length; key++) {
+ forEach(key, bitmaps[key], consumer);
+ }
+ }
+
+ @VisibleForTesting
+ int allocatedBitmapCount() {
+ return bitmaps.length;
+ }
+
+ private void allocateBitmapsIfNeeded(int requiredLength) {
+ if (bitmaps.length < requiredLength) {
+ if (bitmaps.length == 0 && requiredLength == 1) {
+ this.bitmaps = new RoaringBitmap[] {new RoaringBitmap()};
+ } else {
+ RoaringBitmap[] newBitmaps = new RoaringBitmap[requiredLength];
+ System.arraycopy(bitmaps, 0, newBitmaps, 0, bitmaps.length);
+ for (int key = bitmaps.length; key < requiredLength; key++) {
+ newBitmaps[key] = new RoaringBitmap();
+ }
+ this.bitmaps = newBitmaps;
+ }
+ }
+ }
+
+ /**
+ * Returns the number of bytes required to serialize the bitmap.
+ *
+ * @return the serialized size in bytes
+ */
+ public long serializedSizeInBytes() {
+ long size = BITMAP_COUNT_SIZE_BYTES;
+ for (RoaringBitmap bitmap : bitmaps) {
+ size += BITMAP_KEY_SIZE_BYTES + bitmap.serializedSizeInBytes();
+ }
+ return size;
+ }
+
+ /**
+ * Serializes the bitmap using the portable serialization format described below.
+ *
+ *
+ *
The number of 32-bit Roaring bitmaps, serialized as 8 bytes
+ *
For each 32-bit Roaring bitmap, ordered by unsigned comparison of the 32-bit keys:
+ *
+ *
The key stored as 4 bytes
+ *
Serialized 32-bit Roaring bitmap using the standard format
+ *
+ *
+ *
+ *
Note the byte order of the buffer must be little-endian.
+ *
+ * @param buffer the buffer to write to
+ * @see Roaring bitmap spec
+ */
+ public void serialize(ByteBuffer buffer) {
+ validateByteOrder(buffer);
+ buffer.putLong(bitmaps.length);
+ for (int key = 0; key < bitmaps.length; key++) {
+ buffer.putInt(key);
+ bitmaps[key].serialize(buffer);
+ }
+ }
+
+ /**
+ * Deserializes a bitmap from a buffer, assuming the portable serialization format.
+ *
+ * @param buffer the buffer to read from
+ * @return a new bitmap instance with the deserialized data
+ */
+ public static RoaringPositionBitmap deserialize(ByteBuffer buffer) {
+ validateByteOrder(buffer);
+
+ // the bitmap array may be sparse with more elements than the number of read bitmaps
+ int remainingBitmapCount = readBitmapCount(buffer);
+ List bitmaps = Lists.newArrayListWithExpectedSize(remainingBitmapCount);
+ int lastKey = -1;
+
+ while (remainingBitmapCount > 0) {
+ int key = readKey(buffer, lastKey);
+
+ // fill gaps as the bitmap array may be sparse
+ while (lastKey < key - 1) {
+ bitmaps.add(new RoaringBitmap());
+ lastKey++;
+ }
+
+ RoaringBitmap bitmap = readBitmap(buffer);
+ bitmaps.add(bitmap);
+
+ lastKey = key;
+ remainingBitmapCount--;
+ }
+
+ return new RoaringPositionBitmap(bitmaps.toArray(EMPTY_BITMAP_ARRAY));
+ }
+
+ private static void validateByteOrder(ByteBuffer buffer) {
+ Preconditions.checkArgument(
+ buffer.order() == ByteOrder.LITTLE_ENDIAN,
+ "Roaring bitmap serialization requires little-endian byte order");
+ }
+
+ private static int readBitmapCount(ByteBuffer buffer) {
+ long bitmapCount = buffer.getLong();
+ Preconditions.checkArgument(
+ bitmapCount >= 0 && bitmapCount <= Integer.MAX_VALUE,
+ "Invalid bitmap count: %s",
+ bitmapCount);
+ return (int) bitmapCount;
+ }
+
+ private static int readKey(ByteBuffer buffer, int lastKey) {
+ int key = buffer.getInt();
+ Preconditions.checkArgument(key >= 0, "Invalid unsigned key: %s", key);
+ Preconditions.checkArgument(key <= Integer.MAX_VALUE - 1, "Key is too large: %s", key);
+ Preconditions.checkArgument(key > lastKey, "Keys must be sorted in ascending order");
+ return key;
+ }
+
+ private static RoaringBitmap readBitmap(ByteBuffer buffer) {
+ try {
+ RoaringBitmap bitmap = new RoaringBitmap();
+ bitmap.deserialize(buffer);
+ buffer.position(buffer.position() + bitmap.serializedSizeInBytes());
+ return bitmap;
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ // extracts high 32 bits from a 64-bit position (i.e. key)
+ private static int key(long pos) {
+ return (int) (pos >> 32);
+ }
+
+ // extracts low 32 bits from a 64-bit position (i.e. 32-bit position)
+ private static int pos32Bits(long pos) {
+ return (int) pos;
+ }
+
+ // combines high and low 32 bits into a 64-bit position
+ // the low 32 bits must be bit-masked to avoid sign extension
+ private static long toPosition(int key, int pos32Bits) {
+ return (((long) key) << 32) | (((long) pos32Bits) & 0xFFFFFFFFL);
+ }
+
+ // iterates over 64-bit positions, reconstructing them from keys and 32-bit positions
+ private static void forEach(int key, RoaringBitmap bitmap, LongConsumer consumer) {
+ bitmap.forEach((int pos32Bits) -> consumer.accept(toPosition(key, pos32Bits)));
+ }
+
+ private static void validatePosition(long pos) {
+ Preconditions.checkArgument(
+ pos >= 0 && pos <= MAX_POSITION,
+ "Bitmap supports positions that are >= 0 and <= %s: %s",
+ MAX_POSITION,
+ pos);
+ }
+}
diff --git a/core/src/test/java/org/apache/iceberg/deletes/TestRoaringPositionBitmap.java b/core/src/test/java/org/apache/iceberg/deletes/TestRoaringPositionBitmap.java
new file mode 100644
index 000000000000..2daf0382973b
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/deletes/TestRoaringPositionBitmap.java
@@ -0,0 +1,515 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.deletes;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.IOException;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.io.Resources;
+import org.apache.iceberg.util.Pair;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestRoaringPositionBitmap {
+
+ private static final long BITMAP_SIZE = 0xFFFFFFFFL;
+ private static final long BITMAP_OFFSET = BITMAP_SIZE + 1L;
+ private static final long CONTAINER_SIZE = Character.MAX_VALUE;
+ private static final long CONTAINER_OFFSET = CONTAINER_SIZE + 1L;
+ private static final int VALIDATION_LOOKUP_COUNT = 20_000;
+ private static final Set SUPPORTED_OFFICIAL_EXAMPLE_FILES =
+ ImmutableSet.of("64map32bitvals.bin", "64mapempty.bin", "64mapspreadvals.bin");
+
+ @Parameters(name = "seed = {0}, validationSeed = {1}")
+ protected static List