Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support remapping for IVF_FLAT, IVF_PQ and IVF_SQ #2708

Merged
merged 19 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,420 changes: 787 additions & 633 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion python/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions rust/lance-file/src/v2/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ use lance_encoding::encoder::{
};
use lance_encoding::repdef::RepDefBuilder;
use lance_encoding::version::LanceFileVersion;
use lance_io::object_store::ObjectStore;
use lance_io::object_writer::ObjectWriter;
use lance_io::traits::Writer;
use log::debug;
use object_store::path::Path;
use prost::Message;
use prost_types::Any;
use snafu::{location, Location};
Expand Down Expand Up @@ -143,6 +145,24 @@ impl FileWriter {
}
}

/// Write a series of record batches to a new file
///
/// Returns the number of rows written
pub async fn create_file_with_batches(
store: &ObjectStore,
path: &Path,
schema: lance_core::datatypes::Schema,
batches: impl Iterator<Item = RecordBatch> + Send,
options: FileWriterOptions,
) -> Result<usize> {
let writer = store.create(path).await?;
let mut writer = Self::try_new(writer, schema, options)?;
for batch in batches {
writer.write_batch(&batch).await?;
}
Ok(writer.finish().await? as usize)
}

async fn do_write_buffer(writer: &mut ObjectWriter, buf: &[u8]) -> Result<()> {
writer.write_all(buf).await?;
let pad_bytes = pad_bytes::<PAGE_BUFFER_ALIGNMENT>(buf.len());
Expand Down
7 changes: 6 additions & 1 deletion rust/lance-index/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,12 @@ impl IndexType {
pub fn is_vector(&self) -> bool {
matches!(
self,
Self::Vector | Self::IvfPq | Self::IvfHnswSq | Self::IvfHnswPq
Self::Vector
| Self::IvfPq
| Self::IvfHnswSq
| Self::IvfHnswPq
| Self::IvfFlat
| Self::IvfSq
)
}
}
Expand Down
18 changes: 17 additions & 1 deletion rust/lance-index/src/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ use arrow_schema::Field;
use async_trait::async_trait;
use ivf::storage::IvfModel;
use lance_core::{Result, ROW_ID_FIELD};
use lance_io::object_store::ObjectStore;
use lance_io::traits::Reader;
use lance_linalg::distance::DistanceType;
use lazy_static::lazy_static;
use object_store::path::Path;
use quantizer::{QuantizationType, Quantizer};
use v3::subindex::SubIndexType;

Expand Down Expand Up @@ -182,7 +184,21 @@ pub trait VectorIndex: Send + Sync + std::fmt::Debug + Index {
///
/// If an old row id is not in the mapping then it should be
/// left alone.
fn remap(&mut self, mapping: &HashMap<u64, Option<u64>>) -> Result<()>;
async fn remap(&mut self, mapping: &HashMap<u64, Option<u64>>) -> Result<()>;

/// Remap the index according to mapping
///
/// write the remapped index to the index_dir
/// this is available for only v3 index
async fn remap_to(
self: Arc<Self>,
_store: ObjectStore,
_mapping: &HashMap<u64, Option<u64>>,
_column: String,
_index_dir: Path,
) -> Result<()> {
unimplemented!("only for v3 index")
}

/// The metric type of this vector index.
fn metric_type(&self) -> DistanceType;
Expand Down
5 changes: 5 additions & 0 deletions rust/lance-index/src/vector/flat/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//! Flat Vector Index.
//!

use std::collections::HashMap;
use std::sync::Arc;

use arrow::array::AsArray;
Expand Down Expand Up @@ -134,6 +135,10 @@ impl IvfSubIndex for FlatIndex {
Ok(Self {})
}

fn remap(&self, _: &HashMap<u64, Option<u64>>) -> Result<Self> {
Ok(self.clone())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's add a warning log here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh w8, we should remap sub index here no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for v3 we need to remap the subindex & vector storage. flat index doesn't contain anything so it simply returns itself here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flat map is still { origin_vector: row_id }? if row id changes during compaction, we need to remap them ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remap on an vector index (v3) is:

  • remap the sub index
  • remap the storage
    for IVF_FLAT, the sub index is FLAT and storage is FlatStorage. FLAT sub index doesn't contain any data so no need to do anything here. the remapping happens on FlatStorage

}

fn to_batch(&self) -> Result<RecordBatch> {
Ok(RecordBatch::new_empty(Schema::empty().into()))
}
Expand Down
2 changes: 0 additions & 2 deletions rust/lance-index/src/vector/flat/storage.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

//! In-memory graph representations.

use std::sync::Arc;

use crate::vector::quantizer::QuantizerStorage;
Expand Down
4 changes: 4 additions & 0 deletions rust/lance-index/src/vector/hnsw/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,10 @@ impl IvfSubIndex for HNSW {
Ok(hnsw)
}

fn remap(&self, _mapping: &HashMap<u64, Option<u64>>) -> Result<Self> {
unimplemented!("HNSW remap is not supported yet");
}

/// Encode the sub index into a record batch
fn to_batch(&self) -> Result<RecordBatch> {
let mut vector_id_builder = UInt32Builder::with_capacity(self.len());
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-index/src/vector/hnsw/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ impl<Q: Quantization + Send + Sync + 'static> VectorIndex for HNSWIndex<Q> {
Box::new(self.storage.as_ref().unwrap().row_ids())
}

fn remap(&mut self, _mapping: &HashMap<u64, Option<u64>>) -> Result<()> {
async fn remap(&mut self, _mapping: &HashMap<u64, Option<u64>>) -> Result<()> {
Err(Error::Index {
message: "Remapping HNSW in this way not supported".to_string(),
location: location!(),
Expand Down
12 changes: 10 additions & 2 deletions rust/lance-index/src/vector/quantizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,16 @@ use super::flat::index::{FlatBinQuantizer, FlatQuantizer};
use super::pq::ProductQuantizer;
use super::{ivf::storage::IvfModel, sq::ScalarQuantizer, storage::VectorStore};

pub trait Quantization: Send + Sync + Debug + DeepSizeOf + Into<Quantizer> {
type BuildParams: QuantizerBuildParams;
pub trait Quantization:
Send
+ Sync
+ Clone
+ Debug
+ DeepSizeOf
+ Into<Quantizer>
+ TryFrom<Quantizer, Error = lance_core::Error>
{
type BuildParams: QuantizerBuildParams + Send + Sync;
type Metadata: QuantizerMetadata + Send + Sync;
type Storage: QuantizerStorage<Metadata = Self::Metadata> + VectorStore + Debug;

Expand Down
48 changes: 46 additions & 2 deletions rust/lance-index/src/vector/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@

//! Vector Storage, holding (quantized) vectors and providing distance calculation.

use std::collections::HashMap;
use std::{any::Any, sync::Arc};

use arrow::array::AsArray;
use arrow::compute::concat_batches;
use arrow_array::{ArrayRef, RecordBatch};
use arrow::datatypes::UInt64Type;
use arrow_array::{ArrayRef, RecordBatch, UInt32Array, UInt64Array};
use arrow_schema::{Field, SchemaRef};
use deepsize::DeepSizeOf;
use futures::prelude::stream::TryStreamExt;
Expand Down Expand Up @@ -70,7 +73,44 @@ pub trait VectorStore: Send + Sync + Sized + Clone {

fn schema(&self) -> &SchemaRef;

fn to_batches(&self) -> Result<impl Iterator<Item = RecordBatch>>;
fn to_batches(&self) -> Result<impl Iterator<Item = RecordBatch> + Send>;

fn remap(&self, mapping: &HashMap<u64, Option<u64>>) -> Result<Self> {
let batches = self
.to_batches()?
.map(|b| {
let mut indices = Vec::with_capacity(b.num_rows());
let mut new_row_ids = Vec::with_capacity(b.num_rows());

let row_ids = b.column(0).as_primitive::<UInt64Type>().values();
for (i, row_id) in row_ids.iter().enumerate() {
match mapping.get(row_id) {
Some(Some(new_id)) => {
indices.push(i as u32);
new_row_ids.push(*new_id);
}
Some(None) => {}
None => {
indices.push(i as u32);
new_row_ids.push(*row_id);
}
}
}

let indices = UInt32Array::from(indices);
let new_row_ids = Arc::new(UInt64Array::from(new_row_ids));
let new_vectors = arrow::compute::take(b.column(1), &indices, None)?;

Ok(RecordBatch::try_new(
self.schema().clone(),
vec![new_row_ids, new_vectors],
)?)
})
.collect::<Result<Vec<_>>>()?;

let batch = concat_batches(self.schema(), batches.iter())?;
Self::try_from_batch(batch, self.distance_type())
Comment on lines +111 to +112
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess remap is already slow so it probably doesn't matter but it seems odd we would need to concat here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah it's because try_from_batch is not trivial, e.g. for PQ storage, it would transpose the pq codes

}

fn len(&self) -> usize;

Expand Down Expand Up @@ -219,6 +259,10 @@ impl IvfQuantizationStorage {
Q::from_metadata(&metadata, self.distance_type)
}

pub fn schema(&self) -> SchemaRef {
Arc::new(self.reader.schema().as_ref().into())
}

/// Get the number of partitions in the storage.
pub fn num_partitions(&self) -> usize {
self.ivf.num_partitions()
Expand Down
32 changes: 28 additions & 4 deletions rust/lance-index/src/vector/v3/shuffler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,18 @@ use std::sync::Arc;

use arrow::{array::AsArray, compute::sort_to_indices};
use arrow_array::{RecordBatch, UInt32Array};
use arrow_schema::Schema;
use future::join_all;
use futures::prelude::*;
use lance_arrow::RecordBatchExt;
use itertools::Itertools;
use lance_arrow::{RecordBatchExt, SchemaExt};
use lance_core::{
cache::FileMetadataCache,
utils::tokio::{get_num_compute_intensive_cpus, spawn_cpu},
Error, Result,
};
use lance_encoding::decoder::{DecoderPlugins, FilterExpression};
use lance_file::v2::reader::ReaderProjection;
use lance_file::v2::{
reader::{FileReader, FileReaderOptions},
writer::FileWriter,
Expand Down Expand Up @@ -256,14 +259,35 @@ impl ShuffleReader for IvfShufflerReader {
FileReaderOptions::default(),
)
.await?;
let schema = reader.schema().as_ref().into();

let schema: Schema = reader.schema().as_ref().into();
let projection = schema
.fields()
.iter()
.enumerate()
.filter_map(|(index, f)| {
if f.name() != PART_ID_COLUMN {
Some(index)
} else {
None
}
})
.collect::<Vec<_>>();
let schema = schema.project(&projection)?;
let projection = ReaderProjection::from_column_names(
reader.schema().as_ref(),
&schema
.field_names()
.into_iter()
.map(|s| s.as_ref())
.collect_vec(),
)?;
Ok(Some(Box::new(RecordBatchStreamAdapter::new(
Arc::new(schema),
reader.read_stream(
reader.read_stream_projected(
lance_io::ReadBatchParams::RangeFull,
4096,
16,
projection,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need the part_id in batch, just don't read it to save resources

FilterExpression::no_filter(),
)?,
))))
Expand Down
7 changes: 6 additions & 1 deletion rust/lance-index/src/vector/v3/subindex.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;

Expand All @@ -15,7 +16,7 @@ use crate::{prefilter::PreFilter, vector::Query};
/// A sub index for IVF index
pub trait IvfSubIndex: Send + Sync + Debug + DeepSizeOf {
type QueryParams: Send + Sync + for<'a> From<&'a Query>;
type BuildParams: Clone;
type BuildParams: Clone + Send + Sync;

/// Load the sub index from a record batch with a single row
fn load(data: RecordBatch) -> Result<Self>
Expand Down Expand Up @@ -49,6 +50,10 @@ pub trait IvfSubIndex: Send + Sync + Debug + DeepSizeOf {
where
Self: Sized;

fn remap(&self, mapping: &HashMap<u64, Option<u64>>) -> Result<Self>
where
Self: Sized;

/// Encode the sub index into a record batch
fn to_batch(&self) -> Result<RecordBatch>;
}
Expand Down
3 changes: 2 additions & 1 deletion rust/lance-linalg/src/distance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub mod norm_l2;
pub use cosine::*;
use deepsize::DeepSizeOf;
pub use dot::*;
use hamming::hamming_distance_arrow_batch;
pub use l2::*;
pub use norm_l2::*;

Expand Down Expand Up @@ -55,7 +56,7 @@ impl DistanceType {
Self::L2 => l2_distance_arrow_batch,
Self::Cosine => cosine_distance_arrow_batch,
Self::Dot => dot_distance_arrow_batch,
Self::Hamming => todo!(),
Self::Hamming => hamming_distance_arrow_batch,
}
}

Expand Down
9 changes: 6 additions & 3 deletions rust/lance-linalg/src/distance/hamming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::sync::Arc;
use crate::{Error, Result};
use arrow_array::cast::AsArray;
use arrow_array::types::UInt8Type;
use arrow_array::{Array, Float32Array};
use arrow_array::{Array, FixedSizeListArray, Float32Array};
use arrow_schema::DataType;

pub trait Hamming {
Expand Down Expand Up @@ -62,11 +62,14 @@ pub fn hamming_distance_batch<'a>(
Box::new(to.chunks_exact(dimension).map(|v| hamming(from, v)))
}

pub fn hamming_distance_arrow_batch(from: &dyn Array, to: &dyn Array) -> Result<Arc<Float32Array>> {
pub fn hamming_distance_arrow_batch(
from: &dyn Array,
to: &FixedSizeListArray,
) -> Result<Arc<Float32Array>> {
let dists = match *from.data_type() {
DataType::UInt8 => hamming_distance_batch(
from.as_primitive::<UInt8Type>().values(),
to.as_primitive::<UInt8Type>().values(),
to.values().as_primitive::<UInt8Type>().values(),
from.len(),
),
_ => {
Expand Down
13 changes: 11 additions & 2 deletions rust/lance/src/dataset/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1689,6 +1689,15 @@ impl Scanner {
// Check if we've created new versions since the index was built.
let unindexed_fragments = self.dataset.unindexed_fragments(&index.name).await?;
if !unindexed_fragments.is_empty() {
// need to set the metric type to be the same as the index
// to make sure the distance is comparable.
let idx = self
.dataset
.open_vector_index(q.column.as_str(), &index.uuid.to_string())
.await?;
let mut q = q.clone();
q.metric_type = idx.metric_type();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this fixes a bug that with unindexed data, the flat search may compute the distances in a different distance type


// If the vector column is not present, we need to take the vector column, so
// that the distance value is comparable with the flat search ones.
if knn_node.schema().column_with_name(&q.column).is_none() {
Expand Down Expand Up @@ -1725,7 +1734,7 @@ impl Scanner {
scan_node = Arc::new(FilterExec::try_new(physical_refine_expr, scan_node)?);
}
// first we do flat search on just the new data
let topk_appended = self.flat_knn(scan_node, q)?;
let topk_appended = self.flat_knn(scan_node, &q)?;

// To do a union, we need to make the schemas match. Right now
// knn_node: _distance, _rowid, vector
Expand All @@ -1740,7 +1749,7 @@ impl Scanner {
datafusion::physical_plan::Partitioning::RoundRobinBatch(1),
)?;
// then we do a flat search on KNN(new data) + ANN(indexed data)
return self.flat_knn(Arc::new(unioned), q);
return self.flat_knn(Arc::new(unioned), &q);
}

Ok(knn_node)
Expand Down
Loading
Loading