Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: full text search index broken after optimize_indices() #3145

Merged
merged 8 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 11 additions & 16 deletions rust/lance-index/src/scalar/inverted/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@ use itertools::Itertools;
use lance_arrow::iter_str_array;
use lance_core::cache::FileMetadataCache;
use lance_core::utils::tokio::{get_num_compute_intensive_cpus, CPU_RUNTIME};
use lance_core::{Error, Result, ROW_ID};
use lance_core::{Result, ROW_ID};
use lance_io::object_store::ObjectStore;
use lazy_static::lazy_static;
use object_store::path::Path;
use snafu::{location, Location};
use tempfile::{tempdir, TempDir};
use tracing::instrument;

Expand Down Expand Up @@ -83,12 +82,11 @@ impl InvertedIndexBuilder {
}

pub fn from_existing_index(
params: InvertedIndexParams,
tokens: TokenSet,
inverted_list: Arc<InvertedListReader>,
docs: DocSet,
) -> Self {
let params = InvertedIndexParams::default().with_position(inverted_list.has_positions());

Self {
params,
tokens,
Expand Down Expand Up @@ -345,6 +343,7 @@ impl InvertedIndexBuilder {
}
let mut merged_stream = stream::select_all(posting_streams);
let mut last_num_rows = 0;
self.tokens = TokenSet::default();
let start = std::time::Instant::now();
while let Some(r) = merged_stream.try_next().await? {
let (token, batch, max_score) = r?;
Expand Down Expand Up @@ -635,6 +634,7 @@ impl PostingReader {
let schema = schema.clone();
let docs = docs.clone();
tokio::spawn(async move {
// read the posting lists from new data
let batches = offsets.into_iter().map(|(offset, length)| {
let reader = posting_reader.reader.clone();
let schema = schema.clone();
Expand All @@ -648,19 +648,14 @@ impl PostingReader {
});
let mut batches = futures::future::try_join_all(batches).await?;

// read the posting lists from existing data
if let Some(inverted_list) = posting_reader.inverted_list_reader.as_ref() {
let token_id =
posting_reader
.existing_tokens
.get(&token)
.ok_or(Error::Index {
message: format!("token {} not found", token),
location: location!(),
})?;
let batch = inverted_list
.posting_batch(*token_id, inverted_list.has_positions())
.await?;
batches.push(batch);
if let Some(token_id) = posting_reader.existing_tokens.get(&token) {
let batch = inverted_list
.posting_batch(*token_id, inverted_list.has_positions())
.await?;
batches.push(batch);
}
}

let (batch, max_score) =
Expand Down
17 changes: 12 additions & 5 deletions rust/lance-index/src/scalar/inverted/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ use super::builder::inverted_list_schema;
use super::{wand::*, InvertedIndexBuilder, TokenizerConfig};
use crate::prefilter::{NoFilter, PreFilter};
use crate::scalar::{
AnyQuery, FullTextSearchQuery, IndexReader, IndexStore, SargableQuery, ScalarIndex,
AnyQuery, FullTextSearchQuery, IndexReader, IndexStore, InvertedIndexParams, SargableQuery,
ScalarIndex,
};
use crate::Index;

Expand Down Expand Up @@ -70,6 +71,7 @@ lazy_static! {

#[derive(Clone)]
pub struct InvertedIndex {
params: InvertedIndexParams,
tokenizer: tantivy::tokenizer::TextAnalyzer,
tokens: TokenSet,
inverted_list: Arc<InvertedListReader>,
Expand Down Expand Up @@ -181,7 +183,7 @@ impl InvertedIndex {
let tokens = self.tokens.clone();
let inverted_list = self.inverted_list.clone();
let docs = self.docs.clone();
InvertedIndexBuilder::from_existing_index(tokens, inverted_list, docs)
InvertedIndexBuilder::from_existing_index(self.params.clone(), tokens, inverted_list, docs)
}
}

Expand Down Expand Up @@ -255,8 +257,7 @@ impl ScalarIndex for InvertedIndex {
.get("tokenizer")
.map(|s| serde_json::from_str::<TokenizerConfig>(s))
.transpose()?
.unwrap_or_default()
.build()?;
.unwrap_or_default();
let tokens = TokenSet::load(token_reader).await?;
Result::Ok((tokenizer, tokens))
}
Expand All @@ -278,11 +279,17 @@ impl ScalarIndex for InvertedIndex {
}
});

let (tokenizer, tokens) = tokens_fut.await??;
let (tokenizer_config, tokens) = tokens_fut.await??;
let inverted_list = invert_list_fut.await??;
let docs = docs_fut.await??;

let tokenizer = tokenizer_config.build()?;
let params = InvertedIndexParams {
with_position: inverted_list.has_positions(),
tokenizer_config,
};
Ok(Arc::new(Self {
params,
tokenizer,
tokens,
inverted_list,
Expand Down
68 changes: 68 additions & 0 deletions rust/lance/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -904,9 +904,12 @@ mod tests {

use super::*;

use arrow::array::AsArray;
use arrow_array::{FixedSizeListArray, RecordBatch, RecordBatchIterator, StringArray};
use arrow_schema::{Field, Schema};
use lance_arrow::*;
use lance_index::scalar::inverted::TokenizerConfig;
use lance_index::scalar::FullTextSearchQuery;
use lance_index::vector::{
hnsw::builder::HnswBuildParams, ivf::IvfBuildParams, sq::builder::SQBuildParams,
};
Expand Down Expand Up @@ -1294,6 +1297,71 @@ mod tests {
assert_eq!(stats["num_indices"], 1);
}

#[tokio::test]
async fn test_optimize_fts() {
let words = ["apple", "banana", "cherry", "date"];

let dir = tempdir().unwrap();
let schema = Arc::new(Schema::new(vec![Field::new("text", DataType::Utf8, false)]));
let data = StringArray::from_iter_values(words.iter().map(|s| s.to_string()));
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(data)]).unwrap();
let batch_iterator = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());

let mut dataset = Dataset::write(batch_iterator, dir.path().to_str().unwrap(), None)
.await
.unwrap();

let tokenizer_config = TokenizerConfig::default();
let params = InvertedIndexParams {
with_position: true,
tokenizer_config,
};
dataset
.create_index(&["text"], IndexType::Inverted, None, &params, true)
.await
.unwrap();

let new_words = ["elephant", "fig", "grape", "honeydew"];
let new_data = StringArray::from_iter_values(new_words.iter().map(|s| s.to_string()));
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(new_data)]).unwrap();
let batch_iter = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
dataset.append(batch_iter, None).await.unwrap();

dataset
.optimize_indices(&OptimizeOptions {
num_indices_to_merge: 0,
index_names: None,
})
.await
.unwrap();
Comment on lines +1330 to +1336
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there's no new data, does optimize indices do anything? Will it in the future? Should you append some new data before running this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's possible, like merging existing indices. but i think it should do nothing for scalar index & FTS, because they always have only 1 delta index. added adding new data here and make the tests cover more cases


for &word in words.iter().chain(new_words.iter()) {
let query_result = dataset
.scan()
.project(&["text"])
.unwrap()
.full_text_search(FullTextSearchQuery::new(word.to_string()))
.unwrap()
.limit(Some(10), None)
.unwrap()
.try_into_batch()
.await
.unwrap();

let texts = query_result["text"]
.as_string::<i32>()
.iter()
.map(|v| match v {
None => "".to_string(),
Some(v) => v.to_string(),
})
.collect::<Vec<String>>();

assert_eq!(texts.len(), 1);
assert_eq!(texts[0], word);
}
}

#[tokio::test]
async fn test_create_index_too_small_for_pq() {
let test_dir = tempdir().unwrap();
Expand Down
Loading