diff --git a/columnar/src/tests.rs b/columnar/src/tests.rs index 2d45080b91..5e5c50f556 100644 --- a/columnar/src/tests.rs +++ b/columnar/src/tests.rs @@ -26,7 +26,7 @@ fn test_dataframe_writer_str() { assert_eq!(columnar.num_columns(), 1); let cols: Vec = columnar.read_columns("my_string").unwrap(); assert_eq!(cols.len(), 1); - assert_eq!(cols[0].num_bytes(), 87); + assert_eq!(cols[0].num_bytes(), 73); } #[test] @@ -40,7 +40,7 @@ fn test_dataframe_writer_bytes() { assert_eq!(columnar.num_columns(), 1); let cols: Vec = columnar.read_columns("my_string").unwrap(); assert_eq!(cols.len(), 1); - assert_eq!(cols[0].num_bytes(), 87); + assert_eq!(cols[0].num_bytes(), 73); } #[test] diff --git a/src/fastfield/mod.rs b/src/fastfield/mod.rs index 65e4c03bf6..44b01ad78a 100644 --- a/src/fastfield/mod.rs +++ b/src/fastfield/mod.rs @@ -131,7 +131,7 @@ mod tests { } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 93); + assert_eq!(file.len(), 80); let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap(); let column = fast_field_readers .u64("field") @@ -181,7 +181,7 @@ mod tests { write.terminate().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 121); + assert_eq!(file.len(), 108); let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap(); let col = fast_field_readers .u64("field") @@ -214,7 +214,7 @@ mod tests { write.terminate().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 94); + assert_eq!(file.len(), 81); let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap(); let fast_field_reader = fast_field_readers .u64("field") @@ -246,7 +246,7 @@ mod tests { write.terminate().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 4489); + assert_eq!(file.len(), 4476); { let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap(); let col = fast_field_readers @@ -279,7 +279,7 @@ mod tests { write.terminate().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 265); + assert_eq!(file.len(), 252); { let fast_field_readers = FastFieldReaders::open(file, schema).unwrap(); @@ -773,7 +773,7 @@ mod tests { write.terminate().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 102); + assert_eq!(file.len(), 84); let fast_field_readers = FastFieldReaders::open(file, schema).unwrap(); let bool_col = fast_field_readers.bool("field_bool").unwrap(); assert_eq!(bool_col.first(0), Some(true)); @@ -805,7 +805,7 @@ mod tests { write.terminate().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 114); + assert_eq!(file.len(), 96); let readers = FastFieldReaders::open(file, schema).unwrap(); let bool_col = readers.bool("field_bool").unwrap(); for i in 0..25 { @@ -830,7 +830,7 @@ mod tests { write.terminate().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 104); + assert_eq!(file.len(), 86); let fastfield_readers = FastFieldReaders::open(file, schema).unwrap(); let col = fastfield_readers.bool("field_bool").unwrap(); assert_eq!(col.first(0), None); diff --git a/sstable/Cargo.toml b/sstable/Cargo.toml index 643d6b9766..7cce076961 100644 --- a/sstable/Cargo.toml +++ b/sstable/Cargo.toml @@ -11,6 +11,7 @@ description = "sstables for tantivy" [dependencies] common = {version= "0.6", path="../common", package="tantivy-common"} +tantivy-bitpacker = { version= "0.5", path="../bitpacker" } tantivy-fst = "0.5" # experimental gives us access to Decompressor::upper_bound zstd = { version = "0.13", features = ["experimental"] } diff --git a/sstable/README.md b/sstable/README.md index bec6d70f92..e5e3f5a492 100644 --- a/sstable/README.md +++ b/sstable/README.md @@ -89,33 +89,71 @@ Note: as the SSTable does not support redundant keys, there is no ambiguity betw ### SSTFooter ``` -+-------+-------+-----+-------------+---------+---------+ -| Block | Block | ... | IndexOffset | NumTerm | Version | -+-------+-------+-----+-------------+---------+---------+ -|----( # of blocks)---| ++-----+----------------+-------------+-------------+---------+---------+ +| Fst | BlockAddrStore | StoreOffset | IndexOffset | NumTerm | Version | ++-----+----------------+-------------+-------------+---------+---------+ ``` -- Block(SSTBlock): uses IndexValue for its Values format +- Fst(Fst): finite state transducer mapping keys to a block number +- BlockAddrStore(BlockAddrStore): store mapping a block number to its BlockAddr +- StoreOffset(u64): Offset to start of the BlockAddrStore. If zero, see the SingleBlockSStable section - IndexOffset(u64): Offset to the start of the SSTFooter - NumTerm(u64): number of terms in the sstable -- Version(u32): Currently equal to 2 +- Version(u32): Currently equal to 3 -### IndexValue -``` -+------------+----------+-------+-------+-----+ -| EntryCount | StartPos | Entry | Entry | ... | -+------------+----------+-------+-------+-----+ - |---( # of entries)---| -``` +### Fst -- EntryCount(VInt): number of entries -- StartPos(VInt): the start pos of the first (data) block referenced by this (index) block -- Entry (IndexEntry) +Fst is in the format of tantivy\_fst -### Entry -``` -+----------+--------------+ -| BlockLen | FirstOrdinal | -+----------+--------------+ -``` -- BlockLen(VInt): length of the block -- FirstOrdinal(VInt): ordinal of the first element in the given block +### BlockAddrStore + ++---------+-----------+-----------+-----+-----------+-----------+-----+ +| MetaLen | BlockMeta | BlockMeta | ... | BlockData | BlockData | ... | ++---------+-----------+-----------+-----+-----------+-----------+-----+ + |---------(N blocks)----------|---------(N blocks)----------| + +- MetaLen(u64): length of the BlockMeta section +- BlockMeta(BlockAddrBlockMetadata): metadata to seek through BlockData +- BlockData(CompactedBlockAddr): bitpacked per block metadata + +### BlockAddrBlockMetadata + ++--------+------------+--------------+------------+--------------+-------------------+-----------------+----------+ +| Offset | RangeStart | FirstOrdinal | RangeSlope | OrdinalSlope | FirstOrdinalNBits | RangeStartNBits | BlockLen | ++--------+------------+--------------+------------+--------------+-------------------+-----------------+----------+ + +- Offset(u64): offset of the corresponding BlockData in the datastream +- RangeStart(u64): the start position of the first block +- FirstOrdinal(u64): the first ordinal of the first block +- RangeSlope(u32): slope predicted for start range evolution (see computation in BlockData) +- OrdinalSlope(u64): slope predicted for first ordinal evolution (see computation in BlockData) +- FirstOrdinalNBits(u8): number of bits per ordinal in datastream (see computation in BlockData) +- RangeStartNBits(u8): number of bits per range start in datastream (see computation in BlockData) + +### BlockData + ++-----------------+-------------------+---------------+ +| RangeStartDelta | FirstOrdinalDelta | FinalRangeEnd | ++-----------------+-------------------+---------------+ +|------(BlockLen repetitions)---------| + +- RangeStartDelta(var): RangeStartNBits *bits* of little endian number. See below for decoding +- FirstOrdinalDelta(var): FirstOrdinalNBits *bits* of little endian number. See below for decoding +- FinalRangeEnd(var): RangeStartNBits *bits* of integer. See below for decoding + +converting a BlockData of index Index and a BlockAddrBlockMetadata to an actual block address is done as follow: +range\_prediction := RangeStart + Index * RangeSlop; +range\_derivation := RangeStartDelta - (1 << (RangeStartNBits-1)); +range\_start := range\_prediction + range\_derivation + +The same computation can be done for ordinal. + +Note that `range_derivation` can take negative value. `RangeStartDelta` is just its translation to a positive range. + + +## SingleBlockSStable + +The format used for the index is meant to be compact, however it has a constant cost of around 70 +bytes, which isn't negligible for a table containing very few keys. +To limit the impact of that constant cost, single block sstable omit the Fst and BlockAddrStore from +their index. Instead a block with first ordinal of 0, range start of 0 and range end of IndexOffset +is implicitly used for every operations. diff --git a/sstable/benches/ord_to_term.rs b/sstable/benches/ord_to_term.rs index 04c8835fb6..f29b719b34 100644 --- a/sstable/benches/ord_to_term.rs +++ b/sstable/benches/ord_to_term.rs @@ -40,6 +40,31 @@ pub fn criterion_benchmark(c: &mut Criterion) { assert!(dict.ord_to_term(19_000_000, &mut res).unwrap()); }) }); + c.bench_function("term_ord_suffix", |b| { + b.iter(|| { + assert_eq!( + dict.term_ord(b"prefix.00186A0.suffix").unwrap().unwrap(), + 100_000 + ); + assert_eq!( + dict.term_ord(b"prefix.121EAC0.suffix").unwrap().unwrap(), + 19_000_000 + ); + }) + }); + c.bench_function("open_and_term_ord_suffix", |b| { + b.iter(|| { + let dict = Dictionary::::open(slice.clone()).unwrap(); + assert_eq!( + dict.term_ord(b"prefix.00186A0.suffix").unwrap().unwrap(), + 100_000 + ); + assert_eq!( + dict.term_ord(b"prefix.121EAC0.suffix").unwrap().unwrap(), + 19_000_000 + ); + }) + }); } { let slice = make_test_sstable(""); @@ -59,6 +84,25 @@ pub fn criterion_benchmark(c: &mut Criterion) { assert!(dict.ord_to_term(19_000_000, &mut res).unwrap()); }) }); + c.bench_function("term_ord", |b| { + b.iter(|| { + assert_eq!(dict.term_ord(b"prefix.00186A0").unwrap().unwrap(), 100_000); + assert_eq!( + dict.term_ord(b"prefix.121EAC0").unwrap().unwrap(), + 19_000_000 + ); + }) + }); + c.bench_function("open_and_term_ord", |b| { + b.iter(|| { + let dict = Dictionary::::open(slice.clone()).unwrap(); + assert_eq!(dict.term_ord(b"prefix.00186A0").unwrap().unwrap(), 100_000); + assert_eq!( + dict.term_ord(b"prefix.121EAC0").unwrap().unwrap(), + 19_000_000 + ); + }) + }); } } diff --git a/sstable/src/dictionary.rs b/sstable/src/dictionary.rs index 0eb5822d9e..b4821fe24a 100644 --- a/sstable/src/dictionary.rs +++ b/sstable/src/dictionary.rs @@ -9,8 +9,11 @@ use common::{BinarySerializable, OwnedBytes}; use tantivy_fst::automaton::AlwaysMatch; use tantivy_fst::Automaton; +use crate::sstable_index_v3::SSTableIndexV3Empty; use crate::streamer::{Streamer, StreamerBuilder}; -use crate::{BlockAddr, DeltaReader, Reader, SSTable, SSTableIndex, TermOrdinal, VoidSSTable}; +use crate::{ + BlockAddr, DeltaReader, Reader, SSTable, SSTableIndex, SSTableIndexV3, TermOrdinal, VoidSSTable, +}; /// An SSTable is a sorted map that associates sorted `&[u8]` keys /// to any kind of typed values. @@ -180,24 +183,41 @@ impl Dictionary { pub fn open(term_dictionary_file: FileSlice) -> io::Result { let (main_slice, footer_len_slice) = term_dictionary_file.split_from_end(20); let mut footer_len_bytes: OwnedBytes = footer_len_slice.read_bytes()?; - let index_offset = u64::deserialize(&mut footer_len_bytes)?; let num_terms = u64::deserialize(&mut footer_len_bytes)?; let version = u32::deserialize(&mut footer_len_bytes)?; - if version != crate::SSTABLE_VERSION { - return Err(io::Error::new( - io::ErrorKind::Other, - format!( - "Unsuported sstable version, expected {version}, found {}", - crate::SSTABLE_VERSION, - ), - )); - } - let (sstable_slice, index_slice) = main_slice.split(index_offset as usize); let sstable_index_bytes = index_slice.read_bytes()?; - let sstable_index = SSTableIndex::load(sstable_index_bytes) - .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption"))?; + + let sstable_index = match version { + 2 => SSTableIndex::V2( + crate::sstable_index_v2::SSTableIndex::load(sstable_index_bytes).map_err(|_| { + io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption") + })?, + ), + 3 => { + let (sstable_index_bytes, mut footerv3_len_bytes) = sstable_index_bytes.rsplit(8); + let store_offset = u64::deserialize(&mut footerv3_len_bytes)?; + if store_offset != 0 { + SSTableIndex::V3( + SSTableIndexV3::load(sstable_index_bytes, store_offset).map_err(|_| { + io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption") + })?, + ) + } else { + // if store_offset is zero, there is no index, so we build a pseudo-index + // assuming a single block of sstable covering everything. + SSTableIndex::V3Empty(SSTableIndexV3Empty::load(index_offset as usize)) + } + } + _ => { + return Err(io::Error::new( + io::ErrorKind::Other, + format!("Unsuported sstable version, expected one of [2, 3], found {version}"), + )) + } + }; + Ok(Dictionary { sstable_slice, sstable_index, diff --git a/sstable/src/lib.rs b/sstable/src/lib.rs index 09014b7946..dadf43489a 100644 --- a/sstable/src/lib.rs +++ b/sstable/src/lib.rs @@ -10,8 +10,9 @@ pub mod merge; mod streamer; pub mod value; -mod sstable_index; -pub use sstable_index::{BlockAddr, SSTableIndex, SSTableIndexBuilder}; +mod sstable_index_v3; +pub use sstable_index_v3::{BlockAddr, SSTableIndex, SSTableIndexBuilder, SSTableIndexV3}; +mod sstable_index_v2; pub(crate) mod vint; pub use dictionary::Dictionary; pub use streamer::{Streamer, StreamerBuilder}; @@ -28,7 +29,7 @@ use crate::value::{RangeValueReader, RangeValueWriter}; pub type TermOrdinal = u64; const DEFAULT_KEY_CAPACITY: usize = 50; -const SSTABLE_VERSION: u32 = 2; +const SSTABLE_VERSION: u32 = 3; /// Given two byte string returns the length of /// the longest common prefix. @@ -304,7 +305,8 @@ where let offset = wrt.written_bytes(); - self.index_builder.serialize(&mut wrt)?; + let fst_len: u64 = self.index_builder.serialize(&mut wrt)?; + wrt.write_all(&fst_len.to_le_bytes())?; wrt.write_all(&offset.to_le_bytes())?; wrt.write_all(&self.num_terms.to_le_bytes())?; @@ -385,13 +387,10 @@ mod test { 16, 17, 33, 18, 19, 17, 20, // data block 0, 0, 0, 0, // no more block // index - 8, 0, 0, 0, // size of index block - 0, // compression - 1, 0, 12, 0, 32, 17, 20, // index block - 0, 0, 0, 0, // no more index block + 0, 0, 0, 0, 0, 0, 0, 0, // fst lenght 16, 0, 0, 0, 0, 0, 0, 0, // index start offset 3, 0, 0, 0, 0, 0, 0, 0, // num term - 2, 0, 0, 0, // version + 3, 0, 0, 0, // version ] ); let buffer = OwnedBytes::new(buffer); diff --git a/sstable/src/sstable_index.rs b/sstable/src/sstable_index.rs deleted file mode 100644 index 1ce85305cf..0000000000 --- a/sstable/src/sstable_index.rs +++ /dev/null @@ -1,266 +0,0 @@ -use std::io::{self, Write}; -use std::ops::Range; - -use common::OwnedBytes; - -use crate::{common_prefix_len, SSTable, SSTableDataCorruption, TermOrdinal}; - -#[derive(Default, Debug, Clone)] -pub struct SSTableIndex { - blocks: Vec, -} - -impl SSTableIndex { - /// Load an index from its binary representation - pub fn load(data: OwnedBytes) -> Result { - let mut reader = IndexSSTable::reader(data); - let mut blocks = Vec::new(); - - while reader.advance().map_err(|_| SSTableDataCorruption)? { - blocks.push(BlockMeta { - last_key_or_greater: reader.key().to_vec(), - block_addr: reader.value().clone(), - }); - } - - Ok(SSTableIndex { blocks }) - } - - /// Get the [`BlockAddr`] of the requested block. - pub(crate) fn get_block(&self, block_id: usize) -> Option { - self.blocks - .get(block_id) - .map(|block_meta| block_meta.block_addr.clone()) - } - - /// Get the block id of the block that would contain `key`. - /// - /// Returns None if `key` is lexicographically after the last key recorded. - pub(crate) fn locate_with_key(&self, key: &[u8]) -> Option { - let pos = self - .blocks - .binary_search_by_key(&key, |block| &block.last_key_or_greater); - match pos { - Ok(pos) => Some(pos), - Err(pos) => { - if pos < self.blocks.len() { - Some(pos) - } else { - // after end of last block: no block matches - None - } - } - } - } - - /// Get the [`BlockAddr`] of the block that would contain `key`. - /// - /// Returns None if `key` is lexicographically after the last key recorded. - pub fn get_block_with_key(&self, key: &[u8]) -> Option { - self.locate_with_key(key).and_then(|id| self.get_block(id)) - } - - pub(crate) fn locate_with_ord(&self, ord: TermOrdinal) -> usize { - let pos = self - .blocks - .binary_search_by_key(&ord, |block| block.block_addr.first_ordinal); - - match pos { - Ok(pos) => pos, - // Err(0) can't happen as the sstable starts with ordinal zero - Err(pos) => pos - 1, - } - } - - /// Get the [`BlockAddr`] of the block containing the `ord`-th term. - pub(crate) fn get_block_with_ord(&self, ord: TermOrdinal) -> BlockAddr { - // locate_with_ord always returns an index within range - self.get_block(self.locate_with_ord(ord)).unwrap() - } -} - -#[derive(Clone, Eq, PartialEq, Debug)] -pub struct BlockAddr { - pub byte_range: Range, - pub first_ordinal: u64, -} - -#[derive(Debug, Clone)] -pub(crate) struct BlockMeta { - /// Any byte string that is lexicographically greater or equal to - /// the last key in the block, - /// and yet strictly smaller than the first key in the next block. - pub last_key_or_greater: Vec, - pub block_addr: BlockAddr, -} - -#[derive(Default)] -pub struct SSTableIndexBuilder { - index: SSTableIndex, -} - -/// Given that left < right, -/// mutates `left into a shorter byte string left'` that -/// matches `left <= left' < right`. -fn find_shorter_str_in_between(left: &mut Vec, right: &[u8]) { - assert!(&left[..] < right); - let common_len = common_prefix_len(left, right); - if left.len() == common_len { - return; - } - // It is possible to do one character shorter in some case, - // but it is not worth the extra complexity - for pos in (common_len + 1)..left.len() { - if left[pos] != u8::MAX { - left[pos] += 1; - left.truncate(pos + 1); - return; - } - } -} - -impl SSTableIndexBuilder { - /// In order to make the index as light as possible, we - /// try to find a shorter alternative to the last key of the last block - /// that is still smaller than the next key. - pub(crate) fn shorten_last_block_key_given_next_key(&mut self, next_key: &[u8]) { - if let Some(last_block) = self.index.blocks.last_mut() { - find_shorter_str_in_between(&mut last_block.last_key_or_greater, next_key); - } - } - - pub fn add_block(&mut self, last_key: &[u8], byte_range: Range, first_ordinal: u64) { - self.index.blocks.push(BlockMeta { - last_key_or_greater: last_key.to_vec(), - block_addr: BlockAddr { - byte_range, - first_ordinal, - }, - }) - } - - pub fn serialize(&self, wrt: W) -> io::Result<()> { - // we can't use a plain writer as it would generate an index - let mut sstable_writer = IndexSSTable::delta_writer(wrt); - - // in tests, set a smaller block size to stress-test - #[cfg(test)] - sstable_writer.set_block_len(16); - - let mut previous_key = Vec::with_capacity(crate::DEFAULT_KEY_CAPACITY); - for block in self.index.blocks.iter() { - let keep_len = common_prefix_len(&previous_key, &block.last_key_or_greater); - - sstable_writer.write_suffix(keep_len, &block.last_key_or_greater[keep_len..]); - sstable_writer.write_value(&block.block_addr); - sstable_writer.flush_block_if_required()?; - - previous_key.clear(); - previous_key.extend_from_slice(&block.last_key_or_greater); - } - sstable_writer.flush_block()?; - sstable_writer.finish().write_all(&0u32.to_le_bytes())?; - Ok(()) - } -} - -/// SSTable representing an index -/// -/// `last_key_or_greater` is used as the key, the value contains the -/// length and first ordinal of each block. The start offset is implicitly -/// obtained from lengths. -struct IndexSSTable; - -impl SSTable for IndexSSTable { - type Value = BlockAddr; - - type ValueReader = crate::value::index::IndexValueReader; - - type ValueWriter = crate::value::index::IndexValueWriter; -} - -#[cfg(test)] -mod tests { - use common::OwnedBytes; - - use super::{BlockAddr, SSTableIndex, SSTableIndexBuilder}; - use crate::SSTableDataCorruption; - - #[test] - fn test_sstable_index() { - let mut sstable_builder = SSTableIndexBuilder::default(); - sstable_builder.add_block(b"aaa", 10..20, 0u64); - sstable_builder.add_block(b"bbbbbbb", 20..30, 5u64); - sstable_builder.add_block(b"ccc", 30..40, 10u64); - sstable_builder.add_block(b"dddd", 40..50, 15u64); - let mut buffer: Vec = Vec::new(); - sstable_builder.serialize(&mut buffer).unwrap(); - let buffer = OwnedBytes::new(buffer); - let sstable_index = SSTableIndex::load(buffer).unwrap(); - assert_eq!( - sstable_index.get_block_with_key(b"bbbde"), - Some(BlockAddr { - first_ordinal: 10u64, - byte_range: 30..40 - }) - ); - - assert_eq!(sstable_index.locate_with_key(b"aa").unwrap(), 0); - assert_eq!(sstable_index.locate_with_key(b"aaa").unwrap(), 0); - assert_eq!(sstable_index.locate_with_key(b"aab").unwrap(), 1); - assert_eq!(sstable_index.locate_with_key(b"ccc").unwrap(), 2); - assert!(sstable_index.locate_with_key(b"e").is_none()); - - assert_eq!(sstable_index.locate_with_ord(0), 0); - assert_eq!(sstable_index.locate_with_ord(1), 0); - assert_eq!(sstable_index.locate_with_ord(4), 0); - assert_eq!(sstable_index.locate_with_ord(5), 1); - assert_eq!(sstable_index.locate_with_ord(100), 3); - } - - #[test] - fn test_sstable_with_corrupted_data() { - let mut sstable_builder = SSTableIndexBuilder::default(); - sstable_builder.add_block(b"aaa", 10..20, 0u64); - sstable_builder.add_block(b"bbbbbbb", 20..30, 5u64); - sstable_builder.add_block(b"ccc", 30..40, 10u64); - sstable_builder.add_block(b"dddd", 40..50, 15u64); - let mut buffer: Vec = Vec::new(); - sstable_builder.serialize(&mut buffer).unwrap(); - buffer[2] = 9u8; - let buffer = OwnedBytes::new(buffer); - let data_corruption_err = SSTableIndex::load(buffer).err().unwrap(); - assert!(matches!(data_corruption_err, SSTableDataCorruption)); - } - - #[track_caller] - fn test_find_shorter_str_in_between_aux(left: &[u8], right: &[u8]) { - let mut left_buf = left.to_vec(); - super::find_shorter_str_in_between(&mut left_buf, right); - assert!(left_buf.len() <= left.len()); - assert!(left <= &left_buf); - assert!(&left_buf[..] < right); - } - - #[test] - fn test_find_shorter_str_in_between() { - test_find_shorter_str_in_between_aux(b"", b"hello"); - test_find_shorter_str_in_between_aux(b"abc", b"abcd"); - test_find_shorter_str_in_between_aux(b"abcd", b"abd"); - test_find_shorter_str_in_between_aux(&[0, 0, 0], &[1]); - test_find_shorter_str_in_between_aux(&[0, 0, 0], &[0, 0, 1]); - test_find_shorter_str_in_between_aux(&[0, 0, 255, 255, 255, 0u8], &[0, 1]); - } - - use proptest::prelude::*; - - proptest! { - #![proptest_config(ProptestConfig::with_cases(100))] - #[test] - fn test_proptest_find_shorter_str(left in any::>(), right in any::>()) { - if left < right { - test_find_shorter_str_in_between_aux(&left, &right); - } - } - } -} diff --git a/sstable/src/sstable_index_v2.rs b/sstable/src/sstable_index_v2.rs new file mode 100644 index 0000000000..d7c97c13a7 --- /dev/null +++ b/sstable/src/sstable_index_v2.rs @@ -0,0 +1,101 @@ +use common::OwnedBytes; + +use crate::{BlockAddr, SSTable, SSTableDataCorruption, TermOrdinal}; + +#[derive(Default, Debug, Clone)] +pub struct SSTableIndex { + blocks: Vec, +} + +impl SSTableIndex { + /// Load an index from its binary representation + pub fn load(data: OwnedBytes) -> Result { + let mut reader = IndexSSTable::reader(data); + let mut blocks = Vec::new(); + + while reader.advance().map_err(|_| SSTableDataCorruption)? { + blocks.push(BlockMeta { + last_key_or_greater: reader.key().to_vec(), + block_addr: reader.value().clone(), + }); + } + + Ok(SSTableIndex { blocks }) + } + + /// Get the [`BlockAddr`] of the requested block. + pub(crate) fn get_block(&self, block_id: usize) -> Option { + self.blocks + .get(block_id) + .map(|block_meta| block_meta.block_addr.clone()) + } + + /// Get the block id of the block that would contain `key`. + /// + /// Returns None if `key` is lexicographically after the last key recorded. + pub(crate) fn locate_with_key(&self, key: &[u8]) -> Option { + let pos = self + .blocks + .binary_search_by_key(&key, |block| &block.last_key_or_greater); + match pos { + Ok(pos) => Some(pos), + Err(pos) => { + if pos < self.blocks.len() { + Some(pos) + } else { + // after end of last block: no block matches + None + } + } + } + } + + /// Get the [`BlockAddr`] of the block that would contain `key`. + /// + /// Returns None if `key` is lexicographically after the last key recorded. + pub fn get_block_with_key(&self, key: &[u8]) -> Option { + self.locate_with_key(key).and_then(|id| self.get_block(id)) + } + + pub(crate) fn locate_with_ord(&self, ord: TermOrdinal) -> usize { + let pos = self + .blocks + .binary_search_by_key(&ord, |block| block.block_addr.first_ordinal); + + match pos { + Ok(pos) => pos, + // Err(0) can't happen as the sstable starts with ordinal zero + Err(pos) => pos - 1, + } + } + + /// Get the [`BlockAddr`] of the block containing the `ord`-th term. + pub(crate) fn get_block_with_ord(&self, ord: TermOrdinal) -> BlockAddr { + // locate_with_ord always returns an index within range + self.get_block(self.locate_with_ord(ord)).unwrap() + } +} + +#[derive(Debug, Clone)] +pub(crate) struct BlockMeta { + /// Any byte string that is lexicographically greater or equal to + /// the last key in the block, + /// and yet strictly smaller than the first key in the next block. + pub last_key_or_greater: Vec, + pub block_addr: BlockAddr, +} + +/// SSTable representing an index +/// +/// `last_key_or_greater` is used as the key, the value contains the +/// length and first ordinal of each block. The start offset is implicitly +/// obtained from lengths. +struct IndexSSTable; + +impl SSTable for IndexSSTable { + type Value = BlockAddr; + + type ValueReader = crate::value::index::IndexValueReader; + + type ValueWriter = crate::value::index::IndexValueWriter; +} diff --git a/sstable/src/sstable_index_v3.rs b/sstable/src/sstable_index_v3.rs new file mode 100644 index 0000000000..e2a6fdfc79 --- /dev/null +++ b/sstable/src/sstable_index_v3.rs @@ -0,0 +1,826 @@ +use std::io::{self, Read, Write}; +use std::ops::Range; +use std::sync::Arc; + +use common::{BinarySerializable, FixedSize, OwnedBytes}; +use tantivy_bitpacker::{compute_num_bits, BitPacker}; +use tantivy_fst::raw::Fst; +use tantivy_fst::{IntoStreamer, Map, MapBuilder, Streamer}; + +use crate::{common_prefix_len, SSTableDataCorruption, TermOrdinal}; + +#[derive(Debug, Clone)] +pub enum SSTableIndex { + V2(crate::sstable_index_v2::SSTableIndex), + V3(SSTableIndexV3), + V3Empty(SSTableIndexV3Empty), +} + +impl SSTableIndex { + /// Get the [`BlockAddr`] of the requested block. + pub(crate) fn get_block(&self, block_id: u64) -> Option { + match self { + SSTableIndex::V2(v2_index) => v2_index.get_block(block_id as usize), + SSTableIndex::V3(v3_index) => v3_index.get_block(block_id), + SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block(block_id), + } + } + + /// Get the block id of the block that would contain `key`. + /// + /// Returns None if `key` is lexicographically after the last key recorded. + pub(crate) fn locate_with_key(&self, key: &[u8]) -> Option { + match self { + SSTableIndex::V2(v2_index) => v2_index.locate_with_key(key).map(|i| i as u64), + SSTableIndex::V3(v3_index) => v3_index.locate_with_key(key), + SSTableIndex::V3Empty(v3_empty) => v3_empty.locate_with_key(key), + } + } + + /// Get the [`BlockAddr`] of the block that would contain `key`. + /// + /// Returns None if `key` is lexicographically after the last key recorded. + pub fn get_block_with_key(&self, key: &[u8]) -> Option { + match self { + SSTableIndex::V2(v2_index) => v2_index.get_block_with_key(key), + SSTableIndex::V3(v3_index) => v3_index.get_block_with_key(key), + SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block_with_key(key), + } + } + + pub(crate) fn locate_with_ord(&self, ord: TermOrdinal) -> u64 { + match self { + SSTableIndex::V2(v2_index) => v2_index.locate_with_ord(ord) as u64, + SSTableIndex::V3(v3_index) => v3_index.locate_with_ord(ord), + SSTableIndex::V3Empty(v3_empty) => v3_empty.locate_with_ord(ord), + } + } + + /// Get the [`BlockAddr`] of the block containing the `ord`-th term. + pub(crate) fn get_block_with_ord(&self, ord: TermOrdinal) -> BlockAddr { + match self { + SSTableIndex::V2(v2_index) => v2_index.get_block_with_ord(ord), + SSTableIndex::V3(v3_index) => v3_index.get_block_with_ord(ord), + SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block_with_ord(ord), + } + } +} + +#[derive(Debug, Clone)] +pub struct SSTableIndexV3 { + fst_index: Arc>, + block_addr_store: BlockAddrStore, +} + +impl SSTableIndexV3 { + /// Load an index from its binary representation + pub fn load( + data: OwnedBytes, + fst_length: u64, + ) -> Result { + let (fst_slice, block_addr_store_slice) = data.split(fst_length as usize); + let fst_index = Fst::new(fst_slice) + .map_err(|_| SSTableDataCorruption)? + .into(); + let block_addr_store = + BlockAddrStore::open(block_addr_store_slice).map_err(|_| SSTableDataCorruption)?; + + Ok(SSTableIndexV3 { + fst_index: Arc::new(fst_index), + block_addr_store, + }) + } + + /// Get the [`BlockAddr`] of the requested block. + pub(crate) fn get_block(&self, block_id: u64) -> Option { + self.block_addr_store.get(block_id) + } + + /// Get the block id of the block that would contain `key`. + /// + /// Returns None if `key` is lexicographically after the last key recorded. + pub(crate) fn locate_with_key(&self, key: &[u8]) -> Option { + self.fst_index + .range() + .ge(key) + .into_stream() + .next() + .map(|(_key, id)| id) + } + + /// Get the [`BlockAddr`] of the block that would contain `key`. + /// + /// Returns None if `key` is lexicographically after the last key recorded. + pub fn get_block_with_key(&self, key: &[u8]) -> Option { + self.locate_with_key(key).and_then(|id| self.get_block(id)) + } + + pub(crate) fn locate_with_ord(&self, ord: TermOrdinal) -> u64 { + self.block_addr_store.binary_search_ord(ord).0 + } + + /// Get the [`BlockAddr`] of the block containing the `ord`-th term. + pub(crate) fn get_block_with_ord(&self, ord: TermOrdinal) -> BlockAddr { + self.block_addr_store.binary_search_ord(ord).1 + } +} + +#[derive(Debug, Clone)] +pub struct SSTableIndexV3Empty { + block_addr: BlockAddr, +} + +impl SSTableIndexV3Empty { + pub fn load(index_start_pos: usize) -> SSTableIndexV3Empty { + SSTableIndexV3Empty { + block_addr: BlockAddr { + first_ordinal: 0, + byte_range: 0..index_start_pos, + }, + } + } + + /// Get the [`BlockAddr`] of the requested block. + pub(crate) fn get_block(&self, _block_id: u64) -> Option { + Some(self.block_addr.clone()) + } + + /// Get the block id of the block that would contain `key`. + /// + /// Returns None if `key` is lexicographically after the last key recorded. + pub(crate) fn locate_with_key(&self, _key: &[u8]) -> Option { + Some(0) + } + + /// Get the [`BlockAddr`] of the block that would contain `key`. + /// + /// Returns None if `key` is lexicographically after the last key recorded. + pub fn get_block_with_key(&self, _key: &[u8]) -> Option { + Some(self.block_addr.clone()) + } + + pub(crate) fn locate_with_ord(&self, _ord: TermOrdinal) -> u64 { + 0 + } + + /// Get the [`BlockAddr`] of the block containing the `ord`-th term. + pub(crate) fn get_block_with_ord(&self, _ord: TermOrdinal) -> BlockAddr { + self.block_addr.clone() + } +} +#[derive(Clone, Eq, PartialEq, Debug)] +pub struct BlockAddr { + pub first_ordinal: u64, + pub byte_range: Range, +} + +impl BlockAddr { + fn to_block_start(&self) -> BlockStartAddr { + BlockStartAddr { + first_ordinal: self.first_ordinal, + byte_range_start: self.byte_range.start, + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +struct BlockStartAddr { + first_ordinal: u64, + byte_range_start: usize, +} + +impl BlockStartAddr { + fn to_block_addr(&self, byte_range_end: usize) -> BlockAddr { + BlockAddr { + first_ordinal: self.first_ordinal, + byte_range: self.byte_range_start..byte_range_end, + } + } +} + +#[derive(Debug, Clone)] +pub(crate) struct BlockMeta { + /// Any byte string that is lexicographically greater or equal to + /// the last key in the block, + /// and yet strictly smaller than the first key in the next block. + pub last_key_or_greater: Vec, + pub block_addr: BlockAddr, +} + +impl BinarySerializable for BlockStartAddr { + fn serialize(&self, writer: &mut W) -> io::Result<()> { + let start = self.byte_range_start as u64; + start.serialize(writer)?; + self.first_ordinal.serialize(writer) + } + + fn deserialize(reader: &mut R) -> io::Result { + let byte_range_start = u64::deserialize(reader)? as usize; + let first_ordinal = u64::deserialize(reader)?; + Ok(BlockStartAddr { + first_ordinal, + byte_range_start, + }) + } + + // Provided method + fn num_bytes(&self) -> u64 { + BlockStartAddr::SIZE_IN_BYTES as u64 + } +} + +impl FixedSize for BlockStartAddr { + const SIZE_IN_BYTES: usize = 2 * u64::SIZE_IN_BYTES; +} + +/// Given that left < right, +/// mutates `left into a shorter byte string left'` that +/// matches `left <= left' < right`. +fn find_shorter_str_in_between(left: &mut Vec, right: &[u8]) { + assert!(&left[..] < right); + let common_len = common_prefix_len(left, right); + if left.len() == common_len { + return; + } + // It is possible to do one character shorter in some case, + // but it is not worth the extra complexity + for pos in (common_len + 1)..left.len() { + if left[pos] != u8::MAX { + left[pos] += 1; + left.truncate(pos + 1); + return; + } + } +} + +#[derive(Default)] +pub struct SSTableIndexBuilder { + blocks: Vec, +} + +impl SSTableIndexBuilder { + /// In order to make the index as light as possible, we + /// try to find a shorter alternative to the last key of the last block + /// that is still smaller than the next key. + pub(crate) fn shorten_last_block_key_given_next_key(&mut self, next_key: &[u8]) { + if let Some(last_block) = self.blocks.last_mut() { + find_shorter_str_in_between(&mut last_block.last_key_or_greater, next_key); + } + } + + pub fn add_block(&mut self, last_key: &[u8], byte_range: Range, first_ordinal: u64) { + self.blocks.push(BlockMeta { + last_key_or_greater: last_key.to_vec(), + block_addr: BlockAddr { + byte_range, + first_ordinal, + }, + }) + } + + pub fn serialize(&self, wrt: W) -> io::Result { + if self.blocks.len() <= 1 { + return Ok(0); + } + let counting_writer = common::CountingWriter::wrap(wrt); + let mut map_builder = MapBuilder::new(counting_writer).map_err(fst_error_to_io_error)?; + for (i, block) in self.blocks.iter().enumerate() { + map_builder + .insert(&block.last_key_or_greater, i as u64) + .map_err(fst_error_to_io_error)?; + } + let counting_writer = map_builder.into_inner().map_err(fst_error_to_io_error)?; + let written_bytes = counting_writer.written_bytes(); + let mut wrt = counting_writer.finish(); + + let mut block_store_writer = BlockAddrStoreWriter::new(); + for block in &self.blocks { + block_store_writer.write_block_meta(block.block_addr.clone())?; + } + block_store_writer.serialize(&mut wrt)?; + + Ok(written_bytes) + } +} + +fn fst_error_to_io_error(error: tantivy_fst::Error) -> io::Error { + match error { + tantivy_fst::Error::Fst(fst_error) => io::Error::new(io::ErrorKind::Other, fst_error), + tantivy_fst::Error::Io(ioerror) => ioerror, + } +} + +const STORE_BLOCK_LEN: usize = 128; + +#[derive(Debug)] +struct BlockAddrBlockMetadata { + offset: u64, + ref_block_addr: BlockStartAddr, + range_start_slope: u32, + first_ordinal_slope: u32, + range_start_nbits: u8, + first_ordinal_nbits: u8, + block_len: u16, + // these fields are computed on deserialization, and not stored + range_shift: i64, + ordinal_shift: i64, +} + +impl BlockAddrBlockMetadata { + fn num_bits(&self) -> u8 { + self.first_ordinal_nbits + self.range_start_nbits + } + + fn deserialize_block_addr(&self, data: &[u8], inner_offset: usize) -> Option { + if inner_offset == 0 { + let range_end = self.ref_block_addr.byte_range_start + + extract_bits(data, 0, self.range_start_nbits) as usize + + self.range_start_slope as usize + - self.range_shift as usize; + return Some(self.ref_block_addr.to_block_addr(range_end)); + } + let inner_offset = inner_offset - 1; + if inner_offset >= self.block_len as usize { + return None; + } + let num_bits = self.num_bits() as usize; + + let range_start_addr = num_bits * inner_offset; + let ordinal_addr = range_start_addr + self.range_start_nbits as usize; + let range_end_addr = range_start_addr + num_bits; + + if (range_end_addr + self.range_start_nbits as usize + 7) / 8 > data.len() { + return None; + } + + let range_start = self.ref_block_addr.byte_range_start + + extract_bits(data, range_start_addr, self.range_start_nbits) as usize + + self.range_start_slope as usize * (inner_offset + 1) + - self.range_shift as usize; + let first_ordinal = self.ref_block_addr.first_ordinal + + extract_bits(data, ordinal_addr, self.first_ordinal_nbits) + + self.first_ordinal_slope as u64 * (inner_offset + 1) as u64 + - self.ordinal_shift as u64; + let range_end = self.ref_block_addr.byte_range_start + + extract_bits(data, range_end_addr, self.range_start_nbits) as usize + + self.range_start_slope as usize * (inner_offset + 2) + - self.range_shift as usize; + + Some(BlockAddr { + first_ordinal, + byte_range: range_start..range_end, + }) + } + + fn bisect_for_ord(&self, data: &[u8], target_ord: TermOrdinal) -> (u64, BlockAddr) { + let inner_target_ord = target_ord - self.ref_block_addr.first_ordinal; + let num_bits = self.num_bits() as usize; + let range_start_nbits = self.range_start_nbits as usize; + let get_ord = |index| { + extract_bits( + data, + num_bits * index as usize + range_start_nbits, + self.first_ordinal_nbits, + ) + self.first_ordinal_slope as u64 * (index + 1) + - self.ordinal_shift as u64 + }; + + let inner_offset = match binary_search(self.block_len as u64, |index| { + get_ord(index).cmp(&inner_target_ord) + }) { + Ok(inner_offset) => inner_offset + 1, + Err(inner_offset) => inner_offset, + }; + // we can unwrap because inner_offset <= self.block_len + ( + inner_offset, + self.deserialize_block_addr(data, inner_offset as usize) + .unwrap(), + ) + } +} + +// TODO move this function to tantivy_common? +#[inline(always)] +fn extract_bits(data: &[u8], addr_bits: usize, num_bits: u8) -> u64 { + assert!(num_bits <= 56); + let addr_byte = addr_bits / 8; + let bit_shift = (addr_bits % 8) as u64; + let val_unshifted_unmasked: u64 = if data.len() >= addr_byte + 8 { + let b = data[addr_byte..addr_byte + 8].try_into().unwrap(); + u64::from_le_bytes(b) + } else { + // the buffer is not large enough. + // Let's copy the few remaining bytes to a 8 byte buffer + // padded with 0s. + let mut buf = [0u8; 8]; + let data_to_copy = &data[addr_byte..]; + let nbytes = data_to_copy.len(); + buf[..nbytes].copy_from_slice(data_to_copy); + u64::from_le_bytes(buf) + }; + let val_shifted_unmasked = val_unshifted_unmasked >> bit_shift; + let mask = (1u64 << u64::from(num_bits)) - 1; + val_shifted_unmasked & mask +} + +impl BinarySerializable for BlockAddrBlockMetadata { + fn serialize(&self, write: &mut W) -> io::Result<()> { + self.offset.serialize(write)?; + self.ref_block_addr.serialize(write)?; + self.range_start_slope.serialize(write)?; + self.first_ordinal_slope.serialize(write)?; + write.write_all(&[self.first_ordinal_nbits, self.range_start_nbits])?; + self.block_len.serialize(write)?; + self.num_bits(); + Ok(()) + } + + fn deserialize(reader: &mut R) -> io::Result { + let offset = u64::deserialize(reader)?; + let ref_block_addr = BlockStartAddr::deserialize(reader)?; + let range_start_slope = u32::deserialize(reader)?; + let first_ordinal_slope = u32::deserialize(reader)?; + let mut buffer = [0u8; 2]; + reader.read_exact(&mut buffer)?; + let first_ordinal_nbits = buffer[0]; + let range_start_nbits = buffer[1]; + let block_len = u16::deserialize(reader)?; + Ok(BlockAddrBlockMetadata { + offset, + ref_block_addr, + range_start_slope, + first_ordinal_slope, + range_start_nbits, + first_ordinal_nbits, + block_len, + range_shift: 1 << (range_start_nbits - 1), + ordinal_shift: 1 << (first_ordinal_nbits - 1), + }) + } +} + +impl FixedSize for BlockAddrBlockMetadata { + const SIZE_IN_BYTES: usize = u64::SIZE_IN_BYTES + + BlockStartAddr::SIZE_IN_BYTES + + 2 * u32::SIZE_IN_BYTES + + 2 * u8::SIZE_IN_BYTES + + u16::SIZE_IN_BYTES; +} + +#[derive(Debug, Clone)] +struct BlockAddrStore { + block_meta_bytes: OwnedBytes, + addr_bytes: OwnedBytes, +} + +impl BlockAddrStore { + fn open(term_info_store_file: OwnedBytes) -> io::Result { + let (mut len_slice, main_slice) = term_info_store_file.split(8); + let len = u64::deserialize(&mut len_slice)? as usize; + let (block_meta_bytes, addr_bytes) = main_slice.split(len); + Ok(BlockAddrStore { + block_meta_bytes, + addr_bytes, + }) + } + + fn get_block_meta(&self, store_block_id: usize) -> Option { + let mut block_data: &[u8] = self + .block_meta_bytes + .get(store_block_id * BlockAddrBlockMetadata::SIZE_IN_BYTES..)?; + BlockAddrBlockMetadata::deserialize(&mut block_data).ok() + } + + fn get(&self, block_id: u64) -> Option { + let store_block_id = (block_id as usize) / STORE_BLOCK_LEN; + let inner_offset = (block_id as usize) % STORE_BLOCK_LEN; + let block_addr_block_data = self.get_block_meta(store_block_id)?; + block_addr_block_data.deserialize_block_addr( + &self.addr_bytes[block_addr_block_data.offset as usize..], + inner_offset, + ) + } + + fn binary_search_ord(&self, ord: TermOrdinal) -> (u64, BlockAddr) { + let max_block = + (self.block_meta_bytes.len() / BlockAddrBlockMetadata::SIZE_IN_BYTES) as u64; + let get_first_ordinal = |block_id| { + // we can unwrap because block_id < max_block + self.get(block_id * STORE_BLOCK_LEN as u64) + .unwrap() + .first_ordinal + }; + let store_block_id = + binary_search(max_block, |block_id| get_first_ordinal(block_id).cmp(&ord)); + let store_block_id = match store_block_id { + Ok(store_block_id) => { + let block_id = store_block_id * STORE_BLOCK_LEN as u64; + // we can unwrap because store_block_id < max_block + return (block_id, self.get(block_id).unwrap()); + } + Err(store_block_id) => store_block_id - 1, + }; + + // we can unwrap because store_block_id < max_block + let block_addr_block_data = self.get_block_meta(store_block_id as usize).unwrap(); + let (inner_offset, block_addr) = block_addr_block_data.bisect_for_ord( + &self.addr_bytes[block_addr_block_data.offset as usize..], + ord, + ); + ( + store_block_id * STORE_BLOCK_LEN as u64 + inner_offset, + block_addr, + ) + } +} + +fn binary_search(max: u64, cmp_fn: impl Fn(u64) -> std::cmp::Ordering) -> Result { + use std::cmp::Ordering::*; + let mut size = max; + let mut left = 0; + let mut right = size; + while left < right { + let mid = left + size / 2; + + let cmp = cmp_fn(mid); + + if cmp == Less { + left = mid + 1; + } else if cmp == Greater { + right = mid; + } else { + return Ok(mid); + } + + size = right - left; + } + Err(left) +} + +struct BlockAddrStoreWriter { + buffer_block_metas: Vec, + buffer_addrs: Vec, + block_addrs: Vec, +} + +impl BlockAddrStoreWriter { + fn new() -> Self { + BlockAddrStoreWriter { + buffer_block_metas: Vec::new(), + buffer_addrs: Vec::new(), + block_addrs: Vec::with_capacity(STORE_BLOCK_LEN), + } + } + + fn flush_block(&mut self) -> io::Result<()> { + if self.block_addrs.is_empty() { + return Ok(()); + } + let ref_block_addr = self.block_addrs[0].clone(); + + for block_addr in &mut self.block_addrs { + block_addr.byte_range.start -= ref_block_addr.byte_range.start; + block_addr.first_ordinal -= ref_block_addr.first_ordinal; + } + + // we are only called if block_addrs is not empty + let mut last_block_addr = self.block_addrs.last().unwrap().clone(); + last_block_addr.byte_range.end -= ref_block_addr.byte_range.start; + + // we skip(1), so we never give an index of 0 to find_best_slope + let (range_start_slope, range_start_nbits) = find_best_slope( + self.block_addrs + .iter() + .map(|block| block.byte_range.start as u64) + .chain(std::iter::once(last_block_addr.byte_range.end as u64)) + .enumerate() + .skip(1), + ); + + // we skip(1), so we never give an index of 0 to find_best_slope + let (first_ordinal_slope, first_ordinal_nbits) = find_best_slope( + self.block_addrs + .iter() + .map(|block| block.first_ordinal) + .enumerate() + .skip(1), + ); + + let range_shift = 1 << (range_start_nbits - 1); + let ordinal_shift = 1 << (first_ordinal_nbits - 1); + + let block_addr_block_meta = BlockAddrBlockMetadata { + offset: self.buffer_addrs.len() as u64, + ref_block_addr: ref_block_addr.to_block_start(), + range_start_slope, + first_ordinal_slope, + range_start_nbits, + first_ordinal_nbits, + block_len: self.block_addrs.len() as u16 - 1, + range_shift, + ordinal_shift, + }; + block_addr_block_meta.serialize(&mut self.buffer_block_metas)?; + + let mut bit_packer = BitPacker::new(); + + for (i, block_addr) in self.block_addrs.iter().enumerate().skip(1) { + let range_pred = (range_start_slope as usize * i) as i64; + bit_packer.write( + (block_addr.byte_range.start as i64 - range_pred + range_shift) as u64, + range_start_nbits, + &mut self.buffer_addrs, + )?; + let first_ordinal_pred = (first_ordinal_slope as u64 * i as u64) as i64; + bit_packer.write( + (block_addr.first_ordinal as i64 - first_ordinal_pred + ordinal_shift) as u64, + first_ordinal_nbits, + &mut self.buffer_addrs, + )?; + } + + let range_pred = (range_start_slope as usize * self.block_addrs.len()) as i64; + bit_packer.write( + (last_block_addr.byte_range.end as i64 - range_pred + range_shift) as u64, + range_start_nbits, + &mut self.buffer_addrs, + )?; + bit_packer.flush(&mut self.buffer_addrs)?; + + self.block_addrs.clear(); + Ok(()) + } + + fn write_block_meta(&mut self, block_addr: BlockAddr) -> io::Result<()> { + self.block_addrs.push(block_addr); + if self.block_addrs.len() >= STORE_BLOCK_LEN { + self.flush_block()?; + } + Ok(()) + } + + fn serialize(&mut self, wrt: &mut W) -> io::Result<()> { + self.flush_block()?; + let len = self.buffer_block_metas.len() as u64; + len.serialize(wrt)?; + wrt.write_all(&self.buffer_block_metas)?; + wrt.write_all(&self.buffer_addrs)?; + Ok(()) + } +} + +/// Given an iterator over (index, value), returns the slope, and number of bits needed to +/// represente the error to a prediction made by this slope. +/// +/// The iterator may be empty, but all indexes in it must be non-zero. +fn find_best_slope(elements: impl Iterator + Clone) -> (u32, u8) { + let slope_iterator = elements.clone(); + let derivation_iterator = elements; + + let mut min_slope_idx = 1; + let mut min_slope_val = 0; + let mut min_slope = u32::MAX; + let mut max_slope_idx = 1; + let mut max_slope_val = 0; + let mut max_slope = 0; + for (index, value) in slope_iterator { + let slope = (value / index as u64) as u32; + if slope <= min_slope { + min_slope = slope; + min_slope_idx = index; + min_slope_val = value; + } + if slope >= max_slope { + max_slope = slope; + max_slope_idx = index; + max_slope_val = value; + } + } + + // above is an heuristic giving the "highest" and "lowest" point. It's imperfect in that in that + // a point that appear earlier might have a high slope derivation, but a smaller absolute + // derivation than a latter point. + // The actual best values can be obtained by using the symplex method, but the improvement is + // likely minimal, and computation is way more complexe. + // + // Assuming these point are the furthest up and down, we find the slope that would cause the + // same positive derivation for the highest as negative derivation for the lowest. + // A is the optimal slope. B is the derivation to the guess + // + // 0 = min_slope_val - min_slope_idx * A - B + // 0 = max_slope_val - max_slope_idx * A + B + // + // 0 = min_slope_val + max_slope_val - (min_slope_idx + max_slope_idx) * A + // (min_slope_val + max_slope_val) / (min_slope_idx + max_slope_idx) = A + // + // we actually add some correcting factor to have proper rounding, not truncation. + + let denominator = (min_slope_idx + max_slope_idx) as u64; + let final_slope = ((min_slope_val + max_slope_val + denominator / 2) / denominator) as u32; + + // we don't solve for B because our choice of point is suboptimal, so it's actually a lower + // bound and we need to iterate to find the actual worst value. + + let max_derivation: u64 = derivation_iterator + .map(|(index, value)| (value as i64 - final_slope as i64 * index as i64).unsigned_abs()) + .max() + .unwrap_or(0); + + (final_slope, compute_num_bits(max_derivation) + 1) +} + +#[cfg(test)] +mod tests { + use common::OwnedBytes; + + use super::{BlockAddr, SSTableIndexBuilder, SSTableIndexV3}; + use crate::SSTableDataCorruption; + + #[test] + fn test_sstable_index() { + let mut sstable_builder = SSTableIndexBuilder::default(); + sstable_builder.add_block(b"aaa", 10..20, 0u64); + sstable_builder.add_block(b"bbbbbbb", 20..30, 5u64); + sstable_builder.add_block(b"ccc", 30..40, 10u64); + sstable_builder.add_block(b"dddd", 40..50, 15u64); + let mut buffer: Vec = Vec::new(); + let fst_len = sstable_builder.serialize(&mut buffer).unwrap(); + let buffer = OwnedBytes::new(buffer); + let sstable_index = SSTableIndexV3::load(buffer, fst_len).unwrap(); + assert_eq!( + sstable_index.get_block_with_key(b"bbbde"), + Some(BlockAddr { + first_ordinal: 10u64, + byte_range: 30..40 + }) + ); + + assert_eq!(sstable_index.locate_with_key(b"aa").unwrap(), 0); + assert_eq!(sstable_index.locate_with_key(b"aaa").unwrap(), 0); + assert_eq!(sstable_index.locate_with_key(b"aab").unwrap(), 1); + assert_eq!(sstable_index.locate_with_key(b"ccc").unwrap(), 2); + assert!(sstable_index.locate_with_key(b"e").is_none()); + + assert_eq!(sstable_index.locate_with_ord(0), 0); + assert_eq!(sstable_index.locate_with_ord(1), 0); + assert_eq!(sstable_index.locate_with_ord(4), 0); + assert_eq!(sstable_index.locate_with_ord(5), 1); + assert_eq!(sstable_index.locate_with_ord(100), 3); + } + + #[test] + fn test_sstable_with_corrupted_data() { + let mut sstable_builder = SSTableIndexBuilder::default(); + sstable_builder.add_block(b"aaa", 10..20, 0u64); + sstable_builder.add_block(b"bbbbbbb", 20..30, 5u64); + sstable_builder.add_block(b"ccc", 30..40, 10u64); + sstable_builder.add_block(b"dddd", 40..50, 15u64); + let mut buffer: Vec = Vec::new(); + let fst_len = sstable_builder.serialize(&mut buffer).unwrap(); + buffer[2] = 9u8; + let buffer = OwnedBytes::new(buffer); + let data_corruption_err = SSTableIndexV3::load(buffer, fst_len).err().unwrap(); + assert!(matches!(data_corruption_err, SSTableDataCorruption)); + } + + #[track_caller] + fn test_find_shorter_str_in_between_aux(left: &[u8], right: &[u8]) { + let mut left_buf = left.to_vec(); + super::find_shorter_str_in_between(&mut left_buf, right); + assert!(left_buf.len() <= left.len()); + assert!(left <= &left_buf); + assert!(&left_buf[..] < right); + } + + #[test] + fn test_find_shorter_str_in_between() { + test_find_shorter_str_in_between_aux(b"", b"hello"); + test_find_shorter_str_in_between_aux(b"abc", b"abcd"); + test_find_shorter_str_in_between_aux(b"abcd", b"abd"); + test_find_shorter_str_in_between_aux(&[0, 0, 0], &[1]); + test_find_shorter_str_in_between_aux(&[0, 0, 0], &[0, 0, 1]); + test_find_shorter_str_in_between_aux(&[0, 0, 255, 255, 255, 0u8], &[0, 1]); + } + + use proptest::prelude::*; + + proptest! { + #![proptest_config(ProptestConfig::with_cases(100))] + #[test] + fn test_proptest_find_shorter_str(left in any::>(), right in any::>()) { + if left < right { + test_find_shorter_str_in_between_aux(&left, &right); + } + } + } + + #[test] + fn test_find_best_slop() { + assert_eq!(super::find_best_slope(std::iter::empty()), (0, 1)); + assert_eq!( + super::find_best_slope(std::iter::once((1, 12345))), + (12345, 1) + ); + } +}