Skip to content

Commit

Permalink
Spark: Read DVs when reading from .position_deletes table
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra committed Nov 27, 2024
1 parent 38d054e commit b347427
Show file tree
Hide file tree
Showing 3 changed files with 418 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark.source;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.deletes.PositionDeleteIndex;
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.puffin.BlobMetadata;
import org.apache.iceberg.puffin.Puffin;
import org.apache.iceberg.puffin.PuffinReader;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.Pair;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.unsafe.types.UTF8String;

class DVIterable extends CloseableGroup implements CloseableIterable<InternalRow> {
private final Puffin.ReadBuilder builder;
private final PartitionSpec spec;
private final DeleteFile deleteFile;
private final Schema projection;

DVIterable(InputFile inputFile, DeleteFile deleteFile, PartitionSpec spec, Schema projection) {
this.deleteFile = deleteFile;
this.builder = Puffin.read(inputFile);
this.spec = spec;
this.projection = projection;
}

@Override
public CloseableIterator<InternalRow> iterator() {
PuffinReader reader = builder.build();
addCloseable(reader);
return new DVIterator(reader);
}

private class DVIterator implements CloseableIterator<InternalRow> {
private final PuffinReader reader;
private Iterator<Long> positions = Collections.emptyIterator();
private InternalRow currentRow;
private int currentRowPositionIndex = -1;

DVIterator(PuffinReader reader) {
this.reader = reader;
try {
reader.fileMetadata().blobs().stream()
.filter(
blob ->
// read the correct blob for the referenced data file
Objects.equals(
deleteFile.referencedDataFile(),
blob.properties().get("referenced-data-file")))
.findFirst()
.ifPresent(
blob -> {
// there should only be a single element
Pair<BlobMetadata, ByteBuffer> current =
Iterables.getOnlyElement(reader.readAll(ImmutableList.of(blob)));
List<Long> pos = Lists.newArrayList();
PositionDeleteIndex.deserialize(current.second().array(), deleteFile)
.forEach(pos::add);
this.positions = pos.iterator();
});
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
public boolean hasNext() {
return positions.hasNext();
}

@Override
public InternalRow next() {
return row(positions.next());
}

private InternalRow row(long deletedPosition) {
if (null == currentRow) {
List<Object> values = Lists.newArrayList();
if (null != projection.findField(MetadataColumns.DELETE_FILE_PATH.fieldId())) {
values.add(UTF8String.fromString(deleteFile.referencedDataFile()));
}

if (null != projection.findField(MetadataColumns.DELETE_FILE_POS.fieldId())) {
values.add(deletedPosition);
// remember the index within InternalRow where the deleted position needs to be set
this.currentRowPositionIndex = values.size() - 1;
}

if (null != projection.findField(MetadataColumns.DELETE_FILE_ROW_FIELD_ID)) {
// there's no info about deleted rows with DVs, so always return null
values.add(null);
}

if (null != projection.findField(MetadataColumns.PARTITION_COLUMN_ID)) {
StructInternalRow partition = new StructInternalRow(spec.partitionType());
partition.setStruct(deleteFile.partition());
values.add(partition);
}

if (null != projection.findField(MetadataColumns.SPEC_ID_COLUMN_ID)) {
values.add(deleteFile.specId());
}

if (null != projection.findField(MetadataColumns.FILE_PATH_COLUMN_ID)) {
values.add(UTF8String.fromString(deleteFile.location()));
}

this.currentRow = new GenericInternalRow(values.toArray());
} else if (currentRowPositionIndex >= 0) {
// only update the deleted position if necessary, everything else stays the same
this.currentRow.setLong(currentRowPositionIndex, deletedPosition);
}

return currentRow;
}

@Override
public void remove() {
throw new UnsupportedOperationException("Remove is not supported");
}

@Override
public void close() throws IOException {
if (null != reader) {
reader.close();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.spark.source;

import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -29,10 +30,13 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.ExpressionUtil;
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
import org.apache.iceberg.util.ContentFileUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.rdd.InputFileBlockHolder;
import org.apache.spark.sql.catalyst.InternalRow;
Expand All @@ -44,6 +48,7 @@ class PositionDeletesRowReader extends BaseRowReader<PositionDeletesScanTask>
implements PartitionReader<InternalRow> {

private static final Logger LOG = LoggerFactory.getLogger(PositionDeletesRowReader.class);
private final CloseableGroup closeableGroup = new CloseableGroup();

PositionDeletesRowReader(SparkInputPartition partition) {
this(
Expand Down Expand Up @@ -90,15 +95,23 @@ protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) {
ExpressionUtil.extractByIdInclusive(
task.residual(), expectedSchema(), caseSensitive(), Ints.toArray(nonConstantFieldIds));

return newIterable(
if (ContentFileUtil.isDV(task.file())) {
DVIterable dvIterable = new DVIterable(inputFile, task.file(), task.spec(), expectedSchema());
closeableGroup.addCloseable(dvIterable);
return dvIterable.iterator();
}

CloseableIterable<InternalRow> iterable =
newIterable(
inputFile,
task.file().format(),
task.start(),
task.length(),
residualWithoutConstants,
expectedSchema(),
idToConstant)
.iterator();
idToConstant);
closeableGroup.addCloseable(iterable);
return iterable.iterator();
}

private Set<Integer> nonConstantFieldIds(Map<Integer, ?> idToConstant) {
Expand All @@ -108,4 +121,9 @@ private Set<Integer> nonConstantFieldIds(Map<Integer, ?> idToConstant) {
.filter(id -> !idToConstant.containsKey(id))
.collect(Collectors.toSet());
}

@Override
public void close() throws IOException {
closeableGroup.close();
}
}
Loading

0 comments on commit b347427

Please sign in to comment.