Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove get_val in serialization #1587

Merged
merged 2 commits into from
Oct 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion fastfield_codecs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ mod tests {

#[test]
fn estimation_test_bad_interpolation_case_monotonically_increasing() {
let mut data: Vec<u64> = (200..=20000_u64).collect();
let mut data: Vec<u64> = (201..=20000_u64).collect();
data.push(1_000_000);
let data: VecColumn = data.as_slice().into();

Expand Down
38 changes: 25 additions & 13 deletions fastfield_codecs/src/line.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,29 +68,37 @@ impl Line {
}

// Same as train, but the intercept is only estimated from provided sample positions
pub fn estimate(ys: &dyn Column, sample_positions: &[u64]) -> Self {
pub fn estimate(sample_positions_and_values: &[(u64, u64)]) -> Self {
let first_val = sample_positions_and_values[0].1;
let last_val = sample_positions_and_values[sample_positions_and_values.len() - 1].1;
let num_vals = sample_positions_and_values[sample_positions_and_values.len() - 1].0 + 1;
Self::train_from(
ys,
sample_positions
.iter()
.cloned()
.map(|pos| (pos, ys.get_val(pos))),
first_val,
last_val,
num_vals,
sample_positions_and_values.iter().cloned(),
)
}

// Intercept is only computed from provided positions
fn train_from(ys: &dyn Column, positions_and_values: impl Iterator<Item = (u64, u64)>) -> Self {
let num_vals = if let Some(num_vals) = NonZeroU64::new(ys.num_vals() - 1) {
num_vals
fn train_from(
first_val: u64,
last_val: u64,
num_vals: u64,
positions_and_values: impl Iterator<Item = (u64, u64)>,
) -> Self {
// TODO replace with let else
let idx_last_val = if let Some(idx_last_val) = NonZeroU64::new(num_vals - 1) {
idx_last_val
} else {
return Line::default();
};

let y0 = ys.get_val(0);
let y1 = ys.get_val(num_vals.get());
let y0 = first_val;
let y1 = last_val;

// We first independently pick our slope.
let slope = compute_slope(y0, y1, num_vals);
let slope = compute_slope(y0, y1, idx_last_val);

// We picked our slope. Note that it does not have to be perfect.
// Now we need to compute the best intercept.
Expand Down Expand Up @@ -138,8 +146,12 @@ impl Line {
/// This function is only invariable by translation if all of the
/// `ys` are packaged into half of the space. (See heuristic below)
pub fn train(ys: &dyn Column) -> Self {
let first_val = ys.iter().next().unwrap();
let last_val = ys.iter().nth(ys.num_vals() as usize - 1).unwrap();
Self::train_from(
ys,
first_val,
last_val,
ys.num_vals(),
ys.iter().enumerate().map(|(pos, val)| (pos as u64, val)),
)
}
Expand Down
21 changes: 12 additions & 9 deletions fastfield_codecs/src/linear.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,18 +126,20 @@ impl FastFieldCodec for LinearCodec {
return None; // disable compressor for this case
}

// let's sample at 0%, 5%, 10% .. 95%, 100%
let num_vals = column.num_vals() as f32 / 100.0;
let sample_positions = (0..20)
.map(|pos| (num_vals * pos as f32 * 5.0) as u64)
.collect::<Vec<_>>();
let limit_num_vals = column.num_vals().min(100_000);

let line = Line::estimate(column, &sample_positions);
let num_samples = 100;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the step_by solution.

Can you change the column.num_vals() limit above to 100 too, to make proofreading even easier.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand, 100 instead of 100_000?

let step_size = (limit_num_vals / num_samples).max(1); // 20 samples
let mut sample_positions_and_values: Vec<_> = Vec::new();
for (pos, val) in column.iter().enumerate().step_by(step_size as usize) {
sample_positions_and_values.push((pos as u64, val));
}

let line = Line::estimate(&sample_positions_and_values);

let estimated_bit_width = sample_positions
let estimated_bit_width = sample_positions_and_values
.into_iter()
.map(|pos| {
let actual_value = column.get_val(pos);
.map(|(pos, actual_value)| {
let interpolated_val = line.eval(pos as u64);
actual_value.wrapping_sub(interpolated_val)
})
Expand All @@ -146,6 +148,7 @@ impl FastFieldCodec for LinearCodec {
.max()
.unwrap_or(0);

// Extrapolate to whole column
let num_bits = (estimated_bit_width as u64 * column.num_vals() as u64) + 64;
let num_bits_uncompressed = 64 * column.num_vals();
Some(num_bits as f32 / num_bits_uncompressed as f32)
Expand Down
121 changes: 35 additions & 86 deletions src/fastfield/multivalued/writer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::io;
use std::sync::Mutex;

use fastfield_codecs::{Column, MonotonicallyMappableToU64, VecColumn};
use fnv::FnvHashMap;
Expand Down Expand Up @@ -204,112 +203,68 @@ impl MultiValuedFastFieldWriter {
pub(crate) struct MultivalueStartIndex<'a, C: Column> {
column: &'a C,
doc_id_map: &'a DocIdMapping,
min_max_opt: Mutex<Option<(u64, u64)>>,
random_seeker: Mutex<MultivalueStartIndexRandomSeeker<'a, C>>,
}

struct MultivalueStartIndexRandomSeeker<'a, C: Column> {
seek_head: MultivalueStartIndexIter<'a, C>,
seek_next_id: u64,
}
impl<'a, C: Column> MultivalueStartIndexRandomSeeker<'a, C> {
fn new(column: &'a C, doc_id_map: &'a DocIdMapping) -> Self {
Self {
seek_head: MultivalueStartIndexIter {
column,
doc_id_map,
new_doc_id: 0,
offset: 0u64,
},
seek_next_id: 0u64,
}
}
min: u64,
max: u64,
}

impl<'a, C: Column> MultivalueStartIndex<'a, C> {
pub fn new(column: &'a C, doc_id_map: &'a DocIdMapping) -> Self {
assert_eq!(column.num_vals(), doc_id_map.num_old_doc_ids() as u64 + 1);
let (min, max) =
tantivy_bitpacker::minmax(iter_remapped_multivalue_index(doc_id_map, column))
.unwrap_or((0u64, 0u64));
MultivalueStartIndex {
column,
doc_id_map,
min_max_opt: Mutex::default(),
random_seeker: Mutex::new(MultivalueStartIndexRandomSeeker::new(column, doc_id_map)),
}
}

fn minmax(&self) -> (u64, u64) {
if let Some((min, max)) = *self.min_max_opt.lock().unwrap() {
return (min, max);
min,
max,
}
let (min, max) = tantivy_bitpacker::minmax(self.iter()).unwrap_or((0u64, 0u64));
*self.min_max_opt.lock().unwrap() = Some((min, max));
(min, max)
}
}
impl<'a, C: Column> Column for MultivalueStartIndex<'a, C> {
fn get_val(&self, idx: u64) -> u64 {
let mut random_seeker_lock = self.random_seeker.lock().unwrap();
if random_seeker_lock.seek_next_id > idx {
*random_seeker_lock =
MultivalueStartIndexRandomSeeker::new(self.column, self.doc_id_map);
}
let to_skip = idx - random_seeker_lock.seek_next_id;
random_seeker_lock.seek_next_id = idx + 1;
random_seeker_lock.seek_head.nth(to_skip as usize).unwrap()
fn get_val(&self, _idx: u64) -> u64 {
unimplemented!()
}

fn min_value(&self) -> u64 {
self.minmax().0
self.min
}

fn max_value(&self) -> u64 {
self.minmax().1
self.max
}

fn num_vals(&self) -> u64 {
(self.doc_id_map.num_new_doc_ids() + 1) as u64
}

fn iter<'b>(&'b self) -> Box<dyn Iterator<Item = u64> + 'b> {
Box::new(MultivalueStartIndexIter::new(self.column, self.doc_id_map))
}
}

struct MultivalueStartIndexIter<'a, C: Column> {
pub column: &'a C,
pub doc_id_map: &'a DocIdMapping,
pub new_doc_id: usize,
pub offset: u64,
}

impl<'a, C: Column> MultivalueStartIndexIter<'a, C> {
fn new(column: &'a C, doc_id_map: &'a DocIdMapping) -> Self {
Self {
column,
doc_id_map,
new_doc_id: 0,
offset: 0,
}
fn iter(&self) -> Box<dyn Iterator<Item = u64> + '_> {
Box::new(iter_remapped_multivalue_index(
self.doc_id_map,
&self.column,
))
}
}

impl<'a, C: Column> Iterator for MultivalueStartIndexIter<'a, C> {
type Item = u64;

fn next(&mut self) -> Option<Self::Item> {
if self.new_doc_id > self.doc_id_map.num_new_doc_ids() {
return None;
}
let new_doc_id = self.new_doc_id;
self.new_doc_id += 1;
let start_offset = self.offset;
if new_doc_id < self.doc_id_map.num_new_doc_ids() {
let old_doc = self.doc_id_map.get_old_doc_id(new_doc_id as u32) as u64;
let num_vals_for_doc = self.column.get_val(old_doc + 1) - self.column.get_val(old_doc);
self.offset += num_vals_for_doc;
}
Some(start_offset)
}
fn iter_remapped_multivalue_index<'a, C: Column>(
doc_id_map: &'a DocIdMapping,
column: &'a C,
) -> impl Iterator<Item = u64> + 'a {
let mut offset = 0;
doc_id_map
.iter_old_doc_ids()
.chain(std::iter::once(u32::MAX))
.map(move |old_doc| {
if old_doc == u32::MAX {
// sentinel value for last offset
return offset;
}
Comment on lines +258 to +261
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't doing the chain below do the same thing?

 doc_id_map
        .iter_old_doc_ids()
        .map(move |old_doc| { ... })
        .chain(iter::once(offset))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would, but offset is already owned by the closure

let num_vals_for_doc =
column.get_val(old_doc as u64 + 1) - column.get_val(old_doc as u64);
let start_offset = offset;
offset += num_vals_for_doc;
start_offset
})
}

#[cfg(test)]
Expand Down Expand Up @@ -344,11 +299,5 @@ mod tests {
vec![0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55]
);
assert_eq!(multivalue_start_index.num_vals(), 11);
assert_eq!(multivalue_start_index.get_val(3), 2);
assert_eq!(multivalue_start_index.get_val(5), 5);
assert_eq!(multivalue_start_index.get_val(8), 21);
assert_eq!(multivalue_start_index.get_val(4), 3);
assert_eq!(multivalue_start_index.get_val(0), 0);
assert_eq!(multivalue_start_index.get_val(10), 55);
}
}
11 changes: 2 additions & 9 deletions src/fastfield/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,15 +391,8 @@ impl<'map, 'bitp> Column for WriterFastFieldAccessProvider<'map, 'bitp> {
/// # Panics
///
/// May panic if `doc` is greater than the index.
fn get_val(&self, doc: u64) -> u64 {
if let Some(doc_id_map) = self.doc_id_map {
self.vals
.get(doc_id_map.get_old_doc_id(doc as u32) as usize) // consider extra
// FastFieldReader wrapper for
// non doc_id_map
} else {
self.vals.get(doc as usize)
}
fn get_val(&self, _doc: u64) -> u64 {
unimplemented!()
}

fn iter(&self) -> Box<dyn Iterator<Item = u64> + '_> {
Expand Down
4 changes: 0 additions & 4 deletions src/indexer/doc_id_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@ impl SegmentDocIdMapping {
self.new_doc_id_to_old_doc_addr.len()
}

pub(crate) fn get_old_doc_addr(&self, new_doc_id: DocId) -> DocAddress {
self.new_doc_id_to_old_doc_addr[new_doc_id as usize]
}

/// This flags means the segments are simply stacked in the order of their ordinal.
/// e.g. [(0, 1), .. (n, 1), (0, 2)..., (m, 2)]
///
Expand Down
28 changes: 11 additions & 17 deletions src/indexer/merger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use crate::fastfield::{
};
use crate::fieldnorm::{FieldNormReader, FieldNormReaders, FieldNormsSerializer, FieldNormsWriter};
use crate::indexer::doc_id_mapping::{expect_field_id_for_sort_field, SegmentDocIdMapping};
use crate::indexer::sorted_doc_id_column::SortedDocIdColumn;
use crate::indexer::sorted_doc_id_multivalue_column::SortedDocIdMultiValueColumn;
use crate::indexer::sorted_doc_id_column::RemappedDocIdColumn;
use crate::indexer::sorted_doc_id_multivalue_column::RemappedDocIdMultiValueColumn;
use crate::indexer::SegmentSerializer;
use crate::postings::{InvertedIndexSerializer, Postings, SegmentPostings};
use crate::schema::{Cardinality, Field, FieldType, Schema};
Expand Down Expand Up @@ -310,7 +310,7 @@ impl IndexMerger {
fast_field_serializer: &mut CompositeFastFieldSerializer,
doc_id_mapping: &SegmentDocIdMapping,
) -> crate::Result<()> {
let fast_field_accessor = SortedDocIdColumn::new(&self.readers, doc_id_mapping, field);
let fast_field_accessor = RemappedDocIdColumn::new(&self.readers, doc_id_mapping, field);
fast_field_serializer.create_auto_detect_u64_fast_field(field, fast_field_accessor)?;

Ok(())
Expand Down Expand Up @@ -428,14 +428,8 @@ impl IndexMerger {
fast_field_serializer: &mut CompositeFastFieldSerializer,
doc_id_mapping: &SegmentDocIdMapping,
reader_and_field_accessors: &[(&SegmentReader, T)],
) -> crate::Result<Vec<u64>> {
// We can now create our `idx` serializer, and in a second pass,
// can effectively push the different indexes.

// copying into a temp vec is not ideal, but the fast field codec api requires random
// access, which is used in the estimation. It's possible to 1. calculate random
// access on the fly or 2. change the codec api to make random access optional, but
// they both have also major drawbacks.
) -> crate::Result<()> {
// TODO Use `Column` implementation instead

let mut offsets = Vec::with_capacity(doc_id_mapping.len());
let mut offset = 0;
Expand All @@ -449,15 +443,15 @@ impl IndexMerger {
let fastfield_accessor = VecColumn::from(&offsets[..]);

fast_field_serializer.create_auto_detect_u64_fast_field(field, fastfield_accessor)?;
Ok(offsets)
Ok(())
}
/// Returns the fastfield index (index for the data, not the data).
fn write_multi_value_fast_field_idx(
&self,
field: Field,
fast_field_serializer: &mut CompositeFastFieldSerializer,
doc_id_mapping: &SegmentDocIdMapping,
) -> crate::Result<Vec<u64>> {
) -> crate::Result<()> {
let reader_ordinal_and_field_accessors = self
.readers
.iter()
Expand Down Expand Up @@ -561,16 +555,16 @@ impl IndexMerger {
fast_field_serializer: &mut CompositeFastFieldSerializer,
doc_id_mapping: &SegmentDocIdMapping,
) -> crate::Result<()> {
// Multifastfield consists in 2 fastfields.
// Multifastfield consists of 2 fastfields.
// The first serves as an index into the second one and is strictly increasing.
// The second contains the actual values.

// First we merge the idx fast field.
let offsets =
self.write_multi_value_fast_field_idx(field, fast_field_serializer, doc_id_mapping)?;

self.write_multi_value_fast_field_idx(field, fast_field_serializer, doc_id_mapping)?;

let fastfield_accessor =
SortedDocIdMultiValueColumn::new(&self.readers, doc_id_mapping, &offsets, field);
RemappedDocIdMultiValueColumn::new(&self.readers, doc_id_mapping, field);
fast_field_serializer.create_auto_detect_u64_fast_field_with_idx(
field,
fastfield_accessor,
Expand Down
Loading