Skip to content

Commit

Permalink
encode some part of posting list as -1 instead of direct values (#2185)
Browse files Browse the repository at this point in the history
* add support for delta-1 encoding posting list

* encode term frequency minus one

* don't emit tf for json integer terms

* make skipreader not pub(crate) mutable
  • Loading branch information
trinity-1686a authored Oct 20, 2023
1 parent c2b0469 commit 0d45892
Show file tree
Hide file tree
Showing 9 changed files with 240 additions and 65 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ uuid = { version = "1.0.0", features = ["v4", "serde"] }
crossbeam-channel = "0.5.4"
rust-stemmers = "1.2.0"
downcast-rs = "1.2.0"
bitpacking = { version = "0.8.4", default-features = false, features = ["bitpacker4x"] }
bitpacking = { git = "https://github.com/quickwit-oss/bitpacking", rev = "f730b75", default-features = false, features = ["bitpacker4x"] }
census = "0.4.0"
rustc-hash = "1.1.0"
thiserror = "1.0.30"
Expand Down
29 changes: 29 additions & 0 deletions src/indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,35 @@ mod tests_mmap {
}
}

#[test]
fn test_json_field_number() {
// this test was added specifically to reach some cases related to using json fields, with
// frequency enabled, to store integers, with enough documents containing a single integer
// that the posting list can be bitpacked.
let mut schema_builder = Schema::builder();

let json_field = schema_builder.add_json_field("json", TEXT);
let index = Index::create_in_ram(schema_builder.build());
let mut index_writer = index.writer_for_tests().unwrap();
for _ in 0..256 {
let json = serde_json::json!({"somekey": 1u64, "otherkey": -2i64});
index_writer.add_document(doc!(json_field=>json)).unwrap();

let json = serde_json::json!({"somekey": "1str", "otherkey": "2str"});
index_writer.add_document(doc!(json_field=>json)).unwrap();
}
index_writer.commit().unwrap();
let reader = index.reader().unwrap();
let searcher = reader.searcher();
assert_eq!(searcher.num_docs(), 512);
let parse_query = QueryParser::for_index(&index, Vec::new());
{
let query = parse_query.parse_query(r"json.somekey:1").unwrap();
let num_docs = searcher.search(&query, &Count).unwrap();
assert_eq!(num_docs, 256);
}
}

#[test]
fn test_json_field_expand_dots_enabled_dot_escape_not_required() {
let mut schema_builder = Schema::builder();
Expand Down
2 changes: 1 addition & 1 deletion src/positions/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl PositionReader {
// that block is bitpacked.
let bit_width = bit_widths[block_rel_id];
self.block_decoder
.uncompress_block_unsorted(compressed_data, bit_width);
.uncompress_block_unsorted(compressed_data, bit_width, false);
} else {
// that block is vint encoded.
self.block_decoder
Expand Down
5 changes: 3 additions & 2 deletions src/positions/serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ impl<W: io::Write> PositionSerializer<W> {
return;
}
if self.block.len() == COMPRESSION_BLOCK_SIZE {
let (bit_width, block_encoded): (u8, &[u8]) =
self.block_encoder.compress_block_unsorted(&self.block[..]);
let (bit_width, block_encoded): (u8, &[u8]) = self
.block_encoder
.compress_block_unsorted(&self.block[..], false);
self.bit_widths.push(bit_width);
self.positions_buffer.extend(block_encoded);
} else {
Expand Down
84 changes: 59 additions & 25 deletions src/postings/block_segment_postings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ fn max_score<I: Iterator<Item = Score>>(mut it: I) -> Option<Score> {
#[derive(Clone)]
pub struct BlockSegmentPostings {
pub(crate) doc_decoder: BlockDecoder,
loaded_offset: usize,
block_loaded: bool,
freq_decoder: BlockDecoder,
freq_reading_option: FreqReadingOption,
block_max_score_cache: Option<Score>,
doc_freq: u32,
data: OwnedBytes,
pub(crate) skip_reader: SkipReader,
skip_reader: SkipReader,
}

fn decode_bitpacked_block(
Expand All @@ -40,10 +40,16 @@ fn decode_bitpacked_block(
doc_offset: DocId,
doc_num_bits: u8,
tf_num_bits: u8,
strict_delta: bool,
) {
let num_consumed_bytes = doc_decoder.uncompress_block_sorted(data, doc_offset, doc_num_bits);
let num_consumed_bytes =
doc_decoder.uncompress_block_sorted(data, doc_offset, doc_num_bits, strict_delta);
if let Some(freq_decoder) = freq_decoder_opt {
freq_decoder.uncompress_block_unsorted(&data[num_consumed_bytes..], tf_num_bits);
freq_decoder.uncompress_block_unsorted(
&data[num_consumed_bytes..],
tf_num_bits,
strict_delta,
);
}
}

Expand All @@ -57,11 +63,15 @@ fn decode_vint_block(
let num_consumed_bytes =
doc_decoder.uncompress_vint_sorted(data, doc_offset, num_vint_docs, TERMINATED);
if let Some(freq_decoder) = freq_decoder_opt {
freq_decoder.uncompress_vint_unsorted(
&data[num_consumed_bytes..],
num_vint_docs,
TERMINATED,
);
// if it's a json term with freq, containing less than 256 docs, we can reach here thinking
// we have a freq, despite not really having one.
if data.len() > num_consumed_bytes {
freq_decoder.uncompress_vint_unsorted(
&data[num_consumed_bytes..],
num_vint_docs,
TERMINATED,
);
}
}
}

Expand All @@ -78,28 +88,46 @@ fn split_into_skips_and_postings(
}

impl BlockSegmentPostings {
/// Opens a `BlockSegmentPostings`.
/// `doc_freq` is the number of documents in the posting list.
/// `record_option` represents the amount of data available according to the schema.
/// `requested_option` is the amount of data requested by the user.
/// If for instance, we do not request for term frequencies, this function will not decompress
/// term frequency blocks.
pub(crate) fn open(
doc_freq: u32,
data: FileSlice,
record_option: IndexRecordOption,
mut record_option: IndexRecordOption,
requested_option: IndexRecordOption,
) -> io::Result<BlockSegmentPostings> {
let freq_reading_option = match (record_option, requested_option) {
(IndexRecordOption::Basic, _) => FreqReadingOption::NoFreq,
(_, IndexRecordOption::Basic) => FreqReadingOption::SkipFreq,
(_, _) => FreqReadingOption::ReadFreq,
};

let bytes = data.read_bytes()?;
let (skip_data_opt, postings_data) = split_into_skips_and_postings(doc_freq, bytes)?;
let skip_reader = match skip_data_opt {
Some(skip_data) => SkipReader::new(skip_data, doc_freq, record_option),
Some(skip_data) => {
let block_count = doc_freq as usize / COMPRESSION_BLOCK_SIZE;
// 8 is the minimum size of a block with frequency (can be more if pos are stored
// too)
if skip_data.len() < 8 * block_count {
// the field might be encoded with frequency, but this term in particular isn't.
// This can happen for JSON field with term frequencies:
// - text terms are encoded with term freqs.
// - numerical terms are encoded without term freqs.
record_option = IndexRecordOption::Basic;
}
SkipReader::new(skip_data, doc_freq, record_option)
}
None => SkipReader::new(OwnedBytes::empty(), doc_freq, record_option),
};

let freq_reading_option = match (record_option, requested_option) {
(IndexRecordOption::Basic, _) => FreqReadingOption::NoFreq,
(_, IndexRecordOption::Basic) => FreqReadingOption::SkipFreq,
(_, _) => FreqReadingOption::ReadFreq,
};

let mut block_segment_postings = BlockSegmentPostings {
doc_decoder: BlockDecoder::with_val(TERMINATED),
loaded_offset: usize::MAX,
block_loaded: false,
freq_decoder: BlockDecoder::with_val(1),
freq_reading_option,
block_max_score_cache: None,
Expand Down Expand Up @@ -169,7 +197,7 @@ impl BlockSegmentPostings {
split_into_skips_and_postings(doc_freq, postings_data)?;
self.data = postings_data;
self.block_max_score_cache = None;
self.loaded_offset = usize::MAX;
self.block_loaded = false;
if let Some(skip_data) = skip_data_opt {
self.skip_reader.reset(skip_data, doc_freq);
} else {
Expand Down Expand Up @@ -265,22 +293,23 @@ impl BlockSegmentPostings {
pub(crate) fn shallow_seek(&mut self, target_doc: DocId) {
if self.skip_reader.seek(target_doc) {
self.block_max_score_cache = None;
self.block_loaded = false;
}
}

pub(crate) fn block_is_loaded(&self) -> bool {
self.loaded_offset == self.skip_reader.byte_offset()
self.block_loaded
}

pub(crate) fn load_block(&mut self) {
let offset = self.skip_reader.byte_offset();
if self.loaded_offset == offset {
if self.block_is_loaded() {
return;
}
self.loaded_offset = offset;
match self.skip_reader.block_info() {
BlockInfo::BitPacked {
doc_num_bits,
strict_delta_encoded,
tf_num_bits,
..
} => {
Expand All @@ -295,6 +324,7 @@ impl BlockSegmentPostings {
self.skip_reader.last_doc_in_previous_block,
doc_num_bits,
tf_num_bits,
strict_delta_encoded,
);
}
BlockInfo::VInt { num_docs } => {
Expand All @@ -318,13 +348,13 @@ impl BlockSegmentPostings {
);
}
}
self.block_loaded = true;
}

/// Advance to the next block.
///
/// Returns false if and only if there is no remaining block.
pub fn advance(&mut self) {
self.skip_reader.advance();
self.block_loaded = false;
self.block_max_score_cache = None;
self.load_block();
}
Expand All @@ -333,7 +363,7 @@ impl BlockSegmentPostings {
pub fn empty() -> BlockSegmentPostings {
BlockSegmentPostings {
doc_decoder: BlockDecoder::with_val(TERMINATED),
loaded_offset: 0,
block_loaded: true,
freq_decoder: BlockDecoder::with_val(1),
freq_reading_option: FreqReadingOption::NoFreq,
block_max_score_cache: None,
Expand All @@ -342,6 +372,10 @@ impl BlockSegmentPostings {
skip_reader: SkipReader::new(OwnedBytes::empty(), 0, IndexRecordOption::Basic),
}
}

pub(crate) fn skip_reader(&self) -> &SkipReader {
&self.skip_reader
}
}

#[cfg(test)]
Expand Down
Loading

0 comments on commit 0d45892

Please sign in to comment.