-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark: Read DVs when reading from .position_deletes table #11657
Conversation
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
Outdated
Show resolved
Hide resolved
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/DVIterable.java
Outdated
Show resolved
Hide resolved
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/DVIterable.java
Outdated
Show resolved
Hide resolved
b347427
to
cd35ea5
Compare
b79a7da
to
2512b5f
Compare
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/DVIterable.java
Outdated
Show resolved
Hide resolved
|
||
@Override | ||
public CloseableIterator<InternalRow> iterator() { | ||
PuffinReader reader = builder.build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[optional] might be too much, but can we have one reader per DV file ? considering specifically for this use case we will have to read all the blobs in the DV file eventually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you have a particular use case in mind as this isn't something that is being needed currently when reading the PositionDeletesTable
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd argue we rarely need to read the entire DV file as not all DVs may be still valid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, and makes sense to not go this route, was mostly coming from the case that we need to read more than 1 blob in a puffin DV file in that case it might be better to reuse the reader.
9a47998
to
3e1bafe
Compare
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/DVIterable.java
Outdated
Show resolved
Hide resolved
import org.junit.jupiter.api.io.TempDir; | ||
|
||
@ExtendWith(ParameterizedTestExtension.class) | ||
public class TestPositionDeletesReader extends TestBase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also test reading DVs end-to-end by quering the position_deletes
metadata table in Spark? I think we can commit DVs from Java and then read it from Spark, as we don't have DVs in Spark right now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
Outdated
Show resolved
Hide resolved
|
||
@Override | ||
public CloseableIterator<InternalRow> iterator() { | ||
PuffinReader reader = builder.build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd argue we rarely need to read the entire DV file as not all DVs may be still valid.
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
Show resolved
Hide resolved
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/DVIterable.java
Outdated
Show resolved
Hide resolved
710febe
to
de6e0da
Compare
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/DVIterator.java
Outdated
Show resolved
Hide resolved
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/DVIterator.java
Outdated
Show resolved
Hide resolved
9774d18
to
d633591
Compare
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/DVIterator.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
this.row = new GenericInternalRow(rowValues.toArray()); | ||
} else if (null != deletedPositionIndex) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: Do we actually need null != deletedPositionIndex
? I think it is the first invocation and we need to initialize the row or we need to update the position. Shouldn't this fail if the index is still null to indicate something went wrong?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having a case where deletedPositionIndex
is null
is still a valid case IMO. This would be true if a user doesn't project the pos
column
d633591
to
25eb30a
Compare
this is part of #11122 and has been extracted from #11545