Skip to content

Commit

Permalink
chore: chunked SQ storage (lancedb#2437)
Browse files Browse the repository at this point in the history
* Use chunked storage for `ScalarQuantizationStorage`, so that we can
append new values cheaply.
* It does auto-compaction after a hard-code threshold is met.
  • Loading branch information
eddyxu authored Jun 6, 2024
1 parent aaddcd8 commit 6b95cd5
Show file tree
Hide file tree
Showing 25 changed files with 589 additions and 234 deletions.
16 changes: 12 additions & 4 deletions python/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::sync::Arc;

use arrow::compute::concat;
use arrow::compute::{concat, concat_batches};
use arrow::pyarrow::{FromPyArrow, ToPyArrow};
use arrow_array::{
cast::AsArray, Array, FixedSizeListArray, Float32Array, UInt32Array, UInt64Array,
Expand All @@ -28,8 +28,10 @@ use lance::{
};
use lance_arrow::FixedSizeListArrayExt;
use lance_file::writer::FileWriter;
use lance_index::vector::hnsw::builder::HnswBuildParams;
use lance_index::vector::hnsw::HNSW;
use lance_index::vector::{
hnsw::{builder::HnswBuildParams, HNSW},
v3::storage::VectorStore,
};
use lance_linalg::kmeans::compute_partitions;
use lance_linalg::{
distance::DistanceType,
Expand Down Expand Up @@ -237,5 +239,11 @@ pub fn build_sq_storage(
lance_index::vector::sq::ScalarQuantizer::with_bounds(8, dim, lower_bound..upper_bound);
let storage = sq::build_sq_storage(DistanceType::L2, row_ids, vectors, quantizer)
.map_err(|e| PyIOError::new_err(e.to_string()))?;
storage.batch().clone().to_pyarrow(py)
let batches = storage
.to_batches()
.map_err(|e| PyIOError::new_err(e.to_string()))?
.collect::<Vec<_>>();
let batch = concat_batches(&batches[0].schema(), &batches)
.map_err(|e| PyIOError::new_err(e.to_string()))?;
batch.to_pyarrow(py)
}
4 changes: 2 additions & 2 deletions rust/lance-core/src/utils/mask.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ impl RowIdMask {
}

/// Return the indices of the input row ids that were valid
pub fn selected_indices(&self, row_ids: &[u64]) -> Vec<u64> {
let enumerated_ids = row_ids.iter().enumerate();
pub fn selected_indices<'a>(&self, row_ids: impl Iterator<Item = &'a u64> + 'a) -> Vec<u64> {
let enumerated_ids = row_ids.enumerate();
match (&self.block_list, &self.allow_list) {
(Some(block_list), Some(allow_list)) => {
// Only take rows that are both in the allow list and not in the block list
Expand Down
4 changes: 4 additions & 0 deletions rust/lance-index/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,7 @@ harness = false
[[bench]]
name = "hnsw"
harness = false

[[bench]]
name = "sq"
harness = false
2 changes: 2 additions & 0 deletions rust/lance-index/benches/find_partitions.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

mod sq;

use arrow_array::Float32Array;
use arrow_array::{types::Float32Type, FixedSizeListArray};
use lance_arrow::FixedSizeListArrayExt;
Expand Down
113 changes: 113 additions & 0 deletions rust/lance-index/benches/sq.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

//! Scalar Quantization Benchmarks
use std::{iter::repeat_with, ops::Range, sync::Arc, time::Duration};

use arrow_array::{FixedSizeListArray, RecordBatch, UInt64Array, UInt8Array};
use arrow_schema::{DataType, Field, Schema};
use criterion::{criterion_group, criterion_main, Criterion};
use lance_arrow::{FixedSizeListArrayExt, RecordBatchExt};
use lance_core::ROW_ID;
use lance_index::vector::{
sq::storage::ScalarQuantizationStorage, v3::storage::VectorStore, SQ_CODE_COLUMN,
};
use lance_linalg::distance::DistanceType;
use lance_testing::datagen::generate_random_array;
#[cfg(target_os = "linux")]
use pprof::criterion::{Output, PProfProfiler};
use rand::prelude::*;

fn create_full_batch(range: Range<u64>, dim: usize) -> RecordBatch {
let mut rng = rand::thread_rng();
let row_ids = UInt64Array::from_iter_values(range);
let sq_code =
UInt8Array::from_iter_values(repeat_with(|| rng.gen::<u8>()).take(row_ids.len() * dim));
let sq_code_fsl = FixedSizeListArray::try_new_from_values(sq_code, dim as i32).unwrap();

let vector_data = generate_random_array(row_ids.len() * dim);
let vector_fsl = FixedSizeListArray::try_new_from_values(vector_data, dim as i32).unwrap();

let schema = Arc::new(Schema::new(vec![
Field::new(ROW_ID, DataType::UInt64, false),
Field::new(
SQ_CODE_COLUMN,
DataType::FixedSizeList(
Arc::new(Field::new("item", DataType::UInt8, true)),
dim as i32,
),
false,
),
Field::new(
"vector",
DataType::FixedSizeList(
Arc::new(Field::new("item", DataType::Float32, true)),
dim as i32,
),
false,
),
]));
RecordBatch::try_new(
schema,
vec![
Arc::new(row_ids),
Arc::new(sq_code_fsl),
Arc::new(vector_fsl),
],
)
.unwrap()
}

fn create_sq_batch(row_id_range: Range<u64>, dim: usize) -> RecordBatch {
let batch = create_full_batch(row_id_range, dim);
batch.drop_column("vector").unwrap()
}

#[allow(dead_code)]
pub fn bench_storage(c: &mut Criterion) {
let mut rng = rand::thread_rng();

const TOTAL: usize = 8 * 1024 * 1024; // 8M rows

for num_chunks in [1, 32, 128, 1024] {
let storage = ScalarQuantizationStorage::try_new(
8,
DistanceType::L2,
-1.0..1.0,
repeat_with(|| create_sq_batch(0..(TOTAL / num_chunks) as u64, 512)).take(num_chunks),
)
.unwrap();
c.bench_function(
format!("ScalarQuantizationStorage,chunks={}x10K", num_chunks).as_str(),
|b| {
let total = storage.len();
b.iter(|| {
let a = rng.gen_range(0..total as u32);
let b = rng.gen_range(0..total as u32);
storage.distance_between(a, b)
});
},
);
}
}

#[cfg(target_os = "linux")]
criterion_group!(
name=benches;
config = Criterion::default()
.measurement_time(Duration::from_secs(10))
.sample_size(10)
.with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)));
targets = bench_storage);

// Non-linux version does not support pprof.
#[cfg(not(target_os = "linux"))]
criterion_group!(
name=benches;
config = Criterion::default()
.measurement_time(Duration::from_secs(10))
.sample_size(10);
targets = bench_storage);

criterion_main!(benches);
2 changes: 1 addition & 1 deletion rust/lance-index/src/prefilter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,5 @@ pub trait PreFilter: Send + Sync {
/// also known as a selection vector.
///
/// This method must be called after `wait_for_ready`
fn filter_row_ids(&self, row_ids: &[u64]) -> Vec<u64>;
fn filter_row_ids<'a>(&self, row_ids: Box<dyn Iterator<Item = &'a u64> + 'a>) -> Vec<u64>;
}
2 changes: 1 addition & 1 deletion rust/lance-index/src/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ pub trait VectorIndex: Send + Sync + std::fmt::Debug + Index {
}

/// Return the IDs of rows in the index.
fn row_ids(&self) -> &[u64];
fn row_ids(&self) -> Box<dyn Iterator<Item = &'_ u64> + '_>;

/// Remap the index according to mapping
///
Expand Down
4 changes: 2 additions & 2 deletions rust/lance-index/src/vector/flat/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl IvfSubIndex for FlatIndex {
) -> Result<RecordBatch> {
let dist_calc = storage.dist_calculator_from_native(query);
let (row_ids, dists): (Vec<u64>, Vec<f32>) = (0..storage.len())
.filter(|&id| !prefilter.should_drop(storage.row_ids()[id]))
.filter(|&id| !prefilter.should_drop(storage.row_id(id as u32)))
.map(|id| OrderedNode {
id: id as u32,
dist: OrderedFloat(dist_calc.distance(id as u32)),
Expand All @@ -69,7 +69,7 @@ impl IvfSubIndex for FlatIndex {
|OrderedNode {
id,
dist: OrderedFloat(dist),
}| (storage.row_ids()[id as usize], dist),
}| (storage.row_id(id), dist),
)
.unzip();

Expand Down
38 changes: 26 additions & 12 deletions rust/lance-index/src/vector/flat/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ use crate::vector::quantizer::QuantizerStorage;
use crate::vector::utils::prefetch_arrow_array;
use crate::vector::v3::storage::{DistCalculator, VectorStore};
use arrow::array::AsArray;
use arrow::compute::concat_batches;
use arrow::datatypes::UInt64Type;
use arrow_array::{types::Float32Type, RecordBatch};
use arrow_array::{Array, ArrayRef, FixedSizeListArray, UInt64Array};
use arrow_schema::DataType;
use arrow_schema::{DataType, SchemaRef};
use deepsize::DeepSizeOf;
use lance_core::{Error, Result, ROW_ID};
use lance_file::reader::FileReader;
Expand All @@ -30,7 +31,7 @@ pub struct FlatStorage {
distance_type: DistanceType,

// helper fields
row_ids: Arc<UInt64Array>,
pub(super) row_ids: Arc<UInt64Array>,
vectors: Arc<FixedSizeListArray>,
}

Expand Down Expand Up @@ -81,10 +82,7 @@ impl FlatStorage {
impl VectorStore for FlatStorage {
type DistanceCalculator<'a> = FlatDistanceCal;

fn try_from_batch(batch: RecordBatch, distance_type: DistanceType) -> Result<Self>
where
Self: Sized,
{
fn try_from_batch(batch: RecordBatch, distance_type: DistanceType) -> Result<Self> {
let row_ids = Arc::new(
batch
.column_by_name(ROW_ID)
Expand Down Expand Up @@ -113,8 +111,20 @@ impl VectorStore for FlatStorage {
})
}

fn to_batch(&self) -> Result<RecordBatch> {
Ok(self.batch.clone())
fn to_batches(&self) -> Result<impl Iterator<Item = RecordBatch>> {
Ok([self.batch.clone()].into_iter())
}

fn append_batch(&self, batch: RecordBatch, _vector_column: &str) -> Result<Self> {
// TODO: use chunked storage
let new_batch = concat_batches(&batch.schema(), vec![&self.batch, &batch].into_iter())?;
let mut storage = self.clone();
storage.batch = new_batch;
Ok(storage)
}

fn schema(&self) -> &SchemaRef {
self.batch.schema_ref()
}

fn as_any(&self) -> &dyn std::any::Any {
Expand All @@ -125,12 +135,16 @@ impl VectorStore for FlatStorage {
self.vectors.len()
}

fn row_ids(&self) -> &[u64] {
self.row_ids.values()
fn distance_type(&self) -> DistanceType {
self.distance_type
}

fn metric_type(&self) -> DistanceType {
self.distance_type
fn row_id(&self, id: u32) -> u64 {
self.row_ids.values()[id as usize]
}

fn row_ids(&self) -> impl Iterator<Item = &u64> {
self.row_ids.values().iter()
}

fn dist_calculator(&self, query: ArrayRef) -> Self::DistanceCalculator<'_> {
Expand Down
Loading

0 comments on commit 6b95cd5

Please sign in to comment.