Skip to content

Commit

Permalink
impl lazily compute for null count when it is expansive to calc it ea…
Browse files Browse the repository at this point in the history
…gerly.
  • Loading branch information
Rachelint committed Jul 29, 2024
1 parent 0e99e3a commit f8a6d00
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 15 deletions.
84 changes: 71 additions & 13 deletions arrow-buffer/src/buffer/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,35 +15,66 @@
// specific language governing permissions and limitations
// under the License.

use std::sync::atomic::{AtomicI64, Ordering};

use crate::bit_iterator::{BitIndexIterator, BitIterator, BitSliceIterator};
use crate::buffer::BooleanBuffer;
use crate::{Buffer, MutableBuffer};

const UNINITIALIZED_NULL_COUNT: i64 = -1;

#[derive(Debug)]
pub enum NullCount {
Eager(usize),
Lazy(AtomicI64),
}

impl Clone for NullCount {
fn clone(&self) -> Self {
match self {
Self::Eager(v) => Self::Eager(*v),
Self::Lazy(v) => {
let v = v.load(Ordering::Relaxed);
Self::Lazy(AtomicI64::new(v))
}
}
}
}

/// A [`BooleanBuffer`] used to encode validity for arrow arrays
///
/// As per the [Arrow specification], array validity is encoded in a packed bitmask with a
/// `true` value indicating the corresponding slot is not null, and `false` indicating
/// that it is null.
///
/// [Arrow specification]: https://arrow.apache.org/docs/format/Columnar.html#validity-bitmaps
#[derive(Debug, Clone, Eq, PartialEq)]
#[derive(Debug, Clone)]
pub struct NullBuffer {
buffer: BooleanBuffer,
null_count: usize,
null_count: NullCount,
}

impl PartialEq for NullBuffer {
fn eq(&self, other: &Self) -> bool {
self.buffer == other.buffer
}
}

impl Eq for NullBuffer {}

impl NullBuffer {
/// Create a new [`NullBuffer`] computing the null count
pub fn new(buffer: BooleanBuffer) -> Self {
let null_count = buffer.len() - buffer.count_set_bits();
// Expensive to calc the null count, we should lazily compute it when
let null_count = NullCount::Lazy(AtomicI64::new(UNINITIALIZED_NULL_COUNT));
Self { buffer, null_count }
}

/// Create a new [`NullBuffer`] of length `len` where all values are null
pub fn new_null(len: usize) -> Self {
Self {
buffer: BooleanBuffer::new_unset(len),
null_count: len,
null_count: NullCount::Eager(len),
}
}

Expand All @@ -53,7 +84,7 @@ impl NullBuffer {
pub fn new_valid(len: usize) -> Self {
Self {
buffer: BooleanBuffer::new_set(len),
null_count: 0,
null_count: NullCount::Eager(0),
}
}

Expand All @@ -63,7 +94,10 @@ impl NullBuffer {
///
/// `buffer` must contain `null_count` `0` bits
pub unsafe fn new_unchecked(buffer: BooleanBuffer, null_count: usize) -> Self {
Self { buffer, null_count }
Self {
buffer,
null_count: NullCount::Eager(null_count),
}
}

/// Computes the union of the nulls in two optional [`NullBuffer`]
Expand All @@ -81,9 +115,12 @@ impl NullBuffer {

/// Returns true if all nulls in `other` also exist in self
pub fn contains(&self, other: &NullBuffer) -> bool {
if other.null_count == 0 {
return true;
if let NullCount::Eager(v) = &other.null_count {
if *v == 0 {
return true;
}
}

let lhs = self.inner().bit_chunks().iter_padded();
let rhs = other.inner().bit_chunks().iter_padded();
lhs.zip(rhs).all(|(l, r)| (l & !r) == 0)
Expand All @@ -106,9 +143,17 @@ impl NullBuffer {
crate::bit_util::set_bit(buffer.as_mut(), i * count + j)
}
}

let null_count = if let NullCount::Eager(v) = &self.null_count {
NullCount::Eager(v * count)
} else {
// TODO: not sure about if it is better to load the atomic and attempt to reuse the compute result
NullCount::Lazy(AtomicI64::new(UNINITIALIZED_NULL_COUNT))
};

Self {
buffer: BooleanBuffer::new(buffer.into(), 0, capacity),
null_count: self.null_count * count,
null_count,
}
}

Expand All @@ -131,9 +176,20 @@ impl NullBuffer {
}

/// Returns the null count for this [`NullBuffer`]
#[inline]
pub fn null_count(&self) -> usize {
self.null_count
match &self.null_count {
NullCount::Eager(v) => *v,
NullCount::Lazy(v) => {
let cached_null_count = v.load(Ordering::Acquire);
if cached_null_count != UNINITIALIZED_NULL_COUNT {
return cached_null_count as usize;
}

let computed_null_count = self.buffer.len() - self.buffer.count_set_bits();
v.store(computed_null_count as i64, Ordering::Release);
computed_null_count
}
}
}

/// Returns `true` if the value at `idx` is not null
Expand Down Expand Up @@ -189,8 +245,10 @@ impl NullBuffer {
&self,
f: F,
) -> Result<(), E> {
if self.null_count == self.len() {
return Ok(());
if let NullCount::Eager(v) = &self.null_count {
if *v == self.len() {
return Ok(());
}
}
self.valid_indices().try_for_each(f)
}
Expand Down
2 changes: 1 addition & 1 deletion parquet-testing
Submodule parquet-testing updated 32 files
+6 −368 data/README.md
+0 −1 data/_KEY_MATERIAL_FOR_external_key_material_java.parquet.encrypted.json
+ data/bloom_filter.xxhash.bin
+ data/byte_stream_split.zstd.parquet
+ data/byte_stream_split_extended.gzip.parquet
+ data/concatenated_gzip_members.parquet
+ data/data_index_bloom_encoding_with_length.parquet
+ data/datapage_v1-corrupt-checksum.parquet
+ data/datapage_v1-snappy-compressed-checksum.parquet
+ data/datapage_v1-uncompressed-checksum.parquet
+ data/delta_length_byte_array.parquet
+ data/encrypt_columns_and_footer.parquet.encrypted
+ data/encrypt_columns_and_footer_aad.parquet.encrypted
+ data/encrypt_columns_and_footer_ctr.parquet.encrypted
+ data/encrypt_columns_and_footer_disable_aad_storage.parquet.encrypted
+ data/encrypt_columns_plaintext_footer.parquet.encrypted
+ data/external_key_material_java.parquet.encrypted
+0 −73 data/fixed_length_byte_array.md
+ data/fixed_length_byte_array.parquet
+ data/float16_nonzeros_and_nans.parquet
+ data/float16_zeros_and_nans.parquet
+ data/incorrect_map_schema.parquet
+0 −73 data/int32_with_null_pages.md
+ data/int32_with_null_pages.parquet
+ data/large_string_map.brotli.parquet
+ data/nan_in_stats.parquet
+ data/overflow_i16_page_cnt.parquet
+ data/plain-dict-uncompressed-checksum.parquet
+ data/rle-dict-snappy-checksum.parquet
+ data/rle-dict-uncompressed-corrupt-checksum.parquet
+ data/rle_boolean_encoding.parquet
+ data/uniform_encryption.parquet.encrypted

0 comments on commit f8a6d00

Please sign in to comment.