From a205094f6efcda70cf00169932eb472ffc428b9c Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Wed, 20 Nov 2024 19:04:11 +0800 Subject: [PATCH 1/7] fix: FTS index broken after optimize_indices() Signed-off-by: BubbleCal --- .../src/scalar/inverted/builder.rs | 8 +- rust/lance-index/src/scalar/inverted/index.rs | 17 ++-- rust/lance/src/index.rs | 84 ++++++++++++++++++- 3 files changed, 99 insertions(+), 10 deletions(-) diff --git a/rust/lance-index/src/scalar/inverted/builder.rs b/rust/lance-index/src/scalar/inverted/builder.rs index 9e99f1483c..b5b193e220 100644 --- a/rust/lance-index/src/scalar/inverted/builder.rs +++ b/rust/lance-index/src/scalar/inverted/builder.rs @@ -83,12 +83,11 @@ impl InvertedIndexBuilder { } pub fn from_existing_index( + params: InvertedIndexParams, tokens: TokenSet, inverted_list: Arc, docs: DocSet, ) -> Self { - let params = InvertedIndexParams::default().with_position(inverted_list.has_positions()); - Self { params, tokens, @@ -345,10 +344,11 @@ impl InvertedIndexBuilder { } let mut merged_stream = stream::select_all(posting_streams); let mut last_num_rows = 0; + self.tokens = TokenSet::default(); let start = std::time::Instant::now(); while let Some(r) = merged_stream.try_next().await? { let (token, batch, max_score) = r?; - self.tokens.add(token); + self.tokens.add(token.clone()); offsets.push(num_rows); max_scores.push(max_score); num_rows += batch.num_rows(); @@ -635,6 +635,7 @@ impl PostingReader { let schema = schema.clone(); let docs = docs.clone(); tokio::spawn(async move { + // read the posting lists from new data let batches = offsets.into_iter().map(|(offset, length)| { let reader = posting_reader.reader.clone(); let schema = schema.clone(); @@ -648,6 +649,7 @@ impl PostingReader { }); let mut batches = futures::future::try_join_all(batches).await?; + // read the posting lists from existing data if let Some(inverted_list) = posting_reader.inverted_list_reader.as_ref() { let token_id = posting_reader diff --git a/rust/lance-index/src/scalar/inverted/index.rs b/rust/lance-index/src/scalar/inverted/index.rs index db9880c618..9b913fb1b7 100644 --- a/rust/lance-index/src/scalar/inverted/index.rs +++ b/rust/lance-index/src/scalar/inverted/index.rs @@ -37,7 +37,8 @@ use super::builder::inverted_list_schema; use super::{wand::*, InvertedIndexBuilder, TokenizerConfig}; use crate::prefilter::{NoFilter, PreFilter}; use crate::scalar::{ - AnyQuery, FullTextSearchQuery, IndexReader, IndexStore, SargableQuery, ScalarIndex, + AnyQuery, FullTextSearchQuery, IndexReader, IndexStore, InvertedIndexParams, SargableQuery, + ScalarIndex, }; use crate::Index; @@ -70,6 +71,7 @@ lazy_static! { #[derive(Clone)] pub struct InvertedIndex { + params: InvertedIndexParams, tokenizer: tantivy::tokenizer::TextAnalyzer, tokens: TokenSet, inverted_list: Arc, @@ -181,7 +183,7 @@ impl InvertedIndex { let tokens = self.tokens.clone(); let inverted_list = self.inverted_list.clone(); let docs = self.docs.clone(); - InvertedIndexBuilder::from_existing_index(tokens, inverted_list, docs) + InvertedIndexBuilder::from_existing_index(self.params.clone(), tokens, inverted_list, docs) } } @@ -255,8 +257,7 @@ impl ScalarIndex for InvertedIndex { .get("tokenizer") .map(|s| serde_json::from_str::(s)) .transpose()? - .unwrap_or_default() - .build()?; + .unwrap_or_default(); let tokens = TokenSet::load(token_reader).await?; Result::Ok((tokenizer, tokens)) } @@ -278,11 +279,17 @@ impl ScalarIndex for InvertedIndex { } }); - let (tokenizer, tokens) = tokens_fut.await??; + let (tokenizer_config, tokens) = tokens_fut.await??; let inverted_list = invert_list_fut.await??; let docs = docs_fut.await??; + let tokenizer = tokenizer_config.build()?; + let params = InvertedIndexParams { + with_position: inverted_list.has_positions(), + tokenizer_config: tokenizer_config, + }; Ok(Arc::new(Self { + params, tokenizer, tokens, inverted_list, diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 7edb2771d1..1493f754de 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -904,9 +904,14 @@ mod tests { use super::*; - use arrow_array::{FixedSizeListArray, RecordBatch, RecordBatchIterator, StringArray}; - use arrow_schema::{Field, Schema}; + use arrow::array::StringBuilder; + use arrow_array::{ + ArrayRef, FixedSizeListArray, RecordBatch, RecordBatchIterator, StringArray, + }; + use arrow_schema::{Field, Schema, SchemaRef}; use lance_arrow::*; + use lance_index::scalar::inverted::TokenizerConfig; + use lance_index::scalar::FullTextSearchQuery; use lance_index::vector::{ hnsw::builder::HnswBuildParams, ivf::IvfBuildParams, sq::builder::SQBuildParams, }; @@ -1294,6 +1299,81 @@ mod tests { assert_eq!(stats["num_indices"], 1); } + #[tokio::test] + async fn test_full_text_search() { + let words = ["spotted", "crib", "irritating", "disturbed"]; + + let dir = tempdir().unwrap(); + let schema = Arc::new(Schema::new(vec![Field::new("text", DataType::Utf8, false)])); + let mut builder = StringBuilder::new(); + for word in words.iter() { + builder.append_value(word); + } + let array = builder.finish(); + let fields = vec![Arc::new(array) as ArrayRef]; + let record_batch = RecordBatch::try_new(schema.clone(), fields).unwrap(); + let iter_schema: SchemaRef = schema.clone(); + let batches = vec![Ok(record_batch)].into_iter(); + let batch_iterator = RecordBatchIterator::new(batches, iter_schema); + + let mut dataset = Dataset::write(batch_iterator, dir.path().to_str().unwrap(), None) + .await + .unwrap(); + + let mut tokenizer_config = TokenizerConfig::default(); + tokenizer_config = tokenizer_config + .language("English") + .unwrap_or(TokenizerConfig::default()); + tokenizer_config = tokenizer_config.remove_stop_words(false); + tokenizer_config = tokenizer_config.stem(false); + let mut params = InvertedIndexParams::default(); + params.tokenizer_config = tokenizer_config; + dataset + .create_index(&["text"], IndexType::Inverted, None, ¶ms, true) + .await + .unwrap(); + + dataset + .optimize_indices(&OptimizeOptions { + num_indices_to_merge: 0, + index_names: None, + }) + .await + .unwrap(); + let query_result = dataset + .scan() + .project(&["text"]) + .unwrap() + .full_text_search(FullTextSearchQuery::new("spotted".to_owned())) + .unwrap() + .limit(Some(10), None) + .unwrap() + .try_into_batch() + .await + .unwrap(); + + let column_downcast = query_result + .column_by_name("text") + .unwrap() + .as_any() + .downcast_ref::(); + + assert!(column_downcast.is_some()); + + let texts = column_downcast + .unwrap() + .iter() + .map(|v| match v { + None => "".to_string(), + Some(v) => v.to_string(), + }) + .collect::>(); + + assert_eq!(texts.len(), 1); + + assert_eq!(texts[0], "spotted"); + } + #[tokio::test] async fn test_create_index_too_small_for_pq() { let test_dir = tempdir().unwrap(); From a3558af17de613155813d3d15e613cb644f6975b Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Wed, 20 Nov 2024 19:09:53 +0800 Subject: [PATCH 2/7] fix Signed-off-by: BubbleCal --- rust/lance-index/src/scalar/inverted/builder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/lance-index/src/scalar/inverted/builder.rs b/rust/lance-index/src/scalar/inverted/builder.rs index b5b193e220..ea17481aa7 100644 --- a/rust/lance-index/src/scalar/inverted/builder.rs +++ b/rust/lance-index/src/scalar/inverted/builder.rs @@ -348,7 +348,7 @@ impl InvertedIndexBuilder { let start = std::time::Instant::now(); while let Some(r) = merged_stream.try_next().await? { let (token, batch, max_score) = r?; - self.tokens.add(token.clone()); + self.tokens.add(token); offsets.push(num_rows); max_scores.push(max_score); num_rows += batch.num_rows(); From 7b24ea3d741c0cd3491ecc66414f12b0a810a0fb Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Wed, 20 Nov 2024 19:15:25 +0800 Subject: [PATCH 3/7] fmt Signed-off-by: BubbleCal --- rust/lance-index/src/scalar/inverted/index.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/lance-index/src/scalar/inverted/index.rs b/rust/lance-index/src/scalar/inverted/index.rs index 9b913fb1b7..1987e3a0da 100644 --- a/rust/lance-index/src/scalar/inverted/index.rs +++ b/rust/lance-index/src/scalar/inverted/index.rs @@ -286,7 +286,7 @@ impl ScalarIndex for InvertedIndex { let tokenizer = tokenizer_config.build()?; let params = InvertedIndexParams { with_position: inverted_list.has_positions(), - tokenizer_config: tokenizer_config, + tokenizer_config, }; Ok(Arc::new(Self { params, From a856abac0a2784d28658ccb21c1ad3417ce37934 Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Wed, 20 Nov 2024 19:20:44 +0800 Subject: [PATCH 4/7] fmt Signed-off-by: BubbleCal --- rust/lance/src/index.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 1493f754de..634ca1c7d8 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -1326,8 +1326,10 @@ mod tests { .unwrap_or(TokenizerConfig::default()); tokenizer_config = tokenizer_config.remove_stop_words(false); tokenizer_config = tokenizer_config.stem(false); - let mut params = InvertedIndexParams::default(); - params.tokenizer_config = tokenizer_config; + let params = InvertedIndexParams { + with_position: true, + tokenizer_config: tokenizer_config, + }; dataset .create_index(&["text"], IndexType::Inverted, None, ¶ms, true) .await From 8d61feba1bf6307fd05ecdcbfaa4ed05ffbc6de9 Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Wed, 20 Nov 2024 19:25:12 +0800 Subject: [PATCH 5/7] fmt Signed-off-by: BubbleCal --- rust/lance/src/index.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 634ca1c7d8..26a9fe6c03 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -1328,7 +1328,7 @@ mod tests { tokenizer_config = tokenizer_config.stem(false); let params = InvertedIndexParams { with_position: true, - tokenizer_config: tokenizer_config, + tokenizer_config, }; dataset .create_index(&["text"], IndexType::Inverted, None, ¶ms, true) From 480ebabe345b9349d77ee4a9c3c6947dc257d152 Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Thu, 21 Nov 2024 08:51:31 +0800 Subject: [PATCH 6/7] fix Signed-off-by: BubbleCal --- .../src/scalar/inverted/builder.rs | 24 ++--- rust/lance/src/index.rs | 92 ++++++++----------- 2 files changed, 49 insertions(+), 67 deletions(-) diff --git a/rust/lance-index/src/scalar/inverted/builder.rs b/rust/lance-index/src/scalar/inverted/builder.rs index ea17481aa7..55df5a4e52 100644 --- a/rust/lance-index/src/scalar/inverted/builder.rs +++ b/rust/lance-index/src/scalar/inverted/builder.rs @@ -22,11 +22,10 @@ use itertools::Itertools; use lance_arrow::iter_str_array; use lance_core::cache::FileMetadataCache; use lance_core::utils::tokio::{get_num_compute_intensive_cpus, CPU_RUNTIME}; -use lance_core::{Error, Result, ROW_ID}; +use lance_core::{Result, ROW_ID}; use lance_io::object_store::ObjectStore; use lazy_static::lazy_static; use object_store::path::Path; -use snafu::{location, Location}; use tempfile::{tempdir, TempDir}; use tracing::instrument; @@ -651,18 +650,15 @@ impl PostingReader { // read the posting lists from existing data if let Some(inverted_list) = posting_reader.inverted_list_reader.as_ref() { - let token_id = - posting_reader - .existing_tokens - .get(&token) - .ok_or(Error::Index { - message: format!("token {} not found", token), - location: location!(), - })?; - let batch = inverted_list - .posting_batch(*token_id, inverted_list.has_positions()) - .await?; - batches.push(batch); + match posting_reader.existing_tokens.get(&token) { + Some(token_id) => { + let batch = inverted_list + .posting_batch(*token_id, inverted_list.has_positions()) + .await?; + batches.push(batch); + } + None => {} + } } let (batch, max_score) = diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 26a9fe6c03..b5eaf22b5c 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -904,11 +904,9 @@ mod tests { use super::*; - use arrow::array::StringBuilder; - use arrow_array::{ - ArrayRef, FixedSizeListArray, RecordBatch, RecordBatchIterator, StringArray, - }; - use arrow_schema::{Field, Schema, SchemaRef}; + use arrow::array::AsArray; + use arrow_array::{FixedSizeListArray, RecordBatch, RecordBatchIterator, StringArray}; + use arrow_schema::{Field, Schema}; use lance_arrow::*; use lance_index::scalar::inverted::TokenizerConfig; use lance_index::scalar::FullTextSearchQuery; @@ -1300,32 +1298,20 @@ mod tests { } #[tokio::test] - async fn test_full_text_search() { - let words = ["spotted", "crib", "irritating", "disturbed"]; + async fn test_optimize_fts() { + let words = ["apple", "banana", "cherry", "date"]; let dir = tempdir().unwrap(); let schema = Arc::new(Schema::new(vec![Field::new("text", DataType::Utf8, false)])); - let mut builder = StringBuilder::new(); - for word in words.iter() { - builder.append_value(word); - } - let array = builder.finish(); - let fields = vec![Arc::new(array) as ArrayRef]; - let record_batch = RecordBatch::try_new(schema.clone(), fields).unwrap(); - let iter_schema: SchemaRef = schema.clone(); - let batches = vec![Ok(record_batch)].into_iter(); - let batch_iterator = RecordBatchIterator::new(batches, iter_schema); + let data = StringArray::from_iter_values(words.iter().map(|s| s.to_string())); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(data)]).unwrap(); + let batch_iterator = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); let mut dataset = Dataset::write(batch_iterator, dir.path().to_str().unwrap(), None) .await .unwrap(); - let mut tokenizer_config = TokenizerConfig::default(); - tokenizer_config = tokenizer_config - .language("English") - .unwrap_or(TokenizerConfig::default()); - tokenizer_config = tokenizer_config.remove_stop_words(false); - tokenizer_config = tokenizer_config.stem(false); + let tokenizer_config = TokenizerConfig::default(); let params = InvertedIndexParams { with_position: true, tokenizer_config, @@ -1335,6 +1321,12 @@ mod tests { .await .unwrap(); + let new_words = ["elephant", "fig", "grape", "honeydew"]; + let new_data = StringArray::from_iter_values(new_words.iter().map(|s| s.to_string())); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(new_data)]).unwrap(); + let batch_iter = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + dataset.append(batch_iter, None).await.unwrap(); + dataset .optimize_indices(&OptimizeOptions { num_indices_to_merge: 0, @@ -1342,38 +1334,32 @@ mod tests { }) .await .unwrap(); - let query_result = dataset - .scan() - .project(&["text"]) - .unwrap() - .full_text_search(FullTextSearchQuery::new("spotted".to_owned())) - .unwrap() - .limit(Some(10), None) - .unwrap() - .try_into_batch() - .await - .unwrap(); - - let column_downcast = query_result - .column_by_name("text") - .unwrap() - .as_any() - .downcast_ref::(); - - assert!(column_downcast.is_some()); - let texts = column_downcast - .unwrap() - .iter() - .map(|v| match v { - None => "".to_string(), - Some(v) => v.to_string(), - }) - .collect::>(); - - assert_eq!(texts.len(), 1); + for &word in words.iter().chain(new_words.iter()) { + let query_result = dataset + .scan() + .project(&["text"]) + .unwrap() + .full_text_search(FullTextSearchQuery::new(word.to_string())) + .unwrap() + .limit(Some(10), None) + .unwrap() + .try_into_batch() + .await + .unwrap(); - assert_eq!(texts[0], "spotted"); + let texts = query_result["text"] + .as_string::() + .iter() + .map(|v| match v { + None => "".to_string(), + Some(v) => v.to_string(), + }) + .collect::>(); + + assert_eq!(texts.len(), 1); + assert_eq!(texts[0], word); + } } #[tokio::test] From 714024104772941a48f8257739df37af90482d75 Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Thu, 21 Nov 2024 09:00:10 +0800 Subject: [PATCH 7/7] fmt Signed-off-by: BubbleCal --- rust/lance-index/src/scalar/inverted/builder.rs | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/rust/lance-index/src/scalar/inverted/builder.rs b/rust/lance-index/src/scalar/inverted/builder.rs index 55df5a4e52..6b99e7c917 100644 --- a/rust/lance-index/src/scalar/inverted/builder.rs +++ b/rust/lance-index/src/scalar/inverted/builder.rs @@ -650,14 +650,11 @@ impl PostingReader { // read the posting lists from existing data if let Some(inverted_list) = posting_reader.inverted_list_reader.as_ref() { - match posting_reader.existing_tokens.get(&token) { - Some(token_id) => { - let batch = inverted_list - .posting_batch(*token_id, inverted_list.has_positions()) - .await?; - batches.push(batch); - } - None => {} + if let Some(token_id) = posting_reader.existing_tokens.get(&token) { + let batch = inverted_list + .posting_batch(*token_id, inverted_list.has_positions()) + .await?; + batches.push(batch); } }