Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement faster arrow array reader #384

Merged
merged 8 commits into from
Jun 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions arrow/src/array/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,11 @@ impl ArrayDataBuilder {
self
}

pub fn null_count(mut self, null_count: usize) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're intentionally missing this function here because we were avoiding a situation where a user could specify a null count != the actual count in the null buffer. Is there a way of avoiding it @yordan-pavlov?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without this null_count method, count_set_bits_offset would be called unnecessarily (because we already know the null count) a second time in ArrayData::new when value_array_data: ArrayData is created

self.null_count = Some(null_count);
self
}

pub fn null_bit_buffer(mut self, buf: Buffer) -> Self {
self.null_bit_buffer = Some(buf);
self
Expand Down
66 changes: 49 additions & 17 deletions arrow/src/array/transform/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::mem;

use super::{
data::{into_buffers, new_buffers},
ArrayData,
ArrayData, ArrayDataBuilder,
};
use crate::array::StringOffsetSizeTrait;

Expand Down Expand Up @@ -63,7 +63,7 @@ struct _MutableArrayData<'a> {
}

impl<'a> _MutableArrayData<'a> {
fn freeze(self, dictionary: Option<ArrayData>) -> ArrayData {
fn freeze(self, dictionary: Option<ArrayData>) -> ArrayDataBuilder {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jorgecarleitao are you fine with returning a builder here?

let buffers = into_buffers(&self.data_type, self.buffer1, self.buffer2);

let child_data = match self.data_type {
Expand All @@ -76,19 +76,19 @@ impl<'a> _MutableArrayData<'a> {
child_data
}
};
ArrayData::new(
self.data_type,
self.len,
Some(self.null_count),
if self.null_count > 0 {
Some(self.null_buffer.into())
} else {
None
},
0,
buffers,
child_data,
)

let mut array_data_builder = ArrayDataBuilder::new(self.data_type)
.offset(0)
.len(self.len)
.null_count(self.null_count)
.buffers(buffers)
.child_data(child_data);
if self.null_count > 0 {
array_data_builder =
array_data_builder.null_bit_buffer(self.null_buffer.into());
}

array_data_builder
}
}

Expand Down Expand Up @@ -552,8 +552,13 @@ impl<'a> MutableArrayData<'a> {
.map(|array| build_extend_null_bits(array, use_nulls))
.collect();

let null_bytes = bit_util::ceil(array_capacity, 8);
let null_buffer = MutableBuffer::from_len_zeroed(null_bytes);
let null_buffer = if use_nulls {
let null_bytes = bit_util::ceil(array_capacity, 8);
MutableBuffer::from_len_zeroed(null_bytes)
} else {
// create 0 capacity mutable buffer with the intention that it won't be used
MutableBuffer::with_capacity(0)
};

let extend_values = match &data_type {
DataType::Dictionary(_, _) => {
Expand Down Expand Up @@ -605,13 +610,40 @@ impl<'a> MutableArrayData<'a> {

/// Extends this [MutableArrayData] with null elements, disregarding the bound arrays
pub fn extend_nulls(&mut self, len: usize) {
// TODO: null_buffer should probably be extended here as well
// otherwise is_valid() could later panic
// add test to confirm
self.data.null_count += len;
(self.extend_nulls)(&mut self.data, len);
self.data.len += len;
}

/// Returns the current length
#[inline]
pub fn len(&self) -> usize {
self.data.len
}

/// Returns true if len is 0
#[inline]
pub fn is_empty(&self) -> bool {
self.data.len == 0
}

/// Returns the current null count
#[inline]
pub fn null_count(&self) -> usize {
self.data.null_count
}

/// Creates a [ArrayData] from the pushed regions up to this point, consuming `self`.
pub fn freeze(self) -> ArrayData {
self.data.freeze(self.dictionary).build()
}

/// Creates a [ArrayDataBuilder] from the pushed regions up to this point, consuming `self`.
/// This is useful for extending the default behavior of MutableArrayData.
pub fn into_builder(self) -> ArrayDataBuilder {
self.data.freeze(self.dictionary)
}
}
Expand Down
4 changes: 2 additions & 2 deletions arrow/src/buffer/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,10 +327,10 @@ impl MutableBuffer {
}

/// Extends the buffer with a new item, without checking for sufficient capacity
/// Safety
/// # Safety
/// Caller must ensure that the capacity()-len()>=size_of<T>()
#[inline]
unsafe fn push_unchecked<T: ToByteSlice>(&mut self, item: T) {
pub unsafe fn push_unchecked<T: ToByteSlice>(&mut self, item: T) {
let additional = std::mem::size_of::<T>();
let dst = self.data.as_ptr().add(self.len) as *mut T;
std::ptr::write(dst, item);
Expand Down
32 changes: 17 additions & 15 deletions arrow/src/compute/kernels/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ enum State {
/// slots of a [BooleanArray] are true. Each interval corresponds to a contiguous region of memory to be
/// "taken" from an array to be filtered.
#[derive(Debug)]
pub(crate) struct SlicesIterator<'a> {
pub struct SlicesIterator<'a> {
iter: Enumerate<BitChunkIterator<'a>>,
state: State,
filter_count: usize,
filter: &'a BooleanArray,
remainder_mask: u64,
remainder_len: usize,
chunk_len: usize,
Expand All @@ -59,19 +59,14 @@ pub(crate) struct SlicesIterator<'a> {
}

impl<'a> SlicesIterator<'a> {
pub(crate) fn new(filter: &'a BooleanArray) -> Self {
pub fn new(filter: &'a BooleanArray) -> Self {
let values = &filter.data_ref().buffers()[0];

// this operation is performed before iteration
// because it is fast and allows reserving all the needed memory
let filter_count = values.count_set_bits_offset(filter.offset(), filter.len());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this mean that the count is done multiple times now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good question @Dandandan , in my opinion calculating filter_count should not be done in the SlicesIterator because it's not used here. It's just a convenience for many of the clients of the SlicesIterator. Also having filter_count calculated in SlicesIterator::new is inflexible and in the use case of the new ArrowArrayReader would have meant that counting would be performed twice unnecessarily. That's why I have moved it to a filter_count() method instead - keep this convenience for users of SlicesIterator, but make it more flexible and allow more use-cases. Where I have had to change existing code, I was careful to only invoke filter_count() a single time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe adding a docstring to the new filter_count() would be good enough

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done - added docstring for filter_count()


let chunks = values.bit_chunks(filter.offset(), filter.len());

Self {
iter: chunks.iter().enumerate(),
state: State::Chunks,
filter_count,
filter,
remainder_len: chunks.remainder_len(),
chunk_len: chunks.chunk_len(),
remainder_mask: chunks.remainder_bits(),
Expand All @@ -83,6 +78,12 @@ impl<'a> SlicesIterator<'a> {
}
}

/// Counts the number of set bits in the filter array.
fn filter_count(&self) -> usize {
let values = self.filter.values();
values.count_set_bits_offset(self.filter.offset(), self.filter.len())
}

#[inline]
fn current_start(&self) -> usize {
self.current_chunk * 64 + self.current_bit
Expand Down Expand Up @@ -193,7 +194,7 @@ impl<'a> Iterator for SlicesIterator<'a> {
/// Therefore, it is considered undefined behavior to pass `filter` with null values.
pub fn build_filter(filter: &BooleanArray) -> Result<Filter> {
let iter = SlicesIterator::new(filter);
let filter_count = iter.filter_count;
let filter_count = iter.filter_count();
let chunks = iter.collect::<Vec<_>>();

Ok(Box::new(move |array: &ArrayData| {
Expand Down Expand Up @@ -253,7 +254,8 @@ pub fn filter(array: &Array, predicate: &BooleanArray) -> Result<ArrayRef> {
}

let iter = SlicesIterator::new(predicate);
match iter.filter_count {
let filter_count = iter.filter_count();
match filter_count {
0 => {
// return empty
Ok(new_empty_array(array.data_type()))
Expand All @@ -266,7 +268,7 @@ pub fn filter(array: &Array, predicate: &BooleanArray) -> Result<ArrayRef> {
_ => {
// actually filter
let mut mutable =
MutableArrayData::new(vec![array.data_ref()], false, iter.filter_count);
MutableArrayData::new(vec![array.data_ref()], false, filter_count);
iter.for_each(|(start, end)| mutable.extend(0, start, end));
let data = mutable.freeze();
Ok(make_array(data))
Expand Down Expand Up @@ -599,7 +601,7 @@ mod tests {
let filter = BooleanArray::from(filter_values);

let iter = SlicesIterator::new(&filter);
let filter_count = iter.filter_count;
let filter_count = iter.filter_count();
let chunks = iter.collect::<Vec<_>>();

assert_eq!(chunks, vec![(1, 2)]);
Expand All @@ -612,7 +614,7 @@ mod tests {
let filter = BooleanArray::from(filter_values);

let iter = SlicesIterator::new(&filter);
let filter_count = iter.filter_count;
let filter_count = iter.filter_count();
let chunks = iter.collect::<Vec<_>>();

assert_eq!(chunks, vec![(0, 1), (2, 64)]);
Expand All @@ -625,7 +627,7 @@ mod tests {
let filter = BooleanArray::from(filter_values);

let iter = SlicesIterator::new(&filter);
let filter_count = iter.filter_count;
let filter_count = iter.filter_count();
let chunks = iter.collect::<Vec<_>>();

assert_eq!(chunks, vec![(1, 62), (63, 124), (125, 130)]);
Expand Down
5 changes: 5 additions & 0 deletions parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ arrow = { path = "../arrow", version = "5.0.0-SNAPSHOT", optional = true }
base64 = { version = "0.13", optional = true }
clap = { version = "2.33.3", optional = true }
serde_json = { version = "1.0", features = ["preserve_order"], optional = true }
rand = "0.8"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if a new dependency was not needed for the main crate (it seems like it is only needed for test_util -- so perhaps we could mark test_util as [#cfg(test)] or something -- though I suspect this doesn't actually add any new dependency

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also don't like having to add this new dependency, but couldn't get the benchmarks to compile without it; I am more than happy to remove or restrict if someone knows how

Copy link
Contributor

@alamb alamb Jun 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only way I can think of is to move test_util to a new crate (and then add it as a dev dependency)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will try this over the weekend


[dev-dependencies]
criterion = "0.3"
Expand Down Expand Up @@ -76,3 +77,7 @@ required-features = ["cli"]
[[bench]]
name = "arrow_writer"
harness = false

[[bench]]
name = "arrow_array_reader"
harness = false
Loading