From a81c823836bdc195271f775429314f4c8850bae0 Mon Sep 17 00:00:00 2001 From: PSeitz Date: Mon, 13 Nov 2023 05:14:27 +0100 Subject: [PATCH] docid deltas while indexing (#2249) * docid deltas while indexing storing deltas is especially helpful for repetitive data like logs. In those cases, recording a doc on a term costed 4 bytes instead of 1 byte now. HDFS Indexing 1.1GB Total memory consumption: Before: 760 MB Now: 590 MB * use scan for delta decoding --- src/postings/recorder.rs | 77 ++++++++++++++++++++-------------------- 1 file changed, 39 insertions(+), 38 deletions(-) diff --git a/src/postings/recorder.rs b/src/postings/recorder.rs index bc65010d91..9620f155b6 100644 --- a/src/postings/recorder.rs +++ b/src/postings/recorder.rs @@ -82,21 +82,12 @@ pub(crate) trait Recorder: Copy + Default + Send + Sync + 'static { } /// Only records the doc ids -#[derive(Clone, Copy)] +#[derive(Clone, Copy, Default)] pub struct DocIdRecorder { stack: ExpUnrolledLinkedList, current_doc: DocId, } -impl Default for DocIdRecorder { - fn default() -> Self { - DocIdRecorder { - stack: ExpUnrolledLinkedList::default(), - current_doc: u32::MAX, - } - } -} - impl Recorder for DocIdRecorder { #[inline] fn current_doc(&self) -> DocId { @@ -105,8 +96,9 @@ impl Recorder for DocIdRecorder { #[inline] fn new_doc(&mut self, doc: DocId, arena: &mut MemoryArena) { + let delta = doc - self.current_doc; self.current_doc = doc; - self.stack.writer(arena).write_u32_vint(doc); + self.stack.writer(arena).write_u32_vint(delta); } #[inline] @@ -123,21 +115,20 @@ impl Recorder for DocIdRecorder { buffer_lender: &mut BufferLender, ) { let (buffer, doc_ids) = buffer_lender.lend_all(); - self.stack.read_to_end(arena, buffer); // TODO avoid reading twice. + self.stack.read_to_end(arena, buffer); if let Some(doc_id_map) = doc_id_map { - doc_ids.extend( - VInt32Reader::new(&buffer[..]) - .map(|old_doc_id| doc_id_map.get_new_doc_id(old_doc_id)), - ); + let iter = get_sum_reader(VInt32Reader::new(&buffer[..])); + doc_ids.extend(iter.map(|old_doc_id| doc_id_map.get_new_doc_id(old_doc_id))); doc_ids.sort_unstable(); for doc in doc_ids { serializer.write_doc(*doc, 0u32, &[][..]); } } else { - for doc in VInt32Reader::new(&buffer[..]) { - serializer.write_doc(doc, 0u32, &[][..]); + let iter = get_sum_reader(VInt32Reader::new(&buffer[..])); + for doc_id in iter { + serializer.write_doc(doc_id, 0u32, &[][..]); } } } @@ -147,6 +138,15 @@ impl Recorder for DocIdRecorder { } } +/// Takes an Iterator of delta encoded elements and returns an iterator +/// that yields the sum of the elements. +fn get_sum_reader(iter: impl Iterator) -> impl Iterator { + iter.scan(0, |state, delta| { + *state += delta; + Some(*state) + }) +} + /// Recorder encoding document ids, and term frequencies #[derive(Clone, Copy, Default)] pub struct TermFrequencyRecorder { @@ -164,9 +164,10 @@ impl Recorder for TermFrequencyRecorder { #[inline] fn new_doc(&mut self, doc: DocId, arena: &mut MemoryArena) { + let delta = doc - self.current_doc; self.term_doc_freq += 1; self.current_doc = doc; - self.stack.writer(arena).write_u32_vint(doc); + self.stack.writer(arena).write_u32_vint(delta); } #[inline] @@ -193,9 +194,12 @@ impl Recorder for TermFrequencyRecorder { let mut u32_it = VInt32Reader::new(&buffer[..]); if let Some(doc_id_map) = doc_id_map { let mut doc_id_and_tf = vec![]; - while let Some(old_doc_id) = u32_it.next() { + let mut prev_doc = 0; + while let Some(delta_doc_id) = u32_it.next() { + let doc_id = prev_doc + delta_doc_id; + prev_doc = doc_id; let term_freq = u32_it.next().unwrap_or(self.current_tf); - doc_id_and_tf.push((doc_id_map.get_new_doc_id(old_doc_id), term_freq)); + doc_id_and_tf.push((doc_id_map.get_new_doc_id(doc_id), term_freq)); } doc_id_and_tf.sort_unstable_by_key(|&(doc_id, _)| doc_id); @@ -203,9 +207,12 @@ impl Recorder for TermFrequencyRecorder { serializer.write_doc(doc_id, tf, &[][..]); } } else { - while let Some(doc) = u32_it.next() { + let mut prev_doc = 0; + while let Some(delta_doc_id) = u32_it.next() { + let doc_id = prev_doc + delta_doc_id; + prev_doc = doc_id; let term_freq = u32_it.next().unwrap_or(self.current_tf); - serializer.write_doc(doc, term_freq, &[][..]); + serializer.write_doc(doc_id, term_freq, &[][..]); } } } @@ -216,23 +223,13 @@ impl Recorder for TermFrequencyRecorder { } /// Recorder encoding term frequencies as well as positions. -#[derive(Clone, Copy)] +#[derive(Clone, Copy, Default)] pub struct TfAndPositionRecorder { stack: ExpUnrolledLinkedList, current_doc: DocId, term_doc_freq: u32, } -impl Default for TfAndPositionRecorder { - fn default() -> Self { - TfAndPositionRecorder { - stack: ExpUnrolledLinkedList::default(), - current_doc: u32::MAX, - term_doc_freq: 0u32, - } - } -} - impl Recorder for TfAndPositionRecorder { #[inline] fn current_doc(&self) -> DocId { @@ -241,9 +238,10 @@ impl Recorder for TfAndPositionRecorder { #[inline] fn new_doc(&mut self, doc: DocId, arena: &mut MemoryArena) { + let delta = doc - self.current_doc; self.current_doc = doc; self.term_doc_freq += 1u32; - self.stack.writer(arena).write_u32_vint(doc); + self.stack.writer(arena).write_u32_vint(delta); } #[inline] @@ -269,7 +267,10 @@ impl Recorder for TfAndPositionRecorder { self.stack.read_to_end(arena, buffer_u8); let mut u32_it = VInt32Reader::new(&buffer_u8[..]); let mut doc_id_and_positions = vec![]; - while let Some(doc) = u32_it.next() { + let mut prev_doc = 0; + while let Some(delta_doc_id) = u32_it.next() { + let doc_id = prev_doc + delta_doc_id; + prev_doc = doc_id; let mut prev_position_plus_one = 1u32; buffer_positions.clear(); loop { @@ -287,9 +288,9 @@ impl Recorder for TfAndPositionRecorder { if let Some(doc_id_map) = doc_id_map { // this simple variant to remap may consume to much memory doc_id_and_positions - .push((doc_id_map.get_new_doc_id(doc), buffer_positions.to_vec())); + .push((doc_id_map.get_new_doc_id(doc_id), buffer_positions.to_vec())); } else { - serializer.write_doc(doc, buffer_positions.len() as u32, buffer_positions); + serializer.write_doc(doc_id, buffer_positions.len() as u32, buffer_positions); } } if doc_id_map.is_some() {