diff --git a/rust/lance-index/src/scalar/inverted/builder.rs b/rust/lance-index/src/scalar/inverted/builder.rs index 9e99f1483c..6b99e7c917 100644 --- a/rust/lance-index/src/scalar/inverted/builder.rs +++ b/rust/lance-index/src/scalar/inverted/builder.rs @@ -22,11 +22,10 @@ use itertools::Itertools; use lance_arrow::iter_str_array; use lance_core::cache::FileMetadataCache; use lance_core::utils::tokio::{get_num_compute_intensive_cpus, CPU_RUNTIME}; -use lance_core::{Error, Result, ROW_ID}; +use lance_core::{Result, ROW_ID}; use lance_io::object_store::ObjectStore; use lazy_static::lazy_static; use object_store::path::Path; -use snafu::{location, Location}; use tempfile::{tempdir, TempDir}; use tracing::instrument; @@ -83,12 +82,11 @@ impl InvertedIndexBuilder { } pub fn from_existing_index( + params: InvertedIndexParams, tokens: TokenSet, inverted_list: Arc, docs: DocSet, ) -> Self { - let params = InvertedIndexParams::default().with_position(inverted_list.has_positions()); - Self { params, tokens, @@ -345,6 +343,7 @@ impl InvertedIndexBuilder { } let mut merged_stream = stream::select_all(posting_streams); let mut last_num_rows = 0; + self.tokens = TokenSet::default(); let start = std::time::Instant::now(); while let Some(r) = merged_stream.try_next().await? { let (token, batch, max_score) = r?; @@ -635,6 +634,7 @@ impl PostingReader { let schema = schema.clone(); let docs = docs.clone(); tokio::spawn(async move { + // read the posting lists from new data let batches = offsets.into_iter().map(|(offset, length)| { let reader = posting_reader.reader.clone(); let schema = schema.clone(); @@ -648,19 +648,14 @@ impl PostingReader { }); let mut batches = futures::future::try_join_all(batches).await?; + // read the posting lists from existing data if let Some(inverted_list) = posting_reader.inverted_list_reader.as_ref() { - let token_id = - posting_reader - .existing_tokens - .get(&token) - .ok_or(Error::Index { - message: format!("token {} not found", token), - location: location!(), - })?; - let batch = inverted_list - .posting_batch(*token_id, inverted_list.has_positions()) - .await?; - batches.push(batch); + if let Some(token_id) = posting_reader.existing_tokens.get(&token) { + let batch = inverted_list + .posting_batch(*token_id, inverted_list.has_positions()) + .await?; + batches.push(batch); + } } let (batch, max_score) = diff --git a/rust/lance-index/src/scalar/inverted/index.rs b/rust/lance-index/src/scalar/inverted/index.rs index db9880c618..1987e3a0da 100644 --- a/rust/lance-index/src/scalar/inverted/index.rs +++ b/rust/lance-index/src/scalar/inverted/index.rs @@ -37,7 +37,8 @@ use super::builder::inverted_list_schema; use super::{wand::*, InvertedIndexBuilder, TokenizerConfig}; use crate::prefilter::{NoFilter, PreFilter}; use crate::scalar::{ - AnyQuery, FullTextSearchQuery, IndexReader, IndexStore, SargableQuery, ScalarIndex, + AnyQuery, FullTextSearchQuery, IndexReader, IndexStore, InvertedIndexParams, SargableQuery, + ScalarIndex, }; use crate::Index; @@ -70,6 +71,7 @@ lazy_static! { #[derive(Clone)] pub struct InvertedIndex { + params: InvertedIndexParams, tokenizer: tantivy::tokenizer::TextAnalyzer, tokens: TokenSet, inverted_list: Arc, @@ -181,7 +183,7 @@ impl InvertedIndex { let tokens = self.tokens.clone(); let inverted_list = self.inverted_list.clone(); let docs = self.docs.clone(); - InvertedIndexBuilder::from_existing_index(tokens, inverted_list, docs) + InvertedIndexBuilder::from_existing_index(self.params.clone(), tokens, inverted_list, docs) } } @@ -255,8 +257,7 @@ impl ScalarIndex for InvertedIndex { .get("tokenizer") .map(|s| serde_json::from_str::(s)) .transpose()? - .unwrap_or_default() - .build()?; + .unwrap_or_default(); let tokens = TokenSet::load(token_reader).await?; Result::Ok((tokenizer, tokens)) } @@ -278,11 +279,17 @@ impl ScalarIndex for InvertedIndex { } }); - let (tokenizer, tokens) = tokens_fut.await??; + let (tokenizer_config, tokens) = tokens_fut.await??; let inverted_list = invert_list_fut.await??; let docs = docs_fut.await??; + let tokenizer = tokenizer_config.build()?; + let params = InvertedIndexParams { + with_position: inverted_list.has_positions(), + tokenizer_config, + }; Ok(Arc::new(Self { + params, tokenizer, tokens, inverted_list, diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 7edb2771d1..b5eaf22b5c 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -904,9 +904,12 @@ mod tests { use super::*; + use arrow::array::AsArray; use arrow_array::{FixedSizeListArray, RecordBatch, RecordBatchIterator, StringArray}; use arrow_schema::{Field, Schema}; use lance_arrow::*; + use lance_index::scalar::inverted::TokenizerConfig; + use lance_index::scalar::FullTextSearchQuery; use lance_index::vector::{ hnsw::builder::HnswBuildParams, ivf::IvfBuildParams, sq::builder::SQBuildParams, }; @@ -1294,6 +1297,71 @@ mod tests { assert_eq!(stats["num_indices"], 1); } + #[tokio::test] + async fn test_optimize_fts() { + let words = ["apple", "banana", "cherry", "date"]; + + let dir = tempdir().unwrap(); + let schema = Arc::new(Schema::new(vec![Field::new("text", DataType::Utf8, false)])); + let data = StringArray::from_iter_values(words.iter().map(|s| s.to_string())); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(data)]).unwrap(); + let batch_iterator = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + + let mut dataset = Dataset::write(batch_iterator, dir.path().to_str().unwrap(), None) + .await + .unwrap(); + + let tokenizer_config = TokenizerConfig::default(); + let params = InvertedIndexParams { + with_position: true, + tokenizer_config, + }; + dataset + .create_index(&["text"], IndexType::Inverted, None, ¶ms, true) + .await + .unwrap(); + + let new_words = ["elephant", "fig", "grape", "honeydew"]; + let new_data = StringArray::from_iter_values(new_words.iter().map(|s| s.to_string())); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(new_data)]).unwrap(); + let batch_iter = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + dataset.append(batch_iter, None).await.unwrap(); + + dataset + .optimize_indices(&OptimizeOptions { + num_indices_to_merge: 0, + index_names: None, + }) + .await + .unwrap(); + + for &word in words.iter().chain(new_words.iter()) { + let query_result = dataset + .scan() + .project(&["text"]) + .unwrap() + .full_text_search(FullTextSearchQuery::new(word.to_string())) + .unwrap() + .limit(Some(10), None) + .unwrap() + .try_into_batch() + .await + .unwrap(); + + let texts = query_result["text"] + .as_string::() + .iter() + .map(|v| match v { + None => "".to_string(), + Some(v) => v.to_string(), + }) + .collect::>(); + + assert_eq!(texts.len(), 1); + assert_eq!(texts[0], word); + } + } + #[tokio::test] async fn test_create_index_too_small_for_pq() { let test_dir = tempdir().unwrap();