Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

expose collect_block buffer size #2326

Merged
merged 2 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/aggregation/bucket/term_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ pub struct TermsAggregation {
///
/// Defaults to 10 * size.
#[serde(skip_serializing_if = "Option::is_none", default)]
#[serde(alias = "segment_size")]
#[serde(alias = "shard_size")]
#[serde(alias = "split_size")]
pub shard_size: Option<u32>,
pub segment_size: Option<u32>,

/// If you set the `show_term_doc_count_error` parameter to true, the terms aggregation will
/// include doc_count_error_upper_bound, which is an upper bound to the error on the
Expand Down Expand Up @@ -196,7 +196,7 @@ impl TermsAggregationInternal {
pub(crate) fn from_req(req: &TermsAggregation) -> Self {
let size = req.size.unwrap_or(10);

let mut segment_size = req.shard_size.unwrap_or(size * 10);
let mut segment_size = req.segment_size.unwrap_or(size * 10);

let order = req.order.clone().unwrap_or_default();
segment_size = segment_size.max(size);
Expand Down
4 changes: 4 additions & 0 deletions src/collector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,10 @@ pub trait SegmentCollector: 'static {
fn collect(&mut self, doc: DocId, score: Score);

/// The query pushes the scored document to the collector via this method.
/// This method is used when the collector does not require scoring.
///
/// See [`COLLECT_BLOCK_BUFFER_LEN`](crate::COLLECT_BLOCK_BUFFER_LEN) for the
/// buffer size passed to the collector.
fn collect_block(&mut self, docs: &[DocId]) {
for doc in docs {
self.collect(*doc, 0.0);
Expand Down
9 changes: 6 additions & 3 deletions src/docset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use crate::DocId;
/// to compare `[u32; 4]`.
pub const TERMINATED: DocId = i32::MAX as u32;

pub const BUFFER_LEN: usize = 64;
/// The collect_block method on `SegmentCollector` uses a buffer of this size.
/// Passed results to `collect_block` will not exceed this size and will be
/// exactly this size as long as we can fill the buffer.
pub const COLLECT_BLOCK_BUFFER_LEN: usize = 64;

/// Represents an iterable set of sorted doc ids.
pub trait DocSet: Send {
Expand Down Expand Up @@ -61,7 +64,7 @@ pub trait DocSet: Send {
/// This method is only here for specific high-performance
/// use case where batching. The normal way to
/// go through the `DocId`'s is to call `.advance()`.
fn fill_buffer(&mut self, buffer: &mut [DocId; BUFFER_LEN]) -> usize {
fn fill_buffer(&mut self, buffer: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN]) -> usize {
if self.doc() == TERMINATED {
return 0;
}
Expand Down Expand Up @@ -151,7 +154,7 @@ impl<TDocSet: DocSet + ?Sized> DocSet for Box<TDocSet> {
unboxed.seek(target)
}

fn fill_buffer(&mut self, buffer: &mut [DocId; BUFFER_LEN]) -> usize {
fn fill_buffer(&mut self, buffer: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN]) -> usize {
let unboxed: &mut TDocSet = self.borrow_mut();
unboxed.fill_buffer(buffer)
}
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ pub use common::{f64_to_u64, i64_to_u64, u64_to_f64, u64_to_i64, HasLen};
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};

pub use self::docset::{DocSet, TERMINATED};
pub use self::docset::{DocSet, COLLECT_BLOCK_BUFFER_LEN, TERMINATED};
#[deprecated(
since = "0.22.0",
note = "Will be removed in tantivy 0.23. Use export from snippet module instead"
Expand Down
20 changes: 10 additions & 10 deletions src/query/all_query.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::docset::{DocSet, BUFFER_LEN, TERMINATED};
use crate::docset::{DocSet, COLLECT_BLOCK_BUFFER_LEN, TERMINATED};
use crate::index::SegmentReader;
use crate::query::boost_query::BoostScorer;
use crate::query::explanation::does_not_match;
Expand Down Expand Up @@ -54,7 +54,7 @@ impl DocSet for AllScorer {
self.doc
}

fn fill_buffer(&mut self, buffer: &mut [DocId; BUFFER_LEN]) -> usize {
fn fill_buffer(&mut self, buffer: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN]) -> usize {
if self.doc() == TERMINATED {
return 0;
}
Expand Down Expand Up @@ -96,7 +96,7 @@ impl Scorer for AllScorer {
#[cfg(test)]
mod tests {
use super::AllQuery;
use crate::docset::{DocSet, BUFFER_LEN, TERMINATED};
use crate::docset::{DocSet, COLLECT_BLOCK_BUFFER_LEN, TERMINATED};
use crate::query::{AllScorer, EnableScoring, Query};
use crate::schema::{Schema, TEXT};
use crate::{Index, IndexWriter};
Expand Down Expand Up @@ -162,16 +162,16 @@ mod tests {
pub fn test_fill_buffer() {
let mut postings = AllScorer {
doc: 0u32,
max_doc: BUFFER_LEN as u32 * 2 + 9,
max_doc: COLLECT_BLOCK_BUFFER_LEN as u32 * 2 + 9,
};
let mut buffer = [0u32; BUFFER_LEN];
assert_eq!(postings.fill_buffer(&mut buffer), BUFFER_LEN);
for i in 0u32..BUFFER_LEN as u32 {
let mut buffer = [0u32; COLLECT_BLOCK_BUFFER_LEN];
assert_eq!(postings.fill_buffer(&mut buffer), COLLECT_BLOCK_BUFFER_LEN);
for i in 0u32..COLLECT_BLOCK_BUFFER_LEN as u32 {
assert_eq!(buffer[i as usize], i);
}
assert_eq!(postings.fill_buffer(&mut buffer), BUFFER_LEN);
for i in 0u32..BUFFER_LEN as u32 {
assert_eq!(buffer[i as usize], i + BUFFER_LEN as u32);
assert_eq!(postings.fill_buffer(&mut buffer), COLLECT_BLOCK_BUFFER_LEN);
for i in 0u32..COLLECT_BLOCK_BUFFER_LEN as u32 {
assert_eq!(buffer[i as usize], i + COLLECT_BLOCK_BUFFER_LEN as u32);
}
assert_eq!(postings.fill_buffer(&mut buffer), 9);
}
Expand Down
4 changes: 2 additions & 2 deletions src/query/boolean_query/boolean_weight.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::HashMap;

use crate::docset::BUFFER_LEN;
use crate::docset::COLLECT_BLOCK_BUFFER_LEN;
use crate::index::SegmentReader;
use crate::postings::FreqReadingOption;
use crate::query::explanation::does_not_match;
Expand Down Expand Up @@ -228,7 +228,7 @@ impl<TScoreCombiner: ScoreCombiner + Sync> Weight for BooleanWeight<TScoreCombin
callback: &mut dyn FnMut(&[DocId]),
) -> crate::Result<()> {
let scorer = self.complex_scorer(reader, 1.0, || DoNothingCombiner)?;
let mut buffer = [0u32; BUFFER_LEN];
let mut buffer = [0u32; COLLECT_BLOCK_BUFFER_LEN];

match scorer {
SpecializedScorer::TermUnion(term_scorers) => {
Expand Down
4 changes: 2 additions & 2 deletions src/query/boost_query.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::fmt;

use crate::docset::BUFFER_LEN;
use crate::docset::COLLECT_BLOCK_BUFFER_LEN;
use crate::fastfield::AliveBitSet;
use crate::query::{EnableScoring, Explanation, Query, Scorer, Weight};
use crate::{DocId, DocSet, Score, SegmentReader, Term};
Expand Down Expand Up @@ -105,7 +105,7 @@ impl<S: Scorer> DocSet for BoostScorer<S> {
self.underlying.seek(target)
}

fn fill_buffer(&mut self, buffer: &mut [DocId; BUFFER_LEN]) -> usize {
fn fill_buffer(&mut self, buffer: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN]) -> usize {
self.underlying.fill_buffer(buffer)
}

Expand Down
4 changes: 2 additions & 2 deletions src/query/const_score_query.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::fmt;

use crate::docset::BUFFER_LEN;
use crate::docset::COLLECT_BLOCK_BUFFER_LEN;
use crate::query::{EnableScoring, Explanation, Query, Scorer, Weight};
use crate::{DocId, DocSet, Score, SegmentReader, TantivyError, Term};

Expand Down Expand Up @@ -119,7 +119,7 @@ impl<TDocSet: DocSet> DocSet for ConstScorer<TDocSet> {
self.docset.seek(target)
}

fn fill_buffer(&mut self, buffer: &mut [DocId; BUFFER_LEN]) -> usize {
fn fill_buffer(&mut self, buffer: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN]) -> usize {
self.docset.fill_buffer(buffer)
}

Expand Down
4 changes: 2 additions & 2 deletions src/query/term_query/term_weight.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::term_scorer::TermScorer;
use crate::docset::{DocSet, BUFFER_LEN};
use crate::docset::{DocSet, COLLECT_BLOCK_BUFFER_LEN};
use crate::fieldnorm::FieldNormReader;
use crate::index::SegmentReader;
use crate::postings::SegmentPostings;
Expand Down Expand Up @@ -64,7 +64,7 @@ impl Weight for TermWeight {
callback: &mut dyn FnMut(&[DocId]),
) -> crate::Result<()> {
let mut scorer = self.specialized_scorer(reader, 1.0)?;
let mut buffer = [0u32; BUFFER_LEN];
let mut buffer = [0u32; COLLECT_BLOCK_BUFFER_LEN];
for_each_docset_buffered(&mut scorer, &mut buffer, callback);
Ok(())
}
Expand Down
16 changes: 8 additions & 8 deletions src/query/vec_docset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl HasLen for VecDocSet {
pub mod tests {

use super::*;
use crate::docset::{DocSet, BUFFER_LEN};
use crate::docset::{DocSet, COLLECT_BLOCK_BUFFER_LEN};
use crate::DocId;

#[test]
Expand All @@ -72,16 +72,16 @@ pub mod tests {

#[test]
pub fn test_fill_buffer() {
let doc_ids: Vec<DocId> = (1u32..=(BUFFER_LEN as u32 * 2 + 9)).collect();
let doc_ids: Vec<DocId> = (1u32..=(COLLECT_BLOCK_BUFFER_LEN as u32 * 2 + 9)).collect();
let mut postings = VecDocSet::from(doc_ids);
let mut buffer = [0u32; BUFFER_LEN];
assert_eq!(postings.fill_buffer(&mut buffer), BUFFER_LEN);
for i in 0u32..BUFFER_LEN as u32 {
let mut buffer = [0u32; COLLECT_BLOCK_BUFFER_LEN];
assert_eq!(postings.fill_buffer(&mut buffer), COLLECT_BLOCK_BUFFER_LEN);
for i in 0u32..COLLECT_BLOCK_BUFFER_LEN as u32 {
assert_eq!(buffer[i as usize], i + 1);
}
assert_eq!(postings.fill_buffer(&mut buffer), BUFFER_LEN);
for i in 0u32..BUFFER_LEN as u32 {
assert_eq!(buffer[i as usize], i + 1 + BUFFER_LEN as u32);
assert_eq!(postings.fill_buffer(&mut buffer), COLLECT_BLOCK_BUFFER_LEN);
for i in 0u32..COLLECT_BLOCK_BUFFER_LEN as u32 {
assert_eq!(buffer[i as usize], i + 1 + COLLECT_BLOCK_BUFFER_LEN as u32);
}
assert_eq!(postings.fill_buffer(&mut buffer), 9);
}
Expand Down
6 changes: 3 additions & 3 deletions src/query/weight.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::Scorer;
use crate::docset::BUFFER_LEN;
use crate::docset::COLLECT_BLOCK_BUFFER_LEN;
use crate::index::SegmentReader;
use crate::query::Explanation;
use crate::{DocId, DocSet, Score, TERMINATED};
Expand All @@ -22,7 +22,7 @@ pub(crate) fn for_each_scorer<TScorer: Scorer + ?Sized>(
#[inline]
pub(crate) fn for_each_docset_buffered<T: DocSet + ?Sized>(
docset: &mut T,
buffer: &mut [DocId; BUFFER_LEN],
buffer: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN],
mut callback: impl FnMut(&[DocId]),
) {
loop {
Expand Down Expand Up @@ -105,7 +105,7 @@ pub trait Weight: Send + Sync + 'static {
) -> crate::Result<()> {
let mut docset = self.scorer(reader, 1.0)?;

let mut buffer = [0u32; BUFFER_LEN];
let mut buffer = [0u32; COLLECT_BLOCK_BUFFER_LEN];
for_each_docset_buffered(&mut docset, &mut buffer, callback);
Ok(())
}
Expand Down
Loading