Skip to content

Commit

Permalink
feat: support remapping for IVF_FLAT, IVF_PQ and IVF_SQ (#2708)
Browse files Browse the repository at this point in the history
not support IVF_HNSW_* index yet

---------

Signed-off-by: BubbleCal <[email protected]>
  • Loading branch information
BubbleCal authored Dec 20, 2024
1 parent 2b29487 commit 72ae355
Show file tree
Hide file tree
Showing 24 changed files with 1,322 additions and 821 deletions.
1,420 changes: 787 additions & 633 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion python/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions rust/lance-file/src/v2/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ use lance_encoding::encoder::{
};
use lance_encoding::repdef::RepDefBuilder;
use lance_encoding::version::LanceFileVersion;
use lance_io::object_store::ObjectStore;
use lance_io::object_writer::ObjectWriter;
use lance_io::traits::Writer;
use log::debug;
use object_store::path::Path;
use prost::Message;
use prost_types::Any;
use snafu::{location, Location};
Expand Down Expand Up @@ -143,6 +145,24 @@ impl FileWriter {
}
}

/// Write a series of record batches to a new file
///
/// Returns the number of rows written
pub async fn create_file_with_batches(
store: &ObjectStore,
path: &Path,
schema: lance_core::datatypes::Schema,
batches: impl Iterator<Item = RecordBatch> + Send,
options: FileWriterOptions,
) -> Result<usize> {
let writer = store.create(path).await?;
let mut writer = Self::try_new(writer, schema, options)?;
for batch in batches {
writer.write_batch(&batch).await?;
}
Ok(writer.finish().await? as usize)
}

async fn do_write_buffer(writer: &mut ObjectWriter, buf: &[u8]) -> Result<()> {
writer.write_all(buf).await?;
let pad_bytes = pad_bytes::<PAGE_BUFFER_ALIGNMENT>(buf.len());
Expand Down
7 changes: 6 additions & 1 deletion rust/lance-index/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,12 @@ impl IndexType {
pub fn is_vector(&self) -> bool {
matches!(
self,
Self::Vector | Self::IvfPq | Self::IvfHnswSq | Self::IvfHnswPq
Self::Vector
| Self::IvfPq
| Self::IvfHnswSq
| Self::IvfHnswPq
| Self::IvfFlat
| Self::IvfSq
)
}
}
Expand Down
18 changes: 17 additions & 1 deletion rust/lance-index/src/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ use arrow_schema::Field;
use async_trait::async_trait;
use ivf::storage::IvfModel;
use lance_core::{Result, ROW_ID_FIELD};
use lance_io::object_store::ObjectStore;
use lance_io::traits::Reader;
use lance_linalg::distance::DistanceType;
use lazy_static::lazy_static;
use object_store::path::Path;
use quantizer::{QuantizationType, Quantizer};
use v3::subindex::SubIndexType;

Expand Down Expand Up @@ -182,7 +184,21 @@ pub trait VectorIndex: Send + Sync + std::fmt::Debug + Index {
///
/// If an old row id is not in the mapping then it should be
/// left alone.
fn remap(&mut self, mapping: &HashMap<u64, Option<u64>>) -> Result<()>;
async fn remap(&mut self, mapping: &HashMap<u64, Option<u64>>) -> Result<()>;

/// Remap the index according to mapping
///
/// write the remapped index to the index_dir
/// this is available for only v3 index
async fn remap_to(
self: Arc<Self>,
_store: ObjectStore,
_mapping: &HashMap<u64, Option<u64>>,
_column: String,
_index_dir: Path,
) -> Result<()> {
unimplemented!("only for v3 index")
}

/// The metric type of this vector index.
fn metric_type(&self) -> DistanceType;
Expand Down
5 changes: 5 additions & 0 deletions rust/lance-index/src/vector/flat/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//! Flat Vector Index.
//!
use std::collections::HashMap;
use std::sync::Arc;

use arrow::array::AsArray;
Expand Down Expand Up @@ -134,6 +135,10 @@ impl IvfSubIndex for FlatIndex {
Ok(Self {})
}

fn remap(&self, _: &HashMap<u64, Option<u64>>) -> Result<Self> {
Ok(self.clone())
}

fn to_batch(&self) -> Result<RecordBatch> {
Ok(RecordBatch::new_empty(Schema::empty().into()))
}
Expand Down
2 changes: 0 additions & 2 deletions rust/lance-index/src/vector/flat/storage.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

//! In-memory graph representations.
use std::sync::Arc;

use crate::vector::quantizer::QuantizerStorage;
Expand Down
4 changes: 4 additions & 0 deletions rust/lance-index/src/vector/hnsw/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,10 @@ impl IvfSubIndex for HNSW {
Ok(hnsw)
}

fn remap(&self, _mapping: &HashMap<u64, Option<u64>>) -> Result<Self> {
unimplemented!("HNSW remap is not supported yet");
}

/// Encode the sub index into a record batch
fn to_batch(&self) -> Result<RecordBatch> {
let mut vector_id_builder = UInt32Builder::with_capacity(self.len());
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-index/src/vector/hnsw/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ impl<Q: Quantization + Send + Sync + 'static> VectorIndex for HNSWIndex<Q> {
Box::new(self.storage.as_ref().unwrap().row_ids())
}

fn remap(&mut self, _mapping: &HashMap<u64, Option<u64>>) -> Result<()> {
async fn remap(&mut self, _mapping: &HashMap<u64, Option<u64>>) -> Result<()> {
Err(Error::Index {
message: "Remapping HNSW in this way not supported".to_string(),
location: location!(),
Expand Down
12 changes: 10 additions & 2 deletions rust/lance-index/src/vector/quantizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,16 @@ use super::flat::index::{FlatBinQuantizer, FlatQuantizer};
use super::pq::ProductQuantizer;
use super::{ivf::storage::IvfModel, sq::ScalarQuantizer, storage::VectorStore};

pub trait Quantization: Send + Sync + Debug + DeepSizeOf + Into<Quantizer> {
type BuildParams: QuantizerBuildParams;
pub trait Quantization:
Send
+ Sync
+ Clone
+ Debug
+ DeepSizeOf
+ Into<Quantizer>
+ TryFrom<Quantizer, Error = lance_core::Error>
{
type BuildParams: QuantizerBuildParams + Send + Sync;
type Metadata: QuantizerMetadata + Send + Sync;
type Storage: QuantizerStorage<Metadata = Self::Metadata> + VectorStore + Debug;

Expand Down
48 changes: 46 additions & 2 deletions rust/lance-index/src/vector/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@

//! Vector Storage, holding (quantized) vectors and providing distance calculation.
use std::collections::HashMap;
use std::{any::Any, sync::Arc};

use arrow::array::AsArray;
use arrow::compute::concat_batches;
use arrow_array::{ArrayRef, RecordBatch};
use arrow::datatypes::UInt64Type;
use arrow_array::{ArrayRef, RecordBatch, UInt32Array, UInt64Array};
use arrow_schema::{Field, SchemaRef};
use deepsize::DeepSizeOf;
use futures::prelude::stream::TryStreamExt;
Expand Down Expand Up @@ -70,7 +73,44 @@ pub trait VectorStore: Send + Sync + Sized + Clone {

fn schema(&self) -> &SchemaRef;

fn to_batches(&self) -> Result<impl Iterator<Item = RecordBatch>>;
fn to_batches(&self) -> Result<impl Iterator<Item = RecordBatch> + Send>;

fn remap(&self, mapping: &HashMap<u64, Option<u64>>) -> Result<Self> {
let batches = self
.to_batches()?
.map(|b| {
let mut indices = Vec::with_capacity(b.num_rows());
let mut new_row_ids = Vec::with_capacity(b.num_rows());

let row_ids = b.column(0).as_primitive::<UInt64Type>().values();
for (i, row_id) in row_ids.iter().enumerate() {
match mapping.get(row_id) {
Some(Some(new_id)) => {
indices.push(i as u32);
new_row_ids.push(*new_id);
}
Some(None) => {}
None => {
indices.push(i as u32);
new_row_ids.push(*row_id);
}
}
}

let indices = UInt32Array::from(indices);
let new_row_ids = Arc::new(UInt64Array::from(new_row_ids));
let new_vectors = arrow::compute::take(b.column(1), &indices, None)?;

Ok(RecordBatch::try_new(
self.schema().clone(),
vec![new_row_ids, new_vectors],
)?)
})
.collect::<Result<Vec<_>>>()?;

let batch = concat_batches(self.schema(), batches.iter())?;
Self::try_from_batch(batch, self.distance_type())
}

fn len(&self) -> usize;

Expand Down Expand Up @@ -219,6 +259,10 @@ impl IvfQuantizationStorage {
Q::from_metadata(&metadata, self.distance_type)
}

pub fn schema(&self) -> SchemaRef {
Arc::new(self.reader.schema().as_ref().into())
}

/// Get the number of partitions in the storage.
pub fn num_partitions(&self) -> usize {
self.ivf.num_partitions()
Expand Down
32 changes: 28 additions & 4 deletions rust/lance-index/src/vector/v3/shuffler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,18 @@ use std::sync::Arc;

use arrow::{array::AsArray, compute::sort_to_indices};
use arrow_array::{RecordBatch, UInt32Array};
use arrow_schema::Schema;
use future::join_all;
use futures::prelude::*;
use lance_arrow::RecordBatchExt;
use itertools::Itertools;
use lance_arrow::{RecordBatchExt, SchemaExt};
use lance_core::{
cache::FileMetadataCache,
utils::tokio::{get_num_compute_intensive_cpus, spawn_cpu},
Error, Result,
};
use lance_encoding::decoder::{DecoderPlugins, FilterExpression};
use lance_file::v2::reader::ReaderProjection;
use lance_file::v2::{
reader::{FileReader, FileReaderOptions},
writer::FileWriter,
Expand Down Expand Up @@ -256,14 +259,35 @@ impl ShuffleReader for IvfShufflerReader {
FileReaderOptions::default(),
)
.await?;
let schema = reader.schema().as_ref().into();

let schema: Schema = reader.schema().as_ref().into();
let projection = schema
.fields()
.iter()
.enumerate()
.filter_map(|(index, f)| {
if f.name() != PART_ID_COLUMN {
Some(index)
} else {
None
}
})
.collect::<Vec<_>>();
let schema = schema.project(&projection)?;
let projection = ReaderProjection::from_column_names(
reader.schema().as_ref(),
&schema
.field_names()
.into_iter()
.map(|s| s.as_ref())
.collect_vec(),
)?;
Ok(Some(Box::new(RecordBatchStreamAdapter::new(
Arc::new(schema),
reader.read_stream(
reader.read_stream_projected(
lance_io::ReadBatchParams::RangeFull,
4096,
16,
projection,
FilterExpression::no_filter(),
)?,
))))
Expand Down
7 changes: 6 additions & 1 deletion rust/lance-index/src/vector/v3/subindex.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;

Expand All @@ -15,7 +16,7 @@ use crate::{prefilter::PreFilter, vector::Query};
/// A sub index for IVF index
pub trait IvfSubIndex: Send + Sync + Debug + DeepSizeOf {
type QueryParams: Send + Sync + for<'a> From<&'a Query>;
type BuildParams: Clone;
type BuildParams: Clone + Send + Sync;

/// Load the sub index from a record batch with a single row
fn load(data: RecordBatch) -> Result<Self>
Expand Down Expand Up @@ -49,6 +50,10 @@ pub trait IvfSubIndex: Send + Sync + Debug + DeepSizeOf {
where
Self: Sized;

fn remap(&self, mapping: &HashMap<u64, Option<u64>>) -> Result<Self>
where
Self: Sized;

/// Encode the sub index into a record batch
fn to_batch(&self) -> Result<RecordBatch>;
}
Expand Down
3 changes: 2 additions & 1 deletion rust/lance-linalg/src/distance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub mod norm_l2;
pub use cosine::*;
use deepsize::DeepSizeOf;
pub use dot::*;
use hamming::hamming_distance_arrow_batch;
pub use l2::*;
pub use norm_l2::*;

Expand Down Expand Up @@ -55,7 +56,7 @@ impl DistanceType {
Self::L2 => l2_distance_arrow_batch,
Self::Cosine => cosine_distance_arrow_batch,
Self::Dot => dot_distance_arrow_batch,
Self::Hamming => todo!(),
Self::Hamming => hamming_distance_arrow_batch,
}
}

Expand Down
9 changes: 6 additions & 3 deletions rust/lance-linalg/src/distance/hamming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::sync::Arc;
use crate::{Error, Result};
use arrow_array::cast::AsArray;
use arrow_array::types::UInt8Type;
use arrow_array::{Array, Float32Array};
use arrow_array::{Array, FixedSizeListArray, Float32Array};
use arrow_schema::DataType;

pub trait Hamming {
Expand Down Expand Up @@ -62,11 +62,14 @@ pub fn hamming_distance_batch<'a>(
Box::new(to.chunks_exact(dimension).map(|v| hamming(from, v)))
}

pub fn hamming_distance_arrow_batch(from: &dyn Array, to: &dyn Array) -> Result<Arc<Float32Array>> {
pub fn hamming_distance_arrow_batch(
from: &dyn Array,
to: &FixedSizeListArray,
) -> Result<Arc<Float32Array>> {
let dists = match *from.data_type() {
DataType::UInt8 => hamming_distance_batch(
from.as_primitive::<UInt8Type>().values(),
to.as_primitive::<UInt8Type>().values(),
to.values().as_primitive::<UInt8Type>().values(),
from.len(),
),
_ => {
Expand Down
13 changes: 11 additions & 2 deletions rust/lance/src/dataset/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1689,6 +1689,15 @@ impl Scanner {
// Check if we've created new versions since the index was built.
let unindexed_fragments = self.dataset.unindexed_fragments(&index.name).await?;
if !unindexed_fragments.is_empty() {
// need to set the metric type to be the same as the index
// to make sure the distance is comparable.
let idx = self
.dataset
.open_vector_index(q.column.as_str(), &index.uuid.to_string())
.await?;
let mut q = q.clone();
q.metric_type = idx.metric_type();

// If the vector column is not present, we need to take the vector column, so
// that the distance value is comparable with the flat search ones.
if knn_node.schema().column_with_name(&q.column).is_none() {
Expand Down Expand Up @@ -1725,7 +1734,7 @@ impl Scanner {
scan_node = Arc::new(FilterExec::try_new(physical_refine_expr, scan_node)?);
}
// first we do flat search on just the new data
let topk_appended = self.flat_knn(scan_node, q)?;
let topk_appended = self.flat_knn(scan_node, &q)?;

// To do a union, we need to make the schemas match. Right now
// knn_node: _distance, _rowid, vector
Expand All @@ -1740,7 +1749,7 @@ impl Scanner {
datafusion::physical_plan::Partitioning::RoundRobinBatch(1),
)?;
// then we do a flat search on KNN(new data) + ANN(indexed data)
return self.flat_knn(Arc::new(unioned), q);
return self.flat_knn(Arc::new(unioned), &q);
}

Ok(knn_node)
Expand Down
Loading

0 comments on commit 72ae355

Please sign in to comment.