Skip to content

Commit

Permalink
Rewrite to take BTreeSet for addresses
Browse files Browse the repository at this point in the history
  • Loading branch information
GodTamIt committed Nov 14, 2023
1 parent 4657510 commit f0ff5eb
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 118 deletions.
209 changes: 95 additions & 114 deletions src/core/searcher.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::{BTreeMap, HashMap};
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::sync::Arc;
use std::{fmt, io};

Expand Down Expand Up @@ -96,69 +96,63 @@ impl Searcher {
///
/// This method is more efficient than calling [`doc`](Self::doc) multiple times, as it batches
/// overlapping requests to segments and blocks.
///
/// The documents are returned in the same order as the `doc_addresses` slice.
pub fn docs<D: DocumentDeserialize + Clone>(
pub fn docs<D: DocumentDeserialize>(
&self,
doc_addresses: &[DocAddress],
) -> crate::Result<Vec<D>> {
// Ordering by segment ordinal allows reuse of a store reader for documents in the same
// segment.
let mut doc_addrs_by_segment_ord = HashMap::new();
for (idx, doc_address) in doc_addresses.iter().enumerate() {
doc_addrs_by_segment_ord
.entry(doc_address.segment_ord as usize)
.or_insert_with(HashMap::new)
.entry(doc_address)
.or_insert_with(Vec::new)
.push(idx);
}

let mut docs = Vec::with_capacity(doc_addresses.len());

for (segment_ord, doc_addr_to_idxs) in doc_addrs_by_segment_ord {
let store_reader = &self.inner.store_readers[segment_ord];
doc_addresses: &BTreeSet<DocAddress>,
) -> crate::Result<HashMap<DocAddress, D>> {
let mut results = HashMap::<DocAddress, D>::with_capacity(doc_addresses.len());
let mut segment_ord_set = Vec::<&DocAddress>::new();

// Helper function that gets the docs for a set of `DocAddress`es that all have the same
// segment ordinal.
let mut get_docs_for_segment_ord_set =
|doc_addrs: &mut Vec<&DocAddress>| -> crate::Result<()> {
let segment_ord = doc_addrs.first().unwrap().segment_ord;
let store_reader = &self.inner.store_readers[segment_ord as usize];

let doc_ids = std::mem::take(doc_addrs)
.into_iter()
.map(|doc_address| doc_address.doc_id)
.collect();

results.extend(
store_reader
.get_many(doc_ids)?
.into_iter()
.map(|(doc_id, doc)| {
(
DocAddress {
segment_ord,
doc_id,
},
doc,
)
}),
);

Ok(())
};

let doc_ids = doc_addr_to_idxs
.keys()
.map(|doc_address| doc_address.doc_id)
.collect();
let mut docs_in_segment: HashMap<u32, D> = store_reader.get_many(doc_ids)?;

for (doc_address, idxs) in doc_addr_to_idxs {
match idxs.as_slice() {
[idx] => {
debug_assert!(*idx < docs.capacity());

// Safety: The index is valid because it was derived from `doc_addresses`.
unsafe {
*docs.get_unchecked_mut(*idx) =
docs_in_segment.remove(&doc_address.doc_id).unwrap();
}
}
idxs_slice => {
let doc = docs_in_segment.remove(&doc_address.doc_id).unwrap();
for idx in idxs_slice {
debug_assert!(*idx < docs.capacity());

// Safety: The index is valid because it was derived from
// `doc_addresses`.
unsafe {
*docs.get_unchecked_mut(*idx) = doc.clone();
}
}
}
for doc_addr in doc_addresses {
if let Some(cur_set_doc_addr) = segment_ord_set.first() {
if doc_addr.segment_ord != cur_set_doc_addr.segment_ord {
// The new `doc_addr`'s segment ordinal doesn't match that of the current set,
// so grab the set's docs and then start a new set.
get_docs_for_segment_ord_set(&mut segment_ord_set)?;
}
}

segment_ord_set.push(doc_addr);
}

// Safety: The length is valid because every element is fetched and populated in the loop
// above.
unsafe {
docs.set_len(doc_addresses.len());
if !segment_ord_set.is_empty() {
get_docs_for_segment_ord_set(&mut segment_ord_set)?;
}

Ok(docs)
// Debug assert to ensure that all `DocAddress`es were processed.
debug_assert!(segment_ord_set.is_empty());

Ok(results)
}

/// The cache stats for the underlying store reader.
Expand Down Expand Up @@ -188,79 +182,66 @@ impl Searcher {
///
/// This method is more efficient than calling [`doc_async`](Self::doc_async) multiple times, as
/// it batches overlapping requests to segments and blocks.
///
/// The documents are returned in the same order as the `doc_addresses` slice.
#[cfg(feature = "quickwit")]
pub async fn docs_async<D: DocumentDeserialize + Clone>(
pub async fn docs_async<D: DocumentDeserialize>(
&self,
doc_addresses: &[DocAddress],
) -> crate::Result<Vec<D>> {
// Ordering by segment ordinal allows reuse of a store reader for documents in the same
// segment.
let mut doc_addrs_by_segment_ord = HashMap::new();
for (idx, doc_address) in doc_addresses.iter().enumerate() {
doc_addrs_by_segment_ord
.entry(doc_address.segment_ord as usize)
.or_insert_with(HashMap::new)
.entry(doc_address)
.or_insert_with(Vec::new)
.push(idx);
}

doc_addresses: &BTreeSet<DocAddress>,
) -> crate::Result<HashMap<DocAddress, D>> {
let mut segment_ord_set = Vec::<&DocAddress>::new();
let mut futures = futures_util::stream::FuturesUnordered::new();

for (segment_ord, doc_addr_to_idxs) in doc_addrs_by_segment_ord {
let store_reader = &self.inner.store_readers[segment_ord];
// Helper function that creates a future to fetch the docs for a set of `DocAddress`es that
// all have the same segment ordinal.
let get_docs_for_segment_ord_set = |doc_addrs: &mut Vec<&DocAddress>| {
let segment_ord = doc_addrs.first().unwrap().segment_ord;
let store_reader = &self.inner.store_readers[segment_ord as usize];

let doc_ids = doc_addr_to_idxs
.keys()
let doc_ids = std::mem::take(doc_addrs)
.into_iter()
.map(|doc_address| doc_address.doc_id)
.collect();
let doc_future = || async move {
let docs_in_segment: HashMap<u32, D> = store_reader.get_many_async(doc_ids).await?;
Ok::<_, crate::TantivyError>((docs_in_segment, doc_addr_to_idxs))

let get_docs_future = || async move {
let docs = store_reader.get_many_async(doc_ids).await?;
Ok::<_, crate::TantivyError>(docs.into_iter().map(move |(doc_id, doc)| {
(
DocAddress {
segment_ord,
doc_id,
},
doc,
)
}))
};
futures.push(doc_future());
}

let mut docs = Vec::with_capacity(doc_addresses.len());

while let Some(result) = futures.next().await {
let (mut docs_in_segment, doc_addr_to_idxs) = result?;
for (doc_address, idxs) in doc_addr_to_idxs {
match idxs.as_slice() {
[idx] => {
debug_assert!(*idx < docs.capacity());

// Safety: The index is valid because it was derived from `doc_addresses`.
unsafe {
*docs.get_unchecked_mut(*idx) =
docs_in_segment.remove(&doc_address.doc_id).unwrap();
}
}
idxs_slice => {
let doc = docs_in_segment.remove(&doc_address.doc_id).unwrap();
for idx in idxs_slice {
debug_assert!(*idx < docs.capacity());

// Safety: The index is valid because it was derived from
// `doc_addresses`.
unsafe {
*docs.get_unchecked_mut(*idx) = doc.clone();
}
}
}
futures.push(get_docs_future());
};

for doc_addr in doc_addresses {
if let Some(cur_set_doc_addr) = segment_ord_set.first() {
if doc_addr.segment_ord != cur_set_doc_addr.segment_ord {
// The new `doc_addr`'s segment ordinal doesn't match that of the current set,
// so grab the set's docs and then start a new set.
get_docs_for_segment_ord_set(&mut segment_ord_set);
}
}

segment_ord_set.push(doc_addr);
}

// Safety: The length is valid because every element is fetched and populated in the loop
// above.
unsafe {
docs.set_len(doc_addresses.len());
if !segment_ord_set.is_empty() {
get_docs_for_segment_ord_set(&mut segment_ord_set);
}

// Debug assert to ensure that all `DocAddress`es were processed.
debug_assert!(segment_ord_set.is_empty());

let mut results = HashMap::<DocAddress, D>::with_capacity(doc_addresses.len());
while let Some(docs_result) = futures.next().await {
results.extend(docs_result?);
}

Ok(docs)
Ok(results)
}

/// Access the schema associated with the index of this searcher.
Expand Down
6 changes: 4 additions & 2 deletions src/core/tests.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::BTreeSet;

use crate::collector::Count;
use crate::directory::{RamDirectory, WatchCallback};
use crate::indexer::NoMergePolicy;
Expand Down Expand Up @@ -367,12 +369,12 @@ fn test_get_many_docs() -> crate::Result<()> {

let doc_addresses = (0..10)
.map(|i| DocAddress::new(0u32, i))
.collect::<Vec<_>>();
.collect::<BTreeSet<_>>();

let docs = searcher.docs::<TantivyDocument>(&doc_addresses)?;
let mut doc_nums = Vec::new();

for doc in docs {
for (_doc_addr, doc) in docs {
let num_value = doc.get_first(num_field).unwrap();

if let OwnedValue::U64(num) = num_value {
Expand Down
4 changes: 2 additions & 2 deletions src/store/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};

use common::{BinarySerializable, OwnedBytes};
#[cfg(feature = "quickwit")]
use futures_util::StreamExt;
use lru::LruCache;

use super::footer::DocStoreFooter;
Expand Down Expand Up @@ -423,8 +425,6 @@ impl StoreReader {
&self,
mut doc_ids: BTreeSet<DocId>,
) -> crate::Result<HashMap<DocId, D>> {
use futures_util::StreamExt;

let mut results = HashMap::with_capacity(doc_ids.len());

// Helper function to deserialize a document from bytes.
Expand Down

0 comments on commit f0ff5eb

Please sign in to comment.