-
Notifications
You must be signed in to change notification settings - Fork 245
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support remapping for IVF_FLAT, IVF_PQ and IVF_SQ #2708
Changes from 14 commits
e24cbed
9d6e8f1
b5b6252
4c9ea83
0d34f8b
dfa1663
36959a6
deb0b25
3f4b06d
c26696b
77a6cd5
78cdb17
17c973d
7f5fae7
c7cb809
8ed78c8
1873581
e3175e6
c18b4dc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,10 +3,13 @@ | |
|
||
//! Vector Storage, holding (quantized) vectors and providing distance calculation. | ||
|
||
use std::collections::HashMap; | ||
use std::{any::Any, sync::Arc}; | ||
|
||
use arrow::array::AsArray; | ||
use arrow::compute::concat_batches; | ||
use arrow_array::{ArrayRef, RecordBatch}; | ||
use arrow::datatypes::UInt64Type; | ||
use arrow_array::{ArrayRef, RecordBatch, UInt32Array, UInt64Array}; | ||
use arrow_schema::{Field, SchemaRef}; | ||
use deepsize::DeepSizeOf; | ||
use futures::prelude::stream::TryStreamExt; | ||
|
@@ -72,6 +75,43 @@ pub trait VectorStore: Send + Sync + Sized + Clone { | |
|
||
fn to_batches(&self) -> Result<impl Iterator<Item = RecordBatch>>; | ||
|
||
fn remap(&self, mapping: &HashMap<u64, Option<u64>>) -> Result<Self> { | ||
let batches = self | ||
.to_batches()? | ||
.map(|b| { | ||
let mut indices = Vec::with_capacity(b.num_rows()); | ||
let mut new_row_ids = Vec::with_capacity(b.num_rows()); | ||
|
||
let row_ids = b.column(0).as_primitive::<UInt64Type>().values(); | ||
for (i, row_id) in row_ids.iter().enumerate() { | ||
match mapping.get(row_id) { | ||
Some(Some(new_id)) => { | ||
indices.push(i as u32); | ||
new_row_ids.push(*new_id); | ||
} | ||
Some(None) => {} | ||
None => { | ||
indices.push(i as u32); | ||
new_row_ids.push(*row_id); | ||
} | ||
} | ||
} | ||
|
||
let indices = UInt32Array::from(indices); | ||
let new_row_ids = Arc::new(UInt64Array::from(new_row_ids)); | ||
let new_vectors = arrow::compute::take(b.column(1), &indices, None)?; | ||
|
||
Ok(RecordBatch::try_new( | ||
self.schema().clone(), | ||
vec![new_row_ids, new_vectors], | ||
)?) | ||
}) | ||
.collect::<Result<Vec<_>>>()?; | ||
|
||
let batch = concat_batches(self.schema(), batches.iter())?; | ||
Self::try_from_batch(batch, self.distance_type()) | ||
Comment on lines
+111
to
+112
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess remap is already slow so it probably doesn't matter but it seems odd we would need to concat here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah it's because |
||
} | ||
|
||
fn len(&self) -> usize; | ||
|
||
/// Returns true if this graph is empty. | ||
|
@@ -219,6 +259,10 @@ impl IvfQuantizationStorage { | |
Q::from_metadata(&metadata, self.distance_type) | ||
} | ||
|
||
pub fn schema(&self) -> SchemaRef { | ||
Arc::new(self.reader.schema().as_ref().into()) | ||
} | ||
|
||
/// Get the number of partitions in the storage. | ||
pub fn num_partitions(&self) -> usize { | ||
self.ivf.num_partitions() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,15 +8,18 @@ use std::sync::Arc; | |
|
||
use arrow::{array::AsArray, compute::sort_to_indices}; | ||
use arrow_array::{RecordBatch, UInt32Array}; | ||
use arrow_schema::Schema; | ||
use future::join_all; | ||
use futures::prelude::*; | ||
use lance_arrow::RecordBatchExt; | ||
use itertools::Itertools; | ||
use lance_arrow::{RecordBatchExt, SchemaExt}; | ||
use lance_core::{ | ||
cache::FileMetadataCache, | ||
utils::tokio::{get_num_compute_intensive_cpus, spawn_cpu}, | ||
Error, Result, | ||
}; | ||
use lance_encoding::decoder::{DecoderPlugins, FilterExpression}; | ||
use lance_file::v2::reader::ReaderProjection; | ||
use lance_file::v2::{ | ||
reader::{FileReader, FileReaderOptions}, | ||
writer::FileWriter, | ||
|
@@ -256,14 +259,35 @@ impl ShuffleReader for IvfShufflerReader { | |
FileReaderOptions::default(), | ||
) | ||
.await?; | ||
let schema = reader.schema().as_ref().into(); | ||
|
||
let schema: Schema = reader.schema().as_ref().into(); | ||
let projection = schema | ||
.fields() | ||
.iter() | ||
.enumerate() | ||
.filter_map(|(index, f)| { | ||
if f.name() != PART_ID_COLUMN { | ||
Some(index) | ||
} else { | ||
None | ||
} | ||
}) | ||
.collect::<Vec<_>>(); | ||
let schema = schema.project(&projection)?; | ||
let projection = ReaderProjection::from_column_names( | ||
reader.schema().as_ref(), | ||
&schema | ||
.field_names() | ||
.into_iter() | ||
.map(|s| s.as_ref()) | ||
.collect_vec(), | ||
)?; | ||
Ok(Some(Box::new(RecordBatchStreamAdapter::new( | ||
Arc::new(schema), | ||
reader.read_stream( | ||
reader.read_stream_projected( | ||
lance_io::ReadBatchParams::RangeFull, | ||
4096, | ||
16, | ||
projection, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we don't need the |
||
FilterExpression::no_filter(), | ||
)?, | ||
)))) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,6 +41,7 @@ use object_store::path::Path; | |
use snafu::{location, Location}; | ||
use tempfile::tempdir; | ||
use tracing::instrument; | ||
use utils::get_vector_element_type; | ||
use uuid::Uuid; | ||
|
||
use self::{ivf::*, pq::PQIndex}; | ||
|
@@ -253,57 +254,39 @@ pub(crate) async fn build_vector_index( | |
let temp_dir_path = Path::from_filesystem_path(temp_dir.path())?; | ||
let shuffler = IvfShuffler::new(temp_dir_path, ivf_params.num_partitions); | ||
if is_ivf_flat(stages) { | ||
let data_type = dataset | ||
.schema() | ||
.field(column) | ||
.ok_or(Error::Schema { | ||
message: format!("Column {} not found in schema", column), | ||
location: location!(), | ||
})? | ||
.data_type(); | ||
match data_type { | ||
DataType::FixedSizeList(f, _) => match f.data_type() { | ||
DataType::Float16 | DataType::Float32 | DataType::Float64 => { | ||
IvfIndexBuilder::<FlatIndex, FlatQuantizer>::new( | ||
dataset.clone(), | ||
column.to_owned(), | ||
dataset.indices_dir().child(uuid), | ||
params.metric_type, | ||
Box::new(shuffler), | ||
Some(ivf_params.clone()), | ||
Some(()), | ||
(), | ||
)? | ||
.build() | ||
.await?; | ||
} | ||
DataType::UInt8 => { | ||
IvfIndexBuilder::<FlatIndex, FlatBinQuantizer>::new( | ||
dataset.clone(), | ||
column.to_owned(), | ||
dataset.indices_dir().child(uuid), | ||
params.metric_type, | ||
Box::new(shuffler), | ||
Some(ivf_params.clone()), | ||
Some(()), | ||
(), | ||
)? | ||
.build() | ||
.await?; | ||
} | ||
_ => { | ||
return Err(Error::Index { | ||
message: format!( | ||
"Build Vector Index: invalid data type: {:?}", | ||
f.data_type() | ||
), | ||
location: location!(), | ||
}); | ||
} | ||
}, | ||
let element_type = get_vector_element_type(dataset, column)?; | ||
match element_type { | ||
DataType::Float16 | DataType::Float32 | DataType::Float64 => { | ||
IvfIndexBuilder::<FlatIndex, FlatQuantizer>::new( | ||
dataset.clone(), | ||
column.to_owned(), | ||
dataset.indices_dir().child(uuid), | ||
params.metric_type, | ||
Box::new(shuffler), | ||
Some(ivf_params.clone()), | ||
Some(()), | ||
(), | ||
)? | ||
.build() | ||
.await?; | ||
} | ||
DataType::UInt8 => { | ||
IvfIndexBuilder::<FlatIndex, FlatBinQuantizer>::new( | ||
dataset.clone(), | ||
column.to_owned(), | ||
dataset.indices_dir().child(uuid), | ||
params.metric_type, | ||
Box::new(shuffler), | ||
Some(ivf_params.clone()), | ||
Some(()), | ||
(), | ||
)? | ||
.build() | ||
.await?; | ||
} | ||
Comment on lines
+257
to
+286
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why did this change? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I noticed there are many lines are doing the same thing: get the vector data type / value type and check it. |
||
_ => { | ||
return Err(Error::Index { | ||
message: format!("Build Vector Index: invalid data type: {:?}", data_type), | ||
message: format!("Build Vector Index: invalid data type: {:?}", element_type), | ||
location: location!(), | ||
}); | ||
} | ||
|
@@ -416,30 +399,35 @@ pub(crate) async fn remap_vector_index( | |
.open_vector_index(column, &old_uuid.to_string()) | ||
.await?; | ||
old_index.check_can_remap()?; | ||
let ivf_index: &IVFIndex = | ||
old_index | ||
.as_any() | ||
.downcast_ref() | ||
.ok_or_else(|| Error::NotSupported { | ||
source: "Only IVF indexes can be remapped currently".into(), | ||
location: location!(), | ||
})?; | ||
|
||
remap_index_file( | ||
dataset.as_ref(), | ||
&old_uuid.to_string(), | ||
&new_uuid.to_string(), | ||
old_metadata.dataset_version, | ||
ivf_index, | ||
mapping, | ||
old_metadata.name.clone(), | ||
column.to_string(), | ||
// We can safely assume there are no transforms today. We assert above that the | ||
// top stage is IVF and IVF does not support transforms between IVF and PQ. This | ||
// will be fixed in the future. | ||
vec![], | ||
) | ||
.await?; | ||
|
||
if let Some(ivf_index) = old_index.as_any().downcast_ref::<IVFIndex>() { | ||
remap_index_file( | ||
dataset.as_ref(), | ||
&old_uuid.to_string(), | ||
&new_uuid.to_string(), | ||
old_metadata.dataset_version, | ||
ivf_index, | ||
mapping, | ||
old_metadata.name.clone(), | ||
column.to_string(), | ||
// We can safely assume there are no transforms today. We assert above that the | ||
// top stage is IVF and IVF does not support transforms between IVF and PQ. This | ||
// will be fixed in the future. | ||
vec![], | ||
) | ||
.await?; | ||
} else { | ||
// it's v3 index | ||
remap_index_file_v3( | ||
dataset.as_ref(), | ||
&new_uuid.to_string(), | ||
old_index, | ||
mapping, | ||
column.to_string(), | ||
) | ||
.await?; | ||
} | ||
|
||
Ok(()) | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: let's add a warning log here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh w8, we should remap sub index here no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for v3 we need to remap the subindex & vector storage. flat index doesn't contain anything so it simply returns itself here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flat map is still
{ origin_vector: row_id }
? ifrow id
changes during compaction, we need to remap them ?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remap on an vector index (v3) is:
for IVF_FLAT, the sub index is
FLAT
and storage isFlatStorage
. FLAT sub index doesn't contain any data so no need to do anything here. the remapping happens onFlatStorage