-
Notifications
You must be signed in to change notification settings - Fork 847
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Push-Based Parquet Reader #1605
Comments
All in all I like this proposal. Thank you for writing it down. I know it is not based on @jorgecarleitao https://github.com/jorgecarleitao/parquet2 but it is not a dissimilar API where the metadata reading / management is handled outside the main parquet decoding logic -- see the example. I see this similarity as a good sign. 👍 I think it is important to sketch out what the interface for existing users of the Maybe we can provide functions like the following for basic use to both ease migration and to demonstrate how to use this API: fn scan_file(file: impl ChunkReader) -> Result<ParquetRecordBatchReader> {
} async fn async_scan_file(file: impl AsyncRead) -> Result<ParquetRecordBatchReader> {
// buffers / fetches whatever is needed locally to memory
}
What about offering an function from https://docs.rs/parquet/12.0.0/parquet/file/serialized_reader/struct.SerializedRowGroupReader.html /// Perform the scan returning a [`ParquetRecordBatchReader`]
pub fn execute_serialized<R: ChunkReader>(self, reader: R) -> Result<Iterator<Item=SerializedRowGroupReader>> {} |
Is the idea that the calling code would create a |
Adding some free functions to assist migration makes sense to me. It should be pretty much a drop-in replacement 👍 I am somewhat apprehensive about providing an async version, as the whole intent is to let users handle what level of buffering/pre-fetch makes sense for their use case, but I guess with sufficient disclaimers... As for SerializedRowGroupReader the challenge is the RowGroupReader trait exposes APIs for column projection, reading individual pages, etc... which then gets rather confusing if you have projection or predicate pushdown concepts on the Scan. I'd rather a clean break, than trying to shoehorn an existing API W.r.t to users constructing per-row group, entirely up to their use-case, if they want to they can, if they don't want to, they don't need to. It was more me trying to demonstrate the flexibility afforded, than prescribing anything |
👍 makes sense
Yeah I am thinking about the "I want to try out reading parquet from s3 just to see if it works" rather than "I am set on using parquet library and I am now trying to squeeze the most performance"
Right -- I agree -- I was just confirming my understanding of what would be possible with the API |
The desired IO decoupling has been achieved through AsyncFileReader and its integration with ParquetRecordBatchStream |
TLDR
This proposal will allow reading data from parquet files in a "streaming" fashion that reduces the IO required, the resources to locally buffer parquet files, and potentially reduces the latency of first data. It will help the usecase of reading parquet files from remote object stores such as AWS S3.
Problem
SerializedFileReader
is currently created with aChunkReader
which looks likeThe process for reading a file is then
SerializedFileReader::new
will callfooter::parse_metadata
parse_metadata
willChunkReader::get_read
with the final 64 KB byte range, and read this to a bufferChunkReader::get_read
to read the remainder of the footer, and read this to a bufferSerializedFileReader::get_row_iter
will return aRowIter
which for each row groupSerializedRowGroupReader::new
which willChunkReader::get_read
with the byte range of each column chunkThere are two major options to apply this to files in object storage
SerializedFileReader
ChunkReader::get_read
to a range request to object storageThe first option is problematic as it cannot use pruning logic to reduce the amount of data fetched from object storage.
The second option runs into two problems:
Proposal
I would like to decouple the parquet reader entirely from IO concerns, allowing downstreams complete freedom to decide how they want to handle this. This will allow the reader to support a wide variety of potentially data access patterns:
Footer Decode
Introduce functions to assist parsing the parquet metadata
This will allow callers to obtain
ParquetMetaData
regardless of how they choose to fetch the corresponding bytesScanBuilder / Scan
Next introduce a
ScanBuilder
and accompanyingScan
.Where
ParquetRecordBatchReader
is the same type returned by the currentParquetFileArrowReader::get_record_reader
, and is just anIterator<Item=ArrowResult<RecordBatch>>
with aSchema
.This design will only support the arrow use-case, but I couldn't see an easy way to add this at a lower level without introducing API inconsistencies when not scanning the entire file
Deprecate Async
Once implemented, I would propose deprecating and then removing the async API added by #1154
Alternatives Considered
#1154 added an async reader that uses the
AsyncRead
andAsyncSeek
traits to read individual column chunks into memory from an async source. This is the approach taken by arrow2, with its range_reader abstraction. This was not found to perform particularly well (#1473).#1473 proposed an async reader with prefetch functionality, and was also suggested by @alamb in apache/datafusion#2205 (comment). This is similar to the new FSDataInputStream vectored IO API in the Hadoop ecosystem. This was implemented in #1509 and found to perform better, but still represented a non-trivial performance regression on local files.
Additional Context
The motivating discussion for this issue can be found apache/datafusion#2205
@mateuszkj clearly documented the limitations of the current API datafusion-contrib/datafusion-objectstore-s3#53
The text was updated successfully, but these errors were encountered: