diff --git a/benchmarks/sift/lance_sift1m_stats.csv b/benchmarks/sift/lance_sift1m_stats.csv index e42f5e2350..cd0afeaef6 100644 --- a/benchmarks/sift/lance_sift1m_stats.csv +++ b/benchmarks/sift/lance_sift1m_stats.csv @@ -1,49 +1,49 @@ ivf,pq,nprobes,nsamples,topk,refine_factor,recall@k,mean_time_sec -512,16,1,100,10,,0.561,0.0011869692802429 -512,16,1,100,10,1.0,0.561,0.0011151361465454 -512,16,1,100,10,5.0,0.86,0.0013169264793395 -512,16,1,100,10,10.0,0.7559999999999999,0.0016109180450439 -512,16,10,100,10,,0.6409999999999999,0.0017412400245666 -512,16,10,100,10,1.0,0.653,0.0018005084991455 -512,16,10,100,10,5.0,0.954,0.0020255255699157 -512,16,10,100,10,10.0,0.984,0.0023340821266174 -512,16,50,100,10,,0.655,0.0047774505615234 -512,16,50,100,10,1.0,0.639,0.0047786116600036 -512,16,50,100,10,5.0,0.9700000000000002,0.005054075717926 -512,16,50,100,10,10.0,0.996,0.0054328942298889 -512,16,100,100,10,,0.6829999999999998,0.0095507001876831 -512,16,100,100,10,1.0,0.6609999999999999,0.0084998655319213 -512,16,100,100,10,5.0,0.9690000000000002,0.0087792038917541 -512,16,100,100,10,10.0,0.985,0.009198293685913 -1024,16,1,100,10,,0.5770000000000001,0.001229350566864 -1024,16,1,100,10,1.0,0.57,0.0013754272460937 -1024,16,1,100,10,5.0,0.714,0.0015950870513916 -1024,16,1,100,10,10.0,0.7800000000000001,0.0019168639183044 -1024,16,10,100,10,,0.665,0.0017744970321655 -1024,16,10,100,10,1.0,0.661,0.0018366694450378 -1024,16,10,100,10,5.0,0.95,0.0021376228332519 -1024,16,10,100,10,10.0,0.973,0.0024430847167968 -1024,16,50,100,10,,0.649,0.0042504262924194 -1024,16,50,100,10,1.0,0.632,0.0042378640174865 -1024,16,50,100,10,5.0,0.9640000000000002,0.0045270037651062 -1024,16,50,100,10,10.0,0.996,0.0049609613418579 -1024,16,100,100,10,,0.6819999999999999,0.0072804021835327 -1024,16,100,100,10,1.0,0.686,0.007310128211975 -1024,16,100,100,10,5.0,0.981,0.0076206445693969 -1024,16,100,100,10,10.0,0.996,0.0079639410972595 -2048,16,1,100,10,,0.59,0.0016197514533996 -2048,16,1,100,10,1.0,0.546,0.0016238856315612 -2048,16,1,100,10,5.0,0.674,0.0018658971786499 -2048,16,1,100,10,10.0,0.6980000000000001,0.0021486544609069 -2048,16,10,100,10,,0.6679999999999998,0.0020835614204406 -2048,16,10,100,10,1.0,0.69,0.0021107578277587 -2048,16,10,100,10,5.0,0.9460000000000002,0.0024298906326293 -2048,16,10,100,10,10.0,0.966,0.0026938366889953 -2048,16,50,100,10,,0.6350000000000001,0.0042195105552673 -2048,16,50,100,10,1.0,0.6809999999999999,0.0042567706108093 -2048,16,50,100,10,5.0,0.963,0.004510052204132 -2048,16,50,100,10,10.0,0.9920000000000002,0.0048563075065612 -2048,16,100,100,10,,0.6779999999999998,0.0068572688102722 -2048,16,100,100,10,1.0,0.6729999999999998,0.0068558740615844 -2048,16,100,100,10,5.0,0.975,0.0071770000457763 -2048,16,100,100,10,10.0,0.995,0.0075724363327026 +512,16,1,100,10,,0.5459999999999999,0.0012434267997741 +512,16,1,100,10,1.0,0.573,0.0012985253334045 +512,16,1,100,10,5.0,0.7799999999999998,0.0016234064102172 +512,16,1,100,10,10.0,0.81,0.0021506214141845 +512,16,10,100,10,,0.643,0.0019894814491271 +512,16,10,100,10,1.0,0.6429999999999998,0.0020099234580993 +512,16,10,100,10,5.0,0.968,0.0023579096794128 +512,16,10,100,10,10.0,0.975,0.0028011083602905 +512,16,50,100,10,,0.627,0.005469377040863 +512,16,50,100,10,1.0,0.647,0.0049861502647399 +512,16,50,100,10,5.0,0.972,0.0052753090858459 +512,16,50,100,10,10.0,0.995,0.0065333843231201 +512,16,100,100,10,,0.6499999999999999,0.0086667680740356 +512,16,100,100,10,1.0,0.6829999999999998,0.0090697479248046 +512,16,100,100,10,5.0,0.973,0.0090816569328308 +512,16,100,100,10,10.0,0.992,0.0094730043411254 +1024,16,1,100,10,,0.5720000000000001,0.0014924383163452 +1024,16,1,100,10,1.0,0.5980000000000001,0.0016877269744873 +1024,16,1,100,10,5.0,0.7610000000000001,0.0017683720588684 +1024,16,1,100,10,10.0,0.7209999999999998,0.002867727279663 +1024,16,10,100,10,,0.6729999999999998,0.0020465373992919 +1024,16,10,100,10,1.0,0.664,0.0019461870193481 +1024,16,10,100,10,5.0,0.975,0.0023498964309692 +1024,16,10,100,10,10.0,0.9870000000000002,0.0030326318740844 +1024,16,50,100,10,,0.669,0.0047199487686157 +1024,16,50,100,10,1.0,0.6560000000000001,0.0045324063301086 +1024,16,50,100,10,5.0,0.9740000000000002,0.0048948669433593 +1024,16,50,100,10,10.0,0.988,0.0055017662048339 +1024,16,100,100,10,,0.6579999999999999,0.0077169895172119 +1024,16,100,100,10,1.0,0.6580000000000001,0.0077496647834777 +1024,16,100,100,10,5.0,0.969,0.0079165744781494 +1024,16,100,100,10,10.0,0.991,0.008975670337677 +2048,16,1,100,10,,0.5010000000000001,0.0020228648185729 +2048,16,1,100,10,1.0,0.494,0.0022545051574707 +2048,16,1,100,10,5.0,0.627,0.0024846982955932 +2048,16,1,100,10,10.0,0.7119999999999999,0.0031368565559387 +2048,16,10,100,10,,0.6539999999999999,0.0028012990951538 +2048,16,10,100,10,1.0,0.6549999999999998,0.002736701965332 +2048,16,10,100,10,5.0,0.948,0.0028632616996765 +2048,16,10,100,10,10.0,0.97,0.0035367107391357 +2048,16,50,100,10,,0.662,0.0047123503684997 +2048,16,50,100,10,1.0,0.7040000000000001,0.0048845314979553 +2048,16,50,100,10,5.0,0.974,0.005355429649353 +2048,16,50,100,10,10.0,0.997,0.0055468249320983 +2048,16,100,100,10,,0.6699999999999998,0.0074122834205627 +2048,16,100,100,10,1.0,0.6629999999999999,0.0074411630630493 +2048,16,100,100,10,5.0,0.9770000000000002,0.0080564498901367 +2048,16,100,100,10,10.0,0.996,0.0083680939674377 diff --git a/protos/index.proto b/protos/index.proto index cd932f3cdc..ab37e46869 100644 --- a/protos/index.proto +++ b/protos/index.proto @@ -72,6 +72,23 @@ message PQ { repeated float codebook = 4; } +// Transform type +enum TransformType { + OPQ = 0; +} + +// A transform matrix to apply to a vector or vectors. +message Transform { + // The file offset the matrix is stored + uint64 position = 1; + + // Data shape of the matrix, [rows, cols]. + repeated uint32 shape = 2; + + // Transform type. + TransformType type = 3; +} + // Flat Index message Flat {} @@ -84,6 +101,8 @@ message VectorIndexStage { IVF ivf = 2; // Product Quantization PQ pq = 3; + // Transformer + Transform transform = 4; } } diff --git a/rust/src/arrow.rs b/rust/src/arrow.rs index 86257f794f..64b4b4656e 100644 --- a/rust/src/arrow.rs +++ b/rust/src/arrow.rs @@ -138,7 +138,7 @@ pub trait GenericListArrayExt where Offset::Native: OffsetSizeTrait, { - /// Create an [`ListArray`] from values and offsets. + /// Create an [`GenericListArray`] from values and offsets. /// /// ``` /// use arrow_array::{Int32Array, Int64Array, ListArray}; diff --git a/rust/src/arrow/linalg.rs b/rust/src/arrow/linalg.rs index 04937611e3..212377ec7a 100644 --- a/rust/src/arrow/linalg.rs +++ b/rust/src/arrow/linalg.rs @@ -23,7 +23,8 @@ use arrow::{ array::{as_primitive_array, Float32Builder}, datatypes::Float32Type, }; -use arrow_array::{Array, Float32Array}; +use arrow_array::{Array, FixedSizeListArray, Float32Array}; +use arrow_schema::DataType; use rand::{distributions::Standard, rngs::SmallRng, seq::IteratorRandom, Rng, SeedableRng}; #[allow(unused_imports)] @@ -130,6 +131,15 @@ impl MatrixView { } } + pub fn row(&self, i: usize) -> Option { + if i >= self.num_rows() { + None + } else { + let slice_arr = self.data.slice(i * self.num_columns(), self.num_columns()); + Some(as_primitive_array(slice_arr.as_ref()).clone()) + } + } + /// (Lazy) transpose of the matrix. /// pub fn transpose(&self) -> Self { @@ -225,6 +235,25 @@ impl MatrixView { } } +impl TryFrom<&FixedSizeListArray> for MatrixView { + type Error = Error; + + fn try_from(fsl: &FixedSizeListArray) -> Result { + if !matches!(fsl.value_type(), DataType::Float32) { + return Err(Error::Arrow(format!( + "Only support convert f32 FixedSizeListArray to MatrixView, got {}", + fsl.data_type() + ))); + } + let values = fsl.values(); + Ok(Self { + data: Arc::new(as_primitive_array(values.as_ref()).clone()), + num_columns: fsl.value_length() as usize, + transpose: false, + }) + } +} + /// Single Value Decomposition. /// /// diff --git a/rust/src/bin/lq.rs b/rust/src/bin/lq.rs index 62a13ab099..2eb64ceaae 100644 --- a/rust/src/bin/lq.rs +++ b/rust/src/bin/lq.rs @@ -181,7 +181,7 @@ async fn create_index( &[&col], lance::index::IndexType::Vector, name.clone(), - &VectorIndexParams::ivf_pq(*num_partitions, 8, *num_sub_vectors, mt), + &VectorIndexParams::ivf_pq(*num_partitions, 8, *num_sub_vectors, true, mt, 50), false, ) .await diff --git a/rust/src/dataset.rs b/rust/src/dataset.rs index 10bf62506f..c728b64640 100644 --- a/rust/src/dataset.rs +++ b/rust/src/dataset.rs @@ -36,10 +36,8 @@ use self::scanner::Scanner; use crate::arrow::*; use crate::datatypes::Schema; use crate::format::{pb, Fragment, Index, Manifest}; -use crate::index::{ - vector::{ivf::IvfPqIndexBuilder, VectorIndexParams}, - IndexBuilder, IndexParams, IndexType, -}; +use crate::index::vector::ivf::{build_ivf_pq_index, IvfBuildParams, PQBuildParams}; +use crate::index::{vector::VectorIndexParams, IndexParams, IndexType}; use crate::io::{ object_reader::{read_message, read_struct}, read_manifest, read_metadata_offset, write_manifest, FileReader, FileWriter, ObjectStore, @@ -290,7 +288,7 @@ impl Dataset { let base = object_store.base_path().clone(); Ok(Self { object_store, - base, + base: base.into(), manifest: Arc::new(manifest.clone()), }) } @@ -394,16 +392,27 @@ impl Dataset { } } - let builder = IvfPqIndexBuilder::try_new( + let ivf_params = IvfBuildParams { + num_partitions: vec_params.num_partitions as usize, + metric_type: vec_params.metric_type, + max_iters: vec_params.max_iterations, + }; + let pq_params = PQBuildParams { + num_sub_vectors: vec_params.num_sub_vectors as usize, + num_bits: 8, + metric_type: vec_params.metric_type, + use_opq: vec_params.use_opq, + max_iters: vec_params.max_iterations, + }; + build_ivf_pq_index( self, - index_id, - &index_name, column, - vec_params.num_partitions, - vec_params.num_sub_vectors, - vec_params.metric_type, - )?; - builder.build().await? + &index_name, + &index_id, + &ivf_params, + &pq_params, + ) + .await? } } @@ -534,6 +543,14 @@ impl Dataset { Ok(as_struct_array(&reordered).into()) } + /// Sample `n` rows from the dataset. + pub(crate) async fn sample(&self, n: usize, projection: &Schema) -> Result { + use rand::seq::IteratorRandom; + let num_rows = self.count_rows().await?; + let ids = (0..num_rows).choose_multiple(&mut rand::thread_rng(), n); + Ok(self.take(&ids[..], &projection).await?) + } + pub(crate) fn object_store(&self) -> &ObjectStore { &self.object_store } diff --git a/rust/src/index/vector.rs b/rust/src/index/vector.rs index faeff77a85..017832163f 100644 --- a/rust/src/index/vector.rs +++ b/rust/src/index/vector.rs @@ -30,12 +30,15 @@ mod kmeans; mod opq; mod pq; -use super::IndexParams; +use super::{pb, IndexParams}; use crate::{ + arrow::linalg::MatrixView, utils::distance::{cosine::cosine_distance, l2::l2_distance}, Error, Result, }; +const MAX_ITERATIONS: usize = 50; + /// Query parameters for the vector indices #[derive(Debug, Clone)] pub struct Query { @@ -79,6 +82,26 @@ pub trait VectorIndex { async fn search(&self, query: &Query) -> Result; } +/// Transformer on vectors. +#[async_trait] +pub trait Transformer: std::fmt::Debug + Sync + Send { + /// Train the transformer. + /// + /// Parameters: + /// - *data*: training vectors. + async fn train(&mut self, data: &MatrixView) -> Result<()>; + + /// Apply transform on the matrix `data`. + /// + /// Returns a new Matrix instead. + async fn transform(&self, data: &MatrixView) -> Result; + + /// Try to convert into protobuf. + /// + /// TODO: can we use TryFrom/TryInto as trait constrats? + fn try_into_pb(&self) -> Result; +} + /// Distance metrics type. #[derive(Debug, Copy, Clone, PartialEq)] pub enum MetricType { @@ -150,11 +173,17 @@ pub struct VectorIndexParams { /// the number of bits to present the centroids used in PQ. pub nbits: u8, + /// Use Optimized Product Quantizer. + pub use_opq: bool, + /// the number of sub vectors used in PQ. pub num_sub_vectors: u32, /// Vector distance metrics type. pub metric_type: MetricType, + + /// Max number of iterations to train a KMean model + pub max_iterations: usize, } impl VectorIndexParams { @@ -165,17 +194,22 @@ impl VectorIndexParams { /// - `num_partitions`: the number of IVF partitions. /// - `nbits`: the number of bits to present the centroids used in PQ. Can only be `8` for now. /// - `num_sub_vectors`: the number of sub vectors used in PQ. + /// - `metric_type`: how to compute distance, i.e., `L2` or `Cosine`. pub fn ivf_pq( num_partitions: u32, nbits: u8, num_sub_vectors: u32, + use_opq: bool, metric_type: MetricType, + max_iterations: usize, ) -> Self { Self { num_partitions, nbits, num_sub_vectors, + use_opq, metric_type, + max_iterations, } } } @@ -186,7 +220,9 @@ impl Default for VectorIndexParams { num_partitions: 32, nbits: 8, num_sub_vectors: 16, + use_opq: true, metric_type: MetricType::L2, + max_iterations: MAX_ITERATIONS, // Faiss } } } diff --git a/rust/src/index/vector/ivf.rs b/rust/src/index/vector/ivf.rs index 2979554413..350541e5ec 100644 --- a/rust/src/index/vector/ivf.rs +++ b/rust/src/index/vector/ivf.rs @@ -1,66 +1,60 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at +// Copyright 2023 Lance Developers. // -// http://www.apache.org/licenses/LICENSE-2.0 +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at // -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. //! IVF - Inverted File index. use std::sync::Arc; -use arrow_arith::aggregate::{max, min}; +use arrow::array::UInt32Builder; use arrow_arith::arithmetic::subtract_dyn; -use arrow_array::builder::Float32Builder; use arrow_array::{ + builder::Float32Builder, cast::{as_primitive_array, as_struct_array}, Array, ArrayRef, BooleanArray, FixedSizeListArray, Float32Array, RecordBatch, StructArray, UInt32Array, }; use arrow_ord::sort::sort_to_indices; use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema}; -use arrow_select::{ - concat::{concat, concat_batches}, - filter::filter_record_batch, - take::take, -}; +use arrow_select::{concat::concat_batches, filter::filter_record_batch, take::take}; use async_trait::async_trait; use futures::{ stream::{self, StreamExt}, TryStreamExt, }; -use rand::SeedableRng; -use rand::{rngs::SmallRng, Rng}; +use rand::{rngs::SmallRng, SeedableRng}; use uuid::Uuid; use super::{ pq::{PQIndex, ProductQuantizer}, - MetricType, Query, VectorIndex, + MetricType, Query, Transformer, VectorIndex, }; -use crate::arrow::*; +use crate::arrow::{linalg::MatrixView, *}; +use crate::index::vector::opq::*; use crate::io::{ object_reader::{read_message, ObjectReader}, read_message_from_buf, read_metadata_offset, }; use crate::{ - dataset::{scanner::Scanner, Dataset, ROW_ID}, - index::{pb, pb::vector_index_stage::Stage, IndexBuilder, IndexType}, + dataset::{Dataset, ROW_ID}, + index::{pb, pb::vector_index_stage::Stage}, }; use crate::{Error, Result}; const INDEX_FILE_NAME: &str = "index.idx"; const PARTITION_ID_COLUMN: &str = "__ivf_part_id"; const RESIDUAL_COLUMN: &str = "__residual_vector"; +const PQ_CODE_COLUMN: &str = "__pq_code"; /// IVF PQ Index. pub struct IvfPQIndex<'a> { @@ -73,6 +67,9 @@ pub struct IvfPQIndex<'a> { pq: Arc, metric_type: MetricType, + + /// Transform applys to each vector. + transforms: Vec>, } impl<'a> IvfPQIndex<'a> { @@ -102,11 +99,32 @@ impl<'a> IvfPQIndex<'a> { }; let index_metadata = IvfPQIndexMetadata::try_from(&proto)?; + let reader_ref = reader.as_ref(); + let transforms = stream::iter(index_metadata.transforms) + .map(|tf| async move { + Ok::, Error>(Arc::new( + OptimizedProductQuantizer::load( + reader_ref, + tf.position as usize, + tf.shape + .iter() + .map(|s| *s as usize) + .collect::>() + .as_slice(), + ) + .await?, + )) + }) + .buffered(4) + .try_collect::>>() + .await?; + Ok(Self { reader, ivf: index_metadata.ivf, pq: index_metadata.pq, metric_type: index_metadata.metric_type, + transforms, }) } @@ -137,12 +155,18 @@ impl<'a> IvfPQIndex<'a> { #[async_trait] impl VectorIndex for IvfPQIndex<'_> { async fn search(&self, query: &Query) -> Result { + let mut mat = MatrixView::new(query.key.clone(), query.key.len()); + for transform in self.transforms.iter() { + mat = transform.transform(&mat).await?; + } + let key = mat.data(); + let key_ref = key.as_ref(); let partition_ids = self .ivf - .find_partitions(&query.key, query.nprobs, self.metric_type)?; + .find_partitions(key_ref, query.nprobs, self.metric_type)?; let candidates = stream::iter(partition_ids.values()) .then(|part_id| async move { - self.search_in_partition(*part_id as usize, &query.key, query.k) + self.search_in_partition(*part_id as usize, key_ref, query.k) .await }) .collect::>() @@ -184,20 +208,45 @@ pub struct IvfPQIndexMetadata { /// The version of dataset where this index was built. dataset_version: u64, + /// Metric to compute distance metric_type: MetricType, - // Ivf related + /// IVF model ivf: Ivf, /// Product Quantizer pq: Arc, + + /// Transforms to be applied before search. + transforms: Vec, } /// Convert a IvfPQIndex to protobuf payload impl TryFrom<&IvfPQIndexMetadata> for pb::Index { type Error = Error; - fn try_from(idx: &IvfPQIndexMetadata) -> std::result::Result { + fn try_from(idx: &IvfPQIndexMetadata) -> Result { + let mut stages: Vec = idx + .transforms + .iter() + .map(|tf| { + Ok(pb::VectorIndexStage { + stage: Some(pb::vector_index_stage::Stage::Transform(tf.clone())), + }) + }) + .collect::>>()?; + + stages.extend_from_slice(&[ + pb::VectorIndexStage { + stage: Some(pb::vector_index_stage::Stage::Ivf(pb::Ivf::try_from( + &idx.ivf, + )?)), + }, + pb::VectorIndexStage { + stage: Some(pb::vector_index_stage::Stage::Pq(idx.pq.as_ref().into())), + }, + ]); + Ok(Self { name: idx.name.clone(), columns: vec![idx.column.clone()], @@ -206,16 +255,7 @@ impl TryFrom<&IvfPQIndexMetadata> for pb::Index { implementation: Some(pb::index::Implementation::VectorIndex(pb::VectorIndex { spec_version: 1, dimension: idx.dimension, - stages: vec![ - pb::VectorIndexStage { - stage: Some(pb::vector_index_stage::Stage::Ivf(pb::Ivf::try_from( - &idx.ivf, - )?)), - }, - pb::VectorIndexStage { - stage: Some(pb::vector_index_stage::Stage::Pq(idx.pq.as_ref().into())), - }, - ], + stages, metric_type: match idx.metric_type { MetricType::L2 => pb::VectorMetricType::L2.into(), MetricType::Cosine => pb::VectorMetricType::Cosine.into(), @@ -238,8 +278,9 @@ impl TryFrom<&pb::Index> for IvfPQIndexMetadata { if let Some(idx_impl) = idx.implementation.as_ref() { match idx_impl { pb::index::Implementation::VectorIndex(vidx) => { - if vidx.stages.len() != 2 { - return Err(Error::IO("Only support IVF_PQ now".to_string())); + let num_stages = vidx.stages.len(); + if num_stages != 2 && num_stages != 3 { + return Err(Error::IO("Only support IVF_(O)PQ now".to_string())); }; let stage0 = vidx.stages[0].stage.as_ref().ok_or_else(|| { Error::IO("VectorIndex stage 0 is missing".to_string()) @@ -249,7 +290,7 @@ impl TryFrom<&pb::Index> for IvfPQIndexMetadata { _ => Err(Error::IO("Stage 0 only supports IVF".to_string())), }?; let stage1 = vidx.stages[1].stage.as_ref().ok_or_else(|| { - Error::IO("VectorIndex stage 0 is missing".to_string()) + Error::IO("VectorIndex stage 1 is missing".to_string()) })?; let pq = match stage1 { Stage::Pq(pq_proto) => Ok(Arc::new(pq_proto.into())), @@ -269,6 +310,7 @@ impl TryFrom<&pb::Index> for IvfPQIndexMetadata { .into(), ivf, pq, + transforms: vec![], }) } }? @@ -279,26 +321,6 @@ impl TryFrom<&pb::Index> for IvfPQIndexMetadata { } } -fn compute_residual( - centroids: Arc, - vector_array: &FixedSizeListArray, - partition_ids: &UInt32Array, -) -> Result { - let mut residual_builder = Float32Builder::new(); - for i in 0..vector_array.len() { - let vector = vector_array.value(i); - let centroids = centroids.value(partition_ids.value(i) as usize); - let residual_vector = subtract_dyn(vector.as_ref(), centroids.as_ref())?; - let residual_float32: &Float32Array = as_primitive_array(residual_vector.as_ref()); - residual_builder.append_slice(residual_float32.values()); - } - let values = residual_builder.finish(); - Ok(Arc::new(FixedSizeListArray::try_new( - values, - vector_array.value_length(), - )?)) -} - /// Ivf Model #[derive(Debug)] struct Ivf { @@ -329,6 +351,11 @@ impl Ivf { self.centroids.value_length() as usize } + /// Number of IVF partitions. + fn num_partitions(&self) -> usize { + self.centroids.len() + } + /// Use the query vector to find `nprobes` closest partitions. fn find_partitions( &self, @@ -360,84 +387,55 @@ impl Ivf { self.lengths.push(len); } - /// Scan the dataset and assign the partition ID for each row. + /// Compute the partition ID and residual vectors. + /// + /// Parameters + /// - *data*: input matrix to compute residual. + /// - *metric_type*: the metric type to compute distance. /// - /// Currently, it keeps batches in the memory. - async fn partition( + /// Returns a `RecordBatch` with schema `{__part_id: u32, __residual: FixedSizeList}` + pub fn compute_partition_and_residual( &self, - scanner: &Scanner, + data: &MatrixView, metric_type: MetricType, - ) -> Result> { - let schema = scanner.schema()?; - let column_name = schema.field(0).name(); - let batches_with_partition_id = scanner - .try_into_stream() - .await? - .map(|b| async move { - let batch = b?; - let arr = batch.column_by_name(column_name).ok_or_else(|| { - Error::IO(format!("Dataset does not have column {column_name}")) - })?; - let vectors = as_fixed_size_list_array(arr).clone(); - // let vec = vectors.clone(); - let values = self.centroids.values(); - let centroids = as_primitive_array(values.as_ref()).clone(); - let partition_ids = tokio::task::spawn_blocking(move || { - let dist_func = metric_type.func(); - (0..vectors.len()) - .map(|idx| { - let arr = vectors.value(idx); - let f: &Float32Array = as_primitive_array(&arr); - Ok(argmin(dist_func(f, ¢roids, f.len())?.as_ref()).unwrap()) - }) - .collect::>>() - }) - .await??; - let partition_column = Arc::new(UInt32Array::from(partition_ids)); - let batch_with_part_id = batch.try_with_column( - ArrowField::new(PARTITION_ID_COLUMN, DataType::UInt32, false), - partition_column, - )?; - Ok::(batch_with_part_id) - }) - .buffer_unordered(16) - .try_collect::>() - .await?; + ) -> Result { + let mut part_id_builder = UInt32Builder::with_capacity(data.num_rows()); + let mut residual_builder = + Float32Builder::with_capacity(data.num_columns() * data.num_rows()); - // Compute the residual vectors for every RecordBatch. - // let mut residual_batches = vec![]; - let residual_batches = stream::iter(batches_with_partition_id) - .map(|batch| async move { - let centorids = self.centroids.clone(); - let vector = batch.column_by_name(column_name).unwrap().clone(); - let partition_ids = batch.column_by_name(PARTITION_ID_COLUMN).unwrap().clone(); - let residual = tokio::task::spawn_blocking(move || { - compute_residual( - centorids.clone(), - as_fixed_size_list_array(vector.as_ref()), - as_primitive_array(partition_ids.as_ref()), - ) - }) - .await??; - let residual_schema = Arc::new(ArrowSchema::new(vec![ - ArrowField::new(RESIDUAL_COLUMN, residual.data_type().clone(), false), - ArrowField::new(PARTITION_ID_COLUMN, DataType::UInt32, false), - ArrowField::new(ROW_ID, DataType::UInt64, false), - ])); - let b = RecordBatch::try_new( - residual_schema, - vec![ - residual, - batch.column_by_name(PARTITION_ID_COLUMN).unwrap().clone(), - batch.column_by_name(ROW_ID).unwrap().clone(), - ], - )?; - Ok::(b) - }) - .buffer_unordered(16) - .try_collect::>() - .await?; - Ok(residual_batches) + let dim = data.num_columns(); + let dist_func = metric_type.func(); + let centroids: MatrixView = self.centroids.as_ref().try_into()?; + for i in 0..data.num_rows() { + let vector = data.row(i).unwrap(); + let part_id = argmin( + dist_func(&vector, centroids.data().as_ref(), dim) + .unwrap() + .as_ref(), + ) + .unwrap(); + part_id_builder.append_value(part_id); + let cent = centroids.row(part_id as usize).unwrap(); + let residual = subtract_dyn(&vector, ¢)?; + let resi_arr: &Float32Array = as_primitive_array(&residual); + residual_builder.append_slice(resi_arr.values()); + } + + let part_ids = part_id_builder.finish(); + let residuals = FixedSizeListArray::try_new(residual_builder.finish(), dim as i32)?; + let schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new(PARTITION_ID_COLUMN, DataType::UInt32, false), + ArrowField::new( + RESIDUAL_COLUMN, + DataType::FixedSizeList( + Box::new(ArrowField::new("item", DataType::Float32, true)), + dim as i32, + ), + false, + ), + ])); + let batch = RecordBatch::try_new(schema, vec![Arc::new(part_ids), Arc::new(residuals)])?; + Ok(batch) } } @@ -478,241 +476,313 @@ impl TryFrom<&pb::Ivf> for Ivf { } } -pub struct IvfPqIndexBuilder<'a> { - dataset: &'a Dataset, - - /// Unique id of the index. - uuid: Uuid, - - /// Index name - name: String, - - /// Vector column to search for. - column: String, +fn sanity_check(dataset: &Dataset, column: &str) -> Result<()> { + let Some(field) = dataset.schema().field(column) else { + return Err(Error::IO(format!( + "Building index: column {} does not exist in dataset: {:?}", + column, dataset + ))); + }; + if let DataType::FixedSizeList(elem_type, _) = field.data_type() { + if !matches!(elem_type.data_type(), DataType::Float32) { + return Err( + Error::Index( + format!("VectorIndex requires the column data type to be fixed size list of float32s, got {}", + elem_type.data_type()))); + } + } else { + return Err(Error::Index(format!( + "VectorIndex requires the column data type to be fixed size list of float32s, got {}", + field.data_type() + ))); + } + Ok(()) +} - dimension: usize, +/// Parameters to build IVF partitions +pub struct IvfBuildParams { + /// Number of partitions to build. + pub num_partitions: usize, - /// Metric type. - metric_type: MetricType, + /// Metric type, L2 or Cosine. + pub metric_type: MetricType, - /// Number of IVF partitions. - num_partitions: u32, - - // PQ parameters - nbits: u32, + // ---- kmeans parameters + /// Max number of iterations to train kmeans. + pub max_iters: usize, +} - num_sub_vectors: u32, +/// Parameters for building product quantization. +pub struct PQBuildParams { + /// Number of subvectors to build PQ code + pub num_sub_vectors: usize, - /// Max iterations to train a k-mean model. - kmeans_max_iters: u32, -} + /// The number of bits to present one PQ centroid. + pub num_bits: usize, -impl<'a> IvfPqIndexBuilder<'a> { - pub fn try_new( - dataset: &'a Dataset, - uuid: Uuid, - name: &str, - column: &str, - num_partitions: u32, - num_sub_vectors: u32, - metric_type: MetricType, - ) -> Result { - let field = dataset.schema().field(column).ok_or(Error::IO(format!( - "Column {column} does not exist in the dataset" - )))?; - let DataType::FixedSizeList(_, d) = field.data_type() else { - return Err(Error::IO(format!("Column {column} is not a vector type"))); - }; - Ok(Self { - dataset, - uuid, - name: name.to_string(), - column: column.to_string(), - dimension: d as usize, - metric_type, - num_partitions, - num_sub_vectors, - nbits: 8, - kmeans_max_iters: 100, - }) - } + /// Metric type, L2 or Cosine. + pub metric_type: MetricType, - fn sanity_check(&self) -> Result<()> { - // Step 1. Sanity check - let Some(field) = self.dataset.schema().field(&self.column) else { - return Err(Error::IO(format!( - "Building index: column {} does not exist in dataset: {:?}", - self.column, self.dataset - ))); -}; - if let DataType::FixedSizeList(elem_type, _) = field.data_type() { - if !matches!(elem_type.data_type(), DataType::Float32) { - return Err( - Error::Index( - format!("VectorIndex requires the column data type to be fixed size list of float32s, got {}", - elem_type.data_type()))); - } - } else { - return Err(Error::Index( - format!("VectorIndex requires the column data type to be fixed size list of float32s, got {}", - field.data_type()))); - } - Ok(()) - } + /// Train as optimized product quantization. + pub use_opq: bool, - /// Train IVF partitions using kmeans. - async fn train_ivf_model(&self) -> Result { - let mut scanner = self.dataset.scan(); - scanner.project(&[&self.column])?; - - let rng = SmallRng::from_entropy(); - Ok(Ivf::new( - train_kmeans_model( - &scanner, - self.dimension, - self.num_partitions as usize, - self.kmeans_max_iters, - rng.clone(), - self.metric_type, - ) - .await?, - )) - } + /// The max number of iterations for kmeans training. + pub max_iters: usize, } -#[async_trait] -impl IndexBuilder for IvfPqIndexBuilder<'_> { - fn index_type() -> IndexType { - IndexType::Vector - } - - /// Build the IVF_PQ index - async fn build(&self) -> Result<()> { - println!( - "Building vector index: IVF{},PQ{}, metric={}", - self.num_partitions, self.num_sub_vectors, self.metric_type, - ); - - // Step 1. Sanity check - self.sanity_check()?; - - // First, scan the dataset to train IVF models. - let mut ivf_model = self.train_ivf_model().await?; - - // A new scanner, with row id to build inverted index. - let mut scanner = self.dataset.scan(); - scanner.project(&[&self.column])?; - scanner.with_row_id(); - // Assign parition ID and compute residual vectors. - let partitioned_batches = ivf_model.partition(&scanner, self.metric_type).await?; - - // Train PQ - let mut pq = - ProductQuantizer::new(self.num_sub_vectors as usize, self.nbits, self.dimension); - let batch = concat_batches(&partitioned_batches[0].schema(), &partitioned_batches)?; - let residual_vector = batch.column_by_name(RESIDUAL_COLUMN).unwrap(); - - let pq_code = pq - .fit_transform(as_fixed_size_list_array(residual_vector), self.metric_type) +async fn maybe_sample_training_data( + dataset: &Dataset, + column: &str, + sample_size_hint: usize, +) -> Result { + let num_rows = dataset.count_rows().await?; + let projection = dataset.schema().project(&[&column])?; + let batch = if num_rows > sample_size_hint { + dataset.sample(sample_size_hint, &projection).await? + } else { + let mut scanner = dataset.scan(); + scanner.project(&[column])?; + let batches = scanner + .try_into_stream() + .await? + .try_collect::>() .await?; + concat_batches(&Arc::new(ArrowSchema::from(&projection)), &batches)? + }; + let array = batch.column_by_name(column).ok_or(Error::Index(format!( + "Sample training data: column {} does not exist in return", + column + )))?; + let fixed_size_array = as_fixed_size_list_array(array); + fixed_size_array.try_into() +} - const PQ_CODE_COLUMN: &str = "__pq_code"; - let pq_code_batch = RecordBatch::try_new( - Arc::new(ArrowSchema::new(vec![ - ArrowField::new(PQ_CODE_COLUMN, pq_code.data_type().clone(), false), - ArrowField::new(PARTITION_ID_COLUMN, DataType::UInt32, false), - ArrowField::new(ROW_ID, DataType::UInt64, false), - ])), - vec![ - Arc::new(pq_code), - batch.column_by_name(PARTITION_ID_COLUMN).unwrap().clone(), - batch.column_by_name(ROW_ID).unwrap().clone(), - ], - )?; - - let object_store = self.dataset.object_store(); - let path = self - .dataset - .indices_dir() - .child(self.uuid.to_string()) - .child(INDEX_FILE_NAME); - let mut writer = object_store.create(&path).await?; - - // Write each partition to disk. - let part_col = pq_code_batch - .column_by_name(PARTITION_ID_COLUMN) - .unwrap_or_else(|| panic!("{PARTITION_ID_COLUMN} does not exist")); - let partition_ids: &UInt32Array = as_primitive_array(part_col); - let min_id = min(partition_ids).unwrap_or(0); - let max_id = max(partition_ids).unwrap_or(1024 * 1024); - - for part_id in min_id..max_id + 1 { - let predicates = BooleanArray::from_unary(partition_ids, |x| x == part_id); - let parted_batch = filter_record_batch(&pq_code_batch, &predicates)?; - ivf_model.add_partition(writer.tell(), parted_batch.num_rows() as u32); - if parted_batch.num_rows() > 0 { - // Write one partition. - let pq_code = &parted_batch[PQ_CODE_COLUMN]; - writer.write_plain_encoded_array(pq_code.as_ref()).await?; - let row_ids = &parted_batch[ROW_ID]; - writer.write_plain_encoded_array(row_ids.as_ref()).await?; - } - } - - let metadata = IvfPQIndexMetadata { - name: self.name.clone(), - column: self.column.clone(), - dimension: self.dimension as u32, - dataset_version: self.dataset.version().version, - ivf: ivf_model, - pq: pq.into(), - metric_type: self.metric_type, - }; - - let metadata = pb::Index::try_from(&metadata)?; - let pos = writer.write_protobuf(&metadata).await?; - writer.write_magics(pos).await?; - writer.shutdown().await?; - - Ok(()) +/// Compute residual matrix. +/// +/// Parameters +/// - *data*: input matrix to compute residual. +/// - *centroids*: the centroids to compute residual vectors. +/// - *metric_type*: the metric type to compute distance. +fn compute_residual_matrix( + data: &MatrixView, + centroids: &MatrixView, + metric_type: MetricType, +) -> Result> { + assert_eq!(centroids.num_columns(), data.num_columns()); + let dist_func = metric_type.func(); + + let dim = data.num_columns(); + let mut builder = Float32Builder::with_capacity(data.data().len()); + for i in 0..data.num_rows() { + let row = data.row(i).unwrap(); + let part_id = argmin( + dist_func(&row, centroids.data().as_ref(), dim) + .unwrap() + .as_ref(), + ) + .unwrap(); + let centroid = centroids.row(part_id as usize).unwrap(); + let residual = subtract_dyn(&row, ¢roid)?; + let f32_residual_array: &Float32Array = as_primitive_array(&residual); + builder.append_slice(f32_residual_array.values()); } + Ok(Arc::new(builder.finish())) } -async fn train_kmeans_model( - scanner: &Scanner, - dimension: usize, - k: usize, - max_iterations: u32, - rng: impl Rng, - metric_type: MetricType, - // metric_type: Arc< - // dyn Fn(&Float32Array, &Float32Array, usize) -> Result> + Send + Sync, - // >, -) -> Result> { - let schema = scanner.schema()?; - assert_eq!(schema.fields.len(), 1); - let column_name = schema.fields[0].name(); - // Copy all to memory for now, optimize later. +/// Build IVF(PQ) index +pub async fn build_ivf_pq_index( + dataset: &Dataset, + column: &str, + index_name: &str, + uuid: &Uuid, + ivf_params: &IvfBuildParams, + pq_params: &PQBuildParams, +) -> Result<()> { + println!( + "Building vector index: IVF{},{}PQ{}, metric={}", + ivf_params.num_partitions, + if pq_params.use_opq { "O" } else { "" }, + pq_params.num_sub_vectors, + ivf_params.metric_type, + ); + + sanity_check(dataset, column)?; + + // Maximum to train 256 vectors per centroids, see Faiss. + let sample_size_hint = std::cmp::max( + ivf_params.num_partitions, + ProductQuantizer::num_centroids(pq_params.num_bits as u32), + ) * 256; + + let training_data = maybe_sample_training_data(dataset, column, sample_size_hint).await?; + + // Train IVF partitions. + let ivf_model = train_ivf_model(&training_data, ivf_params).await?; + + // Compute the residual vector for training PQ + let ivf_centroids = ivf_model.centroids.as_ref().try_into()?; + let residual_data = + compute_residual_matrix(&training_data, &ivf_centroids, ivf_params.metric_type)?; + let pq_training_data = + FixedSizeListArray::try_new(residual_data.as_ref(), training_data.num_columns() as i32)?; + + // The final train of PQ sub-vectors + let pq = train_pq(&pq_training_data, pq_params).await?; + + // Transform data, compute residuals and sort by partition ids. + let mut scanner = dataset.scan(); + scanner.project(&[column])?; + scanner.with_row_id(); + + let ivf = &ivf_model; + let pq_ref = &pq; + let metric_type = pq_params.metric_type; + + // Scan the dataset and compute residual, pq with with partition ID. + // For now, it loads all data into memory. let batches = scanner .try_into_stream() .await? + .map(|b| async move { + let batch = b?; + let arr = batch + .column_by_name(column) + .ok_or_else(|| Error::IO(format!("Dataset does not have column {column}")))?; + let vectors: MatrixView = as_fixed_size_list_array(arr).try_into()?; + let part_id_and_residual = ivf.compute_partition_and_residual(&vectors, metric_type)?; + + let residual_col = part_id_and_residual + .column_by_name(RESIDUAL_COLUMN) + .unwrap(); + let residual_data = as_fixed_size_list_array(&residual_col); + let pq_code = pq_ref + .transform(&residual_data.try_into()?, metric_type) + .await?; + + let row_ids = batch.column_by_name(ROW_ID).expect("Expect row id").clone(); + let part_ids = part_id_and_residual + .column_by_name(PARTITION_ID_COLUMN) + .expect("Expect partition ids column") + .clone(); + + let schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new(ROW_ID, DataType::UInt64, false), + ArrowField::new(PARTITION_ID_COLUMN, DataType::UInt32, false), + ArrowField::new( + PQ_CODE_COLUMN, + DataType::FixedSizeList( + Box::new(ArrowField::new("item", DataType::UInt8, true)), + pq_params.num_sub_vectors as i32, + ), + false, + ), + ])); + RecordBatch::try_new(schema.clone(), vec![row_ids, part_ids, Arc::new(pq_code)]) + }) + .buffered(num_cpus::get()) .try_collect::>() .await?; - let mut arr_list = vec![]; - for batch in batches { - let arr = batch.column_by_name(column_name).unwrap(); - let list_arr = as_fixed_size_list_array(&arr); - arr_list.push(list_arr.values().clone()); + + write_index_file( + dataset, + column, + index_name, + uuid, + ivf_model, + pq, + ivf_params.metric_type, + &batches, + ) + .await +} + +/// Write index into the file. +async fn write_index_file( + dataset: &Dataset, + column: &str, + index_name: &str, + uuid: &Uuid, + mut ivf: Ivf, + pq: ProductQuantizer, + metric_type: MetricType, + batches: &[RecordBatch], +) -> Result<()> { + let object_store = dataset.object_store(); + let path = dataset + .indices_dir() + .child(uuid.to_string()) + .child(INDEX_FILE_NAME); + let mut writer = object_store.create(&path).await?; + + // Write each partition to disk. + for part_id in 0..ivf.num_partitions() as u32 { + let mut batches_for_parq: Vec = vec![]; + for batch in batches.iter() { + let part_col = batch + .column_by_name(PARTITION_ID_COLUMN) + .unwrap_or_else(|| panic!("{PARTITION_ID_COLUMN} does not exist")); + let partition_ids: &UInt32Array = as_primitive_array(part_col); + let predicates = BooleanArray::from_unary(partition_ids, |x| x == part_id); + let parted_batch = filter_record_batch(&batch, &predicates)?; + batches_for_parq.push(parted_batch); + } + let parted_batch = concat_batches(&batches_for_parq[0].schema(), &batches_for_parq)?; + ivf.add_partition(writer.tell(), parted_batch.num_rows() as u32); + if parted_batch.num_rows() > 0 { + // Write one partition. + let pq_code = &parted_batch[PQ_CODE_COLUMN]; + writer.write_plain_encoded_array(pq_code.as_ref()).await?; + let row_ids = &parted_batch[ROW_ID]; + writer.write_plain_encoded_array(row_ids.as_ref()).await?; + } } - let arrays = arr_list.iter().map(|l| l.as_ref()).collect::>(); + let metadata = IvfPQIndexMetadata { + name: index_name.to_string(), + column: column.to_string(), + dimension: pq.dimension as u32, + dataset_version: dataset.version().version, + metric_type, + ivf, + pq: pq.into(), + transforms: vec![], + }; + + let metadata = pb::Index::try_from(&metadata)?; + let pos = writer.write_protobuf(&metadata).await?; + writer.write_magics(pos).await?; + writer.shutdown().await?; + + Ok(()) +} + +/// Train product quantization over (OPQ-rotated) residual vectors. +async fn train_pq(data: &FixedSizeListArray, params: &PQBuildParams) -> Result { + let mut pq = ProductQuantizer::new( + params.num_sub_vectors, + params.num_bits as u32, + data.value_length() as usize, + ); + let mat: MatrixView = data.try_into()?; + pq.train(&mat, params.metric_type, params.max_iters).await?; + Ok(pq) +} - let all_vectors = concat(&arrays)?; - let values: &Float32Array = as_primitive_array(&all_vectors); - let centroids = - super::kmeans::train_kmeans(values, dimension, k, max_iterations, rng, metric_type).await?; - Ok(Arc::new(FixedSizeListArray::try_new( +/// Train IVF partitions using kmeans. +async fn train_ivf_model(data: &MatrixView, params: &IvfBuildParams) -> Result { + let rng = SmallRng::from_entropy(); + + let centroids = super::kmeans::train_kmeans( + data.data().as_ref(), + data.num_columns(), + params.num_partitions, + params.max_iters as u32, + rng, + params.metric_type, + ) + .await?; + Ok(Ivf::new(Arc::new(FixedSizeListArray::try_new( centroids, - dimension as i32, - )?)) + data.num_columns() as i32, + )?))) } diff --git a/rust/src/index/vector/opq.rs b/rust/src/index/vector/opq.rs index a38e221be5..41829543f8 100644 --- a/rust/src/index/vector/opq.rs +++ b/rust/src/index/vector/opq.rs @@ -18,16 +18,21 @@ use std::sync::Arc; use arrow::array::{as_primitive_array, Float32Builder}; -use arrow_array::{Array, FixedSizeListArray, UInt8Array}; +use arrow_array::{Array, Float32Array, UInt8Array}; +use arrow_schema::DataType; +use async_trait::async_trait; -use super::{pq::ProductQuantizer, MetricType}; -use crate::arrow::{linalg::*, *}; -use crate::Result; +use super::{pq::ProductQuantizer, MetricType, Transformer}; +use crate::arrow::linalg::*; +use crate::index::pb::{Transform, TransformType}; +use crate::io::object_reader::{read_fixed_stride_array, ObjectReader}; +use crate::{Error, Result}; /// Rotation matrix `R` described in Optimized Product Quantization. /// /// [Optimized Product Quantization for Approximate Nearest Neighbor Search /// (CVPR' 13)](https://www.microsoft.com/en-us/research/wp-content/uploads/2013/11/pami13opq.pdf) +#[derive(Debug)] pub struct OptimizedProductQuantizer { num_sub_vectors: usize, @@ -35,10 +40,16 @@ pub struct OptimizedProductQuantizer { num_bits: u32, /// OPQ rotation - rotation: Option, + pub rotation: Option, - /// PQ - pq: Option, + /// The offset where the matrix is stored in the index file. + pub file_position: Option, + + /// The metric to compute the distance. + metric_type: MetricType, + + /// Number of iterations to train OPQ. + num_iters: usize, } impl OptimizedProductQuantizer { @@ -50,50 +61,38 @@ impl OptimizedProductQuantizer { /// - *dimension*: dimension of the training dataset. /// - *num_sub_vectors*: the number of sub vectors in the product quantization. /// - *num_iterations*: The number of iterations to train on OPQ rotation matrix. - pub fn new(num_sub_vectors: usize, num_bits: u32) -> Self { + pub fn new( + num_sub_vectors: usize, + num_bits: u32, + metric_type: MetricType, + num_iters: usize, + ) -> Self { Self { num_sub_vectors, num_bits, rotation: None, - pq: None, + file_position: None, + metric_type, + num_iters, } } - /// Train the opq - pub async fn train( - &mut self, - data: &MatrixView, - metric_type: MetricType, - num_iters: usize, - ) -> Result<()> { - let dim = data.num_columns(); - - let num_centroids = ProductQuantizer::num_centroids(self.num_bits); - // See in Faiss, it does not train more than `256*n_centroids` samples - let train = if data.num_rows() > num_centroids * 256 { - println!( - "Sample {} out of {} to train kmeans of {} dim, {} clusters", - 256 * num_centroids, - data.num_rows(), - data.num_columns(), - num_centroids, - ); - data.sample(num_centroids * 256) - } else { - data.clone() - }; - - // Initialize R (rotation matrix) - let mut rotation = MatrixView::identity(dim); - for _ in 0..num_iters { - // Training data, this is the `X`, described in CVPR' 13 - let train = train.dot(&rotation)?; - let (rot, pq) = self.train_once(&train, metric_type).await?; - rotation = rot; - self.pq = Some(pq); - } - self.rotation = Some(rotation); - Ok(()) + /// Load the optimized product quantizer. + pub async fn load(reader: &dyn ObjectReader, position: usize, shape: &[usize]) -> Result { + let dim = shape[0]; + let length = dim * dim; + let data = + read_fixed_stride_array(reader, &DataType::Float32, position, length, ..).await?; + let f32_data: Float32Array = as_primitive_array(data.as_ref()).clone(); + let rotation = Some(MatrixView::new(Arc::new(f32_data), dim)); + Ok(Self { + num_sub_vectors: 0, + num_bits: 0, + rotation, + file_position: None, + metric_type: MetricType::L2, + num_iters: 0, + }) } /// Train once and return the rotation matrix and PQ codebook. @@ -103,10 +102,11 @@ impl OptimizedProductQuantizer { metric_type: MetricType, ) -> Result<(MatrixView, ProductQuantizer)> { let dim = train.num_columns(); - // TODO: make PQ::fit_transform work with MatrixView. - let fixed_list = FixedSizeListArray::try_new(train.data().as_ref(), dim as i32)?; let mut pq = ProductQuantizer::new(self.num_sub_vectors, self.num_bits, dim); - let pq_code = pq.fit_transform(&fixed_list, metric_type).await?; + + // let data = FixedSizeListArray::try_new(mat.data().as_ref(), mat.num_columns() as i32)?; + pq.train(&train, metric_type, 50).await?; + let pq_code = pq.transform(&train, metric_type).await?; // Reconstruct Y let mut builder = Float32Builder::with_capacity(train.num_columns() * train.num_rows()); @@ -132,6 +132,68 @@ impl OptimizedProductQuantizer { } } +#[async_trait] +impl Transformer for OptimizedProductQuantizer { + async fn train(&mut self, data: &MatrixView) -> Result<()> { + let dim = data.num_columns(); + + let num_centroids = ProductQuantizer::num_centroids(self.num_bits); + // See in Faiss, it does not train more than `256*n_centroids` samples + let train = if data.num_rows() > num_centroids * 256 { + println!( + "Sample {} out of {} to train kmeans of {} dim, {} clusters", + 256 * num_centroids, + data.num_rows(), + data.num_columns(), + num_centroids, + ); + data.sample(num_centroids * 256) + } else { + data.clone() + }; + + // Initialize R (rotation matrix) + let mut rotation = MatrixView::identity(dim); + for _ in 0..self.num_iters { + // Training data, this is the `X`, described in CVPR' 13 + let train = train.dot(&rotation)?; + let (rot, _) = self.train_once(&train, self.metric_type).await?; + rotation = rot; + } + self.rotation = Some(rotation); + Ok(()) + } + + /// Apply OPQ transform + async fn transform(&self, data: &MatrixView) -> Result { + let rotation = self.rotation.as_ref().unwrap(); + Ok(rotation.dot(&data.transpose())?.transpose()) + } + + fn try_into_pb(&self) -> Result { + self.try_into() + } +} + +impl TryFrom<&OptimizedProductQuantizer> for Transform { + type Error = Error; + + fn try_from(opq: &OptimizedProductQuantizer) -> Result { + if opq.file_position.is_none() { + return Err(Error::Index("OPQ has not been persisted yet".to_string())); + } + let rotation = opq + .rotation + .as_ref() + .ok_or(Error::Index("OPQ is not trained".to_string()))?; + Ok(Transform { + position: opq.file_position.unwrap() as u64, + shape: vec![rotation.num_rows() as u32, rotation.num_columns() as u32], + r#type: TransformType::Opq.into(), + }) + } +} + #[cfg(test)] mod tests { use super::*; @@ -146,8 +208,8 @@ mod tests { let data = Arc::new(Float32Array::from_iter((0..12800).map(|v| v as f32))); let matrix = MatrixView::new(data, DIM); - let mut opq = OptimizedProductQuantizer::new(4, 8); - opq.train(&matrix, MetricType::L2, 10).await.unwrap(); + let mut opq = OptimizedProductQuantizer::new(4, 8, MetricType::L2, 10); + opq.train(&matrix).await.unwrap(); assert_eq!(opq.rotation.as_ref().unwrap().num_rows(), DIM); assert_eq!(opq.rotation.as_ref().unwrap().num_columns(), DIM); diff --git a/rust/src/index/vector/pq.rs b/rust/src/index/vector/pq.rs index ee3e4c1a2d..a05222de73 100644 --- a/rust/src/index/vector/pq.rs +++ b/rust/src/index/vector/pq.rs @@ -15,26 +15,24 @@ use std::sync::Arc; use arrow_array::{ - builder::Float32Builder, cast::as_primitive_array, Array, FixedSizeListArray, Float32Array, - RecordBatch, + builder::Float32Builder, cast::as_primitive_array, Array, ArrayRef, FixedSizeListArray, + Float32Array, RecordBatch, UInt64Array, UInt8Array, }; -use arrow_array::{ArrayRef, UInt64Array, UInt8Array}; use arrow_ord::sort::sort_to_indices; use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema}; use arrow_select::take::take; use futures::stream::{self, StreamExt, TryStreamExt}; use rand::SeedableRng; +use super::MetricType; +use crate::arrow::linalg::MatrixView; use crate::arrow::*; -use crate::index::pb; -use crate::index::vector::kmeans::train_kmeans; +use crate::index::{pb, vector::kmeans::train_kmeans}; use crate::io::object_reader::{read_fixed_stride_array, ObjectReader}; use crate::utils::distance::compute::normalize; use crate::utils::distance::l2::l2_distance; use crate::Result; -use super::MetricType; - /// Product Quantization Index. /// pub struct PQIndex<'a> { @@ -223,7 +221,7 @@ pub struct ProductQuantizer { pub num_sub_vectors: usize, /// Vector dimension. - dimension: usize, + pub dimension: usize, /// PQ codebook /// @@ -302,11 +300,13 @@ impl ProductQuantizer { } /// Transform the vector array to PQ code array. - async fn transform( + pub async fn transform( &self, - sub_vectors: &[Arc], + data: &MatrixView, metric_type: MetricType, ) -> Result { + let sub_vectors = divide_to_subvectors(&data, self.num_sub_vectors); + assert_eq!(sub_vectors.len(), self.num_sub_vectors); let vectors = sub_vectors.to_vec(); @@ -353,19 +353,19 @@ impl ProductQuantizer { FixedSizeListArray::try_new(values, self.num_sub_vectors as i32) } - /// Train a [ProductQuantizer] using an array of vectors. - pub async fn fit_transform( + /// Train [`ProductQuantizer`] using vectors. + pub async fn train( &mut self, - data: &FixedSizeListArray, + data: &MatrixView, metric_type: MetricType, - ) -> Result { - assert!(data.value_length() % self.num_sub_vectors as i32 == 0); - assert_eq!(data.value_type(), DataType::Float32); - assert_eq!(data.null_count(), 0); + max_iters: usize, + ) -> Result<()> { + assert!(data.num_columns() % self.num_sub_vectors == 0); + assert_eq!(data.data().null_count(), 0); - let sub_vectors = divide_to_subvectors(data, self.num_sub_vectors as i32); + let sub_vectors = divide_to_subvectors(data, self.num_sub_vectors); let num_centroids = 2_usize.pow(self.num_bits); - let dimension = data.value_length() as usize; + let dimension = data.num_columns(); let sub_vector_dimension = dimension / self.num_sub_vectors; let mut codebook_builder = Float32Builder::with_capacity(num_centroids * dimension); @@ -380,7 +380,7 @@ impl ProductQuantizer { flatten_array, sub_vector_dimension, num_centroids, - 25, + max_iters as u32, rng.clone(), metric_type, ) @@ -392,7 +392,18 @@ impl ProductQuantizer { } let pd_centroids = codebook_builder.finish(); self.codebook = Some(Arc::new(pd_centroids)); - self.transform(&sub_vectors, metric_type).await + + Ok(()) + } + + /// Train a [ProductQuantizer] using an array of vectors. + pub async fn fit_transform( + &mut self, + mat: &MatrixView, + metric_type: MetricType, + ) -> Result { + self.train(mat, metric_type, 50).await?; + self.transform(mat, metric_type).await } } @@ -424,19 +435,19 @@ impl From<&ProductQuantizer> for pb::Pq { /// /// For example, for a `[1024x1M]` matrix, when `n = 8`, this function divides /// the matrix into `[128x1M; 8]` vector of matrix. -fn divide_to_subvectors(array: &FixedSizeListArray, m: i32) -> Vec> { - assert!(!array.is_empty()); +fn divide_to_subvectors(data: &MatrixView, m: usize) -> Vec> { + assert!(!data.num_rows() > 0); - let sub_vector_length = (array.value_length() / m) as usize; - let capacity = array.len() * sub_vector_length; + let sub_vector_length = (data.num_columns() / m) as usize; + let capacity = data.num_rows() * sub_vector_length; let mut subarrays = vec![]; // TODO: very intensive memory copy involved!!! But this is on the write path. // Optimize for memory copy later. for i in 0..m as usize { let mut builder = Float32Builder::with_capacity(capacity); - for j in 0..array.len() { - let arr = array.value(j); + for j in 0..data.num_rows() { + let arr = data.row(j).unwrap(); let row: &Float32Array = as_primitive_array(&arr); let start = i * sub_vector_length; @@ -462,7 +473,7 @@ mod tests { fn test_divide_to_subvectors() { let values = Float32Array::from_iter((0..320).map(|v| v as f32)); // A [10, 32] array. - let mat = FixedSizeListArray::try_new(values, 32).unwrap(); + let mat = MatrixView::new(values.into(), 32); let sub_vectors = divide_to_subvectors(&mat, 4); assert_eq!(sub_vectors.len(), 4); assert_eq!(sub_vectors[0].len(), 10);