Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support remapping for IVF_FLAT, IVF_PQ and IVF_SQ #2708

Merged
merged 19 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,420 changes: 787 additions & 633 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion python/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion rust/lance-index/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,12 @@ impl IndexType {
pub fn is_vector(&self) -> bool {
matches!(
self,
Self::Vector | Self::IvfPq | Self::IvfHnswSq | Self::IvfHnswPq
Self::Vector
| Self::IvfPq
| Self::IvfHnswSq
| Self::IvfHnswPq
| Self::IvfFlat
| Self::IvfSq
)
}
}
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-index/src/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ pub trait VectorIndex: Send + Sync + std::fmt::Debug + Index {
///
/// If an old row id is not in the mapping then it should be
/// left alone.
fn remap(&mut self, mapping: &HashMap<u64, Option<u64>>) -> Result<()>;
async fn remap(&mut self, mapping: &HashMap<u64, Option<u64>>) -> Result<()>;

/// The metric type of this vector index.
fn metric_type(&self) -> DistanceType;
Expand Down
5 changes: 5 additions & 0 deletions rust/lance-index/src/vector/flat/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//! Flat Vector Index.
//!

use std::collections::HashMap;
use std::sync::Arc;

use arrow::array::AsArray;
Expand Down Expand Up @@ -134,6 +135,10 @@ impl IvfSubIndex for FlatIndex {
Ok(Self {})
}

fn remap(&self, _: &HashMap<u64, Option<u64>>) -> Result<Self> {
Ok(self.clone())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's add a warning log here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh w8, we should remap sub index here no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for v3 we need to remap the subindex & vector storage. flat index doesn't contain anything so it simply returns itself here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flat map is still { origin_vector: row_id }? if row id changes during compaction, we need to remap them ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remap on an vector index (v3) is:

  • remap the sub index
  • remap the storage
    for IVF_FLAT, the sub index is FLAT and storage is FlatStorage. FLAT sub index doesn't contain any data so no need to do anything here. the remapping happens on FlatStorage

}

fn to_batch(&self) -> Result<RecordBatch> {
Ok(RecordBatch::new_empty(Schema::empty().into()))
}
Expand Down
2 changes: 0 additions & 2 deletions rust/lance-index/src/vector/flat/storage.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

//! In-memory graph representations.

use std::sync::Arc;

use crate::vector::quantizer::QuantizerStorage;
Expand Down
4 changes: 4 additions & 0 deletions rust/lance-index/src/vector/hnsw/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,10 @@ impl IvfSubIndex for HNSW {
Ok(hnsw)
}

fn remap(&self, _mapping: &HashMap<u64, Option<u64>>) -> Result<Self> {
unimplemented!("HNSW remap is not supported yet");
}

/// Encode the sub index into a record batch
fn to_batch(&self) -> Result<RecordBatch> {
let mut vector_id_builder = UInt32Builder::with_capacity(self.len());
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-index/src/vector/hnsw/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ impl<Q: Quantization + Send + Sync + 'static> VectorIndex for HNSWIndex<Q> {
Box::new(self.storage.as_ref().unwrap().row_ids())
}

fn remap(&mut self, _mapping: &HashMap<u64, Option<u64>>) -> Result<()> {
async fn remap(&mut self, _mapping: &HashMap<u64, Option<u64>>) -> Result<()> {
Err(Error::Index {
message: "Remapping HNSW in this way not supported".to_string(),
location: location!(),
Expand Down
4 changes: 3 additions & 1 deletion rust/lance-index/src/vector/quantizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ use super::flat::index::{FlatBinQuantizer, FlatQuantizer};
use super::pq::ProductQuantizer;
use super::{ivf::storage::IvfModel, sq::ScalarQuantizer, storage::VectorStore};

pub trait Quantization: Send + Sync + Debug + DeepSizeOf + Into<Quantizer> {
pub trait Quantization:
Send + Sync + Debug + DeepSizeOf + Into<Quantizer> + TryFrom<Quantizer, Error = lance_core::Error>
{
type BuildParams: QuantizerBuildParams;
type Metadata: QuantizerMetadata + Send + Sync;
type Storage: QuantizerStorage<Metadata = Self::Metadata> + VectorStore + Debug;
Expand Down
46 changes: 45 additions & 1 deletion rust/lance-index/src/vector/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@

//! Vector Storage, holding (quantized) vectors and providing distance calculation.

use std::collections::HashMap;
use std::{any::Any, sync::Arc};

use arrow::array::AsArray;
use arrow::compute::concat_batches;
use arrow_array::{ArrayRef, RecordBatch};
use arrow::datatypes::UInt64Type;
use arrow_array::{ArrayRef, RecordBatch, UInt32Array, UInt64Array};
use arrow_schema::{Field, SchemaRef};
use deepsize::DeepSizeOf;
use futures::prelude::stream::TryStreamExt;
Expand Down Expand Up @@ -72,6 +75,43 @@ pub trait VectorStore: Send + Sync + Sized + Clone {

fn to_batches(&self) -> Result<impl Iterator<Item = RecordBatch>>;

fn remap(&self, mapping: &HashMap<u64, Option<u64>>) -> Result<Self> {
let batches = self
.to_batches()?
.map(|b| {
let mut indices = Vec::with_capacity(b.num_rows());
let mut new_row_ids = Vec::with_capacity(b.num_rows());

let row_ids = b.column(0).as_primitive::<UInt64Type>().values();
for (i, row_id) in row_ids.iter().enumerate() {
match mapping.get(row_id) {
Some(Some(new_id)) => {
indices.push(i as u32);
new_row_ids.push(*new_id);
}
Some(None) => {}
None => {
indices.push(i as u32);
new_row_ids.push(*row_id);
}
}
}

let indices = UInt32Array::from(indices);
let new_row_ids = Arc::new(UInt64Array::from(new_row_ids));
let new_vectors = arrow::compute::take(b.column(1), &indices, None)?;

Ok(RecordBatch::try_new(
self.schema().clone(),
vec![new_row_ids, new_vectors],
)?)
})
.collect::<Result<Vec<_>>>()?;

let batch = concat_batches(self.schema(), batches.iter())?;
Self::try_from_batch(batch, self.distance_type())
Comment on lines +111 to +112
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess remap is already slow so it probably doesn't matter but it seems odd we would need to concat here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah it's because try_from_batch is not trivial, e.g. for PQ storage, it would transpose the pq codes

}

fn len(&self) -> usize;

/// Returns true if this graph is empty.
Expand Down Expand Up @@ -219,6 +259,10 @@ impl IvfQuantizationStorage {
Q::from_metadata(&metadata, self.distance_type)
}

pub fn schema(&self) -> SchemaRef {
Arc::new(self.reader.schema().as_ref().into())
}

/// Get the number of partitions in the storage.
pub fn num_partitions(&self) -> usize {
self.ivf.num_partitions()
Expand Down
32 changes: 28 additions & 4 deletions rust/lance-index/src/vector/v3/shuffler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,18 @@ use std::sync::Arc;

use arrow::{array::AsArray, compute::sort_to_indices};
use arrow_array::{RecordBatch, UInt32Array};
use arrow_schema::Schema;
use future::join_all;
use futures::prelude::*;
use lance_arrow::RecordBatchExt;
use itertools::Itertools;
use lance_arrow::{RecordBatchExt, SchemaExt};
use lance_core::{
cache::FileMetadataCache,
utils::tokio::{get_num_compute_intensive_cpus, spawn_cpu},
Error, Result,
};
use lance_encoding::decoder::{DecoderPlugins, FilterExpression};
use lance_file::v2::reader::ReaderProjection;
use lance_file::v2::{
reader::{FileReader, FileReaderOptions},
writer::FileWriter,
Expand Down Expand Up @@ -256,14 +259,35 @@ impl ShuffleReader for IvfShufflerReader {
FileReaderOptions::default(),
)
.await?;
let schema = reader.schema().as_ref().into();

let schema: Schema = reader.schema().as_ref().into();
let projection = schema
.fields()
.iter()
.enumerate()
.filter_map(|(index, f)| {
if f.name() != PART_ID_COLUMN {
Some(index)
} else {
None
}
})
.collect::<Vec<_>>();
let schema = schema.project(&projection)?;
let projection = ReaderProjection::from_column_names(
reader.schema().as_ref(),
&schema
.field_names()
.into_iter()
.map(|s| s.as_ref())
.collect_vec(),
)?;
Ok(Some(Box::new(RecordBatchStreamAdapter::new(
Arc::new(schema),
reader.read_stream(
reader.read_stream_projected(
lance_io::ReadBatchParams::RangeFull,
4096,
16,
projection,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need the part_id in batch, just don't read it to save resources

FilterExpression::no_filter(),
)?,
))))
Expand Down
5 changes: 5 additions & 0 deletions rust/lance-index/src/vector/v3/subindex.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;

Expand Down Expand Up @@ -49,6 +50,10 @@ pub trait IvfSubIndex: Send + Sync + Debug + DeepSizeOf {
where
Self: Sized;

fn remap(&self, mapping: &HashMap<u64, Option<u64>>) -> Result<Self>
where
Self: Sized;

/// Encode the sub index into a record batch
fn to_batch(&self) -> Result<RecordBatch>;
}
Expand Down
134 changes: 61 additions & 73 deletions rust/lance/src/index/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use object_store::path::Path;
use snafu::{location, Location};
use tempfile::tempdir;
use tracing::instrument;
use utils::get_vector_element_type;
use uuid::Uuid;

use self::{ivf::*, pq::PQIndex};
Expand Down Expand Up @@ -253,57 +254,39 @@ pub(crate) async fn build_vector_index(
let temp_dir_path = Path::from_filesystem_path(temp_dir.path())?;
let shuffler = IvfShuffler::new(temp_dir_path, ivf_params.num_partitions);
if is_ivf_flat(stages) {
let data_type = dataset
.schema()
.field(column)
.ok_or(Error::Schema {
message: format!("Column {} not found in schema", column),
location: location!(),
})?
.data_type();
match data_type {
DataType::FixedSizeList(f, _) => match f.data_type() {
DataType::Float16 | DataType::Float32 | DataType::Float64 => {
IvfIndexBuilder::<FlatIndex, FlatQuantizer>::new(
dataset.clone(),
column.to_owned(),
dataset.indices_dir().child(uuid),
params.metric_type,
Box::new(shuffler),
Some(ivf_params.clone()),
Some(()),
(),
)?
.build()
.await?;
}
DataType::UInt8 => {
IvfIndexBuilder::<FlatIndex, FlatBinQuantizer>::new(
dataset.clone(),
column.to_owned(),
dataset.indices_dir().child(uuid),
params.metric_type,
Box::new(shuffler),
Some(ivf_params.clone()),
Some(()),
(),
)?
.build()
.await?;
}
_ => {
return Err(Error::Index {
message: format!(
"Build Vector Index: invalid data type: {:?}",
f.data_type()
),
location: location!(),
});
}
},
let element_type = get_vector_element_type(dataset, column)?;
match element_type {
DataType::Float16 | DataType::Float32 | DataType::Float64 => {
IvfIndexBuilder::<FlatIndex, FlatQuantizer>::new(
dataset.clone(),
column.to_owned(),
dataset.indices_dir().child(uuid),
params.metric_type,
Box::new(shuffler),
Some(ivf_params.clone()),
Some(()),
(),
)?
.build()
.await?;
}
DataType::UInt8 => {
IvfIndexBuilder::<FlatIndex, FlatBinQuantizer>::new(
dataset.clone(),
column.to_owned(),
dataset.indices_dir().child(uuid),
params.metric_type,
Box::new(shuffler),
Some(ivf_params.clone()),
Some(()),
(),
)?
.build()
.await?;
}
Comment on lines +257 to +286
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed there are many lines are doing the same thing: get the vector data type / value type and check it.
so just made the function get_vector_element_type to do this

_ => {
return Err(Error::Index {
message: format!("Build Vector Index: invalid data type: {:?}", data_type),
message: format!("Build Vector Index: invalid data type: {:?}", element_type),
location: location!(),
});
}
Expand Down Expand Up @@ -416,30 +399,35 @@ pub(crate) async fn remap_vector_index(
.open_vector_index(column, &old_uuid.to_string())
.await?;
old_index.check_can_remap()?;
let ivf_index: &IVFIndex =
old_index
.as_any()
.downcast_ref()
.ok_or_else(|| Error::NotSupported {
source: "Only IVF indexes can be remapped currently".into(),
location: location!(),
})?;

remap_index_file(
dataset.as_ref(),
&old_uuid.to_string(),
&new_uuid.to_string(),
old_metadata.dataset_version,
ivf_index,
mapping,
old_metadata.name.clone(),
column.to_string(),
// We can safely assume there are no transforms today. We assert above that the
// top stage is IVF and IVF does not support transforms between IVF and PQ. This
// will be fixed in the future.
vec![],
)
.await?;

if let Some(ivf_index) = old_index.as_any().downcast_ref::<IVFIndex>() {
remap_index_file(
dataset.as_ref(),
&old_uuid.to_string(),
&new_uuid.to_string(),
old_metadata.dataset_version,
ivf_index,
mapping,
old_metadata.name.clone(),
column.to_string(),
// We can safely assume there are no transforms today. We assert above that the
// top stage is IVF and IVF does not support transforms between IVF and PQ. This
// will be fixed in the future.
vec![],
)
.await?;
} else {
// it's v3 index
remap_index_file_v3(
dataset.as_ref(),
&new_uuid.to_string(),
old_index,
mapping,
column.to_string(),
)
.await?;
}

Ok(())
}

Expand Down
Loading
Loading