Skip to content

Commit

Permalink
Minor readability refactoring in the SegmentDocIdMapping (#1451)
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton authored Aug 22, 2022
1 parent 8edcd6f commit 7f9ba0e
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 94 deletions.
44 changes: 18 additions & 26 deletions src/indexer/doc_id_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,42 @@
//! to get mappings from old doc_id to new doc_id and vice versa, after sorting
use std::cmp::Reverse;
use std::ops::Index;

use super::SegmentWriter;
use crate::schema::{Field, Schema};
use crate::{DocId, IndexSortByField, Order, SegmentOrdinal, TantivyError};
use crate::{DocAddress, DocId, IndexSortByField, Order, TantivyError};

/// Struct to provide mapping from new doc_id to old doc_id and segment.
#[derive(Clone)]
pub(crate) struct SegmentDocIdMapping {
new_doc_id_to_old_and_segment: Vec<(DocId, SegmentOrdinal)>,
new_doc_id_to_old_doc_addr: Vec<DocAddress>,
is_trivial: bool,
}

impl SegmentDocIdMapping {
pub(crate) fn new(
new_doc_id_to_old_and_segment: Vec<(DocId, SegmentOrdinal)>,
is_trivial: bool,
) -> Self {
pub(crate) fn new(new_doc_id_to_old_and_segment: Vec<DocAddress>, is_trivial: bool) -> Self {
Self {
new_doc_id_to_old_and_segment,
new_doc_id_to_old_doc_addr: new_doc_id_to_old_and_segment,
is_trivial,
}
}
pub(crate) fn iter(&self) -> impl Iterator<Item = &(DocId, SegmentOrdinal)> {
self.new_doc_id_to_old_and_segment.iter()

/// Returns an iterator over the old document addresses, ordered by the new document ids.
///
/// In the returned `DocAddress`, the `segment_ord` is the ordinal of targetted segment
/// in the list of merged segments.
pub(crate) fn iter_old_doc_addrs(&self) -> impl Iterator<Item = DocAddress> + '_ {
self.new_doc_id_to_old_doc_addr.iter().copied()
}

pub(crate) fn len(&self) -> usize {
self.new_doc_id_to_old_and_segment.len()
self.new_doc_id_to_old_doc_addr.len()
}

pub(crate) fn get_old_doc_addr(&self, new_doc_id: DocId) -> DocAddress {
self.new_doc_id_to_old_doc_addr[new_doc_id as usize]
}

/// This flags means the segments are simply stacked in the order of their ordinal.
/// e.g. [(0, 1), .. (n, 1), (0, 2)..., (m, 2)]
///
Expand All @@ -39,21 +46,6 @@ impl SegmentDocIdMapping {
self.is_trivial
}
}
impl Index<usize> for SegmentDocIdMapping {
type Output = (DocId, SegmentOrdinal);

fn index(&self, idx: usize) -> &Self::Output {
&self.new_doc_id_to_old_and_segment[idx]
}
}
impl IntoIterator for SegmentDocIdMapping {
type Item = (DocId, SegmentOrdinal);
type IntoIter = std::vec::IntoIter<Self::Item>;

fn into_iter(self) -> Self::IntoIter {
self.new_doc_id_to_old_and_segment.into_iter()
}
}

/// Struct to provide mapping from old doc_id to new doc_id and vice versa within a segment.
pub struct DocIdMapping {
Expand Down
127 changes: 67 additions & 60 deletions src/indexer/merger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use crate::schema::{Cardinality, Field, FieldType, Schema};
use crate::store::StoreWriter;
use crate::termdict::{TermMerger, TermOrdinal};
use crate::{
DocId, IndexSettings, IndexSortByField, InvertedIndexReader, Order, SegmentComponent,
SegmentOrdinal,
DocAddress, DocId, IndexSettings, IndexSortByField, InvertedIndexReader, Order,
SegmentComponent, SegmentOrdinal,
};

/// Segment's max doc must be `< MAX_DOC_LIMIT`.
Expand Down Expand Up @@ -260,9 +260,9 @@ impl IndexMerger {
.iter()
.map(|reader| reader.get_fieldnorms_reader(field))
.collect::<Result<_, _>>()?;
for (doc_id, reader_ordinal) in doc_id_mapping.iter() {
let fieldnorms_reader = &fieldnorms_readers[*reader_ordinal as usize];
let fieldnorm_id = fieldnorms_reader.fieldnorm_id(*doc_id);
for old_doc_addr in doc_id_mapping.iter_old_doc_addrs() {
let fieldnorms_reader = &fieldnorms_readers[old_doc_addr.segment_ord as usize];
let fieldnorm_id = fieldnorms_reader.fieldnorm_id(old_doc_addr.doc_id);
fieldnorms_data.push(fieldnorm_id);
}

Expand Down Expand Up @@ -377,18 +377,21 @@ impl IndexMerger {
}
impl<'a> FastFieldDataAccess for SortedDocIdFieldAccessProvider<'a> {
fn get_val(&self, doc: u64) -> u64 {
let (doc_id, reader_ordinal) = self.doc_id_mapping[doc as usize];
self.fast_field_readers[reader_ordinal as usize].get(doc_id)
let DocAddress {
doc_id,
segment_ord,
} = self.doc_id_mapping.get_old_doc_addr(doc as u32);
self.fast_field_readers[segment_ord as usize].get(doc_id)
}
}
let fastfield_accessor = SortedDocIdFieldAccessProvider {
doc_id_mapping,
fast_field_readers: &fast_field_readers,
};
let iter_gen = || {
doc_id_mapping.iter().map(|(doc_id, reader_ordinal)| {
let fast_field_reader = &fast_field_readers[*reader_ordinal as usize];
fast_field_reader.get(*doc_id)
doc_id_mapping.iter_old_doc_addrs().map(|old_doc_addr| {
let fast_field_reader = &fast_field_readers[old_doc_addr.segment_ord as usize];
fast_field_reader.get(old_doc_addr.doc_id)
})
};
fast_field_serializer.create_auto_detect_u64_fast_field(
Expand Down Expand Up @@ -469,15 +472,11 @@ impl IndexMerger {
let doc_id_reader_pair =
reader_ordinal_and_field_accessors
.iter()
.map(|reader_and_field_accessor| {
let reader = &self.readers[reader_and_field_accessor.0 as usize];
reader.doc_ids_alive().map(move |doc_id| {
(
doc_id,
reader_and_field_accessor.0,
&reader_and_field_accessor.1,
)
})
.map(|(reader_ord, ff_reader)| {
let reader = &self.readers[*reader_ord as usize];
reader
.doc_ids_alive()
.map(move |doc_id| (doc_id, reader_ord, ff_reader))
});

let total_num_new_docs = self
Expand All @@ -486,7 +485,7 @@ impl IndexMerger {
.map(|reader| reader.num_docs() as usize)
.sum();

let mut sorted_doc_ids = Vec::with_capacity(total_num_new_docs);
let mut sorted_doc_ids: Vec<DocAddress> = Vec::with_capacity(total_num_new_docs);

// create iterator tuple of (old doc_id, reader) in order of the new doc_ids
sorted_doc_ids.extend(
Expand All @@ -501,7 +500,10 @@ impl IndexMerger {
val1 > val2
}
})
.map(|(doc_id, reader_with_id, _)| (doc_id, reader_with_id)),
.map(|(doc_id, &segment_ord, _)| DocAddress {
doc_id,
segment_ord,
}),
);
Ok(SegmentDocIdMapping::new(sorted_doc_ids, false))
}
Expand Down Expand Up @@ -550,10 +552,10 @@ impl IndexMerger {

let mut offsets = Vec::with_capacity(doc_id_mapping.len());
let mut offset = 0;
for (doc_id, reader) in doc_id_mapping.iter() {
let reader = &reader_and_field_accessors[*reader as usize].1;
for old_doc_addr in doc_id_mapping.iter_old_doc_addrs() {
let reader = &reader_and_field_accessors[old_doc_addr.segment_ord as usize].1;
offsets.push(offset);
offset += reader.get_len(*doc_id) as u64;
offset += reader.get_len(old_doc_addr.doc_id) as u64;
}
offsets.push(offset);

Expand Down Expand Up @@ -631,12 +633,12 @@ impl IndexMerger {
fast_field_serializer.new_u64_fast_field_with_idx(field, 0u64, max_term_ord, 1)?;
let mut vals = Vec::with_capacity(100);

for (old_doc_id, reader_ordinal) in doc_id_mapping.iter() {
for old_doc_addr in doc_id_mapping.iter_old_doc_addrs() {
let term_ordinal_mapping: &[TermOrdinal] =
term_ordinal_mappings.get_segment(*reader_ordinal as usize);
term_ordinal_mappings.get_segment(old_doc_addr.segment_ord as usize);

let ff_reader = &fast_field_reader[*reader_ordinal as usize];
ff_reader.get_vals(*old_doc_id, &mut vals);
let ff_reader = &fast_field_reader[old_doc_addr.segment_ord as usize];
ff_reader.get_vals(old_doc_addr.doc_id, &mut vals);
for &prev_term_ord in &vals {
let new_term_ord = term_ordinal_mapping[prev_term_ord as usize];
serialize_vals.add_val(new_term_ord)?;
Expand All @@ -657,16 +659,17 @@ impl IndexMerger {
.map(|reader| reader.num_docs() as usize)
.sum();

let mut mapping = Vec::with_capacity(total_num_new_docs);
let mut mapping: Vec<DocAddress> = Vec::with_capacity(total_num_new_docs);

mapping.extend(
self.readers
.iter()
.enumerate()
.flat_map(|(reader_ordinal, reader)| {
reader
.doc_ids_alive()
.map(move |doc_id| (doc_id, reader_ordinal as SegmentOrdinal))
.flat_map(|(segment_ord, reader)| {
reader.doc_ids_alive().map(move |doc_id| DocAddress {
segment_ord: segment_ord as u32,
doc_id,
})
}),
);
Ok(SegmentDocIdMapping::new(mapping, true))
Expand Down Expand Up @@ -740,22 +743,24 @@ impl IndexMerger {
fn get_val(&self, pos: u64) -> u64 {
// use the offsets index to find the doc_id which will contain the position.
// the offsets are strictly increasing so we can do a simple search on it.
let new_doc_id = self
.offsets
.iter()
.position(|&offset| offset > pos)
.expect("pos is out of bounds")
- 1;
let new_doc_id: DocId =
self.offsets
.iter()
.position(|&offset| offset > pos)
.expect("pos is out of bounds") as DocId
- 1u32;

// now we need to find the position of `pos` in the multivalued bucket
let num_pos_covered_until_now = self.offsets[new_doc_id];
let num_pos_covered_until_now = self.offsets[new_doc_id as usize];
let pos_in_values = pos - num_pos_covered_until_now;

let (old_doc_id, reader_ordinal) = self.doc_id_mapping[new_doc_id as usize];
let num_vals = self.fast_field_readers[reader_ordinal as usize].get_len(old_doc_id);
let old_doc_addr = self.doc_id_mapping.get_old_doc_addr(new_doc_id);
let num_vals = self.fast_field_readers[old_doc_addr.segment_ord as usize]
.get_len(old_doc_addr.doc_id);
assert!(num_vals >= pos_in_values);
let mut vals = vec![];
self.fast_field_readers[reader_ordinal as usize].get_vals(old_doc_id, &mut vals);
let mut vals = Vec::new();
self.fast_field_readers[old_doc_addr.segment_ord as usize]
.get_vals(old_doc_addr.doc_id, &mut vals);

vals[pos_in_values as usize]
}
Expand All @@ -766,12 +771,14 @@ impl IndexMerger {
offsets,
};
let iter_gen = || {
doc_id_mapping.iter().flat_map(|(doc_id, reader_ordinal)| {
let ff_reader = &ff_readers[*reader_ordinal as usize];
let mut vals = vec![];
ff_reader.get_vals(*doc_id, &mut vals);
vals.into_iter()
})
doc_id_mapping
.iter_old_doc_addrs()
.flat_map(|old_doc_addr| {
let ff_reader = &ff_readers[old_doc_addr.segment_ord as usize];
let mut vals = Vec::new();
ff_reader.get_vals(old_doc_addr.doc_id, &mut vals);
vals.into_iter()
})
};
fast_field_serializer.create_auto_detect_u64_fast_field_with_idx(
field,
Expand Down Expand Up @@ -810,9 +817,9 @@ impl IndexMerger {
)?;
let mut serialize_vals = fast_field_serializer.new_bytes_fast_field_with_idx(field, 1);

for (doc_id, reader_ordinal) in doc_id_mapping.iter() {
let bytes_reader = &reader_and_field_accessors[*reader_ordinal as usize].1;
let val = bytes_reader.get_bytes(*doc_id);
for old_doc_addr in doc_id_mapping.iter_old_doc_addrs() {
let bytes_reader = &reader_and_field_accessors[old_doc_addr.segment_ord as usize].1;
let val = bytes_reader.get_bytes(old_doc_addr.doc_id);
serialize_vals.write_all(val)?;
}

Expand Down Expand Up @@ -868,9 +875,9 @@ impl IndexMerger {
segment_local_map
})
.collect();
for (new_doc_id, (old_doc_id, segment_ord)) in doc_id_mapping.iter().enumerate() {
let segment_map = &mut merged_doc_id_map[*segment_ord as usize];
segment_map[*old_doc_id as usize] = Some(new_doc_id as DocId);
for (new_doc_id, old_doc_addr) in doc_id_mapping.iter_old_doc_addrs().enumerate() {
let segment_map = &mut merged_doc_id_map[old_doc_addr.segment_ord as usize];
segment_map[old_doc_addr.doc_id as usize] = Some(new_doc_id as DocId);
}

// Note that the total number of tokens is not exact.
Expand Down Expand Up @@ -1045,15 +1052,15 @@ impl IndexMerger {
.map(|(i, store)| store.iter_raw(self.readers[i].alive_bitset()))
.collect();

for (old_doc_id, reader_ordinal) in doc_id_mapping.iter() {
let doc_bytes_it = &mut document_iterators[*reader_ordinal as usize];
for old_doc_addr in doc_id_mapping.iter_old_doc_addrs() {
let doc_bytes_it = &mut document_iterators[old_doc_addr.segment_ord as usize];
if let Some(doc_bytes_res) = doc_bytes_it.next() {
let doc_bytes = doc_bytes_res?;
store_writer.store_bytes(&doc_bytes)?;
} else {
return Err(DataCorruption::comment_only(&format!(
"unexpected missing document in docstore on merge, doc id {:?}",
old_doc_id
"unexpected missing document in docstore on merge, doc address \
{old_doc_addr:?}",
))
.into());
}
Expand Down
15 changes: 7 additions & 8 deletions src/indexer/merger_sorted_index_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ mod bench_sorted_index_merge {
// use cratedoc_id, readerdoc_id_mappinglet vals = reader.fate::schema;
use crate::fastfield::{DynamicFastFieldReader, FastFieldReader};
use crate::indexer::merger::IndexMerger;
use crate::schema::{Cardinality, Document, NumericOptions, Schema};
use crate::schema::{Cardinality, NumericOptions, Schema};
use crate::{IndexSettings, IndexSortByField, IndexWriter, Order};
fn create_index(sort_by_field: Option<IndexSortByField>) -> Index {
let mut schema_builder = Schema::builder();
Expand All @@ -503,9 +503,7 @@ mod bench_sorted_index_merge {
{
let mut index_writer = index.writer_for_tests().unwrap();
let index_doc = |index_writer: &mut IndexWriter, val: u64| {
let mut doc = Document::default();
doc.add_u64(int_field, val);
index_writer.add_document(doc).unwrap();
index_writer.add_document(doc!(int_field=>val)).unwrap();
};
// 3 segments with 10_000 values in the fast fields
for _ in 0..3 {
Expand All @@ -518,6 +516,7 @@ mod bench_sorted_index_merge {
}
index
}

#[bench]
fn create_sorted_index_walk_overkmerge_on_merge_fastfield(
b: &mut Bencher,
Expand All @@ -533,19 +532,19 @@ mod bench_sorted_index_merge {
IndexMerger::open(index.schema(), index.settings().clone(), &segments[..])?;
let doc_id_mapping = merger.generate_doc_id_mapping(&sort_by_field).unwrap();
b.iter(|| {
let sorted_doc_ids = doc_id_mapping.iter().map(|(doc_id, ordinal)| {
let reader = &merger.readers[*ordinal as usize];
let sorted_doc_ids = doc_id_mapping.iter_old_doc_addrs().map(|doc_addr| {
let reader = &merger.readers[doc_addr.segment_ord as usize];
let u64_reader: DynamicFastFieldReader<u64> =
reader.fast_fields().typed_fast_field_reader(field).expect(
"Failed to find a reader for single fast field. This is a tantivy bug and \
it should never happen.",
);
(doc_id, reader, u64_reader)
(doc_addr.doc_id, reader, u64_reader)
});
// add values in order of the new doc_ids
let mut val = 0;
for (doc_id, _reader, field_reader) in sorted_doc_ids {
val = field_reader.get(*doc_id);
val = field_reader.get(doc_id);
}

val
Expand Down

0 comments on commit 7f9ba0e

Please sign in to comment.