From 64fd5b87f2ed2da32bd59725e00b9fb67f2c1150 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Tue, 25 Apr 2023 17:31:55 +0200 Subject: [PATCH] interleave docstore fetch --- quickwit/quickwit-search/src/fetch_docs.rs | 80 +++++++++++++++++++++- 1 file changed, 79 insertions(+), 1 deletion(-) diff --git a/quickwit/quickwit-search/src/fetch_docs.rs b/quickwit/quickwit-search/src/fetch_docs.rs index 693784f8a67..6bb08909785 100644 --- a/quickwit/quickwit-search/src/fetch_docs.rs +++ b/quickwit/quickwit-search/src/fetch_docs.rs @@ -187,7 +187,8 @@ async fn fetch_docs_in_split( None }; - let doc_futures = global_doc_addrs.into_iter().map(|global_doc_addr| { + let global_doc_addrs = interleave(global_doc_addrs, NUM_CONCURRENT_REQUESTS); + let doc_futures = global_doc_addrs.map(|global_doc_addr| { let moved_searcher = searcher.clone(); let moved_doc_mapper = doc_mapper.clone(); let fields_snippet_generator_opt_clone = fields_snippet_generator_opt.clone(); @@ -248,6 +249,51 @@ async fn fetch_docs_in_split( .await } +fn interleave( + mut data: Vec, + interleave_size: usize, +) -> impl Iterator { + let empty_address = GlobalDocAddress { + split: String::new(), + doc_addr: tantivy::DocAddress { + segment_ord: 0, + doc_id: 0, + }, + }; + + interleave_sequence_generator(interleave_size, data.len()) + .map(move |index| std::mem::replace(&mut data[index], empty_address.clone())) +} + +/// This function generates an order of access where element X+1 is accessed as far as possible +/// after element X, while keeping them less than `interleave_size` interval apparts. +/// The goal is to maximize the time elem has to be processed and fill the cache before X+1 gets +/// processed. The algorithm used is suboptimal on batch limit. +// For an interleave_size of 3, the following sequence get generated: +// +// position: |0|1|2|3|4|5|6|7|8|9|10|11|12|13|14|15|16|17| +// batch_id: | 0 | 1 | +// sub_batch: | 0 | 1 | 2 | 0 | 1 | 2 | +// sub_pos: |0|1|2|0|1|2|0|1|2|0| 1| 2| 0| 1| 2| 0| 1| 2| +// generated: |0|3|6|1|4|7|2|5|8|9|12|15|10|13|16|11|14|17| +fn interleave_sequence_generator( + interleave_size: usize, + length: usize, +) -> impl Iterator { + let batch_size = interleave_size * interleave_size; + + (0..batch_size + length) + .map(move |position| { + let batch_id = position / batch_size; + let batch_pos = position % batch_size; + let sub_pos = position % interleave_size; + let sub_batch = batch_pos / interleave_size; + + sub_batch + sub_pos * interleave_size + batch_id * batch_size + }) + .filter(move |index| *index < length) +} + // A struct to hold the snippet generators associated to // the snippet fields from a search request. #[derive(Clone)] @@ -338,3 +384,35 @@ async fn create_snippet_generator( SNIPPET_MAX_NUM_CHARS, )) } + +#[cfg(test)] +mod tests { + use super::interleave_sequence_generator; + + #[test] + fn test_interleave_sequence_generator() { + let expected_3 = [0, 3, 6, 1, 4, 7, 2, 5, 8]; + + for i in 0..32 { + let mut expected = Vec::new(); + for j in 0..4 { + expected.extend(expected_3.iter().map(|e| j * 9 + e).filter(|e| *e < i)); + } + assert_eq!(expected.len(), i); + + let res: Vec<_> = interleave_sequence_generator(3, i).collect(); + assert_eq!(res, expected); + } + + let expected_4 = [0, 4, 8, 12, 1, 5, 9, 13, 2, 6, 10, 14, 3, 7, 11, 15]; + for i in 0..40 { + let mut expected = Vec::new(); + for j in 0..3 { + expected.extend(expected_4.iter().map(|e| j * 16 + e).filter(|e| *e < i)); + } + + let res: Vec<_> = interleave_sequence_generator(4, i).collect(); + assert_eq!(res, expected); + } + } +}