Skip to content

Commit

Permalink
Support fetching multiple documents
Browse files Browse the repository at this point in the history
  • Loading branch information
GodTamIt committed Nov 14, 2023
1 parent b60d862 commit 4657510
Show file tree
Hide file tree
Showing 3 changed files with 283 additions and 4 deletions.
153 changes: 152 additions & 1 deletion src/core/searcher.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use std::{fmt, io};

#[cfg(feature = "quickwit")]
use futures_util::StreamExt;

use crate::collector::Collector;
use crate::core::{Executor, SegmentReader};
use crate::query::{Bm25StatisticsProvider, EnableScoring, Query};
Expand Down Expand Up @@ -89,6 +92,75 @@ impl Searcher {
store_reader.get(doc_address.doc_id)
}

/// Fetches documents from tantivy's store given a list of [`DocAddress`].
///
/// This method is more efficient than calling [`doc`](Self::doc) multiple times, as it batches
/// overlapping requests to segments and blocks.
///
/// The documents are returned in the same order as the `doc_addresses` slice.
pub fn docs<D: DocumentDeserialize + Clone>(
&self,
doc_addresses: &[DocAddress],
) -> crate::Result<Vec<D>> {
// Ordering by segment ordinal allows reuse of a store reader for documents in the same
// segment.
let mut doc_addrs_by_segment_ord = HashMap::new();
for (idx, doc_address) in doc_addresses.iter().enumerate() {
doc_addrs_by_segment_ord
.entry(doc_address.segment_ord as usize)
.or_insert_with(HashMap::new)
.entry(doc_address)
.or_insert_with(Vec::new)
.push(idx);
}

let mut docs = Vec::with_capacity(doc_addresses.len());

for (segment_ord, doc_addr_to_idxs) in doc_addrs_by_segment_ord {
let store_reader = &self.inner.store_readers[segment_ord];

let doc_ids = doc_addr_to_idxs
.keys()
.map(|doc_address| doc_address.doc_id)
.collect();
let mut docs_in_segment: HashMap<u32, D> = store_reader.get_many(doc_ids)?;

for (doc_address, idxs) in doc_addr_to_idxs {
match idxs.as_slice() {
[idx] => {
debug_assert!(*idx < docs.capacity());

// Safety: The index is valid because it was derived from `doc_addresses`.
unsafe {
*docs.get_unchecked_mut(*idx) =
docs_in_segment.remove(&doc_address.doc_id).unwrap();
}
}
idxs_slice => {
let doc = docs_in_segment.remove(&doc_address.doc_id).unwrap();
for idx in idxs_slice {
debug_assert!(*idx < docs.capacity());

// Safety: The index is valid because it was derived from
// `doc_addresses`.
unsafe {
*docs.get_unchecked_mut(*idx) = doc.clone();
}
}
}
}
}
}

// Safety: The length is valid because every element is fetched and populated in the loop
// above.
unsafe {
docs.set_len(doc_addresses.len());
}

Ok(docs)
}

/// The cache stats for the underlying store reader.
///
/// Aggregates the sum for each segment store reader.
Expand All @@ -112,6 +184,85 @@ impl Searcher {
store_reader.get_async(doc_address.doc_id).await
}

/// Fetches a set of documents in an asynchronous manner.
///
/// This method is more efficient than calling [`doc_async`](Self::doc_async) multiple times, as
/// it batches overlapping requests to segments and blocks.
///
/// The documents are returned in the same order as the `doc_addresses` slice.
#[cfg(feature = "quickwit")]
pub async fn docs_async<D: DocumentDeserialize + Clone>(
&self,
doc_addresses: &[DocAddress],
) -> crate::Result<Vec<D>> {
// Ordering by segment ordinal allows reuse of a store reader for documents in the same
// segment.
let mut doc_addrs_by_segment_ord = HashMap::new();
for (idx, doc_address) in doc_addresses.iter().enumerate() {
doc_addrs_by_segment_ord
.entry(doc_address.segment_ord as usize)
.or_insert_with(HashMap::new)
.entry(doc_address)
.or_insert_with(Vec::new)
.push(idx);
}

let mut futures = futures_util::stream::FuturesUnordered::new();

for (segment_ord, doc_addr_to_idxs) in doc_addrs_by_segment_ord {
let store_reader = &self.inner.store_readers[segment_ord];

let doc_ids = doc_addr_to_idxs
.keys()
.map(|doc_address| doc_address.doc_id)
.collect();
let doc_future = || async move {
let docs_in_segment: HashMap<u32, D> = store_reader.get_many_async(doc_ids).await?;
Ok::<_, crate::TantivyError>((docs_in_segment, doc_addr_to_idxs))
};
futures.push(doc_future());
}

let mut docs = Vec::with_capacity(doc_addresses.len());

while let Some(result) = futures.next().await {
let (mut docs_in_segment, doc_addr_to_idxs) = result?;
for (doc_address, idxs) in doc_addr_to_idxs {
match idxs.as_slice() {
[idx] => {
debug_assert!(*idx < docs.capacity());

// Safety: The index is valid because it was derived from `doc_addresses`.
unsafe {
*docs.get_unchecked_mut(*idx) =
docs_in_segment.remove(&doc_address.doc_id).unwrap();
}
}
idxs_slice => {
let doc = docs_in_segment.remove(&doc_address.doc_id).unwrap();
for idx in idxs_slice {
debug_assert!(*idx < docs.capacity());

// Safety: The index is valid because it was derived from
// `doc_addresses`.
unsafe {
*docs.get_unchecked_mut(*idx) = doc.clone();
}
}
}
}
}
}

// Safety: The length is valid because every element is fetched and populated in the loop
// above.
unsafe {
docs.set_len(doc_addresses.len());
}

Ok(docs)
}

/// Access the schema associated with the index of this searcher.
pub fn schema(&self) -> &Schema {
&self.inner.schema
Expand Down
49 changes: 46 additions & 3 deletions src/core/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ use crate::collector::Count;
use crate::directory::{RamDirectory, WatchCallback};
use crate::indexer::NoMergePolicy;
use crate::query::TermQuery;
use crate::schema::{Field, IndexRecordOption, Schema, INDEXED, STRING, TEXT};
use crate::schema::{Field, IndexRecordOption, OwnedValue, Schema, INDEXED, STORED, STRING, TEXT};
use crate::tokenizer::TokenizerManager;
use crate::{
Directory, Index, IndexBuilder, IndexReader, IndexSettings, IndexWriter, ReloadPolicy,
SegmentId, TantivyDocument, Term,
Directory, DocAddress, Index, IndexBuilder, IndexReader, IndexSettings, IndexWriter,
ReloadPolicy, SegmentId, TantivyDocument, Term,
};

#[test]
Expand Down Expand Up @@ -344,3 +344,46 @@ fn test_merging_segment_update_docfreq() {
let term_info = inv_index.get_term_info(&term).unwrap().unwrap();
assert_eq!(term_info.doc_freq, 12);
}

#[test]
fn test_get_many_docs() -> crate::Result<()> {
let mut schema_builder = Schema::builder();
let num_field = schema_builder.add_u64_field("num", STORED);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer: IndexWriter = index.writer_for_tests()?;
index_writer.set_merge_policy(Box::new(NoMergePolicy));
for i in 0..10u64 {
let doc = doc!(num_field=>i);
index_writer.add_document(doc)?;
}

index_writer.commit()?;
let segment_ids = index.searchable_segment_ids()?;
index_writer.merge(&segment_ids).wait().unwrap();

let searcher = index.reader()?.searcher();
assert_eq!(searcher.num_docs(), 10);

let doc_addresses = (0..10)
.map(|i| DocAddress::new(0u32, i))
.collect::<Vec<_>>();

let docs = searcher.docs::<TantivyDocument>(&doc_addresses)?;
let mut doc_nums = Vec::new();

for doc in docs {
let num_value = doc.get_first(num_field).unwrap();

if let OwnedValue::U64(num) = num_value {
doc_nums.push(*num);
} else {
panic!("Expected u64 value");
}
}

doc_nums.sort();
assert_eq!(doc_nums, (0..10).collect::<Vec<u64>>());

Ok(())
}
85 changes: 85 additions & 0 deletions src/store/reader.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::{BTreeSet, HashMap};
use std::io;
use std::iter::Sum;
use std::num::NonZeroUsize;
Expand Down Expand Up @@ -206,6 +207,44 @@ impl StoreReader {
D::deserialize(deserializer).map_err(crate::TantivyError::from)
}

/// Reads a set of given documents.
///
/// Calling [`get_many`](Self::get_many) is more efficient than calling [`get`](Self::get)
/// multiple times, as it will only read and decompress a block once for all documents within a
/// block.
pub fn get_many<D: DocumentDeserialize>(
&self,
mut doc_ids: BTreeSet<DocId>,
) -> crate::Result<HashMap<DocId, D>> {
let mut results = HashMap::with_capacity(doc_ids.len());

// Helper function to deserialize a document from bytes.
let deserialize_from_bytes = |doc_bytes: &mut OwnedBytes| {
let deserializer = BinaryDocumentDeserializer::from_reader(doc_bytes)
.map_err(crate::TantivyError::from)?;
D::deserialize(deserializer).map_err(crate::TantivyError::from)
};

while let Some(doc_id) = doc_ids.pop_last() {
let checkpoint = self.block_checkpoint(doc_id)?;
let block = self.read_block(&checkpoint)?;
let mut doc_bytes =
Self::get_document_bytes_from_block(block.clone(), doc_id, &checkpoint)?;

results.insert(doc_id, deserialize_from_bytes(&mut doc_bytes)?);

// Split off all doc ids that are in the same block and read them in as well.
let additional_doc_ids = doc_ids.split_off(&checkpoint.doc_range.start);
for doc_id in additional_doc_ids {
let mut doc_bytes =
Self::get_document_bytes_from_block(block.clone(), doc_id, &checkpoint)?;
results.insert(doc_id, deserialize_from_bytes(&mut doc_bytes)?);
}
}

Ok(results)
}

/// Returns raw bytes of a given document.
///
/// Calling `.get(doc)` is relatively costly as it requires
Expand Down Expand Up @@ -377,6 +416,52 @@ impl StoreReader {
.map_err(crate::TantivyError::from)?;
D::deserialize(deserializer).map_err(crate::TantivyError::from)
}

/// Fetches a set of documents asynchronously. Async version of [`get_many`](Self::get_many),
/// except that it may read blocks in parallel.
pub async fn get_many_async<D: DocumentDeserialize>(
&self,
mut doc_ids: BTreeSet<DocId>,
) -> crate::Result<HashMap<DocId, D>> {
use futures_util::StreamExt;

let mut results = HashMap::with_capacity(doc_ids.len());

// Helper function to deserialize a document from bytes.
let deserialize_from_bytes = |doc_bytes: &mut OwnedBytes| {
let deserializer = BinaryDocumentDeserializer::from_reader(doc_bytes)
.map_err(crate::TantivyError::from)?;
D::deserialize(deserializer).map_err(crate::TantivyError::from)
};

let mut read_block_futures = futures_util::stream::FuturesUnordered::new();

// Spawn a future for each block to read.
while let Some(doc_id) = doc_ids.pop_last() {
let checkpoint = self.block_checkpoint(doc_id)?;

let mut checkpoint_doc_ids = doc_ids.split_off(&checkpoint.doc_range.start);
checkpoint_doc_ids.insert(doc_id);

let read_block_future = || async move {
let block = self.read_block_async(&checkpoint).await?;
Ok::<_, io::Error>((block, checkpoint, checkpoint_doc_ids))
};
read_block_futures.push(read_block_future());
}

while let Some(read_block_result) = read_block_futures.next().await {
let (block, checkpoint, checkpoint_doc_ids) = read_block_result?;

for doc_id in checkpoint_doc_ids {
let mut doc_bytes =
Self::get_document_bytes_from_block(block.clone(), doc_id, &checkpoint)?;
results.insert(doc_id, deserialize_from_bytes(&mut doc_bytes)?);
}
}

Ok(results)
}
}

#[cfg(test)]
Expand Down

0 comments on commit 4657510

Please sign in to comment.