Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Push-Based Parquet Reader #1605

Closed
tustvold opened this issue Apr 22, 2022 · 5 comments
Closed

Push-Based Parquet Reader #1605

tustvold opened this issue Apr 22, 2022 · 5 comments
Assignees
Labels
enhancement Any new improvement worthy of a entry in the changelog parquet Changes to the parquet crate

Comments

@tustvold
Copy link
Contributor

tustvold commented Apr 22, 2022

TLDR

This proposal will allow reading data from parquet files in a "streaming" fashion that reduces the IO required, the resources to locally buffer parquet files, and potentially reduces the latency of first data. It will help the usecase of reading parquet files from remote object stores such as AWS S3.

Problem

SerializedFileReader is currently created with a ChunkReader which looks like

pub trait ChunkReader: Length + Send + Sync {
    type T: Read + Send;
    /// get a serialy readeable slice of the current reader
    /// This should fail if the slice exceeds the current bounds
    fn get_read(&self, start: u64, length: usize) -> Result<Self::T>;
}

The process for reading a file is then

  • SerializedFileReader::new will call footer::parse_metadata
    • parse_metadata will
      • Call ChunkReader::get_read with the final 64 KB byte range, and read this to a buffer
      • Determine the footer length
      • Potentially call ChunkReader::get_read to read the remainder of the footer, and read this to a buffer
  • SerializedFileReader::get_row_iter will return a RowIter which for each row group
  • Call SerializedRowGroupReader::new which will
    • Call ChunkReader::get_read with the byte range of each column chunk

There are two major options to apply this to files in object storage

  1. Fetch the entire file to local disk or memory and pass it to SerializedFileReader
  2. Convert ChunkReader::get_read to a range request to object storage

The first option is problematic as it cannot use pruning logic to reduce the amount of data fetched from object storage.

The second option runs into two problems:

  1. The interface is not async and blocking a thread on network IO is not ideal
  2. Lots of small range requests per file adding cost and latency

Proposal

I would like to decouple the parquet reader entirely from IO concerns, allowing downstreams complete freedom to decide how they want to handle this. This will allow the reader to support a wide variety of potentially data access patterns:

  • Sync/Async Disk IO
  • Sync/Async Network IO
  • In-memory/mmapped parquet files
  • Interleaved row group decode with fetching the next row group

Footer Decode

Introduce functions to assist parsing the parquet metadata

/// Parses the 8-bytes parquet footer and returns the length of the metadata section
pub fn parse_footer(footer: [u8; 8]) -> Result<usize> {}

/// Parse metadata payload
pub fn parse_metadata(metadata: &[u8]) -> Result<ParquetMetaData> {}

This will allow callers to obtain ParquetMetaData regardless of how they choose to fetch the corresponding bytes

ScanBuilder / Scan

Next introduce a ScanBuilder and accompanying Scan.

/// Build a [`Scan`]
///
/// Eventually this will support predicate pushdown (#1191)
pub struct ScanBuilder {}

impl ScanBuilder {
  pub fn new(metadata: Arc<ParquetMetaData>) -> Self {}
  
  pub fn with_projection(self, projection: Vec<usize>) -> Self {}
  
  pub fn with_row_groups(self, groups: Vec<usize>) -> Self {}
  
  pub fn with_range(self, range: Range<usize>) -> Self {}
  
  pub fn build(self) -> Scan {}
}

/// Identifies a portion of a file to read
pub struct Scan {}

impl Scan {
  /// Returns a list of byte ranges needed
  pub fn ranges(&self) -> &[Range<usize>] {}
  
  /// Perform the scan returning a [`ParquetRecordBatchReader`] 
  pub fn execute<R: ChunkReader>(self, reader: R) -> Result<ParquetRecordBatchReader> {}
}

Where ParquetRecordBatchReader is the same type returned by the current ParquetFileArrowReader::get_record_reader, and is just an Iterator<Item=ArrowResult<RecordBatch>> with a Schema.

This design will only support the arrow use-case, but I couldn't see an easy way to add this at a lower level without introducing API inconsistencies when not scanning the entire file

Deprecate Async

Once implemented, I would propose deprecating and then removing the async API added by #1154

Alternatives Considered

#1154 added an async reader that uses the AsyncRead and AsyncSeek traits to read individual column chunks into memory from an async source. This is the approach taken by arrow2, with its range_reader abstraction. This was not found to perform particularly well (#1473).

#1473 proposed an async reader with prefetch functionality, and was also suggested by @alamb in apache/datafusion#2205 (comment). This is similar to the new FSDataInputStream vectored IO API in the Hadoop ecosystem. This was implemented in #1509 and found to perform better, but still represented a non-trivial performance regression on local files.

Additional Context

The motivating discussion for this issue can be found apache/datafusion#2205

@mateuszkj clearly documented the limitations of the current API datafusion-contrib/datafusion-objectstore-s3#53

@tustvold tustvold added the enhancement Any new improvement worthy of a entry in the changelog label Apr 22, 2022
@alamb
Copy link
Contributor

alamb commented Apr 24, 2022

All in all I like this proposal. Thank you for writing it down. I know it is not based on @jorgecarleitao https://github.com/jorgecarleitao/parquet2 but it is not a dissimilar API where the metadata reading / management is handled outside the main parquet decoding logic -- see the example. I see this similarity as a good sign. 👍

I think it is important to sketch out what the interface for existing users of the ParquetRecordBatchReader would be. Not just for helping migration, but to ensure that all use cases are satisfied (I am happy to help with this).

Maybe we can provide functions like the following for basic use to both ease migration and to demonstrate how to use this API:

fn scan_file(file: impl ChunkReader) -> Result<ParquetRecordBatchReader> {

}
async fn async_scan_file(file: impl AsyncRead) -> Result<ParquetRecordBatchReader> {
  // buffers / fetches whatever is needed locally to memory

}

This design will only support the arrow use-case, but I couldn't see an easy way to add this at a lower level without introducing API inconsistencies when not scanning the entire file

What about offering an function from Scan that sends back SerializedRowGroupReader?

https://docs.rs/parquet/12.0.0/parquet/file/serialized_reader/struct.SerializedRowGroupReader.html

  /// Perform the scan returning a [`ParquetRecordBatchReader`] 
  pub fn execute_serialized<R: ChunkReader>(self, reader: R) -> Result<Iterator<Item=SerializedRowGroupReader>> {}

@alamb
Copy link
Contributor

alamb commented Apr 24, 2022

Interleaved row group decode with fetching the next row group

Is the idea that the calling code would create a Scan for each row group and the execute those Scans in whatever order was desired?

@tustvold
Copy link
Contributor Author

tustvold commented Apr 24, 2022

Adding some free functions to assist migration makes sense to me. It should be pretty much a drop-in replacement 👍

I am somewhat apprehensive about providing an async version, as the whole intent is to let users handle what level of buffering/pre-fetch makes sense for their use case, but I guess with sufficient disclaimers...

As for SerializedRowGroupReader the challenge is the RowGroupReader trait exposes APIs for column projection, reading individual pages, etc... which then gets rather confusing if you have projection or predicate pushdown concepts on the Scan. I'd rather a clean break, than trying to shoehorn an existing API

W.r.t to users constructing per-row group, entirely up to their use-case, if they want to they can, if they don't want to, they don't need to. It was more me trying to demonstrate the flexibility afforded, than prescribing anything

@alamb
Copy link
Contributor

alamb commented Apr 24, 2022

As for SerializedRowGroupReader the challenge is the RowGroupReader trait exposes APIs for column projection, re

👍 makes sense

I am somewhat apprehensive about providing an async version, as the whole intent is to let users handle what level of buffering/pre-fetch makes sense for their use case, but I guess with sufficient disclaimers...

Yeah I am thinking about the "I want to try out reading parquet from s3 just to see if it works" rather than "I am set on using parquet library and I am now trying to squeeze the most performance"

W.r.t to users constructing per-row group, entirely up to their use-case, if they want to they can, if they don't want to, they don't need to. It was more me trying to demonstrate the flexibility afforded, than prescribing anything

Right -- I agree -- I was just confirming my understanding of what would be possible with the API

@tustvold
Copy link
Contributor Author

tustvold commented Oct 6, 2022

The desired IO decoupling has been achieved through AsyncFileReader and its integration with ParquetRecordBatchStream

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Any new improvement worthy of a entry in the changelog parquet Changes to the parquet crate
Projects
None yet
Development

No branches or pull requests

2 participants