Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement faster arrow array reader #384

Merged
merged 8 commits into from
Jun 10, 2021

Conversation

yordan-pavlov
Copy link
Contributor

Which issue does this PR close?

Closes #200.

Rationale for this change

This PR attempts to implement a new, more efficient and also more generic ArrowArrayReader, as a replacement to both the PrimitiveArrayReader and ComplexObjectArrayReader that exist today. The basic idea behind the new ArrowArrayReader
is to copy contiguous byte slices from parquet page buffers to arrow array buffers as directly as possible, while avoiding unnecessary memory allocation as much as possible. While for primitive types such as Int32, the performance improvements are small in most cases, for complex types such as strings the performance improvements can be significant (up to 6 times faster). See benchmark results below.

I did try initially to use iterators end-to-end as suggested by the linked issue, but this required a more complex and less efficient implementation which was ultimately slower. This is why in this PR iterators are only used to map parquet pages to implementations of the ValueDecoder trait trait which know how to read / decode byte slices for batches of values.

What changes are included in this PR?

This PR implements the new ArrowArrayReader and converters for strings and primitive types, but is only used / enabled for strings. The plan is to enable / use the new ArrowArrayReader for more types in subsequent PRs. Also note that ValueDecoders for only PLAIN and RLE_DICTIONARY encodings are currently implemented.

Are there any user-facing changes?

There are some non-breaking changes to MutableArrayData and SlicesIterator, @jorgecarleitao let me know what you think about those.

Here are the benchmark results:
read Int32Array, plain encoded, mandatory, no NULLs - old: time: [9.0238 us 9.1121 us 9.2100 us]
read Int32Array, plain encoded, mandatory, no NULLs - new: time: [6.9506 us 7.1606 us 7.4062 us]

read Int32Array, plain encoded, optional, no NULLs - old: time: [247.66 us 252.08 us 257.12 us]
read Int32Array, plain encoded, optional, no NULLs - new: time: [40.322 us 40.736 us 41.215 us]

read Int32Array, plain encoded, optional, half NULLs - old: time: [434.25 us 438.25 us 442.92 us]
read Int32Array, plain encoded, optional, half NULLs - new: time: [326.37 us 331.68 us 337.07 us]

read Int32Array, dictionary encoded, mandatory, no NULLs - old: time: [38.876 us 39.698 us 40.805 us]
read Int32Array, dictionary encoded, mandatory, no NULLs - new: time: [150.62 us 152.38 us 154.29 us]

read Int32Array, dictionary encoded, optional, no NULLs - old: time: [265.18 us 267.54 us 270.16 us]
read Int32Array, dictionary encoded, optional, no NULLs - new: time: [167.54 us 169.15 us 170.99 us]

read Int32Array, dictionary encoded, optional, half NULLs - old: time: [442.66 us 446.42 us 450.47 us]
read Int32Array, dictionary encoded, optional, half NULLs - new: time: [418.46 us 421.81 us 425.37 us]

read StringArray, plain encoded, mandatory, no NULLs - old: time: [1.6670 ms 1.6773 ms 1.6895 ms]
read StringArray, plain encoded, mandatory, no NULLs - new: time: [264.44 us 269.63 us 275.39 us]

read StringArray, plain encoded, optional, no NULLs - old: time: [1.8602 ms 1.8753 ms 1.8913 ms]
read StringArray, plain encoded, optional, no NULLs - new: time: [363.59 us 367.03 us 370.63 us]

read StringArray, plain encoded, optional, half NULLs - old: time: [1.5216 ms 1.5346 ms 1.5486 ms]
read StringArray, plain encoded, optional, half NULLs - new: time: [514.01 us 518.68 us 524.09 us]

read StringArray, dictionary encoded, mandatory, no NULLs - old: time: [1.4903 ms 1.5129 ms 1.5358 ms]
read StringArray, dictionary encoded, mandatory, no NULLs - new: time: [218.30 us 220.54 us 223.17 us]

read StringArray, dictionary encoded, optional, no NULLs - old: time: [1.5652 ms 1.5776 ms 1.5912 ms]
read StringArray, dictionary encoded, optional, no NULLs - new: time: [249.53 us 254.14 us 258.99 us]

read StringArray, dictionary encoded, optional, half NULLs - old: time: [1.3585 ms 1.3945 ms 1.4318 ms]
read StringArray, dictionary encoded, optional, half NULLs - new: time: [496.27 us 508.28 us 522.43 us]

@nevi-me @alamb @Dandandan let me know what you think.

@codecov-commenter
Copy link

codecov-commenter commented May 30, 2021

Codecov Report

Merging #384 (80a7984) into master (0c00776) will decrease coverage by 0.05%.
The diff coverage is 78.53%.

❗ Current head 80a7984 differs from pull request most recent head d5173db. Consider uploading reports for the commit d5173db to get more accurate results
Impacted file tree graph

@@            Coverage Diff             @@
##           master     #384      +/-   ##
==========================================
- Coverage   82.71%   82.65%   -0.06%     
==========================================
  Files         163      164       +1     
  Lines       44795    45468     +673     
==========================================
+ Hits        37051    37581     +530     
- Misses       7744     7887     +143     
Impacted Files Coverage Δ
parquet/src/arrow/record_reader.rs 93.44% <0.00%> (-0.54%) ⬇️
parquet/src/column/page.rs 98.68% <ø> (ø)
parquet/src/column/reader.rs 74.36% <0.00%> (-0.38%) ⬇️
parquet/src/errors.rs 18.51% <ø> (ø)
parquet/src/schema/types.rs 88.07% <ø> (ø)
parquet/src/util/memory.rs 91.03% <50.00%> (+1.46%) ⬆️
parquet/src/arrow/arrow_array_reader.rs 78.12% <78.12%> (ø)
arrow/src/compute/kernels/filter.rs 91.98% <90.00%> (+0.07%) ⬆️
parquet/src/util/test_common/page_util.rs 91.00% <90.00%> (-0.67%) ⬇️
arrow/src/array/transform/mod.rs 86.06% <90.47%> (-0.09%) ⬇️
... and 7 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 0c00776...d5173db. Read the comment docs.


// this operation is performed before iteration
// because it is fast and allows reserving all the needed memory
let filter_count = values.count_set_bits_offset(filter.offset(), filter.len());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this mean that the count is done multiple times now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good question @Dandandan , in my opinion calculating filter_count should not be done in the SlicesIterator because it's not used here. It's just a convenience for many of the clients of the SlicesIterator. Also having filter_count calculated in SlicesIterator::new is inflexible and in the use case of the new ArrowArrayReader would have meant that counting would be performed twice unnecessarily. That's why I have moved it to a filter_count() method instead - keep this convenience for users of SlicesIterator, but make it more flexible and allow more use-cases. Where I have had to change existing code, I was careful to only invoke filter_count() a single time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe adding a docstring to the new filter_count() would be good enough

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done - added docstring for filter_count()

@alamb
Copy link
Contributor

alamb commented Jun 1, 2021

Thanks @yordan-pavlov -- I will try and set time aside tomorrow to review this PR. Sorry for the delay

@nevi-me nevi-me self-requested a review June 2, 2021 00:44
Copy link
Contributor

@nevi-me nevi-me left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments from an initial review. No immediate action needed on them though

parquet/src/arrow/arrow_array_reader.rs Show resolved Hide resolved
parquet/src/arrow/arrow_array_reader.rs Outdated Show resolved Hide resolved
}
}

use arrow::datatypes::ArrowPrimitiveType;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reminder to move all imports to the top

@@ -506,6 +506,11 @@ impl ArrayDataBuilder {
self
}

pub fn null_count(mut self, null_count: usize) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're intentionally missing this function here because we were avoiding a situation where a user could specify a null count != the actual count in the null buffer. Is there a way of avoiding it @yordan-pavlov?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without this null_count method, count_set_bits_offset would be called unnecessarily (because we already know the null count) a second time in ArrayData::new when value_array_data: ArrayData is created

@@ -63,7 +63,7 @@ struct _MutableArrayData<'a> {
}

impl<'a> _MutableArrayData<'a> {
fn freeze(self, dictionary: Option<ArrayData>) -> ArrayData {
fn freeze(self, dictionary: Option<ArrayData>) -> ArrayDataBuilder {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jorgecarleitao are you fine with returning a builder here?

consume_source_item: fn(source_item: Source, state: &mut State) -> Target,
}

impl<Source, Target, State> UnzipIter<Source, Target, State>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the approach of unzipping the iterator into 3 iterators. My first pass review was to look at the implementation, but not yet the finer details.

This looks great, I like the approach; and I think it won't be difficult to implement it for lists.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went though this PR @yordan-pavlov really nice. 👍 👍

The high level approach looks clear and solid from my perspective. I had some minor structural things that would like to be be improved such as avoiding a new dependency in the parquet crate and removing commented out code, but I also wouldn't be opposed to merging this as is.

I think @nevi-me is the expert here. If he is cool with this approach then so am I 👍

cc @carols10cents since you and @shepmaster worked on the array reader a bit as well.

arrow/src/compute/kernels/filter.rs Outdated Show resolved Hide resolved

// this operation is performed before iteration
// because it is fast and allows reserving all the needed memory
let filter_count = values.count_set_bits_offset(filter.offset(), filter.len());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe adding a docstring to the new filter_count() would be good enough

@@ -45,6 +45,7 @@ arrow = { path = "../arrow", version = "5.0.0-SNAPSHOT", optional = true }
base64 = { version = "0.13", optional = true }
clap = { version = "2.33.3", optional = true }
serde_json = { version = "1.0", features = ["preserve_order"], optional = true }
rand = "0.8"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if a new dependency was not needed for the main crate (it seems like it is only needed for test_util -- so perhaps we could mark test_util as [#cfg(test)] or something -- though I suspect this doesn't actually add any new dependency

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also don't like having to add this new dependency, but couldn't get the benchmarks to compile without it; I am more than happy to remove or restrict if someone knows how

Copy link
Contributor

@alamb alamb Jun 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only way I can think of is to move test_util to a new crate (and then add it as a dev dependency)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will try this over the weekend

// primitive / int32 benchmarks
// =============================
let plain_int32_no_null_data = build_plain_encoded_int32_page_iterator(schema.clone(), mandatory_int32_column_desc.clone(), 0.0);
// group.bench_function("clone benchmark data", |b| b.iter(|| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason thus bench is commented out?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was curious what's the cost of just cloning the benchmark data; I left it commented out in case someone else is curious about this as well, but I am happy to remove it

parquet/benches/arrow_array_reader.rs Outdated Show resolved Hide resolved
Utf8Converter,
>::new(
page_iterator,
use crate::arrow::arrow_array_reader::{StringArrayConverter, ArrowArrayReader};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the change to move these use statements from the top of the module to here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the only reason for the local use statement is because currently ArrowArrayReader is (intentionally) only used here for strings; once it's used for more types it would make sense to move most / all of these use statements to the top.

@@ -22,6 +22,4 @@ pub mod bit_util;
mod bit_packing;
pub mod cursor;
pub mod hash_util;

#[cfg(test)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This chang means that test_common becomes part of the public parquet API

Was this needed to use test_common stuff in the benchmarks? Maybe it might make sense (as a follow on PR) to move test_common into its own (unpublished) crate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I had to make this change to make test_common::page_util::{InMemoryPageIterator, DataPageBuilderImpl, DataPageBuilder} available in the benchmark crate; I don't like making this public either, but haven't been able to find a way to only make it available to tests and benches; if anyone knows how this could be done I am more than happy to change it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have changed this to

pub(crate) mod test_common;
pub use self::test_common::page_util::{InMemoryPageIterator, DataPageBuilderImpl, DataPageBuilder};

in order to limit new public types to only InMemoryPageIterator, DataPageBuilderImpl, DataPageBuilder which are used in benchmarks. I noticed that this approach is already used here https://github.com/apache/arrow-rs/blob/master/parquet/src/lib.rs#L45 and thought this would be a much simpler solution compared to a new library crate.
@alamb let me know what you think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks reasonable to me. Thank you @yordan-pavlov

@alamb
Copy link
Contributor

alamb commented Jun 4, 2021

@nevi-me is this something you can take on reviewing / approving? I am not very familiar with this code -- it looked good to me but I don't feel super confident of approving it. However, if you don't have the time I will do the best I can

@nevi-me
Copy link
Contributor

nevi-me commented Jun 4, 2021

@nevi-me is this something you can take on reviewing / approving? I am not very familiar with this code -- it looked good to me but I don't feel super confident of approving it. However, if you don't have the time I will do the best I can

I'll complete my review over the weekend, I like the approach; the RefCell makes it feel complicated, but I appreciate why.

I don't think I'll have any major items to raise though

@alamb
Copy link
Contributor

alamb commented Jun 4, 2021

🙏 Thank you @nevi-me !

@alamb
Copy link
Contributor

alamb commented Jun 8, 2021

FYI I plan to make a release candidate for Arrow 4.3 on Thursday or Friday this week and release early next week. So if we want to try and get this PR into 4.3 that is the schedule.

It is large enough, however, that delaying until 4.4 and giving it some more bake time is not a bad idea either

@yordan-pavlov yordan-pavlov force-pushed the fast_arrow_array_reader branch from 04448d1 to d5173db Compare June 9, 2021 19:03
@yordan-pavlov
Copy link
Contributor Author

thanks for the heads up @alamb, I have rebased and cleaned up the code in preparation for merging, but still waiting for review by @nevi-me and @jorgecarleitao

Utf8Converter,
>::new(
page_iterator,
use crate::arrow::arrow_array_reader::{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please move these imports to the top of the file, for consistency

Copy link
Contributor

@nevi-me nevi-me left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yordan-pavlov @alamb I'm happy with merging this, so that we can follow up with documentation (and I'd like to try out list write support)

@alamb
Copy link
Contributor

alamb commented Jun 10, 2021

Merging this in and we can figure out if we want to try and put it into arrow 4.4.0. Thanks @yordan-pavlov !

@alamb
Copy link
Contributor

alamb commented Jun 10, 2021

And thanks @nevi-me for the epic review

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Use iterators to increase performance of creating Arrow arrays
5 participants