Skip to content

Commit

Permalink
docid deltas while indexing
Browse files Browse the repository at this point in the history
storing deltas is especially helpful for repetitive data like logs.
In those cases, recording a doc on a term costed 4 bytes instead of 1
byte now.

HDFS Indexing 1.1GB Total memory consumption:
Before:  760 MB
Now:     590 MB
  • Loading branch information
PSeitz committed Nov 12, 2023
1 parent 4837c78 commit c11ddec
Showing 1 changed file with 43 additions and 15 deletions.
58 changes: 43 additions & 15 deletions src/postings/recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,14 @@ impl Recorder for DocIdRecorder {

#[inline]
fn new_doc(&mut self, doc: DocId, arena: &mut MemoryArena) {
let delta = if self.current_doc == u32::MAX {
doc
} else {
doc - self.current_doc
};

self.current_doc = doc;
self.stack.writer(arena).write_u32_vint(doc);
self.stack.writer(arena).write_u32_vint(delta);
}

#[inline]
Expand All @@ -125,18 +131,22 @@ impl Recorder for DocIdRecorder {
let (buffer, doc_ids) = buffer_lender.lend_all();
self.stack.read_to_end(arena, buffer);
// TODO avoid reading twice.
let mut prev_doc = 0;
if let Some(doc_id_map) = doc_id_map {
doc_ids.extend(
VInt32Reader::new(&buffer[..])
.map(|old_doc_id| doc_id_map.get_new_doc_id(old_doc_id)),
);
doc_ids.extend(VInt32Reader::new(&buffer[..]).map(|delta_doc_id| {
let old_doc_id = prev_doc + delta_doc_id;
prev_doc = old_doc_id;
doc_id_map.get_new_doc_id(old_doc_id)
}));
doc_ids.sort_unstable();

for doc in doc_ids {
serializer.write_doc(*doc, 0u32, &[][..]);
}
} else {
for doc in VInt32Reader::new(&buffer[..]) {
for delta_doc_id in VInt32Reader::new(&buffer[..]) {
let doc = prev_doc + delta_doc_id;
prev_doc = doc;
serializer.write_doc(doc, 0u32, &[][..]);
}
}
Expand Down Expand Up @@ -164,9 +174,14 @@ impl Recorder for TermFrequencyRecorder {

#[inline]
fn new_doc(&mut self, doc: DocId, arena: &mut MemoryArena) {
let delta = if self.current_doc == u32::MAX {
doc
} else {
doc - self.current_doc
};
self.term_doc_freq += 1;
self.current_doc = doc;
self.stack.writer(arena).write_u32_vint(doc);
self.stack.writer(arena).write_u32_vint(delta);
}

#[inline]
Expand All @@ -191,21 +206,26 @@ impl Recorder for TermFrequencyRecorder {
let buffer = buffer_lender.lend_u8();
self.stack.read_to_end(arena, buffer);
let mut u32_it = VInt32Reader::new(&buffer[..]);
let mut prev_doc = 0;
if let Some(doc_id_map) = doc_id_map {
let mut doc_id_and_tf = vec![];
while let Some(old_doc_id) = u32_it.next() {
while let Some(delta_doc_id) = u32_it.next() {
let doc_id = prev_doc + delta_doc_id;
prev_doc = doc_id;
let term_freq = u32_it.next().unwrap_or(self.current_tf);
doc_id_and_tf.push((doc_id_map.get_new_doc_id(old_doc_id), term_freq));
doc_id_and_tf.push((doc_id_map.get_new_doc_id(doc_id), term_freq));
}
doc_id_and_tf.sort_unstable_by_key(|&(doc_id, _)| doc_id);

for (doc_id, tf) in doc_id_and_tf {
serializer.write_doc(doc_id, tf, &[][..]);
}
} else {
while let Some(doc) = u32_it.next() {
while let Some(delta_doc_id) = u32_it.next() {
let doc_id = prev_doc + delta_doc_id;
prev_doc = doc_id;
let term_freq = u32_it.next().unwrap_or(self.current_tf);
serializer.write_doc(doc, term_freq, &[][..]);
serializer.write_doc(doc_id, term_freq, &[][..]);
}
}
}
Expand Down Expand Up @@ -241,9 +261,14 @@ impl Recorder for TfAndPositionRecorder {

#[inline]
fn new_doc(&mut self, doc: DocId, arena: &mut MemoryArena) {
let delta = if self.current_doc == u32::MAX {
doc
} else {
doc - self.current_doc
};
self.current_doc = doc;
self.term_doc_freq += 1u32;
self.stack.writer(arena).write_u32_vint(doc);
self.stack.writer(arena).write_u32_vint(delta);
}

#[inline]
Expand All @@ -269,7 +294,10 @@ impl Recorder for TfAndPositionRecorder {
self.stack.read_to_end(arena, buffer_u8);
let mut u32_it = VInt32Reader::new(&buffer_u8[..]);
let mut doc_id_and_positions = vec![];
while let Some(doc) = u32_it.next() {
let mut prev_doc = 0;
while let Some(delta_doc_id) = u32_it.next() {
let doc_id = prev_doc + delta_doc_id;
prev_doc = doc_id;
let mut prev_position_plus_one = 1u32;
buffer_positions.clear();
loop {
Expand All @@ -287,9 +315,9 @@ impl Recorder for TfAndPositionRecorder {
if let Some(doc_id_map) = doc_id_map {
// this simple variant to remap may consume to much memory
doc_id_and_positions
.push((doc_id_map.get_new_doc_id(doc), buffer_positions.to_vec()));
.push((doc_id_map.get_new_doc_id(doc_id), buffer_positions.to_vec()));
} else {
serializer.write_doc(doc, buffer_positions.len() as u32, buffer_positions);
serializer.write_doc(doc_id, buffer_positions.len() as u32, buffer_positions);
}
}
if doc_id_map.is_some() {
Expand Down

0 comments on commit c11ddec

Please sign in to comment.