-
-
Notifications
You must be signed in to change notification settings - Fork 715
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support fetching multiple documents #2252
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,10 @@ | ||
use std::collections::BTreeMap; | ||
use std::collections::{BTreeMap, BTreeSet, HashMap}; | ||
use std::sync::Arc; | ||
use std::{fmt, io}; | ||
|
||
#[cfg(feature = "quickwit")] | ||
use futures_util::StreamExt; | ||
|
||
use crate::collector::Collector; | ||
use crate::core::{Executor, SegmentReader}; | ||
use crate::query::{Bm25StatisticsProvider, EnableScoring, Query}; | ||
|
@@ -89,6 +92,23 @@ impl Searcher { | |
store_reader.get(doc_address.doc_id) | ||
} | ||
|
||
/// Fetches documents from tantivy's store given a list of [`DocAddress`]. | ||
/// | ||
/// This method is more efficient than calling [`doc`](Self::doc) multiple times, as it batches | ||
/// overlapping requests to segments and blocks. | ||
pub fn docs<D: DocumentDeserialize>( | ||
&self, | ||
doc_addresses: &BTreeSet<DocAddress>, | ||
) -> crate::Result<HashMap<DocAddress, D>> { | ||
// This implementation assumes that the `BlockCache` inside the `StoreReader` has non-zero | ||
// capacity. This, combined with the fact that iteration of `doc_addresses` is ordered, | ||
// allows for blocks to be re-used. | ||
doc_addresses | ||
.iter() | ||
.map(|doc_address| Ok((*doc_address, self.doc(*doc_address)?))) | ||
.collect() | ||
} | ||
|
||
/// The cache stats for the underlying store reader. | ||
/// | ||
/// Aggregates the sum for each segment store reader. | ||
|
@@ -112,6 +132,72 @@ impl Searcher { | |
store_reader.get_async(doc_address.doc_id).await | ||
} | ||
|
||
/// Fetches a set of documents in an asynchronous manner. | ||
/// | ||
/// This method is more efficient than calling [`doc_async`](Self::doc_async) multiple times, as | ||
/// it batches overlapping requests to segments and blocks. | ||
#[cfg(feature = "quickwit")] | ||
pub async fn docs_async<D: DocumentDeserialize>( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I understand that this is not a particularly technical objection, but I do wonder whether it is helpful to add this as additional API surface to Tantivy itself. User code can already sort For example, a service serving multiple users would probably aim for increased locality by handling a single user request as a single task accessing documents in address order to improve cache efficiency. Such a service would handle parallelism and concurrency by handling multiple users in multiple tasks. Furthermore, I have a hard time seeing a situation where additional within-segment concurrency is better than serial cache-friendly access patterns. Also, the additional data structures required to track this additional concurrency seem ripe with trade-offs which user code is just in a better position to make, e.g. use Long story short, personally, I am not convinced this API is pulling its weight in terms of complexity and implied trade-offs. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not reviewing the PR, but just commenting on why I think there is a need for something in that area: In the sync world, the best access pattern is pretty evident. Make the cache store one docstore block, and sort your doc ids. If multiple docs need the same block, every doc but the first of each block comes free (no io, no decompression, just some quick de-serialization). No duplicated work, maximum cache hit. Now enter the async world. If you sort your docs, and fetch multiple docs concurrently, you might end up fetching multiple docs from the same block, concurrently. They will both see a cache-miss, issue an io, decompress the result, put that in cache, and get the doc of their interest. In quickwit-oss/quickwit#3226 it was found that this very much happens. In this case, user code has pretty much no idea of what is the best order. Random might cause more cache misses, sorted do cause more cache misses. Issuing in some predetermined order that maximize the change that cache is already available , while not making access too far apart so it didn't get evicted was judged an "obscure trick" (which I don't disagree with). I don't know the best solution is. Maybe it's a function to fetch multiple docs at once (originally my suggestion) but that makes tantivy choose a tradeoff which might not be the best for your applicatin. Maybe it's having a function that gives insight in what order to use (something which, given a list of doc ids, tells you which can be done concurrently, and which should be done once another doc has already warmed the cache), letting you choose your tradeoff having the full picture. But one way or another, the status quo is that short of obscure tricks, you are leaving some performance on the table with no good reason. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would say that main problem is that user code lacks information required to choose the best access pattern. For example, a function which informs user code about the blocks for a given document (opaquely like (I guess a function which takes something like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
While I agree that it may be beneficial to have the fine-grained control to choose the trade-off, from an API design perspective, I believe default behavior should be that users don't have to know about these implementation details. This should be an opt-in API. Otherwise, it'd be nice if the non-fine-tuned path had sane, reasonably performant defaults.
This is very good to know. At least in this change, I try to group all documents within the same block into one future to minimize this. Of course, multiple separate calls will not benefit from this grouping but I'd want to see some cases where that is a common-enough pattern to warrant further optimization. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I think the point is here exactly that those futures run concurrently against the same cache and hence will evict each others blocks. After all, fetching multiple blocks concurrently is sort of the point of concurrency here. Hence, I think we should start with cheaply cloning (Concurrency in contrast to parallelism likely does little in a normal Tantivy local-disk-based setup in contrast to a directory implementation calling into e.g. S3. Parallelism on the other hand would also benefit CPU-bound decompression but usually needs integration with the runtime to spawn tasks.) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I proposed some preliminary API additions in #2276. Using these, I think fetching multiple documents should consistent of
Obviously there are quite a few trade-offs implied here, the big one being concurrency versus parallelism. Some can possibly be side stepped, e.g always use a size-1 block each for each group and spawn as many futures/tasks as groups which would always avoid conflicts at the cost of more in-memory data structures. Similarly, we could forgo reuse at the same cost of setting up in-memory data structures. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we can side-step the concurrency/parallelism trade-off here by providing an API that just yields the futures and requires the caller to poll them to completion: So a signature like async fn docs_async(&self, doc_addresses: impl IntoIterator<Item=DocAddress>) -> impl Iterator<Item=impl Future<Output=crate::Result<Vec<D>>>>; which internally sets up the per-segment-per-block groups (e.g. sorting them into There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry for somewhat hijacking this cache, but I pushed an implementation of the above to #2276 which as discussed is still a specific trade-off but should avoid cache trashing while still reusing the existing block cache via |
||
&self, | ||
doc_addresses: &BTreeSet<DocAddress>, | ||
) -> crate::Result<HashMap<DocAddress, D>> { | ||
let mut segment_ord_set = Vec::<&DocAddress>::new(); | ||
let mut futures = futures_util::stream::FuturesUnordered::new(); | ||
|
||
// Helper function that creates a future to fetch the docs for a set of `DocAddress`es that | ||
// all have the same segment ordinal. | ||
let get_docs_for_segment_ord_set = |doc_addrs: &mut Vec<&DocAddress>| { | ||
let segment_ord = doc_addrs.first().unwrap().segment_ord; | ||
let store_reader = &self.inner.store_readers[segment_ord as usize]; | ||
|
||
let doc_ids = std::mem::take(doc_addrs) | ||
.into_iter() | ||
.map(|doc_address| doc_address.doc_id) | ||
.collect(); | ||
|
||
let get_docs_future = || async move { | ||
let docs = store_reader.get_many_async(doc_ids).await?; | ||
Ok::<_, crate::TantivyError>(docs.into_iter().map(move |(doc_id, doc)| { | ||
( | ||
DocAddress { | ||
segment_ord, | ||
doc_id, | ||
}, | ||
doc, | ||
) | ||
})) | ||
}; | ||
|
||
futures.push(get_docs_future()); | ||
}; | ||
|
||
for doc_addr in doc_addresses { | ||
if let Some(cur_set_doc_addr) = segment_ord_set.first() { | ||
if doc_addr.segment_ord != cur_set_doc_addr.segment_ord { | ||
// The new `doc_addr`'s segment ordinal doesn't match that of the current set, | ||
// so grab the set's docs and then start a new set. | ||
get_docs_for_segment_ord_set(&mut segment_ord_set); | ||
} | ||
} | ||
|
||
segment_ord_set.push(doc_addr); | ||
} | ||
|
||
if !segment_ord_set.is_empty() { | ||
get_docs_for_segment_ord_set(&mut segment_ord_set); | ||
} | ||
|
||
// Debug assert to ensure that all `DocAddress`es were processed. | ||
debug_assert!(segment_ord_set.is_empty()); | ||
|
||
let mut results = HashMap::<DocAddress, D>::with_capacity(doc_addresses.len()); | ||
while let Some(docs_result) = futures.next().await { | ||
results.extend(docs_result?); | ||
} | ||
|
||
Ok(results) | ||
} | ||
|
||
/// Access the schema associated with the index of this searcher. | ||
pub fn schema(&self) -> &Schema { | ||
&self.inner.schema | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
use std::collections::{BTreeSet, HashMap}; | ||
use std::io; | ||
use std::iter::Sum; | ||
use std::num::NonZeroUsize; | ||
|
@@ -6,6 +7,8 @@ use std::sync::atomic::{AtomicUsize, Ordering}; | |
use std::sync::{Arc, Mutex}; | ||
|
||
use common::{BinarySerializable, OwnedBytes}; | ||
#[cfg(feature = "quickwit")] | ||
use futures_util::StreamExt; | ||
use lru::LruCache; | ||
|
||
use super::footer::DocStoreFooter; | ||
|
@@ -206,6 +209,44 @@ impl StoreReader { | |
D::deserialize(deserializer).map_err(crate::TantivyError::from) | ||
} | ||
|
||
/// Reads a set of given documents. | ||
/// | ||
/// Calling [`get_many`](Self::get_many) is more efficient than calling [`get`](Self::get) | ||
/// multiple times, as it will only read and decompress a block once for all documents within a | ||
/// block. | ||
pub fn get_many<D: DocumentDeserialize>( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This does not seem to be used any more? |
||
&self, | ||
mut doc_ids: BTreeSet<DocId>, | ||
) -> crate::Result<HashMap<DocId, D>> { | ||
let mut results = HashMap::with_capacity(doc_ids.len()); | ||
|
||
// Helper function to deserialize a document from bytes. | ||
let deserialize_from_bytes = |doc_bytes: &mut OwnedBytes| { | ||
let deserializer = BinaryDocumentDeserializer::from_reader(doc_bytes) | ||
.map_err(crate::TantivyError::from)?; | ||
D::deserialize(deserializer).map_err(crate::TantivyError::from) | ||
}; | ||
|
||
while let Some(doc_id) = doc_ids.pop_last() { | ||
let checkpoint = self.block_checkpoint(doc_id)?; | ||
let block = self.read_block(&checkpoint)?; | ||
let mut doc_bytes = | ||
Self::get_document_bytes_from_block(block.clone(), doc_id, &checkpoint)?; | ||
|
||
results.insert(doc_id, deserialize_from_bytes(&mut doc_bytes)?); | ||
|
||
// Split off all doc ids that are in the same block and read them in as well. | ||
let additional_doc_ids = doc_ids.split_off(&checkpoint.doc_range.start); | ||
for doc_id in additional_doc_ids { | ||
let mut doc_bytes = | ||
Self::get_document_bytes_from_block(block.clone(), doc_id, &checkpoint)?; | ||
results.insert(doc_id, deserialize_from_bytes(&mut doc_bytes)?); | ||
} | ||
} | ||
|
||
Ok(results) | ||
} | ||
|
||
/// Returns raw bytes of a given document. | ||
/// | ||
/// Calling `.get(doc)` is relatively costly as it requires | ||
|
@@ -377,6 +418,50 @@ impl StoreReader { | |
.map_err(crate::TantivyError::from)?; | ||
D::deserialize(deserializer).map_err(crate::TantivyError::from) | ||
} | ||
|
||
/// Fetches a set of documents asynchronously. Async version of [`get_many`](Self::get_many), | ||
/// except that it may read blocks in parallel. | ||
pub async fn get_many_async<D: DocumentDeserialize>( | ||
&self, | ||
mut doc_ids: BTreeSet<DocId>, | ||
) -> crate::Result<HashMap<DocId, D>> { | ||
// Helper function to deserialize a document from bytes. | ||
let deserialize_from_bytes = |doc_bytes: &mut OwnedBytes| { | ||
let deserializer = BinaryDocumentDeserializer::from_reader(doc_bytes) | ||
.map_err(crate::TantivyError::from)?; | ||
D::deserialize(deserializer).map_err(crate::TantivyError::from) | ||
}; | ||
|
||
let mut read_block_futures = futures_util::stream::FuturesUnordered::new(); | ||
|
||
// Spawn a future for each block to read. | ||
while let Some(doc_id) = doc_ids.pop_last() { | ||
let checkpoint = self.block_checkpoint(doc_id)?; | ||
|
||
let mut checkpoint_doc_ids = doc_ids.split_off(&checkpoint.doc_range.start); | ||
checkpoint_doc_ids.insert(doc_id); | ||
|
||
let read_block_future = || async move { | ||
let block = self.read_block_async(&checkpoint).await?; | ||
Ok::<_, io::Error>((block, checkpoint, checkpoint_doc_ids)) | ||
}; | ||
read_block_futures.push(read_block_future()); | ||
} | ||
|
||
let mut results = HashMap::with_capacity(doc_ids.len()); | ||
|
||
while let Some(read_block_result) = read_block_futures.next().await { | ||
let (block, checkpoint, checkpoint_doc_ids) = read_block_result?; | ||
|
||
for doc_id in checkpoint_doc_ids { | ||
let mut doc_bytes = | ||
Self::get_document_bytes_from_block(block.clone(), doc_id, &checkpoint)?; | ||
results.insert(doc_id, deserialize_from_bytes(&mut doc_bytes)?); | ||
} | ||
} | ||
|
||
Ok(results) | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function could be simplified, if we rely on the
BlockCache
. Since the doc_adresses are already sorted we can just calldoc
and always hit the cache.Downside would be that this works only if the
BlockCache
parametercache_num_blocks
is not zeroThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't love relying on this invariant, since the dependency is non-obvious, but I've left a comment. It certainly does simplify the code here, though.
In the future, we could also use a thread-pool to parallelize access to blocks, which may require us to revive the previous code but until then, this is fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As written I don't think this pulls it weight as an API addition, especially since it enforces a data structure (
BTreeSet
) that stays sorted under modification instead of for example just sorting a slice ofDocAddress
es.Maybe a performance hint on
Searcher::doc
to access multiple documents in address order if possible to improve cache hit rates would serve the same purpose?