-
Notifications
You must be signed in to change notification settings - Fork 245
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support remapping for IVF_FLAT, IVF_PQ and IVF_SQ #2708
Changes from 17 commits
e24cbed
9d6e8f1
b5b6252
4c9ea83
0d34f8b
dfa1663
36959a6
deb0b25
3f4b06d
c26696b
77a6cd5
78cdb17
17c973d
7f5fae7
c7cb809
8ed78c8
1873581
e3175e6
c18b4dc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,10 +3,13 @@ | |
|
||
//! Vector Storage, holding (quantized) vectors and providing distance calculation. | ||
|
||
use std::collections::HashMap; | ||
use std::{any::Any, sync::Arc}; | ||
|
||
use arrow::array::AsArray; | ||
use arrow::compute::concat_batches; | ||
use arrow_array::{ArrayRef, RecordBatch}; | ||
use arrow::datatypes::UInt64Type; | ||
use arrow_array::{ArrayRef, RecordBatch, UInt32Array, UInt64Array}; | ||
use arrow_schema::{Field, SchemaRef}; | ||
use deepsize::DeepSizeOf; | ||
use futures::prelude::stream::TryStreamExt; | ||
|
@@ -70,7 +73,44 @@ pub trait VectorStore: Send + Sync + Sized + Clone { | |
|
||
fn schema(&self) -> &SchemaRef; | ||
|
||
fn to_batches(&self) -> Result<impl Iterator<Item = RecordBatch>>; | ||
fn to_batches(&self) -> Result<impl Iterator<Item = RecordBatch> + Send>; | ||
|
||
fn remap(&self, mapping: &HashMap<u64, Option<u64>>) -> Result<Self> { | ||
let batches = self | ||
.to_batches()? | ||
.map(|b| { | ||
let mut indices = Vec::with_capacity(b.num_rows()); | ||
let mut new_row_ids = Vec::with_capacity(b.num_rows()); | ||
|
||
let row_ids = b.column(0).as_primitive::<UInt64Type>().values(); | ||
for (i, row_id) in row_ids.iter().enumerate() { | ||
match mapping.get(row_id) { | ||
Some(Some(new_id)) => { | ||
indices.push(i as u32); | ||
new_row_ids.push(*new_id); | ||
} | ||
Some(None) => {} | ||
None => { | ||
indices.push(i as u32); | ||
new_row_ids.push(*row_id); | ||
} | ||
} | ||
} | ||
|
||
let indices = UInt32Array::from(indices); | ||
let new_row_ids = Arc::new(UInt64Array::from(new_row_ids)); | ||
let new_vectors = arrow::compute::take(b.column(1), &indices, None)?; | ||
|
||
Ok(RecordBatch::try_new( | ||
self.schema().clone(), | ||
vec![new_row_ids, new_vectors], | ||
)?) | ||
}) | ||
.collect::<Result<Vec<_>>>()?; | ||
|
||
let batch = concat_batches(self.schema(), batches.iter())?; | ||
Self::try_from_batch(batch, self.distance_type()) | ||
Comment on lines
+111
to
+112
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess remap is already slow so it probably doesn't matter but it seems odd we would need to concat here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah it's because |
||
} | ||
|
||
fn len(&self) -> usize; | ||
|
||
|
@@ -219,6 +259,10 @@ impl IvfQuantizationStorage { | |
Q::from_metadata(&metadata, self.distance_type) | ||
} | ||
|
||
pub fn schema(&self) -> SchemaRef { | ||
Arc::new(self.reader.schema().as_ref().into()) | ||
} | ||
|
||
/// Get the number of partitions in the storage. | ||
pub fn num_partitions(&self) -> usize { | ||
self.ivf.num_partitions() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,15 +8,18 @@ use std::sync::Arc; | |
|
||
use arrow::{array::AsArray, compute::sort_to_indices}; | ||
use arrow_array::{RecordBatch, UInt32Array}; | ||
use arrow_schema::Schema; | ||
use future::join_all; | ||
use futures::prelude::*; | ||
use lance_arrow::RecordBatchExt; | ||
use itertools::Itertools; | ||
use lance_arrow::{RecordBatchExt, SchemaExt}; | ||
use lance_core::{ | ||
cache::FileMetadataCache, | ||
utils::tokio::{get_num_compute_intensive_cpus, spawn_cpu}, | ||
Error, Result, | ||
}; | ||
use lance_encoding::decoder::{DecoderPlugins, FilterExpression}; | ||
use lance_file::v2::reader::ReaderProjection; | ||
use lance_file::v2::{ | ||
reader::{FileReader, FileReaderOptions}, | ||
writer::FileWriter, | ||
|
@@ -256,14 +259,35 @@ impl ShuffleReader for IvfShufflerReader { | |
FileReaderOptions::default(), | ||
) | ||
.await?; | ||
let schema = reader.schema().as_ref().into(); | ||
|
||
let schema: Schema = reader.schema().as_ref().into(); | ||
let projection = schema | ||
.fields() | ||
.iter() | ||
.enumerate() | ||
.filter_map(|(index, f)| { | ||
if f.name() != PART_ID_COLUMN { | ||
Some(index) | ||
} else { | ||
None | ||
} | ||
}) | ||
.collect::<Vec<_>>(); | ||
let schema = schema.project(&projection)?; | ||
let projection = ReaderProjection::from_column_names( | ||
reader.schema().as_ref(), | ||
&schema | ||
.field_names() | ||
.into_iter() | ||
.map(|s| s.as_ref()) | ||
.collect_vec(), | ||
)?; | ||
Ok(Some(Box::new(RecordBatchStreamAdapter::new( | ||
Arc::new(schema), | ||
reader.read_stream( | ||
reader.read_stream_projected( | ||
lance_io::ReadBatchParams::RangeFull, | ||
4096, | ||
16, | ||
projection, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we don't need the |
||
FilterExpression::no_filter(), | ||
)?, | ||
)))) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1689,6 +1689,15 @@ impl Scanner { | |
// Check if we've created new versions since the index was built. | ||
let unindexed_fragments = self.dataset.unindexed_fragments(&index.name).await?; | ||
if !unindexed_fragments.is_empty() { | ||
// need to set the metric type to be the same as the index | ||
// to make sure the distance is comparable. | ||
let idx = self | ||
.dataset | ||
.open_vector_index(q.column.as_str(), &index.uuid.to_string()) | ||
.await?; | ||
let mut q = q.clone(); | ||
q.metric_type = idx.metric_type(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this fixes a bug that with unindexed data, the flat search may compute the distances in a different distance type |
||
|
||
// If the vector column is not present, we need to take the vector column, so | ||
// that the distance value is comparable with the flat search ones. | ||
if knn_node.schema().column_with_name(&q.column).is_none() { | ||
|
@@ -1725,7 +1734,7 @@ impl Scanner { | |
scan_node = Arc::new(FilterExec::try_new(physical_refine_expr, scan_node)?); | ||
} | ||
// first we do flat search on just the new data | ||
let topk_appended = self.flat_knn(scan_node, q)?; | ||
let topk_appended = self.flat_knn(scan_node, &q)?; | ||
|
||
// To do a union, we need to make the schemas match. Right now | ||
// knn_node: _distance, _rowid, vector | ||
|
@@ -1740,7 +1749,7 @@ impl Scanner { | |
datafusion::physical_plan::Partitioning::RoundRobinBatch(1), | ||
)?; | ||
// then we do a flat search on KNN(new data) + ANN(indexed data) | ||
return self.flat_knn(Arc::new(unioned), q); | ||
return self.flat_knn(Arc::new(unioned), &q); | ||
} | ||
|
||
Ok(knn_node) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: let's add a warning log here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh w8, we should remap sub index here no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for v3 we need to remap the subindex & vector storage. flat index doesn't contain anything so it simply returns itself here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flat map is still
{ origin_vector: row_id }
? ifrow id
changes during compaction, we need to remap them ?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remap on an vector index (v3) is:
for IVF_FLAT, the sub index is
FLAT
and storage isFlatStorage
. FLAT sub index doesn't contain any data so no need to do anything here. the remapping happens onFlatStorage