Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Push ChunkReader into SerializedPageReader (#2463) #2464

Merged
merged 1 commit into from
Aug 17, 2022

Conversation

tustvold
Copy link
Contributor

Which issue does this PR close?

Closes #2463

Rationale for this change

This pushes ChunkReader into SerializedPageReader. This not only simplifies the code, but will allow removing the InMemoryColumnChunkReader in async_reader.rs and implementing page skipping the absence of a PageIndex #2460

What changes are included in this PR?

Are there any user-facing changes?

This changes the interface of SerializedPageReader, which is public, albeit a very low-level API

@tustvold tustvold added the api-change Changes to the arrow API label Aug 16, 2022
@github-actions github-actions bot added the parquet Changes to the parquet crate label Aug 16, 2022
let row_count = calculate_row_count(
offset_index,
*seen_num_data_pages,
self.total_num_values,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was actually previously incorrect, it was using total_num_values, instead of total_row_count. We don't currently support skipping with lists, #2122 but this would cause issues if we did

/// Reads a [`PageHeader`] from the provided [`Read`] returning the number of bytes read
fn read_page_header_len<T: Read>(input: &mut T) -> Result<(usize, PageHeader)> {
/// A wrapper around a [`std::io::Read`] that keeps track of the bytes read
struct TrackedRead<R> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of ugly, but the length of the header isn't stored anywhere

Ok(())
}
match &mut self.state {
SerializedPageReaderState::Values {..} =>{ Err(general_err!("Must set page_offset_index when using skip_next_page in SerializedPageReader.")) },
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should now be possible to read the header, and then skip by just incrementing the offset @Ted-Jiang

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool ! 👍

.build()
.unwrap();

let desc = ColumnDescriptor::new(Arc::new(t), 0, 0, ColumnPath::new(vec![]));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is somewhat cumbersome, but I opted to take ColumnChunkMetadata in the constructor as it is more future-proof, not to mention slightly more obvious what the values are. In particular it avoids ambiguity over whether values includes nulls (it does).


let page_len = front.compressed_page_size as usize;

// TODO: Add ChunkReader get_bytes to potentially avoid copy
Copy link
Contributor Author

@tustvold tustvold Aug 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what will allow removing InMemoryRowGroupReader and just using SerializedPageReader. I intend to do this as a follow up

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean InMemoryColumnChunkReader?

If the idea that we make InMemoryColumnChunk impl ChunkReader?

Copy link
Contributor Author

@tustvold tustvold Aug 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could just replace InMemoryColumnChunk with SerializedPageReader<Bytes>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, I was working on #2426. If you want I can rebase onto this branch and do that as part of that ticket

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be fantastic if you could 👍

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks reasonable to me but I have not been down in this code for a while. I would defer to @thinkharderdev @Ted-Jiang @nevi-me or @sunchao

Thanks for pushing this along @tustvold 👍

7,
Compression::UNCOMPRESSED,
Type::INT32,
Arc::new(Bytes::from(buf)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought Bytes were already ref counted -- is there any need to wrap this in an additional Arc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the signature needs Arc<T: ChunkReader> because ChunkReader doesn't impl Clone

// The file source buffer which references exactly the bytes for the column trunk
// to be read by this page reader.
buf: SerializedPages<T>,
pub struct SerializedPageReader<R: ChunkReader> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a non trivial change, right? To make this different than std::io::Read? But on the other hand the SerializedFileReader isn't changed -- https://docs.rs/parquet/20.0.0/parquet/file/index.html#example-of-reading-an-existing-file

Maybe to ease the transition we can add some docstring example showing how to create a ChunkReader from a std::io::Read? Or maybe it doesn't matter.

I think we should be sensitive and over communicate a change like this in the SerializedPageReader.

@zeevm do you have any comments on this change (especially in regards to your comments on #2394 (comment))?

Copy link
Contributor Author

@tustvold tustvold Aug 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a non trivial change, right?

It brings SerializedPageReader into line with all the other readers, so I'm not really sure I agree that this is a major change. I suspect almost all users are using RowGroupReader::get_column_page_reader and not calling this constructor. Tbh I'm not entirely sure why this method is even public... Perhaps I should take the opportunity to make it crate private as part of slowly reducing the amount of implementation detail that leaks out of the crate?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps I should take the opportunity to make it crate private as part of slowly reducing the amount of implementation detail that leaks out of the crate?

github codesearch https://cs.github.com/?q=SerializedPageReader%20language%3ARust&scopeName=All%20repos&scope= seems to suggest that most of the uses of this structure are forks of the arrow-rs codebase in various states of divergence.

Not that github codesearch would find all the possible issues, but it is a good sanity check that this isn't widel used

Copy link
Contributor

@thinkharderdev thinkharderdev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Ok(())
}
match &mut self.state {
SerializedPageReaderState::Values {..} =>{ Err(general_err!("Must set page_offset_index when using skip_next_page in SerializedPageReader.")) },
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool ! 👍

Ok(result)
}
let decompressor = create_codec(meta.compression())?;
let (start, len) = meta.byte_range();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i find

  /// Returns the offset and length in bytes of the column chunk within the file
    pub fn byte_range(&self) -> (u64, u64) {
        let col_start = match self.dictionary_page_offset() {
            Some(dictionary_page_offset) => dictionary_page_offset,
            None => self.data_page_offset(),
        };
        let col_len = self.compressed_size();
        assert!(
            col_start >= 0 && col_len >= 0,
            "column start and length should not be negative"
        );
        (col_start as u64, col_len as u64)
    }

It may return the dictionary_page_start, which is not right in checking wether is dictPage

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is intentional, we want it to return the dictionary_page_start if there is one as we use this to construct the dictionary page location.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh! forgot there is no dict_offset in locations 😂

Copy link
Member

@Ted-Jiang Ted-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! @tustvold Thanks a lot 😊 !
This change simplifies the code and help me implementing the skip row without pageIndex in SerializedPageReader.
I think the situation in skip row without pageIndex, is not coverd in fuzz test. I will working one this file one issue #2475

Ok(result)
}
let decompressor = create_codec(meta.compression())?;
let (start, len) = meta.byte_range();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not correct, as it will potentially miss the dictionary page. I originally tried to use the presence of the dictionary_page_offset to infer the existence of a dictionary page, but this isn't consistently set. In particular "alltypes_tiny_pages.parquet" does not set the dictionary page offset, and yet the first page is a dictionary page

@tustvold tustvold merged commit 63ab69e into apache:master Aug 17, 2022
@ursabot
Copy link

ursabot commented Aug 17, 2022

Benchmark runs are scheduled for baseline = ecc6210 and contender = 63ab69e. 63ab69e is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-rs-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-rs-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-rs-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-rs-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

@alamb alamb changed the title Push ChunkReader into SerializedPageReader (#2463) Push ChunkReader into SerializedPageReader (#2463) Aug 17, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api-change Changes to the arrow API parquet Changes to the parquet crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Push ChunkReader into SerializedPageReader
5 participants