diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index c56b1fd308cf..9d9ab0655a7e 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -684,7 +684,7 @@ mod tests { let record_batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap(); - assert_eq!(record_batch.get_array_memory_size(), 364); + assert_eq!(record_batch.get_array_memory_size(), 380); } fn check_batch(record_batch: RecordBatch, num_rows: usize) { diff --git a/arrow-buffer/src/buffer/null.rs b/arrow-buffer/src/buffer/null.rs index c79aef398059..7929f99a046d 100644 --- a/arrow-buffer/src/buffer/null.rs +++ b/arrow-buffer/src/buffer/null.rs @@ -15,10 +15,32 @@ // specific language governing permissions and limitations // under the License. +use std::sync::atomic::{AtomicI64, Ordering}; + use crate::bit_iterator::{BitIndexIterator, BitIterator, BitSliceIterator}; use crate::buffer::BooleanBuffer; use crate::{Buffer, MutableBuffer}; +const UNINITIALIZED_NULL_COUNT: i64 = -1; + +#[derive(Debug)] +pub enum NullCount { + Eager(usize), + Lazy(AtomicI64), +} + +impl Clone for NullCount { + fn clone(&self) -> Self { + match self { + Self::Eager(v) => Self::Eager(*v), + Self::Lazy(v) => { + let v = v.load(Ordering::Relaxed); + Self::Lazy(AtomicI64::new(v)) + } + } + } +} + /// A [`BooleanBuffer`] used to encode validity for arrow arrays /// /// As per the [Arrow specification], array validity is encoded in a packed bitmask with a @@ -26,16 +48,25 @@ use crate::{Buffer, MutableBuffer}; /// that it is null. /// /// [Arrow specification]: https://arrow.apache.org/docs/format/Columnar.html#validity-bitmaps -#[derive(Debug, Clone, Eq, PartialEq)] +#[derive(Debug, Clone)] pub struct NullBuffer { buffer: BooleanBuffer, - null_count: usize, + null_count: NullCount, +} + +impl PartialEq for NullBuffer { + fn eq(&self, other: &Self) -> bool { + self.buffer == other.buffer + } } +impl Eq for NullBuffer {} + impl NullBuffer { /// Create a new [`NullBuffer`] computing the null count pub fn new(buffer: BooleanBuffer) -> Self { - let null_count = buffer.len() - buffer.count_set_bits(); + // Expensive to calc the null count, we should lazily compute it when it is really needed + let null_count = NullCount::Lazy(AtomicI64::new(UNINITIALIZED_NULL_COUNT)); Self { buffer, null_count } } @@ -43,7 +74,7 @@ impl NullBuffer { pub fn new_null(len: usize) -> Self { Self { buffer: BooleanBuffer::new_unset(len), - null_count: len, + null_count: NullCount::Eager(len), } } @@ -53,7 +84,7 @@ impl NullBuffer { pub fn new_valid(len: usize) -> Self { Self { buffer: BooleanBuffer::new_set(len), - null_count: 0, + null_count: NullCount::Eager(0), } } @@ -63,7 +94,10 @@ impl NullBuffer { /// /// `buffer` must contain `null_count` `0` bits pub unsafe fn new_unchecked(buffer: BooleanBuffer, null_count: usize) -> Self { - Self { buffer, null_count } + Self { + buffer, + null_count: NullCount::Eager(null_count), + } } /// Computes the union of the nulls in two optional [`NullBuffer`] @@ -81,9 +115,12 @@ impl NullBuffer { /// Returns true if all nulls in `other` also exist in self pub fn contains(&self, other: &NullBuffer) -> bool { - if other.null_count == 0 { - return true; + if let NullCount::Eager(v) = &other.null_count { + if *v == 0 { + return true; + } } + let lhs = self.inner().bit_chunks().iter_padded(); let rhs = other.inner().bit_chunks().iter_padded(); lhs.zip(rhs).all(|(l, r)| (l & !r) == 0) @@ -106,9 +143,17 @@ impl NullBuffer { crate::bit_util::set_bit(buffer.as_mut(), i * count + j) } } + + let null_count = if let NullCount::Eager(v) = &self.null_count { + NullCount::Eager(v * count) + } else { + // TODO: not sure about if it is better to load the atomic and attempt to reuse the compute result + NullCount::Lazy(AtomicI64::new(UNINITIALIZED_NULL_COUNT)) + }; + Self { buffer: BooleanBuffer::new(buffer.into(), 0, capacity), - null_count: self.null_count * count, + null_count, } } @@ -131,9 +176,20 @@ impl NullBuffer { } /// Returns the null count for this [`NullBuffer`] - #[inline] pub fn null_count(&self) -> usize { - self.null_count + match &self.null_count { + NullCount::Eager(v) => *v, + NullCount::Lazy(v) => { + let cached_null_count = v.load(Ordering::Acquire); + if cached_null_count != UNINITIALIZED_NULL_COUNT { + return cached_null_count as usize; + } + + let computed_null_count = self.buffer.len() - self.buffer.count_set_bits(); + v.store(computed_null_count as i64, Ordering::Release); + computed_null_count + } + } } /// Returns `true` if the value at `idx` is not null @@ -189,8 +245,10 @@ impl NullBuffer { &self, f: F, ) -> Result<(), E> { - if self.null_count == self.len() { - return Ok(()); + if let NullCount::Eager(v) = &self.null_count { + if *v == self.len() { + return Ok(()); + } } self.valid_indices().try_for_each(f) }