Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor readability refactoring in the SegmentDocIdMapping #1451

Merged
merged 1 commit into from
Aug 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 18 additions & 26 deletions src/indexer/doc_id_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,42 @@
//! to get mappings from old doc_id to new doc_id and vice versa, after sorting

use std::cmp::Reverse;
use std::ops::Index;

use super::SegmentWriter;
use crate::schema::{Field, Schema};
use crate::{DocId, IndexSortByField, Order, SegmentOrdinal, TantivyError};
use crate::{DocAddress, DocId, IndexSortByField, Order, TantivyError};

/// Struct to provide mapping from new doc_id to old doc_id and segment.
#[derive(Clone)]
pub(crate) struct SegmentDocIdMapping {
new_doc_id_to_old_and_segment: Vec<(DocId, SegmentOrdinal)>,
new_doc_id_to_old_doc_addr: Vec<DocAddress>,
is_trivial: bool,
}

impl SegmentDocIdMapping {
pub(crate) fn new(
new_doc_id_to_old_and_segment: Vec<(DocId, SegmentOrdinal)>,
is_trivial: bool,
) -> Self {
pub(crate) fn new(new_doc_id_to_old_and_segment: Vec<DocAddress>, is_trivial: bool) -> Self {
Self {
new_doc_id_to_old_and_segment,
new_doc_id_to_old_doc_addr: new_doc_id_to_old_and_segment,
is_trivial,
}
}
pub(crate) fn iter(&self) -> impl Iterator<Item = &(DocId, SegmentOrdinal)> {
self.new_doc_id_to_old_and_segment.iter()

/// Returns an iterator over the old document addresses, ordered by the new document ids.
///
/// In the returned `DocAddress`, the `segment_ord` is the ordinal of targetted segment
/// in the list of merged segments.
pub(crate) fn iter_old_doc_addrs(&self) -> impl Iterator<Item = DocAddress> + '_ {
self.new_doc_id_to_old_doc_addr.iter().copied()
}

pub(crate) fn len(&self) -> usize {
self.new_doc_id_to_old_and_segment.len()
self.new_doc_id_to_old_doc_addr.len()
}

pub(crate) fn get_old_doc_addr(&self, new_doc_id: DocId) -> DocAddress {
self.new_doc_id_to_old_doc_addr[new_doc_id as usize]
}

/// This flags means the segments are simply stacked in the order of their ordinal.
/// e.g. [(0, 1), .. (n, 1), (0, 2)..., (m, 2)]
///
Expand All @@ -39,21 +46,6 @@ impl SegmentDocIdMapping {
self.is_trivial
}
}
impl Index<usize> for SegmentDocIdMapping {
type Output = (DocId, SegmentOrdinal);

fn index(&self, idx: usize) -> &Self::Output {
&self.new_doc_id_to_old_and_segment[idx]
}
}
impl IntoIterator for SegmentDocIdMapping {
type Item = (DocId, SegmentOrdinal);
type IntoIter = std::vec::IntoIter<Self::Item>;

fn into_iter(self) -> Self::IntoIter {
self.new_doc_id_to_old_and_segment.into_iter()
}
}

/// Struct to provide mapping from old doc_id to new doc_id and vice versa within a segment.
pub struct DocIdMapping {
Expand Down
127 changes: 67 additions & 60 deletions src/indexer/merger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use crate::schema::{Cardinality, Field, FieldType, Schema};
use crate::store::StoreWriter;
use crate::termdict::{TermMerger, TermOrdinal};
use crate::{
DocId, IndexSettings, IndexSortByField, InvertedIndexReader, Order, SegmentComponent,
SegmentOrdinal,
DocAddress, DocId, IndexSettings, IndexSortByField, InvertedIndexReader, Order,
SegmentComponent, SegmentOrdinal,
};

/// Segment's max doc must be `< MAX_DOC_LIMIT`.
Expand Down Expand Up @@ -260,9 +260,9 @@ impl IndexMerger {
.iter()
.map(|reader| reader.get_fieldnorms_reader(field))
.collect::<Result<_, _>>()?;
for (doc_id, reader_ordinal) in doc_id_mapping.iter() {
let fieldnorms_reader = &fieldnorms_readers[*reader_ordinal as usize];
let fieldnorm_id = fieldnorms_reader.fieldnorm_id(*doc_id);
for old_doc_addr in doc_id_mapping.iter_old_doc_addrs() {
let fieldnorms_reader = &fieldnorms_readers[old_doc_addr.segment_ord as usize];
let fieldnorm_id = fieldnorms_reader.fieldnorm_id(old_doc_addr.doc_id);
fieldnorms_data.push(fieldnorm_id);
}

Expand Down Expand Up @@ -377,18 +377,21 @@ impl IndexMerger {
}
impl<'a> FastFieldDataAccess for SortedDocIdFieldAccessProvider<'a> {
fn get_val(&self, doc: u64) -> u64 {
let (doc_id, reader_ordinal) = self.doc_id_mapping[doc as usize];
self.fast_field_readers[reader_ordinal as usize].get(doc_id)
let DocAddress {
doc_id,
segment_ord,
} = self.doc_id_mapping.get_old_doc_addr(doc as u32);
self.fast_field_readers[segment_ord as usize].get(doc_id)
}
}
let fastfield_accessor = SortedDocIdFieldAccessProvider {
doc_id_mapping,
fast_field_readers: &fast_field_readers,
};
let iter_gen = || {
doc_id_mapping.iter().map(|(doc_id, reader_ordinal)| {
let fast_field_reader = &fast_field_readers[*reader_ordinal as usize];
fast_field_reader.get(*doc_id)
doc_id_mapping.iter_old_doc_addrs().map(|old_doc_addr| {
let fast_field_reader = &fast_field_readers[old_doc_addr.segment_ord as usize];
fast_field_reader.get(old_doc_addr.doc_id)
})
};
fast_field_serializer.create_auto_detect_u64_fast_field(
Expand Down Expand Up @@ -469,15 +472,11 @@ impl IndexMerger {
let doc_id_reader_pair =
reader_ordinal_and_field_accessors
.iter()
.map(|reader_and_field_accessor| {
let reader = &self.readers[reader_and_field_accessor.0 as usize];
reader.doc_ids_alive().map(move |doc_id| {
(
doc_id,
reader_and_field_accessor.0,
&reader_and_field_accessor.1,
)
})
.map(|(reader_ord, ff_reader)| {
let reader = &self.readers[*reader_ord as usize];
reader
.doc_ids_alive()
.map(move |doc_id| (doc_id, reader_ord, ff_reader))
});

let total_num_new_docs = self
Expand All @@ -486,7 +485,7 @@ impl IndexMerger {
.map(|reader| reader.num_docs() as usize)
.sum();

let mut sorted_doc_ids = Vec::with_capacity(total_num_new_docs);
let mut sorted_doc_ids: Vec<DocAddress> = Vec::with_capacity(total_num_new_docs);

// create iterator tuple of (old doc_id, reader) in order of the new doc_ids
sorted_doc_ids.extend(
Expand All @@ -501,7 +500,10 @@ impl IndexMerger {
val1 > val2
}
})
.map(|(doc_id, reader_with_id, _)| (doc_id, reader_with_id)),
.map(|(doc_id, &segment_ord, _)| DocAddress {
doc_id,
segment_ord,
}),
);
Ok(SegmentDocIdMapping::new(sorted_doc_ids, false))
}
Expand Down Expand Up @@ -550,10 +552,10 @@ impl IndexMerger {

let mut offsets = Vec::with_capacity(doc_id_mapping.len());
let mut offset = 0;
for (doc_id, reader) in doc_id_mapping.iter() {
let reader = &reader_and_field_accessors[*reader as usize].1;
for old_doc_addr in doc_id_mapping.iter_old_doc_addrs() {
let reader = &reader_and_field_accessors[old_doc_addr.segment_ord as usize].1;
offsets.push(offset);
offset += reader.get_len(*doc_id) as u64;
offset += reader.get_len(old_doc_addr.doc_id) as u64;
}
offsets.push(offset);

Expand Down Expand Up @@ -631,12 +633,12 @@ impl IndexMerger {
fast_field_serializer.new_u64_fast_field_with_idx(field, 0u64, max_term_ord, 1)?;
let mut vals = Vec::with_capacity(100);

for (old_doc_id, reader_ordinal) in doc_id_mapping.iter() {
for old_doc_addr in doc_id_mapping.iter_old_doc_addrs() {
let term_ordinal_mapping: &[TermOrdinal] =
term_ordinal_mappings.get_segment(*reader_ordinal as usize);
term_ordinal_mappings.get_segment(old_doc_addr.segment_ord as usize);

let ff_reader = &fast_field_reader[*reader_ordinal as usize];
ff_reader.get_vals(*old_doc_id, &mut vals);
let ff_reader = &fast_field_reader[old_doc_addr.segment_ord as usize];
ff_reader.get_vals(old_doc_addr.doc_id, &mut vals);
for &prev_term_ord in &vals {
let new_term_ord = term_ordinal_mapping[prev_term_ord as usize];
serialize_vals.add_val(new_term_ord)?;
Expand All @@ -657,16 +659,17 @@ impl IndexMerger {
.map(|reader| reader.num_docs() as usize)
.sum();

let mut mapping = Vec::with_capacity(total_num_new_docs);
let mut mapping: Vec<DocAddress> = Vec::with_capacity(total_num_new_docs);

mapping.extend(
self.readers
.iter()
.enumerate()
.flat_map(|(reader_ordinal, reader)| {
reader
.doc_ids_alive()
.map(move |doc_id| (doc_id, reader_ordinal as SegmentOrdinal))
.flat_map(|(segment_ord, reader)| {
reader.doc_ids_alive().map(move |doc_id| DocAddress {
segment_ord: segment_ord as u32,
doc_id,
})
}),
);
Ok(SegmentDocIdMapping::new(mapping, true))
Expand Down Expand Up @@ -740,22 +743,24 @@ impl IndexMerger {
fn get_val(&self, pos: u64) -> u64 {
// use the offsets index to find the doc_id which will contain the position.
// the offsets are strictly increasing so we can do a simple search on it.
let new_doc_id = self
.offsets
.iter()
.position(|&offset| offset > pos)
.expect("pos is out of bounds")
- 1;
let new_doc_id: DocId =
self.offsets
.iter()
.position(|&offset| offset > pos)
.expect("pos is out of bounds") as DocId
- 1u32;

// now we need to find the position of `pos` in the multivalued bucket
let num_pos_covered_until_now = self.offsets[new_doc_id];
let num_pos_covered_until_now = self.offsets[new_doc_id as usize];
let pos_in_values = pos - num_pos_covered_until_now;

let (old_doc_id, reader_ordinal) = self.doc_id_mapping[new_doc_id as usize];
let num_vals = self.fast_field_readers[reader_ordinal as usize].get_len(old_doc_id);
let old_doc_addr = self.doc_id_mapping.get_old_doc_addr(new_doc_id);
let num_vals = self.fast_field_readers[old_doc_addr.segment_ord as usize]
.get_len(old_doc_addr.doc_id);
assert!(num_vals >= pos_in_values);
let mut vals = vec![];
self.fast_field_readers[reader_ordinal as usize].get_vals(old_doc_id, &mut vals);
let mut vals = Vec::new();
self.fast_field_readers[old_doc_addr.segment_ord as usize]
.get_vals(old_doc_addr.doc_id, &mut vals);

vals[pos_in_values as usize]
}
Expand All @@ -766,12 +771,14 @@ impl IndexMerger {
offsets,
};
let iter_gen = || {
doc_id_mapping.iter().flat_map(|(doc_id, reader_ordinal)| {
let ff_reader = &ff_readers[*reader_ordinal as usize];
let mut vals = vec![];
ff_reader.get_vals(*doc_id, &mut vals);
vals.into_iter()
})
doc_id_mapping
.iter_old_doc_addrs()
.flat_map(|old_doc_addr| {
let ff_reader = &ff_readers[old_doc_addr.segment_ord as usize];
let mut vals = Vec::new();
ff_reader.get_vals(old_doc_addr.doc_id, &mut vals);
vals.into_iter()
})
};
fast_field_serializer.create_auto_detect_u64_fast_field_with_idx(
field,
Expand Down Expand Up @@ -810,9 +817,9 @@ impl IndexMerger {
)?;
let mut serialize_vals = fast_field_serializer.new_bytes_fast_field_with_idx(field, 1);

for (doc_id, reader_ordinal) in doc_id_mapping.iter() {
let bytes_reader = &reader_and_field_accessors[*reader_ordinal as usize].1;
let val = bytes_reader.get_bytes(*doc_id);
for old_doc_addr in doc_id_mapping.iter_old_doc_addrs() {
let bytes_reader = &reader_and_field_accessors[old_doc_addr.segment_ord as usize].1;
let val = bytes_reader.get_bytes(old_doc_addr.doc_id);
serialize_vals.write_all(val)?;
}

Expand Down Expand Up @@ -868,9 +875,9 @@ impl IndexMerger {
segment_local_map
})
.collect();
for (new_doc_id, (old_doc_id, segment_ord)) in doc_id_mapping.iter().enumerate() {
let segment_map = &mut merged_doc_id_map[*segment_ord as usize];
segment_map[*old_doc_id as usize] = Some(new_doc_id as DocId);
for (new_doc_id, old_doc_addr) in doc_id_mapping.iter_old_doc_addrs().enumerate() {
let segment_map = &mut merged_doc_id_map[old_doc_addr.segment_ord as usize];
segment_map[old_doc_addr.doc_id as usize] = Some(new_doc_id as DocId);
}

// Note that the total number of tokens is not exact.
Expand Down Expand Up @@ -1045,15 +1052,15 @@ impl IndexMerger {
.map(|(i, store)| store.iter_raw(self.readers[i].alive_bitset()))
.collect();

for (old_doc_id, reader_ordinal) in doc_id_mapping.iter() {
let doc_bytes_it = &mut document_iterators[*reader_ordinal as usize];
for old_doc_addr in doc_id_mapping.iter_old_doc_addrs() {
let doc_bytes_it = &mut document_iterators[old_doc_addr.segment_ord as usize];
if let Some(doc_bytes_res) = doc_bytes_it.next() {
let doc_bytes = doc_bytes_res?;
store_writer.store_bytes(&doc_bytes)?;
} else {
return Err(DataCorruption::comment_only(&format!(
"unexpected missing document in docstore on merge, doc id {:?}",
old_doc_id
"unexpected missing document in docstore on merge, doc address \
{old_doc_addr:?}",
))
.into());
}
Expand Down
15 changes: 7 additions & 8 deletions src/indexer/merger_sorted_index_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ mod bench_sorted_index_merge {
// use cratedoc_id, readerdoc_id_mappinglet vals = reader.fate::schema;
use crate::fastfield::{DynamicFastFieldReader, FastFieldReader};
use crate::indexer::merger::IndexMerger;
use crate::schema::{Cardinality, Document, NumericOptions, Schema};
use crate::schema::{Cardinality, NumericOptions, Schema};
use crate::{IndexSettings, IndexSortByField, IndexWriter, Order};
fn create_index(sort_by_field: Option<IndexSortByField>) -> Index {
let mut schema_builder = Schema::builder();
Expand All @@ -503,9 +503,7 @@ mod bench_sorted_index_merge {
{
let mut index_writer = index.writer_for_tests().unwrap();
let index_doc = |index_writer: &mut IndexWriter, val: u64| {
let mut doc = Document::default();
doc.add_u64(int_field, val);
index_writer.add_document(doc).unwrap();
index_writer.add_document(doc!(int_field=>val)).unwrap();
};
// 3 segments with 10_000 values in the fast fields
for _ in 0..3 {
Expand All @@ -518,6 +516,7 @@ mod bench_sorted_index_merge {
}
index
}

#[bench]
fn create_sorted_index_walk_overkmerge_on_merge_fastfield(
b: &mut Bencher,
Expand All @@ -533,19 +532,19 @@ mod bench_sorted_index_merge {
IndexMerger::open(index.schema(), index.settings().clone(), &segments[..])?;
let doc_id_mapping = merger.generate_doc_id_mapping(&sort_by_field).unwrap();
b.iter(|| {
let sorted_doc_ids = doc_id_mapping.iter().map(|(doc_id, ordinal)| {
let reader = &merger.readers[*ordinal as usize];
let sorted_doc_ids = doc_id_mapping.iter_old_doc_addrs().map(|doc_addr| {
let reader = &merger.readers[doc_addr.segment_ord as usize];
let u64_reader: DynamicFastFieldReader<u64> =
reader.fast_fields().typed_fast_field_reader(field).expect(
"Failed to find a reader for single fast field. This is a tantivy bug and \
it should never happen.",
);
(doc_id, reader, u64_reader)
(doc_addr.doc_id, reader, u64_reader)
});
// add values in order of the new doc_ids
let mut val = 0;
for (doc_id, _reader, field_reader) in sorted_doc_ids {
val = field_reader.get(*doc_id);
val = field_reader.get(doc_id);
}

val
Expand Down