Skip to content

Commit

Permalink
chore: lance-encoding test utility now properly aligns buffers (#3116)
Browse files Browse the repository at this point in the history
We align buffers in the file writer but we were not doing the same thing
in the test utility. This forced encodings to do extra copies. We remove
one such copy in this PR.

Closes #3115
  • Loading branch information
westonpace authored Nov 11, 2024
1 parent c237bcb commit 4ab270b
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 7 deletions.
12 changes: 10 additions & 2 deletions rust/lance-encoding/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ use crate::{
use hyperloglogplus::{HyperLogLog, HyperLogLogPlus};
use std::collections::hash_map::RandomState;

/// The minimum alignment for a page buffer. Writers must respect this.
pub const MIN_PAGE_BUFFER_ALIGNMENT: u64 = 8;

/// An encoded array
///
/// Maps to a single Arrow array
Expand Down Expand Up @@ -1299,9 +1302,14 @@ pub async fn encode_batch(
encoding_strategy: &dyn FieldEncodingStrategy,
options: &EncodingOptions,
) -> Result<EncodedBatch> {
if !is_pwr_two(options.buffer_alignment) || options.buffer_alignment < 8 {
if !is_pwr_two(options.buffer_alignment) || options.buffer_alignment < MIN_PAGE_BUFFER_ALIGNMENT
{
return Err(Error::InvalidInput {
source: "buffer_alignment must be a power of two and at least 8".into(),
source: format!(
"buffer_alignment must be a power of two and at least {}",
MIN_PAGE_BUFFER_ALIGNMENT
)
.into(),
location: location!(),
});
}
Expand Down
1 change: 0 additions & 1 deletion rust/lance-encoding/src/encodings/physical/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,6 @@ impl MiniBlockDecompressor for BinaryMiniBlockDecompressor {
// it has so assertion can not be done here and the caller of `decompress` must ensure `num_values` <= number of values in the chunk.
fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
assert!(data.len() >= 8);
let data = data.to_vec();
let offsets: &[u32] = try_cast_slice(&data)
.expect("casting buffer failed during BinaryMiniBlock decompression");

Expand Down
16 changes: 12 additions & 4 deletions rust/lance-encoding/src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use tokio::sync::mpsc::{self, UnboundedSender};

use lance_core::{
cache::{CapacityMode, FileMetadataCache},
utils::bit::pad_bytes,
Result,
};
use lance_datagen::{array, gen, ArrayGenerator, RowCount, Seed};
Expand All @@ -26,14 +27,15 @@ use crate::{
},
encoder::{
default_encoding_strategy, ColumnIndexSequence, EncodedColumn, EncodedPage,
EncodingOptions, FieldEncoder, OutOfLineBuffers,
EncodingOptions, FieldEncoder, OutOfLineBuffers, MIN_PAGE_BUFFER_ALIGNMENT,
},
repdef::RepDefBuilder,
version::LanceFileVersion,
EncodingsIo,
};

const MAX_PAGE_BYTES: u64 = 32 * 1024 * 1024;
const TEST_ALIGNMENT: usize = MIN_PAGE_BUFFER_ALIGNMENT as usize;

#[derive(Debug)]
pub(crate) struct SimulatedScheduler {
Expand Down Expand Up @@ -277,7 +279,7 @@ pub async fn check_round_trip_encoding_generated(
max_page_bytes: MAX_PAGE_BYTES,
cache_bytes_per_column: page_size,
keep_original_array: true,
buffer_alignment: 64,
buffer_alignment: MIN_PAGE_BUFFER_ALIGNMENT,
};
encoding_strategy
.create_field_encoder(
Expand Down Expand Up @@ -410,7 +412,7 @@ pub async fn check_round_trip_encoding_of_data(
cache_bytes_per_column: *page_size,
max_page_bytes: test_cases.get_max_page_size(),
keep_original_array: true,
buffer_alignment: 64,
buffer_alignment: MIN_PAGE_BUFFER_ALIGNMENT,
};
let encoder = encoding_strategy
.create_field_encoder(
Expand Down Expand Up @@ -443,11 +445,17 @@ impl SimulatedWriter {
let offset = self.encoded_data.len() as u64;
self.encoded_data.extend_from_slice(&buffer);
let size = self.encoded_data.len() as u64 - offset;
let pad_bytes = pad_bytes::<TEST_ALIGNMENT>(self.encoded_data.len());
self.encoded_data
.extend(std::iter::repeat(0).take(pad_bytes));
(offset, size)
}

fn write_lance_buffer(&mut self, buffer: LanceBuffer) {
self.encoded_data.extend_from_slice(&buffer);
let pad_bytes = pad_bytes::<TEST_ALIGNMENT>(self.encoded_data.len());
self.encoded_data
.extend(std::iter::repeat(0).take(pad_bytes));
}

fn write_page(&mut self, encoded_page: EncodedPage) {
Expand All @@ -471,7 +479,7 @@ impl SimulatedWriter {
}

fn new_external_buffers(&self) -> OutOfLineBuffers {
OutOfLineBuffers::new(self.encoded_data.len() as u64, /*buffer_alignment=*/ 1)
OutOfLineBuffers::new(self.encoded_data.len() as u64, MIN_PAGE_BUFFER_ALIGNMENT)
}
}

Expand Down

0 comments on commit 4ab270b

Please sign in to comment.