diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnVectorWithFilter.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnVectorWithFilter.java index ab0d652321d3..edaaaeda2515 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnVectorWithFilter.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnVectorWithFilter.java @@ -18,78 +18,138 @@ */ package org.apache.iceberg.spark.data.vectorized; -import org.apache.iceberg.arrow.vectorized.VectorHolder; import org.apache.spark.sql.types.Decimal; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.vectorized.ColumnVector; import org.apache.spark.sql.vectorized.ColumnarArray; +import org.apache.spark.sql.vectorized.ColumnarMap; import org.apache.spark.unsafe.types.UTF8String; -public class ColumnVectorWithFilter extends IcebergArrowColumnVector { +/** + * A column vector implementation that applies row-level filtering. + * + *
This class wraps an existing column vector and uses a row ID mapping array to remap row
+ * indices during data access. Each method that retrieves data for a specific row translates the
+ * provided row index using the mapping array, effectively filtering the original data to only
+ * expose the live subset of rows. This approach allows efficient row-level filtering without
+ * modifying the underlying data.
+ */
+public class ColumnVectorWithFilter extends ColumnVector {
+ private final ColumnVector delegate;
private final int[] rowIdMapping;
+ private volatile ColumnVectorWithFilter[] children = null;
- public ColumnVectorWithFilter(VectorHolder holder, int[] rowIdMapping) {
- super(holder);
+ public ColumnVectorWithFilter(ColumnVector delegate, int[] rowIdMapping) {
+ super(delegate.dataType());
+ this.delegate = delegate;
this.rowIdMapping = rowIdMapping;
}
+ @Override
+ public void close() {
+ delegate.close();
+ }
+
+ @Override
+ public void closeIfFreeable() {
+ delegate.closeIfFreeable();
+ }
+
+ @Override
+ public boolean hasNull() {
+ return delegate.hasNull();
+ }
+
+ @Override
+ public int numNulls() {
+ // computing the actual number of nulls with rowIdMapping is expensive
+ // it is OK to overestimate and return the number of nulls in the original vector
+ return delegate.numNulls();
+ }
+
@Override
public boolean isNullAt(int rowId) {
- return nullabilityHolder().isNullAt(rowIdMapping[rowId]) == 1;
+ return delegate.isNullAt(rowIdMapping[rowId]);
}
@Override
public boolean getBoolean(int rowId) {
- return accessor().getBoolean(rowIdMapping[rowId]);
+ return delegate.getBoolean(rowIdMapping[rowId]);
+ }
+
+ @Override
+ public byte getByte(int rowId) {
+ return delegate.getByte(rowIdMapping[rowId]);
+ }
+
+ @Override
+ public short getShort(int rowId) {
+ return delegate.getShort(rowIdMapping[rowId]);
}
@Override
public int getInt(int rowId) {
- return accessor().getInt(rowIdMapping[rowId]);
+ return delegate.getInt(rowIdMapping[rowId]);
}
@Override
public long getLong(int rowId) {
- return accessor().getLong(rowIdMapping[rowId]);
+ return delegate.getLong(rowIdMapping[rowId]);
}
@Override
public float getFloat(int rowId) {
- return accessor().getFloat(rowIdMapping[rowId]);
+ return delegate.getFloat(rowIdMapping[rowId]);
}
@Override
public double getDouble(int rowId) {
- return accessor().getDouble(rowIdMapping[rowId]);
+ return delegate.getDouble(rowIdMapping[rowId]);
}
@Override
public ColumnarArray getArray(int rowId) {
- if (isNullAt(rowId)) {
- return null;
- }
- return accessor().getArray(rowIdMapping[rowId]);
+ return delegate.getArray(rowIdMapping[rowId]);
+ }
+
+ @Override
+ public ColumnarMap getMap(int rowId) {
+ return delegate.getMap(rowIdMapping[rowId]);
}
@Override
public Decimal getDecimal(int rowId, int precision, int scale) {
- if (isNullAt(rowId)) {
- return null;
- }
- return accessor().getDecimal(rowIdMapping[rowId], precision, scale);
+ return delegate.getDecimal(rowIdMapping[rowId], precision, scale);
}
@Override
public UTF8String getUTF8String(int rowId) {
- if (isNullAt(rowId)) {
- return null;
- }
- return accessor().getUTF8String(rowIdMapping[rowId]);
+ return delegate.getUTF8String(rowIdMapping[rowId]);
}
@Override
public byte[] getBinary(int rowId) {
- if (isNullAt(rowId)) {
- return null;
+ return delegate.getBinary(rowIdMapping[rowId]);
+ }
+
+ @Override
+ public ColumnVector getChild(int ordinal) {
+ if (children == null) {
+ synchronized (this) {
+ if (children == null) {
+ if (dataType() instanceof StructType) {
+ StructType structType = (StructType) dataType();
+ this.children = new ColumnVectorWithFilter[structType.length()];
+ for (int index = 0; index < structType.length(); index++) {
+ children[index] = new ColumnVectorWithFilter(delegate.getChild(index), rowIdMapping);
+ }
+ } else {
+ throw new UnsupportedOperationException("Unsupported nested type: " + dataType());
+ }
+ }
+ }
}
- return accessor().getBinary(rowIdMapping[rowId]);
+
+ return children[ordinal];
}
}
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java
index c65c24d02f59..2123939399cb 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java
@@ -94,43 +94,42 @@ private class ColumnBatchLoader {
}
ColumnarBatch loadDataToColumnBatch() {
- ColumnVector[] arrowColumnVectors = readDataToColumnVectors();
+ ColumnVector[] vectors = readDataToColumnVectors();
int numLiveRows = batchSize;
+
if (hasIsDeletedColumn) {
- boolean[] isDeleted =
- ColumnarBatchUtil.buildIsDeleted(
- arrowColumnVectors, deletes, rowStartPosInBatch, batchSize);
- for (int i = 0; i < arrowColumnVectors.length; i++) {
- ColumnVector vector = arrowColumnVectors[i];
+ boolean[] isDeleted = buildIsDeleted(vectors);
+ for (ColumnVector vector : vectors) {
if (vector instanceof DeletedColumnVector) {
((DeletedColumnVector) vector).setValue(isDeleted);
}
}
} else {
- Pair