From 8b42c4c1263ad585fcbb2079431f5419c09e87f1 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Wed, 5 Oct 2022 12:05:13 +0800 Subject: [PATCH 1/3] disable linear codec for multivalue value index don't materialize index column on merge use simpler chain() variant --- src/fastfield/mod.rs | 4 +- src/fastfield/multivalued/mod.rs | 12 +++ src/fastfield/multivalued/writer.rs | 27 +++---- src/fastfield/serializer/mod.rs | 14 ++++ src/indexer/merger.rs | 49 ++++++------ .../sorted_doc_id_multivalue_column.rs | 76 ++++++++++++++++++- 6 files changed, 141 insertions(+), 41 deletions(-) diff --git a/src/fastfield/mod.rs b/src/fastfield/mod.rs index 40bb750561..5d36f98604 100644 --- a/src/fastfield/mod.rs +++ b/src/fastfield/mod.rs @@ -27,7 +27,9 @@ pub use self::bytes::{BytesFastFieldReader, BytesFastFieldWriter}; pub use self::error::{FastFieldNotAvailableError, Result}; pub use self::facet_reader::FacetReader; pub(crate) use self::multivalued::MultivalueStartIndex; -pub use self::multivalued::{MultiValuedFastFieldReader, MultiValuedFastFieldWriter}; +pub use self::multivalued::{ + get_fastfield_codecs_for_multivalue, MultiValuedFastFieldReader, MultiValuedFastFieldWriter, +}; pub use self::readers::FastFieldReaders; pub(crate) use self::readers::{type_and_cardinality, FastType}; pub use self::serializer::{Column, CompositeFastFieldSerializer}; diff --git a/src/fastfield/multivalued/mod.rs b/src/fastfield/multivalued/mod.rs index 4eded3bdca..de3f6d1a1f 100644 --- a/src/fastfield/multivalued/mod.rs +++ b/src/fastfield/multivalued/mod.rs @@ -1,10 +1,22 @@ mod reader; mod writer; +use fastfield_codecs::FastFieldCodecType; + pub use self::reader::MultiValuedFastFieldReader; pub use self::writer::MultiValuedFastFieldWriter; pub(crate) use self::writer::MultivalueStartIndex; +/// The valid codecs for multivalue values excludes the linear interpolation codec. +/// +/// This limitation is only valid for the values, not the offset index of the multivalue index. +pub fn get_fastfield_codecs_for_multivalue() -> [FastFieldCodecType; 2] { + [ + FastFieldCodecType::Bitpacked, + FastFieldCodecType::BlockwiseLinear, + ] +} + #[cfg(test)] mod tests { use proptest::strategy::Strategy; diff --git a/src/fastfield/multivalued/writer.rs b/src/fastfield/multivalued/writer.rs index 957c002f95..0fb30caf63 100644 --- a/src/fastfield/multivalued/writer.rs +++ b/src/fastfield/multivalued/writer.rs @@ -3,6 +3,7 @@ use std::io; use fastfield_codecs::{Column, MonotonicallyMappableToU64, VecColumn}; use fnv::FnvHashMap; +use super::get_fastfield_codecs_for_multivalue; use crate::fastfield::{value_to_u64, CompositeFastFieldSerializer, FastFieldType}; use crate::indexer::doc_id_mapping::DocIdMapping; use crate::postings::UnorderedTermId; @@ -194,7 +195,12 @@ impl MultiValuedFastFieldWriter { } } let col = VecColumn::from(&values[..]); - serializer.create_auto_detect_u64_fast_field_with_idx(self.field, col, 1)?; + serializer.create_auto_detect_u64_fast_field_with_idx_and_codecs( + self.field, + col, + 1, + &get_fastfield_codecs_for_multivalue(), + )?; } Ok(()) } @@ -251,20 +257,11 @@ fn iter_remapped_multivalue_index<'a, C: Column>( column: &'a C, ) -> impl Iterator + 'a { let mut offset = 0; - doc_id_map - .iter_old_doc_ids() - .chain(std::iter::once(u32::MAX)) - .map(move |old_doc| { - if old_doc == u32::MAX { - // sentinel value for last offset - return offset; - } - let num_vals_for_doc = - column.get_val(old_doc as u64 + 1) - column.get_val(old_doc as u64); - let start_offset = offset; - offset += num_vals_for_doc; - start_offset - }) + std::iter::once(0).chain(doc_id_map.iter_old_doc_ids().map(move |old_doc| { + let num_vals_for_doc = column.get_val(old_doc as u64 + 1) - column.get_val(old_doc as u64); + offset += num_vals_for_doc; + offset as u64 + })) } #[cfg(test)] diff --git a/src/fastfield/serializer/mod.rs b/src/fastfield/serializer/mod.rs index 6efe3e28e9..6ca2929317 100644 --- a/src/fastfield/serializer/mod.rs +++ b/src/fastfield/serializer/mod.rs @@ -70,6 +70,20 @@ impl CompositeFastFieldSerializer { Ok(()) } + /// Serialize data into a new u64 fast field. The best compression codec of the the provided + /// will be chosen. + pub fn create_auto_detect_u64_fast_field_with_idx_and_codecs( + &mut self, + field: Field, + fastfield_accessor: impl Column, + idx: usize, + codec_types: &[FastFieldCodecType], + ) -> io::Result<()> { + let field_write = self.composite_write.for_field_with_idx(field, idx); + fastfield_codecs::serialize(fastfield_accessor, field_write, codec_types)?; + Ok(()) + } + /// Start serializing a new [u8] fast field. Use the returned writer to write data into the /// bytes field. To associate the bytes with documents a seperate index must be created on /// index 0. See bytes/writer.rs::serialize for an example. diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index ff73bb95c1..36a750f3e6 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -6,11 +6,13 @@ use fastfield_codecs::VecColumn; use itertools::Itertools; use measure_time::debug_time; +use super::sorted_doc_id_multivalue_column::RemappedDocIdMultiValueIndexColumn; use crate::core::{Segment, SegmentReader}; use crate::docset::{DocSet, TERMINATED}; use crate::error::DataCorruption; use crate::fastfield::{ - AliveBitSet, Column, CompositeFastFieldSerializer, MultiValueLength, MultiValuedFastFieldReader, + get_fastfield_codecs_for_multivalue, AliveBitSet, Column, CompositeFastFieldSerializer, + MultiValueLength, MultiValuedFastFieldReader, }; use crate::fieldnorm::{FieldNormReader, FieldNormReaders, FieldNormsSerializer, FieldNormsWriter}; use crate::indexer::doc_id_mapping::{expect_field_id_for_sort_field, SegmentDocIdMapping}; @@ -423,26 +425,18 @@ impl IndexMerger { // Creating the index file to point into the data, generic over `BytesFastFieldReader` and // `MultiValuedFastFieldReader` // - fn write_1_n_fast_field_idx_generic( + fn write_1_n_fast_field_idx_generic( field: Field, fast_field_serializer: &mut CompositeFastFieldSerializer, doc_id_mapping: &SegmentDocIdMapping, - reader_and_field_accessors: &[(&SegmentReader, T)], + segment_readers_and_field_accessor: &[(&SegmentReader, T)], ) -> crate::Result<()> { - // TODO Use `Column` implementation instead - - let mut offsets = Vec::with_capacity(doc_id_mapping.len()); - let mut offset = 0; - for old_doc_addr in doc_id_mapping.iter_old_doc_addrs() { - let reader = &reader_and_field_accessors[old_doc_addr.segment_ord as usize].1; - offsets.push(offset); - offset += reader.get_len(old_doc_addr.doc_id) as u64; - } - offsets.push(offset); - - let fastfield_accessor = VecColumn::from(&offsets[..]); + let column = RemappedDocIdMultiValueIndexColumn::new( + segment_readers_and_field_accessor, + doc_id_mapping, + ); - fast_field_serializer.create_auto_detect_u64_fast_field(field, fastfield_accessor)?; + fast_field_serializer.create_auto_detect_u64_fast_field(field, column)?; Ok(()) } /// Returns the fastfield index (index for the data, not the data). @@ -452,7 +446,7 @@ impl IndexMerger { fast_field_serializer: &mut CompositeFastFieldSerializer, doc_id_mapping: &SegmentDocIdMapping, ) -> crate::Result<()> { - let reader_ordinal_and_field_accessors = self + let segment_reader_and_field_accessors = self .readers .iter() .map(|reader| { @@ -471,7 +465,7 @@ impl IndexMerger { field, fast_field_serializer, doc_id_mapping, - &reader_ordinal_and_field_accessors, + &segment_reader_and_field_accessors, ) } @@ -520,7 +514,12 @@ impl IndexMerger { } let col = VecColumn::from(&vals[..]); - fast_field_serializer.create_auto_detect_u64_fast_field_with_idx(field, col, 1)?; + fast_field_serializer.create_auto_detect_u64_fast_field_with_idx_and_codecs( + field, + col, + 1, + &get_fastfield_codecs_for_multivalue(), + )?; } Ok(()) } @@ -565,10 +564,11 @@ impl IndexMerger { let fastfield_accessor = RemappedDocIdMultiValueColumn::new(&self.readers, doc_id_mapping, field); - fast_field_serializer.create_auto_detect_u64_fast_field_with_idx( + fast_field_serializer.create_auto_detect_u64_fast_field_with_idx_and_codecs( field, fastfield_accessor, 1, + &get_fastfield_codecs_for_multivalue(), )?; Ok(()) @@ -580,7 +580,7 @@ impl IndexMerger { fast_field_serializer: &mut CompositeFastFieldSerializer, doc_id_mapping: &SegmentDocIdMapping, ) -> crate::Result<()> { - let reader_and_field_accessors = self + let segment_reader_and_field_accessors = self .readers .iter() .map(|reader| { @@ -591,17 +591,18 @@ impl IndexMerger { (reader, bytes_reader) }) .collect::>(); - Self::write_1_n_fast_field_idx_generic( field, fast_field_serializer, doc_id_mapping, - &reader_and_field_accessors, + &segment_reader_and_field_accessors, )?; + let mut serialize_vals = fast_field_serializer.new_bytes_fast_field(field); for old_doc_addr in doc_id_mapping.iter_old_doc_addrs() { - let bytes_reader = &reader_and_field_accessors[old_doc_addr.segment_ord as usize].1; + let bytes_reader = + &segment_reader_and_field_accessors[old_doc_addr.segment_ord as usize].1; let val = bytes_reader.get_bytes(old_doc_addr.doc_id); serialize_vals.write_all(val)?; } diff --git a/src/indexer/sorted_doc_id_multivalue_column.rs b/src/indexer/sorted_doc_id_multivalue_column.rs index 45df329f54..43265c42b8 100644 --- a/src/indexer/sorted_doc_id_multivalue_column.rs +++ b/src/indexer/sorted_doc_id_multivalue_column.rs @@ -3,7 +3,7 @@ use std::cmp; use fastfield_codecs::Column; use super::flat_map_with_buffer::FlatMapWithBufferIter; -use crate::fastfield::MultiValuedFastFieldReader; +use crate::fastfield::{MultiValueLength, MultiValuedFastFieldReader}; use crate::indexer::doc_id_mapping::SegmentDocIdMapping; use crate::schema::Field; use crate::{DocAddress, SegmentReader}; @@ -93,3 +93,77 @@ impl<'a> Column for RemappedDocIdMultiValueColumn<'a> { self.num_vals } } + +pub(crate) struct RemappedDocIdMultiValueIndexColumn<'a, T: MultiValueLength> { + doc_id_mapping: &'a SegmentDocIdMapping, + multi_value_length_readers: Vec<&'a T>, + min_value: u64, + max_value: u64, + num_vals: u64, +} + +impl<'a, T: MultiValueLength> RemappedDocIdMultiValueIndexColumn<'a, T> { + pub(crate) fn new( + segment_and_multi_value_length_readers: &'a [(&'a SegmentReader, T)], + doc_id_mapping: &'a SegmentDocIdMapping, + ) -> Self { + // We go through a complete first pass to compute the minimum and the + // maximum value and initialize our Column. + let mut num_vals = 0; + let min_value = 0; + let mut max_value = 0; + let mut multi_value_length_readers = + Vec::with_capacity(segment_and_multi_value_length_readers.len()); + for reader_and_multi_value_length_reader in segment_and_multi_value_length_readers { + let segment_reader = reader_and_multi_value_length_reader.0; + let multi_value_length_reader = &reader_and_multi_value_length_reader.1; + if !segment_reader.has_deletes() { + max_value += multi_value_length_reader.get_total_len(); + } else { + for doc in segment_reader.doc_ids_alive() { + max_value += multi_value_length_reader.get_len(doc); + } + } + num_vals += segment_reader.num_docs() as u64; + multi_value_length_readers.push(multi_value_length_reader); + } + Self { + doc_id_mapping, + multi_value_length_readers, + min_value, + max_value, + num_vals, + } + } +} + +impl<'a, T: MultiValueLength + Send + Sync> Column for RemappedDocIdMultiValueIndexColumn<'a, T> { + fn get_val(&self, _pos: u64) -> u64 { + unimplemented!() + } + + fn iter(&self) -> Box + '_> { + let mut offset = 0; + Box::new( + std::iter::once(0).chain(self.doc_id_mapping.iter_old_doc_addrs().map( + move |old_doc_addr| { + let ff_reader = + &self.multi_value_length_readers[old_doc_addr.segment_ord as usize]; + offset += ff_reader.get_len(old_doc_addr.doc_id); + offset + }, + )), + ) + } + fn min_value(&self) -> u64 { + self.min_value + } + + fn max_value(&self) -> u64 { + self.max_value + } + + fn num_vals(&self) -> u64 { + self.num_vals + } +} From b9f06bc28735052f6c3cb6b3e9deb0e238191496 Mon Sep 17 00:00:00 2001 From: PSeitz Date: Wed, 5 Oct 2022 13:07:23 +0200 Subject: [PATCH 2/3] Update src/fastfield/multivalued/mod.rs Co-authored-by: Paul Masurel --- src/fastfield/multivalued/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/fastfield/multivalued/mod.rs b/src/fastfield/multivalued/mod.rs index de3f6d1a1f..26b49abd7b 100644 --- a/src/fastfield/multivalued/mod.rs +++ b/src/fastfield/multivalued/mod.rs @@ -10,7 +10,7 @@ pub(crate) use self::writer::MultivalueStartIndex; /// The valid codecs for multivalue values excludes the linear interpolation codec. /// /// This limitation is only valid for the values, not the offset index of the multivalue index. -pub fn get_fastfield_codecs_for_multivalue() -> [FastFieldCodecType; 2] { +pub(crate) fn get_fastfield_codecs_for_multivalue() -> [FastFieldCodecType; 2] { [ FastFieldCodecType::Bitpacked, FastFieldCodecType::BlockwiseLinear, From d742275048d0158f1da8ed0b4c98cd65055a6523 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Wed, 5 Oct 2022 19:07:58 +0800 Subject: [PATCH 3/3] renames --- src/fastfield/mod.rs | 6 ++---- src/indexer/merger.rs | 19 ++++++++----------- .../sorted_doc_id_multivalue_column.rs | 11 +++++------ 3 files changed, 15 insertions(+), 21 deletions(-) diff --git a/src/fastfield/mod.rs b/src/fastfield/mod.rs index 5d36f98604..3fca75fce4 100644 --- a/src/fastfield/mod.rs +++ b/src/fastfield/mod.rs @@ -26,10 +26,8 @@ pub use self::alive_bitset::{intersect_alive_bitsets, write_alive_bitset, AliveB pub use self::bytes::{BytesFastFieldReader, BytesFastFieldWriter}; pub use self::error::{FastFieldNotAvailableError, Result}; pub use self::facet_reader::FacetReader; -pub(crate) use self::multivalued::MultivalueStartIndex; -pub use self::multivalued::{ - get_fastfield_codecs_for_multivalue, MultiValuedFastFieldReader, MultiValuedFastFieldWriter, -}; +pub(crate) use self::multivalued::{get_fastfield_codecs_for_multivalue, MultivalueStartIndex}; +pub use self::multivalued::{MultiValuedFastFieldReader, MultiValuedFastFieldWriter}; pub use self::readers::FastFieldReaders; pub(crate) use self::readers::{type_and_cardinality, FastType}; pub use self::serializer::{Column, CompositeFastFieldSerializer}; diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 36a750f3e6..0ed47a9156 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -429,12 +429,10 @@ impl IndexMerger { field: Field, fast_field_serializer: &mut CompositeFastFieldSerializer, doc_id_mapping: &SegmentDocIdMapping, - segment_readers_and_field_accessor: &[(&SegmentReader, T)], + segment_and_ff_readers: &[(&SegmentReader, T)], ) -> crate::Result<()> { - let column = RemappedDocIdMultiValueIndexColumn::new( - segment_readers_and_field_accessor, - doc_id_mapping, - ); + let column = + RemappedDocIdMultiValueIndexColumn::new(segment_and_ff_readers, doc_id_mapping); fast_field_serializer.create_auto_detect_u64_fast_field(field, column)?; Ok(()) @@ -446,7 +444,7 @@ impl IndexMerger { fast_field_serializer: &mut CompositeFastFieldSerializer, doc_id_mapping: &SegmentDocIdMapping, ) -> crate::Result<()> { - let segment_reader_and_field_accessors = self + let segment_and_ff_readers = self .readers .iter() .map(|reader| { @@ -465,7 +463,7 @@ impl IndexMerger { field, fast_field_serializer, doc_id_mapping, - &segment_reader_and_field_accessors, + &segment_and_ff_readers, ) } @@ -580,7 +578,7 @@ impl IndexMerger { fast_field_serializer: &mut CompositeFastFieldSerializer, doc_id_mapping: &SegmentDocIdMapping, ) -> crate::Result<()> { - let segment_reader_and_field_accessors = self + let segment_and_ff_readers = self .readers .iter() .map(|reader| { @@ -595,14 +593,13 @@ impl IndexMerger { field, fast_field_serializer, doc_id_mapping, - &segment_reader_and_field_accessors, + &segment_and_ff_readers, )?; let mut serialize_vals = fast_field_serializer.new_bytes_fast_field(field); for old_doc_addr in doc_id_mapping.iter_old_doc_addrs() { - let bytes_reader = - &segment_reader_and_field_accessors[old_doc_addr.segment_ord as usize].1; + let bytes_reader = &segment_and_ff_readers[old_doc_addr.segment_ord as usize].1; let val = bytes_reader.get_bytes(old_doc_addr.doc_id); serialize_vals.write_all(val)?; } diff --git a/src/indexer/sorted_doc_id_multivalue_column.rs b/src/indexer/sorted_doc_id_multivalue_column.rs index 43265c42b8..30e1beabaf 100644 --- a/src/indexer/sorted_doc_id_multivalue_column.rs +++ b/src/indexer/sorted_doc_id_multivalue_column.rs @@ -104,7 +104,7 @@ pub(crate) struct RemappedDocIdMultiValueIndexColumn<'a, T: MultiValueLength> { impl<'a, T: MultiValueLength> RemappedDocIdMultiValueIndexColumn<'a, T> { pub(crate) fn new( - segment_and_multi_value_length_readers: &'a [(&'a SegmentReader, T)], + segment_and_ff_readers: &'a [(&'a SegmentReader, T)], doc_id_mapping: &'a SegmentDocIdMapping, ) -> Self { // We go through a complete first pass to compute the minimum and the @@ -112,11 +112,10 @@ impl<'a, T: MultiValueLength> RemappedDocIdMultiValueIndexColumn<'a, T> { let mut num_vals = 0; let min_value = 0; let mut max_value = 0; - let mut multi_value_length_readers = - Vec::with_capacity(segment_and_multi_value_length_readers.len()); - for reader_and_multi_value_length_reader in segment_and_multi_value_length_readers { - let segment_reader = reader_and_multi_value_length_reader.0; - let multi_value_length_reader = &reader_and_multi_value_length_reader.1; + let mut multi_value_length_readers = Vec::with_capacity(segment_and_ff_readers.len()); + for segment_and_ff_reader in segment_and_ff_readers { + let segment_reader = segment_and_ff_reader.0; + let multi_value_length_reader = &segment_and_ff_reader.1; if !segment_reader.has_deletes() { max_value += multi_value_length_reader.get_total_len(); } else {