Skip to content

Commit

Permalink
Merge pull request #1587 from quickwit-oss/no_get_val_in_serialize
Browse files Browse the repository at this point in the history
remove get_val in serialization
  • Loading branch information
PSeitz authored Oct 4, 2022
2 parents 0f4a478 + 0f5cff7 commit 4cf911d
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 179 deletions.
2 changes: 1 addition & 1 deletion fastfield_codecs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ mod tests {

#[test]
fn estimation_test_bad_interpolation_case_monotonically_increasing() {
let mut data: Vec<u64> = (200..=20000_u64).collect();
let mut data: Vec<u64> = (201..=20000_u64).collect();
data.push(1_000_000);
let data: VecColumn = data.as_slice().into();

Expand Down
38 changes: 25 additions & 13 deletions fastfield_codecs/src/line.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,29 +68,37 @@ impl Line {
}

// Same as train, but the intercept is only estimated from provided sample positions
pub fn estimate(ys: &dyn Column, sample_positions: &[u64]) -> Self {
pub fn estimate(sample_positions_and_values: &[(u64, u64)]) -> Self {
let first_val = sample_positions_and_values[0].1;
let last_val = sample_positions_and_values[sample_positions_and_values.len() - 1].1;
let num_vals = sample_positions_and_values[sample_positions_and_values.len() - 1].0 + 1;
Self::train_from(
ys,
sample_positions
.iter()
.cloned()
.map(|pos| (pos, ys.get_val(pos))),
first_val,
last_val,
num_vals,
sample_positions_and_values.iter().cloned(),
)
}

// Intercept is only computed from provided positions
fn train_from(ys: &dyn Column, positions_and_values: impl Iterator<Item = (u64, u64)>) -> Self {
let num_vals = if let Some(num_vals) = NonZeroU64::new(ys.num_vals() - 1) {
num_vals
fn train_from(
first_val: u64,
last_val: u64,
num_vals: u64,
positions_and_values: impl Iterator<Item = (u64, u64)>,
) -> Self {
// TODO replace with let else
let idx_last_val = if let Some(idx_last_val) = NonZeroU64::new(num_vals - 1) {
idx_last_val
} else {
return Line::default();
};

let y0 = ys.get_val(0);
let y1 = ys.get_val(num_vals.get());
let y0 = first_val;
let y1 = last_val;

// We first independently pick our slope.
let slope = compute_slope(y0, y1, num_vals);
let slope = compute_slope(y0, y1, idx_last_val);

// We picked our slope. Note that it does not have to be perfect.
// Now we need to compute the best intercept.
Expand Down Expand Up @@ -138,8 +146,12 @@ impl Line {
/// This function is only invariable by translation if all of the
/// `ys` are packaged into half of the space. (See heuristic below)
pub fn train(ys: &dyn Column) -> Self {
let first_val = ys.iter().next().unwrap();
let last_val = ys.iter().nth(ys.num_vals() as usize - 1).unwrap();
Self::train_from(
ys,
first_val,
last_val,
ys.num_vals(),
ys.iter().enumerate().map(|(pos, val)| (pos as u64, val)),
)
}
Expand Down
21 changes: 12 additions & 9 deletions fastfield_codecs/src/linear.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,18 +126,20 @@ impl FastFieldCodec for LinearCodec {
return None; // disable compressor for this case
}

// let's sample at 0%, 5%, 10% .. 95%, 100%
let num_vals = column.num_vals() as f32 / 100.0;
let sample_positions = (0..20)
.map(|pos| (num_vals * pos as f32 * 5.0) as u64)
.collect::<Vec<_>>();
let limit_num_vals = column.num_vals().min(100_000);

let line = Line::estimate(column, &sample_positions);
let num_samples = 100;
let step_size = (limit_num_vals / num_samples).max(1); // 20 samples
let mut sample_positions_and_values: Vec<_> = Vec::new();
for (pos, val) in column.iter().enumerate().step_by(step_size as usize) {
sample_positions_and_values.push((pos as u64, val));
}

let line = Line::estimate(&sample_positions_and_values);

let estimated_bit_width = sample_positions
let estimated_bit_width = sample_positions_and_values
.into_iter()
.map(|pos| {
let actual_value = column.get_val(pos);
.map(|(pos, actual_value)| {
let interpolated_val = line.eval(pos as u64);
actual_value.wrapping_sub(interpolated_val)
})
Expand All @@ -146,6 +148,7 @@ impl FastFieldCodec for LinearCodec {
.max()
.unwrap_or(0);

// Extrapolate to whole column
let num_bits = (estimated_bit_width as u64 * column.num_vals() as u64) + 64;
let num_bits_uncompressed = 64 * column.num_vals();
Some(num_bits as f32 / num_bits_uncompressed as f32)
Expand Down
121 changes: 35 additions & 86 deletions src/fastfield/multivalued/writer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::io;
use std::sync::Mutex;

use fastfield_codecs::{Column, MonotonicallyMappableToU64, VecColumn};
use fnv::FnvHashMap;
Expand Down Expand Up @@ -204,112 +203,68 @@ impl MultiValuedFastFieldWriter {
pub(crate) struct MultivalueStartIndex<'a, C: Column> {
column: &'a C,
doc_id_map: &'a DocIdMapping,
min_max_opt: Mutex<Option<(u64, u64)>>,
random_seeker: Mutex<MultivalueStartIndexRandomSeeker<'a, C>>,
}

struct MultivalueStartIndexRandomSeeker<'a, C: Column> {
seek_head: MultivalueStartIndexIter<'a, C>,
seek_next_id: u64,
}
impl<'a, C: Column> MultivalueStartIndexRandomSeeker<'a, C> {
fn new(column: &'a C, doc_id_map: &'a DocIdMapping) -> Self {
Self {
seek_head: MultivalueStartIndexIter {
column,
doc_id_map,
new_doc_id: 0,
offset: 0u64,
},
seek_next_id: 0u64,
}
}
min: u64,
max: u64,
}

impl<'a, C: Column> MultivalueStartIndex<'a, C> {
pub fn new(column: &'a C, doc_id_map: &'a DocIdMapping) -> Self {
assert_eq!(column.num_vals(), doc_id_map.num_old_doc_ids() as u64 + 1);
let (min, max) =
tantivy_bitpacker::minmax(iter_remapped_multivalue_index(doc_id_map, column))
.unwrap_or((0u64, 0u64));
MultivalueStartIndex {
column,
doc_id_map,
min_max_opt: Mutex::default(),
random_seeker: Mutex::new(MultivalueStartIndexRandomSeeker::new(column, doc_id_map)),
}
}

fn minmax(&self) -> (u64, u64) {
if let Some((min, max)) = *self.min_max_opt.lock().unwrap() {
return (min, max);
min,
max,
}
let (min, max) = tantivy_bitpacker::minmax(self.iter()).unwrap_or((0u64, 0u64));
*self.min_max_opt.lock().unwrap() = Some((min, max));
(min, max)
}
}
impl<'a, C: Column> Column for MultivalueStartIndex<'a, C> {
fn get_val(&self, idx: u64) -> u64 {
let mut random_seeker_lock = self.random_seeker.lock().unwrap();
if random_seeker_lock.seek_next_id > idx {
*random_seeker_lock =
MultivalueStartIndexRandomSeeker::new(self.column, self.doc_id_map);
}
let to_skip = idx - random_seeker_lock.seek_next_id;
random_seeker_lock.seek_next_id = idx + 1;
random_seeker_lock.seek_head.nth(to_skip as usize).unwrap()
fn get_val(&self, _idx: u64) -> u64 {
unimplemented!()
}

fn min_value(&self) -> u64 {
self.minmax().0
self.min
}

fn max_value(&self) -> u64 {
self.minmax().1
self.max
}

fn num_vals(&self) -> u64 {
(self.doc_id_map.num_new_doc_ids() + 1) as u64
}

fn iter<'b>(&'b self) -> Box<dyn Iterator<Item = u64> + 'b> {
Box::new(MultivalueStartIndexIter::new(self.column, self.doc_id_map))
}
}

struct MultivalueStartIndexIter<'a, C: Column> {
pub column: &'a C,
pub doc_id_map: &'a DocIdMapping,
pub new_doc_id: usize,
pub offset: u64,
}

impl<'a, C: Column> MultivalueStartIndexIter<'a, C> {
fn new(column: &'a C, doc_id_map: &'a DocIdMapping) -> Self {
Self {
column,
doc_id_map,
new_doc_id: 0,
offset: 0,
}
fn iter(&self) -> Box<dyn Iterator<Item = u64> + '_> {
Box::new(iter_remapped_multivalue_index(
self.doc_id_map,
&self.column,
))
}
}

impl<'a, C: Column> Iterator for MultivalueStartIndexIter<'a, C> {
type Item = u64;

fn next(&mut self) -> Option<Self::Item> {
if self.new_doc_id > self.doc_id_map.num_new_doc_ids() {
return None;
}
let new_doc_id = self.new_doc_id;
self.new_doc_id += 1;
let start_offset = self.offset;
if new_doc_id < self.doc_id_map.num_new_doc_ids() {
let old_doc = self.doc_id_map.get_old_doc_id(new_doc_id as u32) as u64;
let num_vals_for_doc = self.column.get_val(old_doc + 1) - self.column.get_val(old_doc);
self.offset += num_vals_for_doc;
}
Some(start_offset)
}
fn iter_remapped_multivalue_index<'a, C: Column>(
doc_id_map: &'a DocIdMapping,
column: &'a C,
) -> impl Iterator<Item = u64> + 'a {
let mut offset = 0;
doc_id_map
.iter_old_doc_ids()
.chain(std::iter::once(u32::MAX))
.map(move |old_doc| {
if old_doc == u32::MAX {
// sentinel value for last offset
return offset;
}
let num_vals_for_doc =
column.get_val(old_doc as u64 + 1) - column.get_val(old_doc as u64);
let start_offset = offset;
offset += num_vals_for_doc;
start_offset
})
}

#[cfg(test)]
Expand Down Expand Up @@ -344,11 +299,5 @@ mod tests {
vec![0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55]
);
assert_eq!(multivalue_start_index.num_vals(), 11);
assert_eq!(multivalue_start_index.get_val(3), 2);
assert_eq!(multivalue_start_index.get_val(5), 5);
assert_eq!(multivalue_start_index.get_val(8), 21);
assert_eq!(multivalue_start_index.get_val(4), 3);
assert_eq!(multivalue_start_index.get_val(0), 0);
assert_eq!(multivalue_start_index.get_val(10), 55);
}
}
11 changes: 2 additions & 9 deletions src/fastfield/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,15 +391,8 @@ impl<'map, 'bitp> Column for WriterFastFieldAccessProvider<'map, 'bitp> {
/// # Panics
///
/// May panic if `doc` is greater than the index.
fn get_val(&self, doc: u64) -> u64 {
if let Some(doc_id_map) = self.doc_id_map {
self.vals
.get(doc_id_map.get_old_doc_id(doc as u32) as usize) // consider extra
// FastFieldReader wrapper for
// non doc_id_map
} else {
self.vals.get(doc as usize)
}
fn get_val(&self, _doc: u64) -> u64 {
unimplemented!()
}

fn iter(&self) -> Box<dyn Iterator<Item = u64> + '_> {
Expand Down
4 changes: 0 additions & 4 deletions src/indexer/doc_id_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@ impl SegmentDocIdMapping {
self.new_doc_id_to_old_doc_addr.len()
}

pub(crate) fn get_old_doc_addr(&self, new_doc_id: DocId) -> DocAddress {
self.new_doc_id_to_old_doc_addr[new_doc_id as usize]
}

/// This flags means the segments are simply stacked in the order of their ordinal.
/// e.g. [(0, 1), .. (n, 1), (0, 2)..., (m, 2)]
///
Expand Down
28 changes: 11 additions & 17 deletions src/indexer/merger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use crate::fastfield::{
};
use crate::fieldnorm::{FieldNormReader, FieldNormReaders, FieldNormsSerializer, FieldNormsWriter};
use crate::indexer::doc_id_mapping::{expect_field_id_for_sort_field, SegmentDocIdMapping};
use crate::indexer::sorted_doc_id_column::SortedDocIdColumn;
use crate::indexer::sorted_doc_id_multivalue_column::SortedDocIdMultiValueColumn;
use crate::indexer::sorted_doc_id_column::RemappedDocIdColumn;
use crate::indexer::sorted_doc_id_multivalue_column::RemappedDocIdMultiValueColumn;
use crate::indexer::SegmentSerializer;
use crate::postings::{InvertedIndexSerializer, Postings, SegmentPostings};
use crate::schema::{Cardinality, Field, FieldType, Schema};
Expand Down Expand Up @@ -310,7 +310,7 @@ impl IndexMerger {
fast_field_serializer: &mut CompositeFastFieldSerializer,
doc_id_mapping: &SegmentDocIdMapping,
) -> crate::Result<()> {
let fast_field_accessor = SortedDocIdColumn::new(&self.readers, doc_id_mapping, field);
let fast_field_accessor = RemappedDocIdColumn::new(&self.readers, doc_id_mapping, field);
fast_field_serializer.create_auto_detect_u64_fast_field(field, fast_field_accessor)?;

Ok(())
Expand Down Expand Up @@ -428,14 +428,8 @@ impl IndexMerger {
fast_field_serializer: &mut CompositeFastFieldSerializer,
doc_id_mapping: &SegmentDocIdMapping,
reader_and_field_accessors: &[(&SegmentReader, T)],
) -> crate::Result<Vec<u64>> {
// We can now create our `idx` serializer, and in a second pass,
// can effectively push the different indexes.

// copying into a temp vec is not ideal, but the fast field codec api requires random
// access, which is used in the estimation. It's possible to 1. calculate random
// access on the fly or 2. change the codec api to make random access optional, but
// they both have also major drawbacks.
) -> crate::Result<()> {
// TODO Use `Column` implementation instead

let mut offsets = Vec::with_capacity(doc_id_mapping.len());
let mut offset = 0;
Expand All @@ -449,15 +443,15 @@ impl IndexMerger {
let fastfield_accessor = VecColumn::from(&offsets[..]);

fast_field_serializer.create_auto_detect_u64_fast_field(field, fastfield_accessor)?;
Ok(offsets)
Ok(())
}
/// Returns the fastfield index (index for the data, not the data).
fn write_multi_value_fast_field_idx(
&self,
field: Field,
fast_field_serializer: &mut CompositeFastFieldSerializer,
doc_id_mapping: &SegmentDocIdMapping,
) -> crate::Result<Vec<u64>> {
) -> crate::Result<()> {
let reader_ordinal_and_field_accessors = self
.readers
.iter()
Expand Down Expand Up @@ -561,16 +555,16 @@ impl IndexMerger {
fast_field_serializer: &mut CompositeFastFieldSerializer,
doc_id_mapping: &SegmentDocIdMapping,
) -> crate::Result<()> {
// Multifastfield consists in 2 fastfields.
// Multifastfield consists of 2 fastfields.
// The first serves as an index into the second one and is strictly increasing.
// The second contains the actual values.

// First we merge the idx fast field.
let offsets =
self.write_multi_value_fast_field_idx(field, fast_field_serializer, doc_id_mapping)?;

self.write_multi_value_fast_field_idx(field, fast_field_serializer, doc_id_mapping)?;

let fastfield_accessor =
SortedDocIdMultiValueColumn::new(&self.readers, doc_id_mapping, &offsets, field);
RemappedDocIdMultiValueColumn::new(&self.readers, doc_id_mapping, field);
fast_field_serializer.create_auto_detect_u64_fast_field_with_idx(
field,
fastfield_accessor,
Expand Down
Loading

0 comments on commit 4cf911d

Please sign in to comment.