Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disable linear codec for multivalue values #1591

Merged
merged 3 commits into from
Oct 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/fastfield/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub use self::alive_bitset::{intersect_alive_bitsets, write_alive_bitset, AliveB
pub use self::bytes::{BytesFastFieldReader, BytesFastFieldWriter};
pub use self::error::{FastFieldNotAvailableError, Result};
pub use self::facet_reader::FacetReader;
pub(crate) use self::multivalued::MultivalueStartIndex;
pub(crate) use self::multivalued::{get_fastfield_codecs_for_multivalue, MultivalueStartIndex};
pub use self::multivalued::{MultiValuedFastFieldReader, MultiValuedFastFieldWriter};
pub use self::readers::FastFieldReaders;
pub(crate) use self::readers::{type_and_cardinality, FastType};
Expand Down
12 changes: 12 additions & 0 deletions src/fastfield/multivalued/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,22 @@
mod reader;
mod writer;

use fastfield_codecs::FastFieldCodecType;

pub use self::reader::MultiValuedFastFieldReader;
pub use self::writer::MultiValuedFastFieldWriter;
pub(crate) use self::writer::MultivalueStartIndex;

/// The valid codecs for multivalue values excludes the linear interpolation codec.
///
/// This limitation is only valid for the values, not the offset index of the multivalue index.
pub(crate) fn get_fastfield_codecs_for_multivalue() -> [FastFieldCodecType; 2] {
[
FastFieldCodecType::Bitpacked,
FastFieldCodecType::BlockwiseLinear,
]
}

#[cfg(test)]
mod tests {
use proptest::strategy::Strategy;
Expand Down
27 changes: 12 additions & 15 deletions src/fastfield/multivalued/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::io;
use fastfield_codecs::{Column, MonotonicallyMappableToU64, VecColumn};
use fnv::FnvHashMap;

use super::get_fastfield_codecs_for_multivalue;
use crate::fastfield::{value_to_u64, CompositeFastFieldSerializer, FastFieldType};
use crate::indexer::doc_id_mapping::DocIdMapping;
use crate::postings::UnorderedTermId;
Expand Down Expand Up @@ -194,7 +195,12 @@ impl MultiValuedFastFieldWriter {
}
}
let col = VecColumn::from(&values[..]);
serializer.create_auto_detect_u64_fast_field_with_idx(self.field, col, 1)?;
serializer.create_auto_detect_u64_fast_field_with_idx_and_codecs(
self.field,
col,
1,
&get_fastfield_codecs_for_multivalue(),
)?;
}
Ok(())
}
Expand Down Expand Up @@ -251,20 +257,11 @@ fn iter_remapped_multivalue_index<'a, C: Column>(
column: &'a C,
) -> impl Iterator<Item = u64> + 'a {
let mut offset = 0;
doc_id_map
.iter_old_doc_ids()
.chain(std::iter::once(u32::MAX))
.map(move |old_doc| {
if old_doc == u32::MAX {
// sentinel value for last offset
return offset;
}
let num_vals_for_doc =
column.get_val(old_doc as u64 + 1) - column.get_val(old_doc as u64);
let start_offset = offset;
offset += num_vals_for_doc;
start_offset
})
std::iter::once(0).chain(doc_id_map.iter_old_doc_ids().map(move |old_doc| {
let num_vals_for_doc = column.get_val(old_doc as u64 + 1) - column.get_val(old_doc as u64);
offset += num_vals_for_doc;
offset as u64
}))
}

#[cfg(test)]
Expand Down
14 changes: 14 additions & 0 deletions src/fastfield/serializer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,20 @@ impl CompositeFastFieldSerializer {
Ok(())
}

/// Serialize data into a new u64 fast field. The best compression codec of the the provided
/// will be chosen.
pub fn create_auto_detect_u64_fast_field_with_idx_and_codecs<T: MonotonicallyMappableToU64>(
&mut self,
field: Field,
fastfield_accessor: impl Column<T>,
idx: usize,
codec_types: &[FastFieldCodecType],
) -> io::Result<()> {
let field_write = self.composite_write.for_field_with_idx(field, idx);
fastfield_codecs::serialize(fastfield_accessor, field_write, codec_types)?;
Ok(())
}

/// Start serializing a new [u8] fast field. Use the returned writer to write data into the
/// bytes field. To associate the bytes with documents a seperate index must be created on
/// index 0. See bytes/writer.rs::serialize for an example.
Expand Down
46 changes: 22 additions & 24 deletions src/indexer/merger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ use fastfield_codecs::VecColumn;
use itertools::Itertools;
use measure_time::debug_time;

use super::sorted_doc_id_multivalue_column::RemappedDocIdMultiValueIndexColumn;
use crate::core::{Segment, SegmentReader};
use crate::docset::{DocSet, TERMINATED};
use crate::error::DataCorruption;
use crate::fastfield::{
AliveBitSet, Column, CompositeFastFieldSerializer, MultiValueLength, MultiValuedFastFieldReader,
get_fastfield_codecs_for_multivalue, AliveBitSet, Column, CompositeFastFieldSerializer,
MultiValueLength, MultiValuedFastFieldReader,
};
use crate::fieldnorm::{FieldNormReader, FieldNormReaders, FieldNormsSerializer, FieldNormsWriter};
use crate::indexer::doc_id_mapping::{expect_field_id_for_sort_field, SegmentDocIdMapping};
Expand Down Expand Up @@ -423,26 +425,16 @@ impl IndexMerger {
// Creating the index file to point into the data, generic over `BytesFastFieldReader` and
// `MultiValuedFastFieldReader`
//
fn write_1_n_fast_field_idx_generic<T: MultiValueLength>(
fn write_1_n_fast_field_idx_generic<T: MultiValueLength + Send + Sync>(
field: Field,
fast_field_serializer: &mut CompositeFastFieldSerializer,
doc_id_mapping: &SegmentDocIdMapping,
reader_and_field_accessors: &[(&SegmentReader, T)],
segment_and_ff_readers: &[(&SegmentReader, T)],
) -> crate::Result<()> {
// TODO Use `Column` implementation instead
let column =
RemappedDocIdMultiValueIndexColumn::new(segment_and_ff_readers, doc_id_mapping);

let mut offsets = Vec::with_capacity(doc_id_mapping.len());
let mut offset = 0;
for old_doc_addr in doc_id_mapping.iter_old_doc_addrs() {
let reader = &reader_and_field_accessors[old_doc_addr.segment_ord as usize].1;
offsets.push(offset);
offset += reader.get_len(old_doc_addr.doc_id) as u64;
}
offsets.push(offset);

let fastfield_accessor = VecColumn::from(&offsets[..]);

fast_field_serializer.create_auto_detect_u64_fast_field(field, fastfield_accessor)?;
fast_field_serializer.create_auto_detect_u64_fast_field(field, column)?;
Ok(())
}
/// Returns the fastfield index (index for the data, not the data).
Expand All @@ -452,7 +444,7 @@ impl IndexMerger {
fast_field_serializer: &mut CompositeFastFieldSerializer,
doc_id_mapping: &SegmentDocIdMapping,
) -> crate::Result<()> {
let reader_ordinal_and_field_accessors = self
let segment_and_ff_readers = self
.readers
.iter()
.map(|reader| {
Expand All @@ -471,7 +463,7 @@ impl IndexMerger {
field,
fast_field_serializer,
doc_id_mapping,
&reader_ordinal_and_field_accessors,
&segment_and_ff_readers,
)
}

Expand Down Expand Up @@ -520,7 +512,12 @@ impl IndexMerger {
}

let col = VecColumn::from(&vals[..]);
fast_field_serializer.create_auto_detect_u64_fast_field_with_idx(field, col, 1)?;
fast_field_serializer.create_auto_detect_u64_fast_field_with_idx_and_codecs(
field,
col,
1,
&get_fastfield_codecs_for_multivalue(),
)?;
}
Ok(())
}
Expand Down Expand Up @@ -565,10 +562,11 @@ impl IndexMerger {

let fastfield_accessor =
RemappedDocIdMultiValueColumn::new(&self.readers, doc_id_mapping, field);
fast_field_serializer.create_auto_detect_u64_fast_field_with_idx(
fast_field_serializer.create_auto_detect_u64_fast_field_with_idx_and_codecs(
field,
fastfield_accessor,
1,
&get_fastfield_codecs_for_multivalue(),
)?;

Ok(())
Expand All @@ -580,7 +578,7 @@ impl IndexMerger {
fast_field_serializer: &mut CompositeFastFieldSerializer,
doc_id_mapping: &SegmentDocIdMapping,
) -> crate::Result<()> {
let reader_and_field_accessors = self
let segment_and_ff_readers = self
.readers
.iter()
.map(|reader| {
Expand All @@ -591,17 +589,17 @@ impl IndexMerger {
(reader, bytes_reader)
})
.collect::<Vec<_>>();

Self::write_1_n_fast_field_idx_generic(
field,
fast_field_serializer,
doc_id_mapping,
&reader_and_field_accessors,
&segment_and_ff_readers,
)?;

let mut serialize_vals = fast_field_serializer.new_bytes_fast_field(field);

for old_doc_addr in doc_id_mapping.iter_old_doc_addrs() {
let bytes_reader = &reader_and_field_accessors[old_doc_addr.segment_ord as usize].1;
let bytes_reader = &segment_and_ff_readers[old_doc_addr.segment_ord as usize].1;
let val = bytes_reader.get_bytes(old_doc_addr.doc_id);
serialize_vals.write_all(val)?;
}
Expand Down
75 changes: 74 additions & 1 deletion src/indexer/sorted_doc_id_multivalue_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::cmp;
use fastfield_codecs::Column;

use super::flat_map_with_buffer::FlatMapWithBufferIter;
use crate::fastfield::MultiValuedFastFieldReader;
use crate::fastfield::{MultiValueLength, MultiValuedFastFieldReader};
use crate::indexer::doc_id_mapping::SegmentDocIdMapping;
use crate::schema::Field;
use crate::{DocAddress, SegmentReader};
Expand Down Expand Up @@ -93,3 +93,76 @@ impl<'a> Column for RemappedDocIdMultiValueColumn<'a> {
self.num_vals
}
}

pub(crate) struct RemappedDocIdMultiValueIndexColumn<'a, T: MultiValueLength> {
doc_id_mapping: &'a SegmentDocIdMapping,
multi_value_length_readers: Vec<&'a T>,
min_value: u64,
max_value: u64,
num_vals: u64,
}

impl<'a, T: MultiValueLength> RemappedDocIdMultiValueIndexColumn<'a, T> {
pub(crate) fn new(
segment_and_ff_readers: &'a [(&'a SegmentReader, T)],
doc_id_mapping: &'a SegmentDocIdMapping,
) -> Self {
// We go through a complete first pass to compute the minimum and the
// maximum value and initialize our Column.
let mut num_vals = 0;
let min_value = 0;
let mut max_value = 0;
let mut multi_value_length_readers = Vec::with_capacity(segment_and_ff_readers.len());
for segment_and_ff_reader in segment_and_ff_readers {
let segment_reader = segment_and_ff_reader.0;
let multi_value_length_reader = &segment_and_ff_reader.1;
if !segment_reader.has_deletes() {
max_value += multi_value_length_reader.get_total_len();
} else {
for doc in segment_reader.doc_ids_alive() {
max_value += multi_value_length_reader.get_len(doc);
}
}
num_vals += segment_reader.num_docs() as u64;
multi_value_length_readers.push(multi_value_length_reader);
}
Self {
doc_id_mapping,
multi_value_length_readers,
min_value,
max_value,
num_vals,
}
}
}

impl<'a, T: MultiValueLength + Send + Sync> Column for RemappedDocIdMultiValueIndexColumn<'a, T> {
fn get_val(&self, _pos: u64) -> u64 {
unimplemented!()
}

fn iter(&self) -> Box<dyn Iterator<Item = u64> + '_> {
let mut offset = 0;
Box::new(
std::iter::once(0).chain(self.doc_id_mapping.iter_old_doc_addrs().map(
move |old_doc_addr| {
let ff_reader =
&self.multi_value_length_readers[old_doc_addr.segment_ord as usize];
offset += ff_reader.get_len(old_doc_addr.doc_id);
offset
},
)),
)
}
fn min_value(&self) -> u64 {
self.min_value
}

fn max_value(&self) -> u64 {
self.max_value
}

fn num_vals(&self) -> u64 {
self.num_vals
}
}