From 6d9a123cf238061e9009ac48afc7b50886d7a0b1 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Tue, 4 Oct 2022 12:01:25 +0800 Subject: [PATCH 1/2] remove get_val in serialization remove get_val in serialization and mark as unimplemented!() replace get_val with iter in linear codec remove MultivalueStartIndexRandomSeeker replace MultivalueStartIndexIter with closure Sample 100 values in linear codec --- fastfield_codecs/src/lib.rs | 2 +- fastfield_codecs/src/line.rs | 38 ++++-- fastfield_codecs/src/linear.rs | 22 ++-- src/fastfield/multivalued/writer.rs | 121 +++++------------- src/fastfield/writer.rs | 11 +- src/indexer/doc_id_mapping.rs | 4 - src/indexer/merger.rs | 28 ++-- src/indexer/sorted_doc_id_column.rs | 18 +-- .../sorted_doc_id_multivalue_column.rs | 37 ++---- 9 files changed, 102 insertions(+), 179 deletions(-) diff --git a/fastfield_codecs/src/lib.rs b/fastfield_codecs/src/lib.rs index 4205a323a5..1f66a27e9b 100644 --- a/fastfield_codecs/src/lib.rs +++ b/fastfield_codecs/src/lib.rs @@ -312,7 +312,7 @@ mod tests { #[test] fn estimation_test_bad_interpolation_case_monotonically_increasing() { - let mut data: Vec = (200..=20000_u64).collect(); + let mut data: Vec = (201..=20000_u64).collect(); data.push(1_000_000); let data: VecColumn = data.as_slice().into(); diff --git a/fastfield_codecs/src/line.rs b/fastfield_codecs/src/line.rs index c1eb558e57..4119da9ee2 100644 --- a/fastfield_codecs/src/line.rs +++ b/fastfield_codecs/src/line.rs @@ -68,29 +68,37 @@ impl Line { } // Same as train, but the intercept is only estimated from provided sample positions - pub fn estimate(ys: &dyn Column, sample_positions: &[u64]) -> Self { + pub fn estimate(sample_positions_and_values: &[(u64, u64)]) -> Self { + let first_val = sample_positions_and_values[0].1; + let last_val = sample_positions_and_values[sample_positions_and_values.len() - 1].1; + let num_vals = sample_positions_and_values[sample_positions_and_values.len() - 1].0 + 1; Self::train_from( - ys, - sample_positions - .iter() - .cloned() - .map(|pos| (pos, ys.get_val(pos))), + first_val, + last_val, + num_vals, + sample_positions_and_values.iter().cloned(), ) } // Intercept is only computed from provided positions - fn train_from(ys: &dyn Column, positions_and_values: impl Iterator) -> Self { - let num_vals = if let Some(num_vals) = NonZeroU64::new(ys.num_vals() - 1) { - num_vals + fn train_from( + first_val: u64, + last_val: u64, + num_vals: u64, + positions_and_values: impl Iterator, + ) -> Self { + // TODO replace with let else + let idx_last_val = if let Some(idx_last_val) = NonZeroU64::new(num_vals - 1) { + idx_last_val } else { return Line::default(); }; - let y0 = ys.get_val(0); - let y1 = ys.get_val(num_vals.get()); + let y0 = first_val; + let y1 = last_val; // We first independently pick our slope. - let slope = compute_slope(y0, y1, num_vals); + let slope = compute_slope(y0, y1, idx_last_val); // We picked our slope. Note that it does not have to be perfect. // Now we need to compute the best intercept. @@ -138,8 +146,12 @@ impl Line { /// This function is only invariable by translation if all of the /// `ys` are packaged into half of the space. (See heuristic below) pub fn train(ys: &dyn Column) -> Self { + let first_val = ys.iter().next().unwrap(); + let last_val = ys.iter().nth(ys.num_vals() as usize - 1).unwrap(); Self::train_from( - ys, + first_val, + last_val, + ys.num_vals(), ys.iter().enumerate().map(|(pos, val)| (pos as u64, val)), ) } diff --git a/fastfield_codecs/src/linear.rs b/fastfield_codecs/src/linear.rs index ad6f923a1a..2926f45fca 100644 --- a/fastfield_codecs/src/linear.rs +++ b/fastfield_codecs/src/linear.rs @@ -126,18 +126,21 @@ impl FastFieldCodec for LinearCodec { return None; // disable compressor for this case } - // let's sample at 0%, 5%, 10% .. 95%, 100% - let num_vals = column.num_vals() as f32 / 100.0; - let sample_positions = (0..20) - .map(|pos| (num_vals * pos as f32 * 5.0) as u64) - .collect::>(); + let limit_num_vals = column.num_vals().min(100_000); + + let num_samples = 100; + let step_size = (limit_num_vals / num_samples).max(1); // 20 samples + let mut sample_positions_and_values: Vec<_> = Vec::new(); + for (idx, val) in column.iter().step_by(step_size as usize).enumerate() { + let pos = idx as u64 * step_size; + sample_positions_and_values.push((pos, val)); + } - let line = Line::estimate(column, &sample_positions); + let line = Line::estimate(&sample_positions_and_values); - let estimated_bit_width = sample_positions + let estimated_bit_width = sample_positions_and_values .into_iter() - .map(|pos| { - let actual_value = column.get_val(pos); + .map(|(pos, actual_value)| { let interpolated_val = line.eval(pos as u64); actual_value.wrapping_sub(interpolated_val) }) @@ -146,6 +149,7 @@ impl FastFieldCodec for LinearCodec { .max() .unwrap_or(0); + // Extrapolate to whole column let num_bits = (estimated_bit_width as u64 * column.num_vals() as u64) + 64; let num_bits_uncompressed = 64 * column.num_vals(); Some(num_bits as f32 / num_bits_uncompressed as f32) diff --git a/src/fastfield/multivalued/writer.rs b/src/fastfield/multivalued/writer.rs index 38aad73245..957c002f95 100644 --- a/src/fastfield/multivalued/writer.rs +++ b/src/fastfield/multivalued/writer.rs @@ -1,5 +1,4 @@ use std::io; -use std::sync::Mutex; use fastfield_codecs::{Column, MonotonicallyMappableToU64, VecColumn}; use fnv::FnvHashMap; @@ -204,112 +203,68 @@ impl MultiValuedFastFieldWriter { pub(crate) struct MultivalueStartIndex<'a, C: Column> { column: &'a C, doc_id_map: &'a DocIdMapping, - min_max_opt: Mutex>, - random_seeker: Mutex>, -} - -struct MultivalueStartIndexRandomSeeker<'a, C: Column> { - seek_head: MultivalueStartIndexIter<'a, C>, - seek_next_id: u64, -} -impl<'a, C: Column> MultivalueStartIndexRandomSeeker<'a, C> { - fn new(column: &'a C, doc_id_map: &'a DocIdMapping) -> Self { - Self { - seek_head: MultivalueStartIndexIter { - column, - doc_id_map, - new_doc_id: 0, - offset: 0u64, - }, - seek_next_id: 0u64, - } - } + min: u64, + max: u64, } impl<'a, C: Column> MultivalueStartIndex<'a, C> { pub fn new(column: &'a C, doc_id_map: &'a DocIdMapping) -> Self { assert_eq!(column.num_vals(), doc_id_map.num_old_doc_ids() as u64 + 1); + let (min, max) = + tantivy_bitpacker::minmax(iter_remapped_multivalue_index(doc_id_map, column)) + .unwrap_or((0u64, 0u64)); MultivalueStartIndex { column, doc_id_map, - min_max_opt: Mutex::default(), - random_seeker: Mutex::new(MultivalueStartIndexRandomSeeker::new(column, doc_id_map)), - } - } - - fn minmax(&self) -> (u64, u64) { - if let Some((min, max)) = *self.min_max_opt.lock().unwrap() { - return (min, max); + min, + max, } - let (min, max) = tantivy_bitpacker::minmax(self.iter()).unwrap_or((0u64, 0u64)); - *self.min_max_opt.lock().unwrap() = Some((min, max)); - (min, max) } } impl<'a, C: Column> Column for MultivalueStartIndex<'a, C> { - fn get_val(&self, idx: u64) -> u64 { - let mut random_seeker_lock = self.random_seeker.lock().unwrap(); - if random_seeker_lock.seek_next_id > idx { - *random_seeker_lock = - MultivalueStartIndexRandomSeeker::new(self.column, self.doc_id_map); - } - let to_skip = idx - random_seeker_lock.seek_next_id; - random_seeker_lock.seek_next_id = idx + 1; - random_seeker_lock.seek_head.nth(to_skip as usize).unwrap() + fn get_val(&self, _idx: u64) -> u64 { + unimplemented!() } fn min_value(&self) -> u64 { - self.minmax().0 + self.min } fn max_value(&self) -> u64 { - self.minmax().1 + self.max } fn num_vals(&self) -> u64 { (self.doc_id_map.num_new_doc_ids() + 1) as u64 } - fn iter<'b>(&'b self) -> Box + 'b> { - Box::new(MultivalueStartIndexIter::new(self.column, self.doc_id_map)) - } -} - -struct MultivalueStartIndexIter<'a, C: Column> { - pub column: &'a C, - pub doc_id_map: &'a DocIdMapping, - pub new_doc_id: usize, - pub offset: u64, -} - -impl<'a, C: Column> MultivalueStartIndexIter<'a, C> { - fn new(column: &'a C, doc_id_map: &'a DocIdMapping) -> Self { - Self { - column, - doc_id_map, - new_doc_id: 0, - offset: 0, - } + fn iter(&self) -> Box + '_> { + Box::new(iter_remapped_multivalue_index( + self.doc_id_map, + &self.column, + )) } } -impl<'a, C: Column> Iterator for MultivalueStartIndexIter<'a, C> { - type Item = u64; - - fn next(&mut self) -> Option { - if self.new_doc_id > self.doc_id_map.num_new_doc_ids() { - return None; - } - let new_doc_id = self.new_doc_id; - self.new_doc_id += 1; - let start_offset = self.offset; - if new_doc_id < self.doc_id_map.num_new_doc_ids() { - let old_doc = self.doc_id_map.get_old_doc_id(new_doc_id as u32) as u64; - let num_vals_for_doc = self.column.get_val(old_doc + 1) - self.column.get_val(old_doc); - self.offset += num_vals_for_doc; - } - Some(start_offset) - } +fn iter_remapped_multivalue_index<'a, C: Column>( + doc_id_map: &'a DocIdMapping, + column: &'a C, +) -> impl Iterator + 'a { + let mut offset = 0; + doc_id_map + .iter_old_doc_ids() + .chain(std::iter::once(u32::MAX)) + .map(move |old_doc| { + if old_doc == u32::MAX { + // sentinel value for last offset + return offset; + } + let num_vals_for_doc = + column.get_val(old_doc as u64 + 1) - column.get_val(old_doc as u64); + let start_offset = offset; + offset += num_vals_for_doc; + start_offset + }) } #[cfg(test)] @@ -344,11 +299,5 @@ mod tests { vec![0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55] ); assert_eq!(multivalue_start_index.num_vals(), 11); - assert_eq!(multivalue_start_index.get_val(3), 2); - assert_eq!(multivalue_start_index.get_val(5), 5); - assert_eq!(multivalue_start_index.get_val(8), 21); - assert_eq!(multivalue_start_index.get_val(4), 3); - assert_eq!(multivalue_start_index.get_val(0), 0); - assert_eq!(multivalue_start_index.get_val(10), 55); } } diff --git a/src/fastfield/writer.rs b/src/fastfield/writer.rs index 8e55c1b8b4..5d1a0810e4 100644 --- a/src/fastfield/writer.rs +++ b/src/fastfield/writer.rs @@ -391,15 +391,8 @@ impl<'map, 'bitp> Column for WriterFastFieldAccessProvider<'map, 'bitp> { /// # Panics /// /// May panic if `doc` is greater than the index. - fn get_val(&self, doc: u64) -> u64 { - if let Some(doc_id_map) = self.doc_id_map { - self.vals - .get(doc_id_map.get_old_doc_id(doc as u32) as usize) // consider extra - // FastFieldReader wrapper for - // non doc_id_map - } else { - self.vals.get(doc as usize) - } + fn get_val(&self, _doc: u64) -> u64 { + unimplemented!() } fn iter(&self) -> Box + '_> { diff --git a/src/indexer/doc_id_mapping.rs b/src/indexer/doc_id_mapping.rs index 6c60fbf457..1f25af1ad0 100644 --- a/src/indexer/doc_id_mapping.rs +++ b/src/indexer/doc_id_mapping.rs @@ -34,10 +34,6 @@ impl SegmentDocIdMapping { self.new_doc_id_to_old_doc_addr.len() } - pub(crate) fn get_old_doc_addr(&self, new_doc_id: DocId) -> DocAddress { - self.new_doc_id_to_old_doc_addr[new_doc_id as usize] - } - /// This flags means the segments are simply stacked in the order of their ordinal. /// e.g. [(0, 1), .. (n, 1), (0, 2)..., (m, 2)] /// diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 210584e8cb..ff73bb95c1 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -14,8 +14,8 @@ use crate::fastfield::{ }; use crate::fieldnorm::{FieldNormReader, FieldNormReaders, FieldNormsSerializer, FieldNormsWriter}; use crate::indexer::doc_id_mapping::{expect_field_id_for_sort_field, SegmentDocIdMapping}; -use crate::indexer::sorted_doc_id_column::SortedDocIdColumn; -use crate::indexer::sorted_doc_id_multivalue_column::SortedDocIdMultiValueColumn; +use crate::indexer::sorted_doc_id_column::RemappedDocIdColumn; +use crate::indexer::sorted_doc_id_multivalue_column::RemappedDocIdMultiValueColumn; use crate::indexer::SegmentSerializer; use crate::postings::{InvertedIndexSerializer, Postings, SegmentPostings}; use crate::schema::{Cardinality, Field, FieldType, Schema}; @@ -310,7 +310,7 @@ impl IndexMerger { fast_field_serializer: &mut CompositeFastFieldSerializer, doc_id_mapping: &SegmentDocIdMapping, ) -> crate::Result<()> { - let fast_field_accessor = SortedDocIdColumn::new(&self.readers, doc_id_mapping, field); + let fast_field_accessor = RemappedDocIdColumn::new(&self.readers, doc_id_mapping, field); fast_field_serializer.create_auto_detect_u64_fast_field(field, fast_field_accessor)?; Ok(()) @@ -428,14 +428,8 @@ impl IndexMerger { fast_field_serializer: &mut CompositeFastFieldSerializer, doc_id_mapping: &SegmentDocIdMapping, reader_and_field_accessors: &[(&SegmentReader, T)], - ) -> crate::Result> { - // We can now create our `idx` serializer, and in a second pass, - // can effectively push the different indexes. - - // copying into a temp vec is not ideal, but the fast field codec api requires random - // access, which is used in the estimation. It's possible to 1. calculate random - // access on the fly or 2. change the codec api to make random access optional, but - // they both have also major drawbacks. + ) -> crate::Result<()> { + // TODO Use `Column` implementation instead let mut offsets = Vec::with_capacity(doc_id_mapping.len()); let mut offset = 0; @@ -449,7 +443,7 @@ impl IndexMerger { let fastfield_accessor = VecColumn::from(&offsets[..]); fast_field_serializer.create_auto_detect_u64_fast_field(field, fastfield_accessor)?; - Ok(offsets) + Ok(()) } /// Returns the fastfield index (index for the data, not the data). fn write_multi_value_fast_field_idx( @@ -457,7 +451,7 @@ impl IndexMerger { field: Field, fast_field_serializer: &mut CompositeFastFieldSerializer, doc_id_mapping: &SegmentDocIdMapping, - ) -> crate::Result> { + ) -> crate::Result<()> { let reader_ordinal_and_field_accessors = self .readers .iter() @@ -561,16 +555,16 @@ impl IndexMerger { fast_field_serializer: &mut CompositeFastFieldSerializer, doc_id_mapping: &SegmentDocIdMapping, ) -> crate::Result<()> { - // Multifastfield consists in 2 fastfields. + // Multifastfield consists of 2 fastfields. // The first serves as an index into the second one and is strictly increasing. // The second contains the actual values. // First we merge the idx fast field. - let offsets = - self.write_multi_value_fast_field_idx(field, fast_field_serializer, doc_id_mapping)?; + + self.write_multi_value_fast_field_idx(field, fast_field_serializer, doc_id_mapping)?; let fastfield_accessor = - SortedDocIdMultiValueColumn::new(&self.readers, doc_id_mapping, &offsets, field); + RemappedDocIdMultiValueColumn::new(&self.readers, doc_id_mapping, field); fast_field_serializer.create_auto_detect_u64_fast_field_with_idx( field, fastfield_accessor, diff --git a/src/indexer/sorted_doc_id_column.rs b/src/indexer/sorted_doc_id_column.rs index 1f84c20ace..c02fe1f166 100644 --- a/src/indexer/sorted_doc_id_column.rs +++ b/src/indexer/sorted_doc_id_column.rs @@ -5,9 +5,9 @@ use itertools::Itertools; use crate::indexer::doc_id_mapping::SegmentDocIdMapping; use crate::schema::Field; -use crate::{DocAddress, SegmentReader}; +use crate::SegmentReader; -pub(crate) struct SortedDocIdColumn<'a> { +pub(crate) struct RemappedDocIdColumn<'a> { doc_id_mapping: &'a SegmentDocIdMapping, fast_field_readers: Vec>>, min_value: u64, @@ -37,7 +37,7 @@ fn compute_min_max_val( .into_option() } -impl<'a> SortedDocIdColumn<'a> { +impl<'a> RemappedDocIdColumn<'a> { pub(crate) fn new( readers: &'a [SegmentReader], doc_id_mapping: &'a SegmentDocIdMapping, @@ -68,7 +68,7 @@ impl<'a> SortedDocIdColumn<'a> { }) .collect::>(); - SortedDocIdColumn { + RemappedDocIdColumn { doc_id_mapping, fast_field_readers, min_value, @@ -78,13 +78,9 @@ impl<'a> SortedDocIdColumn<'a> { } } -impl<'a> Column for SortedDocIdColumn<'a> { - fn get_val(&self, doc: u64) -> u64 { - let DocAddress { - doc_id, - segment_ord, - } = self.doc_id_mapping.get_old_doc_addr(doc as u32); - self.fast_field_readers[segment_ord as usize].get_val(doc_id as u64) +impl<'a> Column for RemappedDocIdColumn<'a> { + fn get_val(&self, _doc: u64) -> u64 { + unimplemented!() } fn iter(&self) -> Box + '_> { diff --git a/src/indexer/sorted_doc_id_multivalue_column.rs b/src/indexer/sorted_doc_id_multivalue_column.rs index 95328571fb..1e126af815 100644 --- a/src/indexer/sorted_doc_id_multivalue_column.rs +++ b/src/indexer/sorted_doc_id_multivalue_column.rs @@ -2,26 +2,23 @@ use std::cmp; use fastfield_codecs::Column; -use crate::fastfield::{MultiValueLength, MultiValuedFastFieldReader}; +use crate::fastfield::MultiValuedFastFieldReader; use crate::indexer::doc_id_mapping::SegmentDocIdMapping; use crate::schema::Field; -use crate::{DocId, SegmentReader}; +use crate::SegmentReader; -// We can now initialize our serializer, and push it the different values -pub(crate) struct SortedDocIdMultiValueColumn<'a> { +pub(crate) struct RemappedDocIdMultiValueColumn<'a> { doc_id_mapping: &'a SegmentDocIdMapping, fast_field_readers: Vec>, - offsets: &'a [u64], min_value: u64, max_value: u64, num_vals: u64, } -impl<'a> SortedDocIdMultiValueColumn<'a> { +impl<'a> RemappedDocIdMultiValueColumn<'a> { pub(crate) fn new( readers: &'a [SegmentReader], doc_id_mapping: &'a SegmentDocIdMapping, - offsets: &'a [u64], field: Field, ) -> Self { // Our values are bitpacked and we need to know what should be @@ -58,10 +55,9 @@ impl<'a> SortedDocIdMultiValueColumn<'a> { min_value = 0; max_value = 0; } - SortedDocIdMultiValueColumn { + RemappedDocIdMultiValueColumn { doc_id_mapping, fast_field_readers, - offsets, min_value, max_value, num_vals: num_vals as u64, @@ -69,26 +65,9 @@ impl<'a> SortedDocIdMultiValueColumn<'a> { } } -impl<'a> Column for SortedDocIdMultiValueColumn<'a> { - fn get_val(&self, pos: u64) -> u64 { - // use the offsets index to find the doc_id which will contain the position. - // the offsets are strictly increasing so we can do a binary search on it. - - let new_doc_id: DocId = self.offsets.partition_point(|&offset| offset <= pos) as DocId - 1; // Offsets start at 0, so -1 is safe - - // now we need to find the position of `pos` in the multivalued bucket - let num_pos_covered_until_now = self.offsets[new_doc_id as usize]; - let pos_in_values = pos - num_pos_covered_until_now; - - let old_doc_addr = self.doc_id_mapping.get_old_doc_addr(new_doc_id); - let num_vals = - self.fast_field_readers[old_doc_addr.segment_ord as usize].get_len(old_doc_addr.doc_id); - assert!(num_vals >= pos_in_values); - let mut vals = Vec::new(); - self.fast_field_readers[old_doc_addr.segment_ord as usize] - .get_vals(old_doc_addr.doc_id, &mut vals); - - vals[pos_in_values as usize] +impl<'a> Column for RemappedDocIdMultiValueColumn<'a> { + fn get_val(&self, _pos: u64) -> u64 { + unimplemented!() } fn iter(&self) -> Box + '_> { From 0f5cff762fc918fa203c01b900bb2dc3c6e0c4db Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Tue, 4 Oct 2022 12:30:19 +0800 Subject: [PATCH 2/2] move enumerate and remove computation --- fastfield_codecs/src/linear.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/fastfield_codecs/src/linear.rs b/fastfield_codecs/src/linear.rs index 2926f45fca..6ed2d84b6c 100644 --- a/fastfield_codecs/src/linear.rs +++ b/fastfield_codecs/src/linear.rs @@ -131,9 +131,8 @@ impl FastFieldCodec for LinearCodec { let num_samples = 100; let step_size = (limit_num_vals / num_samples).max(1); // 20 samples let mut sample_positions_and_values: Vec<_> = Vec::new(); - for (idx, val) in column.iter().step_by(step_size as usize).enumerate() { - let pos = idx as u64 * step_size; - sample_positions_and_values.push((pos, val)); + for (pos, val) in column.iter().enumerate().step_by(step_size as usize) { + sample_positions_and_values.push((pos as u64, val)); } let line = Line::estimate(&sample_positions_and_values);