From 97299b30d62f6c40cdfb5d3260448f0480383344 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Tue, 14 Nov 2023 10:09:34 +0100 Subject: [PATCH 01/18] read path for new fst based index --- sstable/Cargo.toml | 1 + sstable/README.md | 78 +++++--- sstable/src/dictionary.rs | 5 +- sstable/src/lib.rs | 3 +- sstable/src/sstable_index.rs | 370 +++++++++++++++++++++++++++-------- 5 files changed, 348 insertions(+), 109 deletions(-) diff --git a/sstable/Cargo.toml b/sstable/Cargo.toml index 10e94f75c7..6d8b10b665 100644 --- a/sstable/Cargo.toml +++ b/sstable/Cargo.toml @@ -10,6 +10,7 @@ categories = ["database-implementations", "data-structures", "compression"] description = "sstables for tantivy" [dependencies] +byteorder = "1.4.3" common = {version= "0.6", path="../common", package="tantivy-common"} tantivy-fst = "0.4" # experimental gives us access to Decompressor::upper_bound diff --git a/sstable/README.md b/sstable/README.md index bec6d70f92..a9980b75ea 100644 --- a/sstable/README.md +++ b/sstable/README.md @@ -89,33 +89,63 @@ Note: as the SSTable does not support redundant keys, there is no ambiguity betw ### SSTFooter ``` -+-------+-------+-----+-------------+---------+---------+ -| Block | Block | ... | IndexOffset | NumTerm | Version | -+-------+-------+-----+-------------+---------+---------+ -|----( # of blocks)---| ++-----+----------------+-------------+-------------+---------+---------+ +| Fst | BlockAddrStore | StoreOffset | IndexOffset | NumTerm | Version | ++-----+----------------+-------------+-------------+---------+---------+ ``` -- Block(SSTBlock): uses IndexValue for its Values format +- Fst(Fst): finit state transducer mapping keys to a block number +- BlockAddrStore(BlockAddrStore): store mapping a block number to its BlockAddr +- FstLen(u64): Lenght of the Fst - IndexOffset(u64): Offset to the start of the SSTFooter - NumTerm(u64): number of terms in the sstable -- Version(u32): Currently equal to 2 +- Version(u32): Currently equal to 3 -### IndexValue -``` -+------------+----------+-------+-------+-----+ -| EntryCount | StartPos | Entry | Entry | ... | -+------------+----------+-------+-------+-----+ - |---( # of entries)---| -``` +### Fst -- EntryCount(VInt): number of entries -- StartPos(VInt): the start pos of the first (data) block referenced by this (index) block -- Entry (IndexEntry) +Fst is in the format of tantivy\_fst -### Entry -``` -+----------+--------------+ -| BlockLen | FirstOrdinal | -+----------+--------------+ -``` -- BlockLen(VInt): length of the block -- FirstOrdinal(VInt): ordinal of the first element in the given block +### BlockAddrStore + ++---------+-----------+-----------+-----+-----------+-----------+-----+ +| MetaLen | BlockMeta | BlockMeta | ... | BlockData | BlockData | ... | ++---------+-----------+-----------+-----+-----------+-----------+-----+ + |---------(N blocks)----------|---------(N blocks)----------| + +- MetaLen(u64): lenght of the BlockMeta section +- BlockMeta(BlockAddrBlockMetadata): metadata to seek through BlockData +- BlockData(CompactedBlockAddr): bitpacked per block metadata + +### BlockAddrBlockMetadata + ++--------+--------------+-------------------+-----------------+---------------+ +| Offset | RefBlockAddr | FirstOrdinalNBits | RangeStartNBits | RangeLenNBits | ++--------+--------------+-------------------+-----------------+---------------+ + +- Offset(u64): offset of the corresponding BlockData in the datastream +- RefBlockAddr(BlockAddr): reference block for the compacted block data +- FirstOrdinalNBits(u8): number of bits per ordinal in datastream +- RangeStartNBits(u8): number of bits per range start in datastream +- RangeLenNBits(u8): number of bits per range lenght in datastream + +### BlockAddr + ++--------------+------------+----------+ +| FirstOrdinal | RangeStart | RangeEnd | ++--------------+------------+----------+ + +- FirstOrdinal(u64): the first ordinal of this block +- RangeStart(u64): the start position of the corresponding block in the sstable +- RangeEnd(u64): the end position of the corresponding block in the sstable + +### BlockData + ++-------------------+-----------------+----------+ +| FirstOrdinalDelta | RangeStartDelta | RangeEnd | ++-------------------+-----------------+----------+ +|---------------(255 repetitions)----------------| + +- FirstOrdinalDelta(var): FirstOrdinalNBits *bits* of little endian number. Delta between the first +ordinal for this block, and the reference block +- RangeStartDelta(var): RangeStartNBits *bits* of little endian number. Delta between the range +start for this block, and it of the reference block +- RangeEnd(var): RangeEndNBits *bits* of little endian number. Lenght of the range of this block diff --git a/sstable/src/dictionary.rs b/sstable/src/dictionary.rs index 0eb5822d9e..3aafaa10aa 100644 --- a/sstable/src/dictionary.rs +++ b/sstable/src/dictionary.rs @@ -178,9 +178,10 @@ impl Dictionary { /// Opens a `TermDictionary`. pub fn open(term_dictionary_file: FileSlice) -> io::Result { - let (main_slice, footer_len_slice) = term_dictionary_file.split_from_end(20); + let (main_slice, footer_len_slice) = term_dictionary_file.split_from_end(28); let mut footer_len_bytes: OwnedBytes = footer_len_slice.read_bytes()?; + let store_offset = u64::deserialize(&mut footer_len_bytes)?; let index_offset = u64::deserialize(&mut footer_len_bytes)?; let num_terms = u64::deserialize(&mut footer_len_bytes)?; let version = u32::deserialize(&mut footer_len_bytes)?; @@ -196,7 +197,7 @@ impl Dictionary { let (sstable_slice, index_slice) = main_slice.split(index_offset as usize); let sstable_index_bytes = index_slice.read_bytes()?; - let sstable_index = SSTableIndex::load(sstable_index_bytes) + let sstable_index = SSTableIndex::load(sstable_index_bytes, store_offset) .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption"))?; Ok(Dictionary { sstable_slice, diff --git a/sstable/src/lib.rs b/sstable/src/lib.rs index 09014b7946..928fad0ec8 100644 --- a/sstable/src/lib.rs +++ b/sstable/src/lib.rs @@ -304,7 +304,8 @@ where let offset = wrt.written_bytes(); - self.index_builder.serialize(&mut wrt)?; + let fst_len: u64 = self.index_builder.serialize(&mut wrt)?; + wrt.write_all(&fst_len.to_le_bytes())?; wrt.write_all(&offset.to_le_bytes())?; wrt.write_all(&self.num_terms.to_le_bytes())?; diff --git a/sstable/src/sstable_index.rs b/sstable/src/sstable_index.rs index 1ce85305cf..db805a1f40 100644 --- a/sstable/src/sstable_index.rs +++ b/sstable/src/sstable_index.rs @@ -1,56 +1,50 @@ -use std::io::{self, Write}; +use std::io::{self, Read, Write}; use std::ops::Range; +use std::sync::Arc; -use common::OwnedBytes; +use common::{BinarySerializable, FixedSize, OwnedBytes}; +use tantivy_fst::raw::Fst; +use tantivy_fst::{IntoStreamer, Map, MapBuilder, Streamer}; -use crate::{common_prefix_len, SSTable, SSTableDataCorruption, TermOrdinal}; +use crate::{common_prefix_len, SSTableDataCorruption, TermOrdinal}; -#[derive(Default, Debug, Clone)] +#[derive(Debug, Clone)] pub struct SSTableIndex { - blocks: Vec, + fst_index: Arc>, + block_addr_store: BlockAddrStore, } impl SSTableIndex { /// Load an index from its binary representation - pub fn load(data: OwnedBytes) -> Result { - let mut reader = IndexSSTable::reader(data); - let mut blocks = Vec::new(); - - while reader.advance().map_err(|_| SSTableDataCorruption)? { - blocks.push(BlockMeta { - last_key_or_greater: reader.key().to_vec(), - block_addr: reader.value().clone(), - }); - } - - Ok(SSTableIndex { blocks }) + pub fn load(data: OwnedBytes, fst_length: u64) -> Result { + let (fst_slice, block_addr_store_slice) = data.split(fst_length as usize); + let fst_index = Fst::new(fst_slice) + .map_err(|_| SSTableDataCorruption)? + .into(); + let block_addr_store = + BlockAddrStore::open(block_addr_store_slice).map_err(|_| SSTableDataCorruption)?; + + Ok(SSTableIndex { + fst_index: Arc::new(fst_index), + block_addr_store, + }) } /// Get the [`BlockAddr`] of the requested block. - pub(crate) fn get_block(&self, block_id: usize) -> Option { - self.blocks - .get(block_id) - .map(|block_meta| block_meta.block_addr.clone()) + pub(crate) fn get_block(&self, block_id: u64) -> Option { + self.block_addr_store.get(block_id) } /// Get the block id of the block that would contain `key`. /// /// Returns None if `key` is lexicographically after the last key recorded. - pub(crate) fn locate_with_key(&self, key: &[u8]) -> Option { - let pos = self - .blocks - .binary_search_by_key(&key, |block| &block.last_key_or_greater); - match pos { - Ok(pos) => Some(pos), - Err(pos) => { - if pos < self.blocks.len() { - Some(pos) - } else { - // after end of last block: no block matches - None - } - } - } + pub(crate) fn locate_with_key(&self, key: &[u8]) -> Option { + self.fst_index + .range() + .ge(key) + .into_stream() + .next() + .map(|(_key, id)| id) } /// Get the [`BlockAddr`] of the block that would contain `key`. @@ -60,29 +54,20 @@ impl SSTableIndex { self.locate_with_key(key).and_then(|id| self.get_block(id)) } - pub(crate) fn locate_with_ord(&self, ord: TermOrdinal) -> usize { - let pos = self - .blocks - .binary_search_by_key(&ord, |block| block.block_addr.first_ordinal); - - match pos { - Ok(pos) => pos, - // Err(0) can't happen as the sstable starts with ordinal zero - Err(pos) => pos - 1, - } + pub(crate) fn locate_with_ord(&self, ord: TermOrdinal) -> u64 { + self.block_addr_store.binary_search_ord(ord).0 } /// Get the [`BlockAddr`] of the block containing the `ord`-th term. pub(crate) fn get_block_with_ord(&self, ord: TermOrdinal) -> BlockAddr { - // locate_with_ord always returns an index within range - self.get_block(self.locate_with_ord(ord)).unwrap() + self.block_addr_store.binary_search_ord(ord).1 } } #[derive(Clone, Eq, PartialEq, Debug)] pub struct BlockAddr { - pub byte_range: Range, pub first_ordinal: u64, + pub byte_range: Range, } #[derive(Debug, Clone)] @@ -94,9 +79,38 @@ pub(crate) struct BlockMeta { pub block_addr: BlockAddr, } +impl BinarySerializable for BlockAddr { + fn serialize(&self, writer: &mut W) -> io::Result<()> { + self.first_ordinal.serialize(writer)?; + let start = self.byte_range.start as u64; + start.serialize(writer)?; + let end = self.byte_range.end as u64; + end.serialize(writer) + } + + fn deserialize(reader: &mut R) -> io::Result { + let first_ordinal = u64::deserialize(reader)?; + let start = u64::deserialize(reader)? as usize; + let end = u64::deserialize(reader)? as usize; + Ok(BlockAddr { + first_ordinal, + byte_range: start..end, + }) + } + + // Provided method + fn num_bytes(&self) -> u64 { + BlockAddr::SIZE_IN_BYTES as u64 + } +} + +impl FixedSize for BlockAddr { + const SIZE_IN_BYTES: usize = 3 * u64::SIZE_IN_BYTES; +} + #[derive(Default)] pub struct SSTableIndexBuilder { - index: SSTableIndex, + blocks: Vec, } /// Given that left < right, @@ -124,13 +138,13 @@ impl SSTableIndexBuilder { /// try to find a shorter alternative to the last key of the last block /// that is still smaller than the next key. pub(crate) fn shorten_last_block_key_given_next_key(&mut self, next_key: &[u8]) { - if let Some(last_block) = self.index.blocks.last_mut() { + if let Some(last_block) = self.blocks.last_mut() { find_shorter_str_in_between(&mut last_block.last_key_or_greater, next_key); } } pub fn add_block(&mut self, last_key: &[u8], byte_range: Range, first_ordinal: u64) { - self.index.blocks.push(BlockMeta { + self.blocks.push(BlockMeta { last_key_or_greater: last_key.to_vec(), block_addr: BlockAddr { byte_range, @@ -139,44 +153,236 @@ impl SSTableIndexBuilder { }) } - pub fn serialize(&self, wrt: W) -> io::Result<()> { - // we can't use a plain writer as it would generate an index - let mut sstable_writer = IndexSSTable::delta_writer(wrt); + pub fn serialize(&self, wrt: W) -> io::Result { + // TODO handle errors + let counting_writer = common::CountingWriter::wrap(wrt); + let mut map_builder = MapBuilder::new(counting_writer).unwrap(); + for (i, block) in self.blocks.iter().enumerate() { + map_builder + .insert(&block.last_key_or_greater, i as u64) + .unwrap(); + } + let counting_writer = map_builder.into_inner().unwrap(); + let written_bytes = counting_writer.written_bytes(); + let _writer = counting_writer.finish(); + + // TODO write a BlockAddrStore + Ok(written_bytes) + } +} + +const STORE_BLOCK_LEN: usize = 256; + +#[derive(Debug)] +struct BlockAddrBlockMetadata { + offset: u64, + ref_block_addr: BlockAddr, + first_ordinal_nbits: u8, + range_start_nbits: u8, + range_len_nbits: u8, +} - // in tests, set a smaller block size to stress-test - #[cfg(test)] - sstable_writer.set_block_len(16); +impl BlockAddrBlockMetadata { + fn num_bits(&self) -> u8 { + self.first_ordinal_nbits + self.range_start_nbits + self.range_len_nbits + } - let mut previous_key = Vec::with_capacity(crate::DEFAULT_KEY_CAPACITY); - for block in self.index.blocks.iter() { - let keep_len = common_prefix_len(&previous_key, &block.last_key_or_greater); + fn deserialize_block_addr(&self, data: &[u8], inner_offset: usize) -> Option { + assert!(inner_offset < STORE_BLOCK_LEN - 1); + let num_bits = self.num_bits() as usize; - sstable_writer.write_suffix(keep_len, &block.last_key_or_greater[keep_len..]); - sstable_writer.write_value(&block.block_addr); - sstable_writer.flush_block_if_required()?; + let ordinal_addr = num_bits * inner_offset; + let range_start_addr = ordinal_addr + self.first_ordinal_nbits as usize; + let range_len_addr = range_start_addr + self.first_ordinal_nbits as usize; - previous_key.clear(); - previous_key.extend_from_slice(&block.last_key_or_greater); + if (range_len_addr + self.range_len_nbits as usize + 7) / 8 > data.len() { + return None; } - sstable_writer.flush_block()?; - sstable_writer.finish().write_all(&0u32.to_le_bytes())?; + + let first_ordinal = self.ref_block_addr.first_ordinal + + extract_bits(data, ordinal_addr, self.first_ordinal_nbits); + let range_start = self.ref_block_addr.byte_range.start + + extract_bits(data, range_start_addr, self.range_start_nbits) as usize; + let range_len = extract_bits(data, range_len_addr, self.range_len_nbits) as usize; + + let range_end = range_start + range_len; + Some(BlockAddr { + first_ordinal, + byte_range: range_start..range_end, + }) + } + + // /!\ countrary to deserialize_block_addr() for which inner_offset goes from 0 to + // STORE_BLOCK_LEN - 2, this function goes from 1 to STORE_BLOCK_LEN - 1, and 0 marks + // we should use ref_block_addr + fn bissect_for_ord(&self, data: &[u8], target_ord: TermOrdinal) -> (u64, BlockAddr) { + let max = (STORE_BLOCK_LEN - 1).min(data.len() * 8 / self.num_bits() as usize); + + let inner_target_ord = target_ord - self.ref_block_addr.first_ordinal; + let num_bits = self.num_bits() as usize; + let get_ord = + |index| extract_bits(data, num_bits * index as usize, self.first_ordinal_nbits); + + let inner_offset = + match binary_search(max as u64, |index| inner_target_ord.cmp(&get_ord(index))) { + Ok(inner_offset) => inner_offset + 1, + Err(inner_offset) => inner_offset, + }; + if inner_offset == 0 { + (0, self.ref_block_addr.clone()) + } else { + ( + inner_offset, + self.deserialize_block_addr(data, inner_offset as usize - 1) + .unwrap(), + ) + } + } +} + +// TODO move this function to tantivy_common? +fn extract_bits(data: &[u8], addr_bits: usize, num_bits: u8) -> u64 { + use byteorder::{ByteOrder, LittleEndian}; + assert!(num_bits <= 56); + let addr_byte = addr_bits / 8; + let bit_shift = (addr_bits % 8) as u64; + let val_unshifted_unmasked: u64 = if data.len() >= addr_byte + 8 { + LittleEndian::read_u64(&data[addr_byte..][..8]) + } else { + // the buffer is not large enough. + // Let's copy the few remaining bytes to a 8 byte buffer + // padded with 0s. + let mut buf = [0u8; 8]; + let data_to_copy = &data[addr_byte..]; + let nbytes = data_to_copy.len(); + buf[..nbytes].copy_from_slice(data_to_copy); + LittleEndian::read_u64(&buf) + }; + let val_shifted_unmasked = val_unshifted_unmasked >> bit_shift; + let mask = (1u64 << u64::from(num_bits)) - 1; + val_shifted_unmasked & mask +} + +impl BinarySerializable for BlockAddrBlockMetadata { + fn serialize(&self, write: &mut W) -> io::Result<()> { + self.offset.serialize(write)?; + self.ref_block_addr.serialize(write)?; + write.write_all(&[ + self.first_ordinal_nbits, + self.range_start_nbits, + self.range_len_nbits, + ])?; Ok(()) } + + fn deserialize(reader: &mut R) -> io::Result { + let offset = u64::deserialize(reader)?; + let ref_block_addr = BlockAddr::deserialize(reader)?; + let mut buffer = [0u8; 3]; + reader.read_exact(&mut buffer)?; + Ok(BlockAddrBlockMetadata { + offset, + ref_block_addr, + first_ordinal_nbits: buffer[0], + range_start_nbits: buffer[1], + range_len_nbits: buffer[2], + }) + } } -/// SSTable representing an index -/// -/// `last_key_or_greater` is used as the key, the value contains the -/// length and first ordinal of each block. The start offset is implicitly -/// obtained from lengths. -struct IndexSSTable; +impl FixedSize for BlockAddrBlockMetadata { + const SIZE_IN_BYTES: usize = + u64::SIZE_IN_BYTES + BlockAddr::SIZE_IN_BYTES + 3 * u8::SIZE_IN_BYTES; +} -impl SSTable for IndexSSTable { - type Value = BlockAddr; +#[derive(Debug, Clone)] +struct BlockAddrStore { + block_meta_bytes: OwnedBytes, + addr_bytes: OwnedBytes, +} - type ValueReader = crate::value::index::IndexValueReader; +impl BlockAddrStore { + fn open(term_info_store_file: OwnedBytes) -> io::Result { + let (mut len_slice, main_slice) = term_info_store_file.split(8); + let len = u64::deserialize(&mut len_slice)? as usize; + let (block_meta_bytes, addr_bytes) = main_slice.split(len); + Ok(BlockAddrStore { + block_meta_bytes, + addr_bytes, + }) + } + + fn get_block_meta(&self, store_block_id: usize) -> Option { + let mut block_data: &[u8] = &self + .block_meta_bytes + .get(store_block_id * BlockAddrBlockMetadata::SIZE_IN_BYTES..)?; + BlockAddrBlockMetadata::deserialize(&mut block_data).ok() + } + + fn get(&self, block_id: u64) -> Option { + let store_block_id = (block_id as usize) / STORE_BLOCK_LEN; + let inner_offset = (block_id as usize) % STORE_BLOCK_LEN; + let block_addr_block_data = self.get_block_meta(store_block_id)?; + if inner_offset == 0 { + return Some(block_addr_block_data.ref_block_addr); + } + block_addr_block_data.deserialize_block_addr( + &self.addr_bytes[block_addr_block_data.offset as usize..], + inner_offset - 1, + ) + } + + fn binary_search_ord(&self, ord: TermOrdinal) -> (u64, BlockAddr) { + let max_block = + (self.block_meta_bytes.len() / BlockAddrBlockMetadata::SIZE_IN_BYTES) as u64; + let get_first_ordinal = |block_id| { + self.get(block_id * STORE_BLOCK_LEN as u64) + .unwrap() + .first_ordinal + }; + let store_block_id = + binary_search(max_block, |block_id| ord.cmp(&get_first_ordinal(block_id))); + let store_block_id = match store_block_id { + Ok(store_block_id) => { + let block_id = store_block_id * STORE_BLOCK_LEN as u64; + return (block_id, self.get(block_id).unwrap()); + } + Err(store_block_id) => store_block_id - 1, + }; - type ValueWriter = crate::value::index::IndexValueWriter; + let block_addr_block_data = self.get_block_meta(store_block_id as usize).unwrap(); + let (inner_offset, block_addr) = block_addr_block_data.bissect_for_ord( + &self.addr_bytes[block_addr_block_data.offset as usize..], + ord, + ); + ( + store_block_id * STORE_BLOCK_LEN as u64 + inner_offset, + block_addr, + ) + } +} + +fn binary_search(max: u64, cmp_fn: impl Fn(u64) -> std::cmp::Ordering) -> Result { + use std::cmp::Ordering::*; + let mut size = max; + let mut left = 0; + let mut right = size; + while left < right { + let mid = left + size / 2; + + let cmp = cmp_fn(mid); + + if cmp == Less { + left = mid + 1; + } else if cmp == Greater { + right = mid; + } else { + return Ok(mid); + } + + size = right - left; + } + Err(left) } #[cfg(test)] @@ -194,9 +400,9 @@ mod tests { sstable_builder.add_block(b"ccc", 30..40, 10u64); sstable_builder.add_block(b"dddd", 40..50, 15u64); let mut buffer: Vec = Vec::new(); - sstable_builder.serialize(&mut buffer).unwrap(); + let fst_len = sstable_builder.serialize(&mut buffer).unwrap(); let buffer = OwnedBytes::new(buffer); - let sstable_index = SSTableIndex::load(buffer).unwrap(); + let sstable_index = SSTableIndex::load(buffer, fst_len).unwrap(); assert_eq!( sstable_index.get_block_with_key(b"bbbde"), Some(BlockAddr { @@ -226,10 +432,10 @@ mod tests { sstable_builder.add_block(b"ccc", 30..40, 10u64); sstable_builder.add_block(b"dddd", 40..50, 15u64); let mut buffer: Vec = Vec::new(); - sstable_builder.serialize(&mut buffer).unwrap(); + let fst_len = sstable_builder.serialize(&mut buffer).unwrap(); buffer[2] = 9u8; let buffer = OwnedBytes::new(buffer); - let data_corruption_err = SSTableIndex::load(buffer).err().unwrap(); + let data_corruption_err = SSTableIndex::load(buffer, fst_len).err().unwrap(); assert!(matches!(data_corruption_err, SSTableDataCorruption)); } From fbecabaf6475e9c195c146672c211e4ab7c38b43 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Wed, 15 Nov 2023 17:32:51 +0100 Subject: [PATCH 02/18] implement BlockAddrStoreWriter --- sstable/Cargo.toml | 1 + sstable/benches/ord_to_term.rs | 45 +++++++++++ sstable/src/lib.rs | 1 + sstable/src/sstable_index.rs | 144 ++++++++++++++++++++++++++------- 4 files changed, 161 insertions(+), 30 deletions(-) diff --git a/sstable/Cargo.toml b/sstable/Cargo.toml index 6d8b10b665..a621cbd23b 100644 --- a/sstable/Cargo.toml +++ b/sstable/Cargo.toml @@ -12,6 +12,7 @@ description = "sstables for tantivy" [dependencies] byteorder = "1.4.3" common = {version= "0.6", path="../common", package="tantivy-common"} +tantivy-bitpacker = { version= "0.5", path="../bitpacker" } tantivy-fst = "0.4" # experimental gives us access to Decompressor::upper_bound zstd = { version = "0.13", features = ["experimental"] } diff --git a/sstable/benches/ord_to_term.rs b/sstable/benches/ord_to_term.rs index 04c8835fb6..afdcb8fed5 100644 --- a/sstable/benches/ord_to_term.rs +++ b/sstable/benches/ord_to_term.rs @@ -15,6 +15,7 @@ fn make_test_sstable(suffix: &str) -> FileSlice { } let table = builder.finish().unwrap(); + eprintln!("len={}", table.len()); let table = Arc::new(OwnedBytes::new(table)); let slice = common::file_slice::FileSlice::new(table.clone()); @@ -40,6 +41,31 @@ pub fn criterion_benchmark(c: &mut Criterion) { assert!(dict.ord_to_term(19_000_000, &mut res).unwrap()); }) }); + c.bench_function("term_ord_suffix", |b| { + b.iter(|| { + assert_eq!( + dict.term_ord(b"prefix.00186A0.suffix").unwrap().unwrap(), + 100_000 + ); + assert_eq!( + dict.term_ord(b"prefix.121EAC0.suffix").unwrap().unwrap(), + 19_000_000 + ); + }) + }); + c.bench_function("open_and_term_ord_suffix", |b| { + b.iter(|| { + let dict = Dictionary::::open(slice.clone()).unwrap(); + assert_eq!( + dict.term_ord(b"prefix.00186A0.suffix").unwrap().unwrap(), + 100_000 + ); + assert_eq!( + dict.term_ord(b"prefix.121EAC0.suffix").unwrap().unwrap(), + 19_000_000 + ); + }) + }); } { let slice = make_test_sstable(""); @@ -59,6 +85,25 @@ pub fn criterion_benchmark(c: &mut Criterion) { assert!(dict.ord_to_term(19_000_000, &mut res).unwrap()); }) }); + c.bench_function("term_ord", |b| { + b.iter(|| { + assert_eq!(dict.term_ord(b"prefix.00186A0").unwrap().unwrap(), 100_000); + assert_eq!( + dict.term_ord(b"prefix.121EAC0").unwrap().unwrap(), + 19_000_000 + ); + }) + }); + c.bench_function("open_and_term_ord", |b| { + b.iter(|| { + let dict = Dictionary::::open(slice.clone()).unwrap(); + assert_eq!(dict.term_ord(b"prefix.00186A0").unwrap().unwrap(), 100_000); + assert_eq!( + dict.term_ord(b"prefix.121EAC0").unwrap().unwrap(), + 19_000_000 + ); + }) + }); } } diff --git a/sstable/src/lib.rs b/sstable/src/lib.rs index 928fad0ec8..d1eefa39c7 100644 --- a/sstable/src/lib.rs +++ b/sstable/src/lib.rs @@ -304,6 +304,7 @@ where let offset = wrt.written_bytes(); + eprintln!("index_offset={offset}"); let fst_len: u64 = self.index_builder.serialize(&mut wrt)?; wrt.write_all(&fst_len.to_le_bytes())?; wrt.write_all(&offset.to_le_bytes())?; diff --git a/sstable/src/sstable_index.rs b/sstable/src/sstable_index.rs index db805a1f40..c4a37c2803 100644 --- a/sstable/src/sstable_index.rs +++ b/sstable/src/sstable_index.rs @@ -3,6 +3,7 @@ use std::ops::Range; use std::sync::Arc; use common::{BinarySerializable, FixedSize, OwnedBytes}; +use tantivy_bitpacker::{compute_num_bits, BitPacker}; use tantivy_fst::raw::Fst; use tantivy_fst::{IntoStreamer, Map, MapBuilder, Streamer}; @@ -108,11 +109,6 @@ impl FixedSize for BlockAddr { const SIZE_IN_BYTES: usize = 3 * u64::SIZE_IN_BYTES; } -#[derive(Default)] -pub struct SSTableIndexBuilder { - blocks: Vec, -} - /// Given that left < right, /// mutates `left into a shorter byte string left'` that /// matches `left <= left' < right`. @@ -133,6 +129,11 @@ fn find_shorter_str_in_between(left: &mut Vec, right: &[u8]) { } } +#[derive(Default)] +pub struct SSTableIndexBuilder { + blocks: Vec, +} + impl SSTableIndexBuilder { /// In order to make the index as light as possible, we /// try to find a shorter alternative to the last key of the last block @@ -164,9 +165,15 @@ impl SSTableIndexBuilder { } let counting_writer = map_builder.into_inner().unwrap(); let written_bytes = counting_writer.written_bytes(); - let _writer = counting_writer.finish(); + let mut wrt = counting_writer.finish(); + + let mut block_store_writer = BlockAddrStoreWriter::new(); + for block in &self.blocks { + block_store_writer.write_block_meta(block.block_addr.clone())?; + } + block_store_writer.serialize(&mut wrt)?; + eprintln!("fst len={written_bytes}"); - // TODO write a BlockAddrStore Ok(written_bytes) } } @@ -177,35 +184,34 @@ const STORE_BLOCK_LEN: usize = 256; struct BlockAddrBlockMetadata { offset: u64, ref_block_addr: BlockAddr, - first_ordinal_nbits: u8, range_start_nbits: u8, - range_len_nbits: u8, + first_ordinal_nbits: u8, } impl BlockAddrBlockMetadata { fn num_bits(&self) -> u8 { - self.first_ordinal_nbits + self.range_start_nbits + self.range_len_nbits + self.first_ordinal_nbits + self.range_start_nbits } fn deserialize_block_addr(&self, data: &[u8], inner_offset: usize) -> Option { assert!(inner_offset < STORE_BLOCK_LEN - 1); let num_bits = self.num_bits() as usize; - let ordinal_addr = num_bits * inner_offset; - let range_start_addr = ordinal_addr + self.first_ordinal_nbits as usize; - let range_len_addr = range_start_addr + self.first_ordinal_nbits as usize; + let range_start_addr = num_bits * inner_offset; + let ordinal_addr = range_start_addr + self.range_start_nbits as usize; + let range_end_addr = range_start_addr + self.num_bits() as usize; - if (range_len_addr + self.range_len_nbits as usize + 7) / 8 > data.len() { + if (range_end_addr + self.range_start_nbits as usize + 7) / 8 > data.len() { return None; } + let range_start = self.ref_block_addr.byte_range.end + + extract_bits(data, range_start_addr, self.range_start_nbits) as usize; let first_ordinal = self.ref_block_addr.first_ordinal + extract_bits(data, ordinal_addr, self.first_ordinal_nbits); - let range_start = self.ref_block_addr.byte_range.start - + extract_bits(data, range_start_addr, self.range_start_nbits) as usize; - let range_len = extract_bits(data, range_len_addr, self.range_len_nbits) as usize; + let range_end = self.ref_block_addr.byte_range.end + + extract_bits(data, range_end_addr, self.range_start_nbits) as usize; - let range_end = range_start + range_len; Some(BlockAddr { first_ordinal, byte_range: range_start..range_end, @@ -216,15 +222,22 @@ impl BlockAddrBlockMetadata { // STORE_BLOCK_LEN - 2, this function goes from 1 to STORE_BLOCK_LEN - 1, and 0 marks // we should use ref_block_addr fn bissect_for_ord(&self, data: &[u8], target_ord: TermOrdinal) -> (u64, BlockAddr) { + // TODO can panic if block has header only let max = (STORE_BLOCK_LEN - 1).min(data.len() * 8 / self.num_bits() as usize); let inner_target_ord = target_ord - self.ref_block_addr.first_ordinal; let num_bits = self.num_bits() as usize; - let get_ord = - |index| extract_bits(data, num_bits * index as usize, self.first_ordinal_nbits); + let range_start_nbits = self.range_start_nbits as usize; + let get_ord = |index| { + extract_bits( + data, + num_bits * index as usize + range_start_nbits, + self.first_ordinal_nbits, + ) + }; let inner_offset = - match binary_search(max as u64, |index| inner_target_ord.cmp(&get_ord(index))) { + match binary_search(max as u64, |index| get_ord(index).cmp(&inner_target_ord)) { Ok(inner_offset) => inner_offset + 1, Err(inner_offset) => inner_offset, }; @@ -267,32 +280,27 @@ impl BinarySerializable for BlockAddrBlockMetadata { fn serialize(&self, write: &mut W) -> io::Result<()> { self.offset.serialize(write)?; self.ref_block_addr.serialize(write)?; - write.write_all(&[ - self.first_ordinal_nbits, - self.range_start_nbits, - self.range_len_nbits, - ])?; + write.write_all(&[self.first_ordinal_nbits, self.range_start_nbits])?; Ok(()) } fn deserialize(reader: &mut R) -> io::Result { let offset = u64::deserialize(reader)?; let ref_block_addr = BlockAddr::deserialize(reader)?; - let mut buffer = [0u8; 3]; + let mut buffer = [0u8; 2]; reader.read_exact(&mut buffer)?; Ok(BlockAddrBlockMetadata { offset, ref_block_addr, first_ordinal_nbits: buffer[0], range_start_nbits: buffer[1], - range_len_nbits: buffer[2], }) } } impl FixedSize for BlockAddrBlockMetadata { const SIZE_IN_BYTES: usize = - u64::SIZE_IN_BYTES + BlockAddr::SIZE_IN_BYTES + 3 * u8::SIZE_IN_BYTES; + u64::SIZE_IN_BYTES + BlockAddr::SIZE_IN_BYTES + 2 * u8::SIZE_IN_BYTES; } #[derive(Debug, Clone)] @@ -341,7 +349,7 @@ impl BlockAddrStore { .first_ordinal }; let store_block_id = - binary_search(max_block, |block_id| ord.cmp(&get_first_ordinal(block_id))); + binary_search(max_block, |block_id| get_first_ordinal(block_id).cmp(&ord)); let store_block_id = match store_block_id { Ok(store_block_id) => { let block_id = store_block_id * STORE_BLOCK_LEN as u64; @@ -385,6 +393,82 @@ fn binary_search(max: u64, cmp_fn: impl Fn(u64) -> std::cmp::Ordering) -> Result Err(left) } +struct BlockAddrStoreWriter { + buffer_block_metas: Vec, + buffer_addrs: Vec, + block_addrs: Vec, +} + +impl BlockAddrStoreWriter { + fn new() -> Self { + BlockAddrStoreWriter { + buffer_block_metas: Vec::new(), + buffer_addrs: Vec::new(), + block_addrs: Vec::with_capacity(STORE_BLOCK_LEN), + } + } + + fn flush_block(&mut self) -> io::Result<()> { + let ref_block_addr = self.block_addrs[0].clone(); + + let last_block_addr = self.block_addrs.last().unwrap().clone(); + + let ordinal_offset = last_block_addr.first_ordinal - ref_block_addr.first_ordinal; + let range_offset = (last_block_addr.byte_range.end - ref_block_addr.byte_range.end) as u64; + + let max_ordinal_nbits = compute_num_bits(ordinal_offset); + let max_range_start_nbits = compute_num_bits(range_offset); + + let block_addr_block_meta = BlockAddrBlockMetadata { + offset: self.buffer_addrs.len() as u64, + ref_block_addr: ref_block_addr.clone(), + range_start_nbits: max_range_start_nbits, + first_ordinal_nbits: max_ordinal_nbits, + }; + block_addr_block_meta.serialize(&mut self.buffer_block_metas)?; + + let mut bit_packer = BitPacker::new(); + + for block_addr in &self.block_addrs[1..] { + bit_packer.write( + (block_addr.byte_range.start - ref_block_addr.byte_range.end) as u64, + max_range_start_nbits, + &mut self.buffer_addrs, + )?; + bit_packer.write( + (block_addr.first_ordinal - ref_block_addr.first_ordinal) as u64, + max_ordinal_nbits, + &mut self.buffer_addrs, + )?; + } + + bit_packer.write(range_offset, max_range_start_nbits, &mut self.buffer_addrs)?; + bit_packer.flush(&mut self.buffer_addrs)?; + + self.block_addrs.clear(); + Ok(()) + } + + fn write_block_meta(&mut self, block_addr: BlockAddr) -> io::Result<()> { + self.block_addrs.push(block_addr); + if self.block_addrs.len() >= STORE_BLOCK_LEN { + self.flush_block()?; + } + Ok(()) + } + + fn serialize(&mut self, wrt: &mut W) -> io::Result<()> { + if !self.block_addrs.is_empty() { + self.flush_block()?; + } + let len = self.buffer_block_metas.len() as u64; + len.serialize(wrt)?; + wrt.write_all(&self.buffer_block_metas)?; + wrt.write_all(&self.buffer_addrs)?; + Ok(()) + } +} + #[cfg(test)] mod tests { use common::OwnedBytes; From 50fe7330fc3bf69196ef2842a52707f14809e08a Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Fri, 17 Nov 2023 19:19:51 +0100 Subject: [PATCH 03/18] new format with linear approx --- sstable/src/lib.rs | 37 ++++++++++ sstable/src/sstable_index.rs | 130 ++++++++++++++++++++++++++--------- 2 files changed, 135 insertions(+), 32 deletions(-) diff --git a/sstable/src/lib.rs b/sstable/src/lib.rs index d1eefa39c7..44c03e1d7f 100644 --- a/sstable/src/lib.rs +++ b/sstable/src/lib.rs @@ -548,4 +548,41 @@ mod test { assert!(btree_set_range.next().is_none()); } } + + #[test] + fn tmp() { + let words: std::collections::BTreeSet = + ["", "a", "aa", "aaaa", "aba", "acaaaa", "b"] + .into_iter() + .map(ToString::to_string) + .collect(); + let (lower_bound, upper_bound) = + (Bound::Excluded("b".to_string()), Bound::::Unbounded); + let mut builder = Dictionary::::builder(Vec::new()).unwrap(); + builder.set_block_len(16); + for word in &words { + builder.insert(word.as_bytes(), &()).unwrap(); + } + let buffer: Vec = builder.finish().unwrap(); + let dictionary: Dictionary = + Dictionary::open(FileSlice::from(buffer)).unwrap(); + let mut range_builder = dictionary.range(); + range_builder = match lower_bound.as_ref() { + Bound::Included(key) => range_builder.ge(key.as_bytes()), + Bound::Excluded(key) => range_builder.gt(key.as_bytes()), + Bound::Unbounded => range_builder, + }; + range_builder = match upper_bound.as_ref() { + Bound::Included(key) => range_builder.le(key.as_bytes()), + Bound::Excluded(key) => range_builder.lt(key.as_bytes()), + Bound::Unbounded => range_builder, + }; + let mut stream = range_builder.into_stream().unwrap(); + let mut btree_set_range = words.range((lower_bound, upper_bound)); + while stream.advance() { + let val = btree_set_range.next().unwrap(); + assert_eq!(val.as_bytes(), stream.key()); + } + assert!(btree_set_range.next().is_none()); + } } diff --git a/sstable/src/sstable_index.rs b/sstable/src/sstable_index.rs index c4a37c2803..22eb5c2184 100644 --- a/sstable/src/sstable_index.rs +++ b/sstable/src/sstable_index.rs @@ -165,7 +165,7 @@ impl SSTableIndexBuilder { } let counting_writer = map_builder.into_inner().unwrap(); let written_bytes = counting_writer.written_bytes(); - let mut wrt = counting_writer.finish(); + let mut wrt = counting_writer; let mut block_store_writer = BlockAddrStoreWriter::new(); for block in &self.blocks { @@ -173,6 +173,7 @@ impl SSTableIndexBuilder { } block_store_writer.serialize(&mut wrt)?; eprintln!("fst len={written_bytes}"); + eprintln!("store len={}", wrt.written_bytes() - written_bytes); Ok(written_bytes) } @@ -184,8 +185,11 @@ const STORE_BLOCK_LEN: usize = 256; struct BlockAddrBlockMetadata { offset: u64, ref_block_addr: BlockAddr, + range_start_slop: u32, + first_ordinal_slop: u32, range_start_nbits: u8, first_ordinal_nbits: u8, + block_len: u16, } impl BlockAddrBlockMetadata { @@ -194,23 +198,28 @@ impl BlockAddrBlockMetadata { } fn deserialize_block_addr(&self, data: &[u8], inner_offset: usize) -> Option { - assert!(inner_offset < STORE_BLOCK_LEN - 1); + if inner_offset >= self.block_len as usize { + return None; + } let num_bits = self.num_bits() as usize; let range_start_addr = num_bits * inner_offset; let ordinal_addr = range_start_addr + self.range_start_nbits as usize; - let range_end_addr = range_start_addr + self.num_bits() as usize; + let range_end_addr = range_start_addr + num_bits as usize; if (range_end_addr + self.range_start_nbits as usize + 7) / 8 > data.len() { return None; } - let range_start = self.ref_block_addr.byte_range.end - + extract_bits(data, range_start_addr, self.range_start_nbits) as usize; + let range_start = self.ref_block_addr.byte_range.start + + extract_bits(data, range_start_addr, self.range_start_nbits) as usize + + self.range_start_slop as usize * (inner_offset + 1); let first_ordinal = self.ref_block_addr.first_ordinal - + extract_bits(data, ordinal_addr, self.first_ordinal_nbits); - let range_end = self.ref_block_addr.byte_range.end - + extract_bits(data, range_end_addr, self.range_start_nbits) as usize; + + extract_bits(data, ordinal_addr, self.first_ordinal_nbits) + + self.first_ordinal_slop as u64 * (inner_offset + 1) as u64; + let range_end = self.ref_block_addr.byte_range.start + + extract_bits(data, range_end_addr, self.range_start_nbits) as usize + + self.range_start_slop as usize * (inner_offset + 2); Some(BlockAddr { first_ordinal, @@ -223,8 +232,6 @@ impl BlockAddrBlockMetadata { // we should use ref_block_addr fn bissect_for_ord(&self, data: &[u8], target_ord: TermOrdinal) -> (u64, BlockAddr) { // TODO can panic if block has header only - let max = (STORE_BLOCK_LEN - 1).min(data.len() * 8 / self.num_bits() as usize); - let inner_target_ord = target_ord - self.ref_block_addr.first_ordinal; let num_bits = self.num_bits() as usize; let range_start_nbits = self.range_start_nbits as usize; @@ -233,14 +240,15 @@ impl BlockAddrBlockMetadata { data, num_bits * index as usize + range_start_nbits, self.first_ordinal_nbits, - ) + ) + self.first_ordinal_slop as u64 * (index + 1) as u64 }; - let inner_offset = - match binary_search(max as u64, |index| get_ord(index).cmp(&inner_target_ord)) { - Ok(inner_offset) => inner_offset + 1, - Err(inner_offset) => inner_offset, - }; + let inner_offset = match binary_search(self.block_len as u64, |index| { + get_ord(index).cmp(&inner_target_ord) + }) { + Ok(inner_offset) => inner_offset + 1, + Err(inner_offset) => inner_offset, + }; if inner_offset == 0 { (0, self.ref_block_addr.clone()) } else { @@ -280,27 +288,39 @@ impl BinarySerializable for BlockAddrBlockMetadata { fn serialize(&self, write: &mut W) -> io::Result<()> { self.offset.serialize(write)?; self.ref_block_addr.serialize(write)?; + self.range_start_slop.serialize(write)?; + self.first_ordinal_slop.serialize(write)?; write.write_all(&[self.first_ordinal_nbits, self.range_start_nbits])?; + self.block_len.serialize(write)?; Ok(()) } fn deserialize(reader: &mut R) -> io::Result { let offset = u64::deserialize(reader)?; let ref_block_addr = BlockAddr::deserialize(reader)?; + let range_start_slop = u32::deserialize(reader)?; + let first_ordinal_slop = u32::deserialize(reader)?; let mut buffer = [0u8; 2]; reader.read_exact(&mut buffer)?; + let block_len = u16::deserialize(reader)?; Ok(BlockAddrBlockMetadata { offset, ref_block_addr, - first_ordinal_nbits: buffer[0], + range_start_slop, + first_ordinal_slop, range_start_nbits: buffer[1], + first_ordinal_nbits: buffer[0], + block_len, }) } } impl FixedSize for BlockAddrBlockMetadata { - const SIZE_IN_BYTES: usize = - u64::SIZE_IN_BYTES + BlockAddr::SIZE_IN_BYTES + 2 * u8::SIZE_IN_BYTES; + const SIZE_IN_BYTES: usize = u64::SIZE_IN_BYTES + + BlockAddr::SIZE_IN_BYTES + + 2 * u32::SIZE_IN_BYTES + + 2 * u8::SIZE_IN_BYTES + + u16::SIZE_IN_BYTES; } #[derive(Debug, Clone)] @@ -411,38 +431,84 @@ impl BlockAddrStoreWriter { fn flush_block(&mut self) -> io::Result<()> { let ref_block_addr = self.block_addrs[0].clone(); - let last_block_addr = self.block_addrs.last().unwrap().clone(); + for block_addr in &mut self.block_addrs { + block_addr.byte_range.start -= ref_block_addr.byte_range.start; + block_addr.first_ordinal -= ref_block_addr.first_ordinal; + } - let ordinal_offset = last_block_addr.first_ordinal - ref_block_addr.first_ordinal; - let range_offset = (last_block_addr.byte_range.end - ref_block_addr.byte_range.end) as u64; + let mut last_block_addr = self.block_addrs.last().unwrap().clone(); + last_block_addr.byte_range.end -= ref_block_addr.byte_range.start; + + let (range_start_slop, first_ordinal_slop) = + self.block_addrs.iter().enumerate().skip(1).fold( + (u64::MAX, u64::MAX), + |(range_slop, ordinal_slop), (index, block)| { + ( + range_slop.min((block.byte_range.start / index) as u64), + ordinal_slop.min(block.first_ordinal / index as u64), + ) + }, + ); + let range_start_slop = + range_start_slop.min((last_block_addr.byte_range.end / self.block_addrs.len()) as u64); + + // we need correction to be at least 1 otherwise we may fail to assess the number of + // elements in a block if each block is exactly the same size and the same number of + // ordinal + let (range_max_correction, ordinal_max_correction) = + self.block_addrs.iter().enumerate().skip(1).fold( + (1, 0), + |(range_max_correction, ordinal_max_correction), (index, block)| { + let range_correction = + block.byte_range.start - range_start_slop as usize * index; + let ordinal_correction = + block.first_ordinal - first_ordinal_slop * index as u64; + ( + range_max_correction.max(range_correction), + ordinal_max_correction.max(ordinal_correction), + ) + }, + ); + + let range_max_correction = range_max_correction.max( + last_block_addr.byte_range.end - range_start_slop as usize * self.block_addrs.len(), + ); - let max_ordinal_nbits = compute_num_bits(ordinal_offset); - let max_range_start_nbits = compute_num_bits(range_offset); + let range_start_nbits = compute_num_bits(range_max_correction as u64); + let first_ordinal_nbits = compute_num_bits(ordinal_max_correction); let block_addr_block_meta = BlockAddrBlockMetadata { offset: self.buffer_addrs.len() as u64, ref_block_addr: ref_block_addr.clone(), - range_start_nbits: max_range_start_nbits, - first_ordinal_nbits: max_ordinal_nbits, + range_start_slop: range_start_slop as u32, + first_ordinal_slop: first_ordinal_slop as u32, + range_start_nbits, + first_ordinal_nbits, + block_len: self.block_addrs.len() as u16 - 1, }; block_addr_block_meta.serialize(&mut self.buffer_block_metas)?; let mut bit_packer = BitPacker::new(); - for block_addr in &self.block_addrs[1..] { + for (i, block_addr) in self.block_addrs.iter().enumerate().skip(1) { bit_packer.write( - (block_addr.byte_range.start - ref_block_addr.byte_range.end) as u64, - max_range_start_nbits, + (block_addr.byte_range.start - range_start_slop as usize * i) as u64, + range_start_nbits, &mut self.buffer_addrs, )?; bit_packer.write( - (block_addr.first_ordinal - ref_block_addr.first_ordinal) as u64, - max_ordinal_nbits, + block_addr.first_ordinal - first_ordinal_slop * i as u64, + first_ordinal_nbits, &mut self.buffer_addrs, )?; } - bit_packer.write(range_offset, max_range_start_nbits, &mut self.buffer_addrs)?; + bit_packer.write( + (last_block_addr.byte_range.end - range_start_slop as usize * self.block_addrs.len()) + as u64, + range_start_nbits, + &mut self.buffer_addrs, + )?; bit_packer.flush(&mut self.buffer_addrs)?; self.block_addrs.clear(); From e6f106253db1265037d1d52564c14dde0c86fdae Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Mon, 20 Nov 2023 11:12:20 +0100 Subject: [PATCH 04/18] omit initial block range end save 8byte per 256 block --- sstable/src/sstable_index.rs | 83 ++++++++++++++++++++++-------------- 1 file changed, 52 insertions(+), 31 deletions(-) diff --git a/sstable/src/sstable_index.rs b/sstable/src/sstable_index.rs index 22eb5c2184..e3ba50383d 100644 --- a/sstable/src/sstable_index.rs +++ b/sstable/src/sstable_index.rs @@ -71,6 +71,30 @@ pub struct BlockAddr { pub byte_range: Range, } +impl BlockAddr { + fn to_block_start(&self) -> BlockStartAddr { + BlockStartAddr { + first_ordinal: self.first_ordinal, + byte_range_start: self.byte_range.start, + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +struct BlockStartAddr { + first_ordinal: u64, + byte_range_start: usize, +} + +impl BlockStartAddr { + fn to_block_addr(&self, byte_range_end: usize) -> BlockAddr { + BlockAddr { + first_ordinal: self.first_ordinal, + byte_range: self.byte_range_start..byte_range_end, + } + } +} + #[derive(Debug, Clone)] pub(crate) struct BlockMeta { /// Any byte string that is lexicographically greater or equal to @@ -80,33 +104,30 @@ pub(crate) struct BlockMeta { pub block_addr: BlockAddr, } -impl BinarySerializable for BlockAddr { +impl BinarySerializable for BlockStartAddr { fn serialize(&self, writer: &mut W) -> io::Result<()> { self.first_ordinal.serialize(writer)?; - let start = self.byte_range.start as u64; - start.serialize(writer)?; - let end = self.byte_range.end as u64; - end.serialize(writer) + let start = self.byte_range_start as u64; + start.serialize(writer) } fn deserialize(reader: &mut R) -> io::Result { let first_ordinal = u64::deserialize(reader)?; - let start = u64::deserialize(reader)? as usize; - let end = u64::deserialize(reader)? as usize; - Ok(BlockAddr { + let byte_range_start = u64::deserialize(reader)? as usize; + Ok(BlockStartAddr { first_ordinal, - byte_range: start..end, + byte_range_start, }) } // Provided method fn num_bytes(&self) -> u64 { - BlockAddr::SIZE_IN_BYTES as u64 + BlockStartAddr::SIZE_IN_BYTES as u64 } } -impl FixedSize for BlockAddr { - const SIZE_IN_BYTES: usize = 3 * u64::SIZE_IN_BYTES; +impl FixedSize for BlockStartAddr { + const SIZE_IN_BYTES: usize = 2 * u64::SIZE_IN_BYTES; } /// Given that left < right, @@ -184,7 +205,7 @@ const STORE_BLOCK_LEN: usize = 256; #[derive(Debug)] struct BlockAddrBlockMetadata { offset: u64, - ref_block_addr: BlockAddr, + ref_block_addr: BlockStartAddr, range_start_slop: u32, first_ordinal_slop: u32, range_start_nbits: u8, @@ -198,6 +219,13 @@ impl BlockAddrBlockMetadata { } fn deserialize_block_addr(&self, data: &[u8], inner_offset: usize) -> Option { + if inner_offset == 0 { + let range_end = self.ref_block_addr.byte_range_start + + extract_bits(data, 0, self.range_start_nbits) as usize + + self.range_start_slop as usize; + return Some(self.ref_block_addr.to_block_addr(range_end)); + } + let inner_offset = inner_offset - 1; if inner_offset >= self.block_len as usize { return None; } @@ -211,13 +239,13 @@ impl BlockAddrBlockMetadata { return None; } - let range_start = self.ref_block_addr.byte_range.start + let range_start = self.ref_block_addr.byte_range_start + extract_bits(data, range_start_addr, self.range_start_nbits) as usize + self.range_start_slop as usize * (inner_offset + 1); let first_ordinal = self.ref_block_addr.first_ordinal + extract_bits(data, ordinal_addr, self.first_ordinal_nbits) + self.first_ordinal_slop as u64 * (inner_offset + 1) as u64; - let range_end = self.ref_block_addr.byte_range.start + let range_end = self.ref_block_addr.byte_range_start + extract_bits(data, range_end_addr, self.range_start_nbits) as usize + self.range_start_slop as usize * (inner_offset + 2); @@ -249,15 +277,11 @@ impl BlockAddrBlockMetadata { Ok(inner_offset) => inner_offset + 1, Err(inner_offset) => inner_offset, }; - if inner_offset == 0 { - (0, self.ref_block_addr.clone()) - } else { - ( - inner_offset, - self.deserialize_block_addr(data, inner_offset as usize - 1) - .unwrap(), - ) - } + ( + inner_offset, + self.deserialize_block_addr(data, inner_offset as usize) + .unwrap(), + ) } } @@ -297,7 +321,7 @@ impl BinarySerializable for BlockAddrBlockMetadata { fn deserialize(reader: &mut R) -> io::Result { let offset = u64::deserialize(reader)?; - let ref_block_addr = BlockAddr::deserialize(reader)?; + let ref_block_addr = BlockStartAddr::deserialize(reader)?; let range_start_slop = u32::deserialize(reader)?; let first_ordinal_slop = u32::deserialize(reader)?; let mut buffer = [0u8; 2]; @@ -317,7 +341,7 @@ impl BinarySerializable for BlockAddrBlockMetadata { impl FixedSize for BlockAddrBlockMetadata { const SIZE_IN_BYTES: usize = u64::SIZE_IN_BYTES - + BlockAddr::SIZE_IN_BYTES + + BlockStartAddr::SIZE_IN_BYTES + 2 * u32::SIZE_IN_BYTES + 2 * u8::SIZE_IN_BYTES + u16::SIZE_IN_BYTES; @@ -351,12 +375,9 @@ impl BlockAddrStore { let store_block_id = (block_id as usize) / STORE_BLOCK_LEN; let inner_offset = (block_id as usize) % STORE_BLOCK_LEN; let block_addr_block_data = self.get_block_meta(store_block_id)?; - if inner_offset == 0 { - return Some(block_addr_block_data.ref_block_addr); - } block_addr_block_data.deserialize_block_addr( &self.addr_bytes[block_addr_block_data.offset as usize..], - inner_offset - 1, + inner_offset, ) } @@ -479,7 +500,7 @@ impl BlockAddrStoreWriter { let block_addr_block_meta = BlockAddrBlockMetadata { offset: self.buffer_addrs.len() as u64, - ref_block_addr: ref_block_addr.clone(), + ref_block_addr: ref_block_addr.to_block_start(), range_start_slop: range_start_slop as u32, first_ordinal_slop: first_ordinal_slop as u32, range_start_nbits, From b5c878ab8319e50ce6af44fe8bb250d9230a7b15 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Mon, 20 Nov 2023 12:00:34 +0100 Subject: [PATCH 05/18] extract slop/derivation computation --- sstable/src/sstable_index.rs | 87 +++++++++++++++++++----------------- 1 file changed, 46 insertions(+), 41 deletions(-) diff --git a/sstable/src/sstable_index.rs b/sstable/src/sstable_index.rs index e3ba50383d..7aca8fd0a0 100644 --- a/sstable/src/sstable_index.rs +++ b/sstable/src/sstable_index.rs @@ -233,7 +233,7 @@ impl BlockAddrBlockMetadata { let range_start_addr = num_bits * inner_offset; let ordinal_addr = range_start_addr + self.range_start_nbits as usize; - let range_end_addr = range_start_addr + num_bits as usize; + let range_end_addr = range_start_addr + num_bits; if (range_end_addr + self.range_start_nbits as usize + 7) / 8 > data.len() { return None; @@ -268,7 +268,7 @@ impl BlockAddrBlockMetadata { data, num_bits * index as usize + range_start_nbits, self.first_ordinal_nbits, - ) + self.first_ordinal_slop as u64 * (index + 1) as u64 + ) + self.first_ordinal_slop as u64 * (index + 1) }; let inner_offset = match binary_search(self.block_len as u64, |index| { @@ -365,7 +365,7 @@ impl BlockAddrStore { } fn get_block_meta(&self, store_block_id: usize) -> Option { - let mut block_data: &[u8] = &self + let mut block_data: &[u8] = self .block_meta_bytes .get(store_block_id * BlockAddrBlockMetadata::SIZE_IN_BYTES..)?; BlockAddrBlockMetadata::deserialize(&mut block_data).ok() @@ -460,49 +460,37 @@ impl BlockAddrStoreWriter { let mut last_block_addr = self.block_addrs.last().unwrap().clone(); last_block_addr.byte_range.end -= ref_block_addr.byte_range.start; - let (range_start_slop, first_ordinal_slop) = - self.block_addrs.iter().enumerate().skip(1).fold( - (u64::MAX, u64::MAX), - |(range_slop, ordinal_slop), (index, block)| { - ( - range_slop.min((block.byte_range.start / index) as u64), - ordinal_slop.min(block.first_ordinal / index as u64), - ) - }, - ); - let range_start_slop = - range_start_slop.min((last_block_addr.byte_range.end / self.block_addrs.len()) as u64); - - // we need correction to be at least 1 otherwise we may fail to assess the number of - // elements in a block if each block is exactly the same size and the same number of - // ordinal - let (range_max_correction, ordinal_max_correction) = - self.block_addrs.iter().enumerate().skip(1).fold( - (1, 0), - |(range_max_correction, ordinal_max_correction), (index, block)| { - let range_correction = - block.byte_range.start - range_start_slop as usize * index; - let ordinal_correction = - block.first_ordinal - first_ordinal_slop * index as u64; - ( - range_max_correction.max(range_correction), - ordinal_max_correction.max(ordinal_correction), - ) - }, - ); - - let range_max_correction = range_max_correction.max( - last_block_addr.byte_range.end - range_start_slop as usize * self.block_addrs.len(), + let (range_start_slop, range_max_derivation) = find_best_slop( + self.block_addrs + .iter() + .map(|block| block.byte_range.start as u64) + .chain(std::iter::once(last_block_addr.byte_range.end as u64)) + .enumerate() + .skip(1), ); - let range_start_nbits = compute_num_bits(range_max_correction as u64); - let first_ordinal_nbits = compute_num_bits(ordinal_max_correction); + let (first_ordinal_slop, ordinal_max_derivation) = find_best_slop( + self.block_addrs + .iter() + .map(|block| block.first_ordinal) + .enumerate() + .skip(1), + ); + + // we need derivation to be at least 1 otherwise we may fail to assess the number of + // elements in a block if each element is exactly the same size and the same number of + // ordinal + let mut range_start_nbits = compute_num_bits(range_max_derivation); + let first_ordinal_nbits = compute_num_bits(ordinal_max_derivation); + if range_start_nbits + first_ordinal_nbits == 0 { + range_start_nbits = 1; + } let block_addr_block_meta = BlockAddrBlockMetadata { offset: self.buffer_addrs.len() as u64, ref_block_addr: ref_block_addr.to_block_start(), - range_start_slop: range_start_slop as u32, - first_ordinal_slop: first_ordinal_slop as u32, + range_start_slop, + first_ordinal_slop, range_start_nbits, first_ordinal_nbits, block_len: self.block_addrs.len() as u16 - 1, @@ -518,7 +506,7 @@ impl BlockAddrStoreWriter { &mut self.buffer_addrs, )?; bit_packer.write( - block_addr.first_ordinal - first_ordinal_slop * i as u64, + block_addr.first_ordinal - first_ordinal_slop as u64 * i as u64, first_ordinal_nbits, &mut self.buffer_addrs, )?; @@ -556,6 +544,23 @@ impl BlockAddrStoreWriter { } } +fn find_best_slop(elements: impl Iterator + Clone) -> (u32, u64) { + let slop_iterator = elements.clone(); + let derivation_iterator = elements; + + let final_slop = slop_iterator.fold(u32::MAX, |slop, (index, value)| { + slop.min((value / index as u64) as u32) + }); + + let max_derivation = derivation_iterator.fold(1, |max_derivation, (index, value)| { + let derivation = value - final_slop as u64 * index as u64; + + max_derivation.max(derivation) + }); + + (final_slop, max_derivation) +} + #[cfg(test)] mod tests { use common::OwnedBytes; From 142cdebdf5b880ad490869d5aee727f3248ddffe Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Tue, 21 Nov 2023 14:40:49 +0100 Subject: [PATCH 06/18] use better linear approximator and allow negative correction to approximator --- sstable/src/sstable_index.rs | 104 ++++++++++++++++++++++++++--------- 1 file changed, 77 insertions(+), 27 deletions(-) diff --git a/sstable/src/sstable_index.rs b/sstable/src/sstable_index.rs index 7aca8fd0a0..b0d3bf5bd9 100644 --- a/sstable/src/sstable_index.rs +++ b/sstable/src/sstable_index.rs @@ -211,6 +211,9 @@ struct BlockAddrBlockMetadata { range_start_nbits: u8, first_ordinal_nbits: u8, block_len: u16, + // these fields are computed on deserialization, and not stored + range_shift: i64, + ordinal_shift: i64, } impl BlockAddrBlockMetadata { @@ -222,7 +225,8 @@ impl BlockAddrBlockMetadata { if inner_offset == 0 { let range_end = self.ref_block_addr.byte_range_start + extract_bits(data, 0, self.range_start_nbits) as usize - + self.range_start_slop as usize; + + self.range_start_slop as usize + - self.range_shift as usize; return Some(self.ref_block_addr.to_block_addr(range_end)); } let inner_offset = inner_offset - 1; @@ -241,13 +245,16 @@ impl BlockAddrBlockMetadata { let range_start = self.ref_block_addr.byte_range_start + extract_bits(data, range_start_addr, self.range_start_nbits) as usize - + self.range_start_slop as usize * (inner_offset + 1); + + self.range_start_slop as usize * (inner_offset + 1) + - self.range_shift as usize; let first_ordinal = self.ref_block_addr.first_ordinal + extract_bits(data, ordinal_addr, self.first_ordinal_nbits) - + self.first_ordinal_slop as u64 * (inner_offset + 1) as u64; + + self.first_ordinal_slop as u64 * (inner_offset + 1) as u64 + - self.ordinal_shift as u64; let range_end = self.ref_block_addr.byte_range_start + extract_bits(data, range_end_addr, self.range_start_nbits) as usize - + self.range_start_slop as usize * (inner_offset + 2); + + self.range_start_slop as usize * (inner_offset + 2) + - self.range_shift as usize; Some(BlockAddr { first_ordinal, @@ -269,6 +276,7 @@ impl BlockAddrBlockMetadata { num_bits * index as usize + range_start_nbits, self.first_ordinal_nbits, ) + self.first_ordinal_slop as u64 * (index + 1) + - self.ordinal_shift as u64 }; let inner_offset = match binary_search(self.block_len as u64, |index| { @@ -326,15 +334,19 @@ impl BinarySerializable for BlockAddrBlockMetadata { let first_ordinal_slop = u32::deserialize(reader)?; let mut buffer = [0u8; 2]; reader.read_exact(&mut buffer)?; + let first_ordinal_nbits = buffer[0]; + let range_start_nbits = buffer[1]; let block_len = u16::deserialize(reader)?; Ok(BlockAddrBlockMetadata { offset, ref_block_addr, range_start_slop, first_ordinal_slop, - range_start_nbits: buffer[1], - first_ordinal_nbits: buffer[0], + range_start_nbits, + first_ordinal_nbits, block_len, + range_shift: 1 << (range_start_nbits - 1), + ordinal_shift: 1 << (first_ordinal_nbits - 1), }) } } @@ -460,7 +472,7 @@ impl BlockAddrStoreWriter { let mut last_block_addr = self.block_addrs.last().unwrap().clone(); last_block_addr.byte_range.end -= ref_block_addr.byte_range.start; - let (range_start_slop, range_max_derivation) = find_best_slop( + let (range_start_slop, range_start_nbits) = find_best_slop( self.block_addrs .iter() .map(|block| block.byte_range.start as u64) @@ -469,7 +481,7 @@ impl BlockAddrStoreWriter { .skip(1), ); - let (first_ordinal_slop, ordinal_max_derivation) = find_best_slop( + let (first_ordinal_slop, first_ordinal_nbits) = find_best_slop( self.block_addrs .iter() .map(|block| block.first_ordinal) @@ -477,14 +489,8 @@ impl BlockAddrStoreWriter { .skip(1), ); - // we need derivation to be at least 1 otherwise we may fail to assess the number of - // elements in a block if each element is exactly the same size and the same number of - // ordinal - let mut range_start_nbits = compute_num_bits(range_max_derivation); - let first_ordinal_nbits = compute_num_bits(ordinal_max_derivation); - if range_start_nbits + first_ordinal_nbits == 0 { - range_start_nbits = 1; - } + let range_shift = 1 << (range_start_nbits - 1); + let ordinal_shift = 1 << (first_ordinal_nbits - 1); let block_addr_block_meta = BlockAddrBlockMetadata { offset: self.buffer_addrs.len() as u64, @@ -494,27 +500,31 @@ impl BlockAddrStoreWriter { range_start_nbits, first_ordinal_nbits, block_len: self.block_addrs.len() as u16 - 1, + range_shift, + ordinal_shift, }; block_addr_block_meta.serialize(&mut self.buffer_block_metas)?; let mut bit_packer = BitPacker::new(); for (i, block_addr) in self.block_addrs.iter().enumerate().skip(1) { + let range_pred = (range_start_slop as usize * i) as i64; bit_packer.write( - (block_addr.byte_range.start - range_start_slop as usize * i) as u64, + (block_addr.byte_range.start as i64 - range_pred + range_shift) as u64, range_start_nbits, &mut self.buffer_addrs, )?; + let first_ordinal_pred = (first_ordinal_slop as u64 * i as u64) as i64; bit_packer.write( - block_addr.first_ordinal - first_ordinal_slop as u64 * i as u64, + (block_addr.first_ordinal as i64 - first_ordinal_pred + ordinal_shift) as u64, first_ordinal_nbits, &mut self.buffer_addrs, )?; } + let range_pred = (range_start_slop as usize * self.block_addrs.len()) as i64; bit_packer.write( - (last_block_addr.byte_range.end - range_start_slop as usize * self.block_addrs.len()) - as u64, + (last_block_addr.byte_range.end as i64 - range_pred + range_shift) as u64, range_start_nbits, &mut self.buffer_addrs, )?; @@ -544,21 +554,61 @@ impl BlockAddrStoreWriter { } } -fn find_best_slop(elements: impl Iterator + Clone) -> (u32, u64) { +fn find_best_slop(elements: impl Iterator + Clone) -> (u32, u8) { let slop_iterator = elements.clone(); let derivation_iterator = elements; - let final_slop = slop_iterator.fold(u32::MAX, |slop, (index, value)| { - slop.min((value / index as u64) as u32) - }); + let mut min_slop_idx = 1; + let mut min_slop_val = 0; + let mut min_slop = u32::MAX; + let mut max_slop_idx = 1; + let mut max_slop_val = 0; + let mut max_slop = 0; + for (index, value) in slop_iterator { + let slop = (value / index as u64) as u32; + if slop <= min_slop { + min_slop = slop; + min_slop_idx = index; + min_slop_val = value; + } + if slop >= max_slop { + max_slop = slop; + max_slop_idx = index; + max_slop_val = value; + } + } - let max_derivation = derivation_iterator.fold(1, |max_derivation, (index, value)| { - let derivation = value - final_slop as u64 * index as u64; + // above is an heristic giving the "highest" and "lowest" point. It's imperfect in that in that + // a point that appear earlier might have a high slop derivation, but a smaller absolute + // derivation than a latter point. + // The actual best values can be obtained by using the symplex method, but the improvement is + // likely minimal, and computation is way more complexe. + // + // Assuming these point are the furthest up and down, we find the slop that would cause the same + // positive derivation for the highest as negative derivation for the lowest. + // A is the optimal slop. B is the derivation to the guess + // + // 0 = min_slop_val - min_slop_idx * A - B + // 0 = max_slop_val - max_slop_idx * A + B + // + // 0 = min_slop_val + max_slop_val - (min_slop_idx + max_slop_idx) * A + // (min_slop_val + max_slop_val) / (min_slop_idx + max_slop_idx) = A + // + // we actually add some correcting factor to have proper rounding, not truncation. + + let denominator = (min_slop_idx + max_slop_idx) as u64; + let final_slop = ((min_slop_val + max_slop_val + denominator / 2) / denominator) as u32; + + // we don't solve for B because our choice of point is suboptimal, so it's actually a lower + // bound and we need to iterate to find the actual worst value. + + let max_derivation = derivation_iterator.fold(0, |max_derivation, (index, value)| { + let derivation = (value as i64 - final_slop as i64 * index as i64).unsigned_abs(); max_derivation.max(derivation) }); - (final_slop, max_derivation) + (final_slop, compute_num_bits(max_derivation) + 1) } #[cfg(test)] From bf07fedefefa9353dfecc43bb155369d2b3d7334 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Tue, 21 Nov 2023 15:25:24 +0100 Subject: [PATCH 07/18] document format and reorder some fields --- sstable/README.md | 47 +++++++++++++++++------------------- sstable/src/sstable_index.rs | 7 +++--- 2 files changed, 26 insertions(+), 28 deletions(-) diff --git a/sstable/README.md b/sstable/README.md index a9980b75ea..567ce20a32 100644 --- a/sstable/README.md +++ b/sstable/README.md @@ -117,35 +117,32 @@ Fst is in the format of tantivy\_fst ### BlockAddrBlockMetadata -+--------+--------------+-------------------+-----------------+---------------+ -| Offset | RefBlockAddr | FirstOrdinalNBits | RangeStartNBits | RangeLenNBits | -+--------+--------------+-------------------+-----------------+---------------+ ++--------+------------+---------------+-----------+-------------+-------------------+-----------------+----------+ +| Offset | RangeStart | FirstOrdianal | RangeSlop | OrdinalSlop | FirstOrdinalNBits | RangeStartNBits | BlockLen | ++--------+------------+---------------+-----------+-------------+-------------------+-----------------+----------+ - Offset(u64): offset of the corresponding BlockData in the datastream -- RefBlockAddr(BlockAddr): reference block for the compacted block data -- FirstOrdinalNBits(u8): number of bits per ordinal in datastream -- RangeStartNBits(u8): number of bits per range start in datastream -- RangeLenNBits(u8): number of bits per range lenght in datastream +- RangeStart(u64): the start position of the first block +- FirstOrdinal(u64): the first ordinal of the first block +- RangeSlop(u32): slop predicted for start range evolution (see computation in BlockData) +- OrdinalSlop(u64): slop predicted for first ordinal evolution (see computation in BlockData) +- FirstOrdinalNBits(u8): number of bits per ordinal in datastream (see computation in BlockData) +- RangeStartNBits(u8): number of bits per range start in datastream (see computation in BlockData) -### BlockAddr - -+--------------+------------+----------+ -| FirstOrdinal | RangeStart | RangeEnd | -+--------------+------------+----------+ +### BlockData -- FirstOrdinal(u64): the first ordinal of this block -- RangeStart(u64): the start position of the corresponding block in the sstable -- RangeEnd(u64): the end position of the corresponding block in the sstable ++-----------------+-------------------+---------------+ +| RangeStartDelta | FirstOrdinalDelta | FinalRangeEnd | ++-----------------+-------------------+---------------+ +|------(BlockLen repetitions)---------| -### BlockData +- RangeStartDelta(var): RangeStartNBits *bits* of little endian number. See below for decoding +- FirstOrdinalDelta(var): FirstOrdinalNBits *bits* of little endian number. See below for decoding +- FinalRangeEnd(var): RangeStartNBits *bits* of integer. See below for decoding -+-------------------+-----------------+----------+ -| FirstOrdinalDelta | RangeStartDelta | RangeEnd | -+-------------------+-----------------+----------+ -|---------------(255 repetitions)----------------| +converting a BlockData of index Index and a BlockAddrBlockMetadata to an actual block address is done as follow: +range\_prediction := RangeStart + Index * RangeSlop; +range\_derivation := RangeStartDelta - (1 << (RangeStartNBits-1)); +range\_start := range\_prediction + range\_derivation -- FirstOrdinalDelta(var): FirstOrdinalNBits *bits* of little endian number. Delta between the first -ordinal for this block, and the reference block -- RangeStartDelta(var): RangeStartNBits *bits* of little endian number. Delta between the range -start for this block, and it of the reference block -- RangeEnd(var): RangeEndNBits *bits* of little endian number. Lenght of the range of this block +the same computation can be done for ordinal diff --git a/sstable/src/sstable_index.rs b/sstable/src/sstable_index.rs index b0d3bf5bd9..f2b5b41697 100644 --- a/sstable/src/sstable_index.rs +++ b/sstable/src/sstable_index.rs @@ -106,14 +106,14 @@ pub(crate) struct BlockMeta { impl BinarySerializable for BlockStartAddr { fn serialize(&self, writer: &mut W) -> io::Result<()> { - self.first_ordinal.serialize(writer)?; let start = self.byte_range_start as u64; - start.serialize(writer) + start.serialize(writer)?; + self.first_ordinal.serialize(writer) } fn deserialize(reader: &mut R) -> io::Result { - let first_ordinal = u64::deserialize(reader)?; let byte_range_start = u64::deserialize(reader)? as usize; + let first_ordinal = u64::deserialize(reader)?; Ok(BlockStartAddr { first_ordinal, byte_range_start, @@ -324,6 +324,7 @@ impl BinarySerializable for BlockAddrBlockMetadata { self.first_ordinal_slop.serialize(write)?; write.write_all(&[self.first_ordinal_nbits, self.range_start_nbits])?; self.block_len.serialize(write)?; + self.num_bits(); Ok(()) } From 2e4ff9e5445a1bfc1c115b4cb54b1133452ca448 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Tue, 21 Nov 2023 16:25:20 +0100 Subject: [PATCH 08/18] optimize single block sstable size --- src/fastfield/mod.rs | 16 ++--- sstable/README.md | 11 +++- sstable/src/dictionary.rs | 16 ++++- sstable/src/lib.rs | 11 ++-- sstable/src/sstable_index.rs | 120 +++++++++++++++++++++++++++++++++-- 5 files changed, 148 insertions(+), 26 deletions(-) diff --git a/src/fastfield/mod.rs b/src/fastfield/mod.rs index 1c9b3d5afe..783f1d7f60 100644 --- a/src/fastfield/mod.rs +++ b/src/fastfield/mod.rs @@ -131,7 +131,7 @@ mod tests { } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 93); + assert_eq!(file.len(), 80); let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap(); let column = fast_field_readers .u64("field") @@ -181,7 +181,7 @@ mod tests { write.terminate().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 121); + assert_eq!(file.len(), 108); let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap(); let col = fast_field_readers .u64("field") @@ -214,7 +214,7 @@ mod tests { write.terminate().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 94); + assert_eq!(file.len(), 81); let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap(); let fast_field_reader = fast_field_readers .u64("field") @@ -246,7 +246,7 @@ mod tests { write.terminate().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 4489); + assert_eq!(file.len(), 4476); { let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap(); let col = fast_field_readers @@ -279,7 +279,7 @@ mod tests { write.terminate().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 265); + assert_eq!(file.len(), 252); { let fast_field_readers = FastFieldReaders::open(file, schema).unwrap(); @@ -773,7 +773,7 @@ mod tests { write.terminate().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 102); + assert_eq!(file.len(), 84); let fast_field_readers = FastFieldReaders::open(file, schema).unwrap(); let bool_col = fast_field_readers.bool("field_bool").unwrap(); assert_eq!(bool_col.first(0), Some(true)); @@ -805,7 +805,7 @@ mod tests { write.terminate().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 114); + assert_eq!(file.len(), 96); let readers = FastFieldReaders::open(file, schema).unwrap(); let bool_col = readers.bool("field_bool").unwrap(); for i in 0..25 { @@ -830,7 +830,7 @@ mod tests { write.terminate().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 104); + assert_eq!(file.len(), 86); let fastfield_readers = FastFieldReaders::open(file, schema).unwrap(); let col = fastfield_readers.bool("field_bool").unwrap(); assert_eq!(col.first(0), None); diff --git a/sstable/README.md b/sstable/README.md index 567ce20a32..e6b2d2e3eb 100644 --- a/sstable/README.md +++ b/sstable/README.md @@ -95,7 +95,7 @@ Note: as the SSTable does not support redundant keys, there is no ambiguity betw ``` - Fst(Fst): finit state transducer mapping keys to a block number - BlockAddrStore(BlockAddrStore): store mapping a block number to its BlockAddr -- FstLen(u64): Lenght of the Fst +- StoreOffset(u64): Offset to start of the BlockAddrStore. If zero, see the SingleBlockSStable section - IndexOffset(u64): Offset to the start of the SSTFooter - NumTerm(u64): number of terms in the sstable - Version(u32): Currently equal to 3 @@ -146,3 +146,12 @@ range\_derivation := RangeStartDelta - (1 << (RangeStartNBits-1)); range\_start := range\_prediction + range\_derivation the same computation can be done for ordinal + + +## SingleBlockSStable + +The format used for the index is meant to be compact, however it has a constant cost of arround 70 +bytes, which isn't negligible for a table containing very few keys. +To limit the impact of that constant cost, single block sstable omit the Fst and BlockAddrStore from +their index. Instead a block with first ordinal of 0, range start of 0 and range end of IndexOffset +is implicitly used for every operations. diff --git a/sstable/src/dictionary.rs b/sstable/src/dictionary.rs index 3aafaa10aa..95f0ab4092 100644 --- a/sstable/src/dictionary.rs +++ b/sstable/src/dictionary.rs @@ -9,8 +9,11 @@ use common::{BinarySerializable, OwnedBytes}; use tantivy_fst::automaton::AlwaysMatch; use tantivy_fst::Automaton; +use crate::sstable_index::SSTableIndexV3Empty; use crate::streamer::{Streamer, StreamerBuilder}; -use crate::{BlockAddr, DeltaReader, Reader, SSTable, SSTableIndex, TermOrdinal, VoidSSTable}; +use crate::{ + BlockAddr, DeltaReader, Reader, SSTable, SSTableIndex, SSTableIndexV3, TermOrdinal, VoidSSTable, +}; /// An SSTable is a sorted map that associates sorted `&[u8]` keys /// to any kind of typed values. @@ -197,8 +200,15 @@ impl Dictionary { let (sstable_slice, index_slice) = main_slice.split(index_offset as usize); let sstable_index_bytes = index_slice.read_bytes()?; - let sstable_index = SSTableIndex::load(sstable_index_bytes, store_offset) - .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption"))?; + let sstable_index = if store_offset != 0 { + SSTableIndex::V3( + SSTableIndexV3::load(sstable_index_bytes, store_offset).map_err(|_| { + io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption") + })?, + ) + } else { + SSTableIndex::V3Empty(SSTableIndexV3Empty::load(index_offset as usize)) + }; Ok(Dictionary { sstable_slice, sstable_index, diff --git a/sstable/src/lib.rs b/sstable/src/lib.rs index 44c03e1d7f..9df68edbb5 100644 --- a/sstable/src/lib.rs +++ b/sstable/src/lib.rs @@ -11,7 +11,7 @@ mod streamer; pub mod value; mod sstable_index; -pub use sstable_index::{BlockAddr, SSTableIndex, SSTableIndexBuilder}; +pub use sstable_index::{BlockAddr, SSTableIndex, SSTableIndexBuilder, SSTableIndexV3}; pub(crate) mod vint; pub use dictionary::Dictionary; pub use streamer::{Streamer, StreamerBuilder}; @@ -28,7 +28,7 @@ use crate::value::{RangeValueReader, RangeValueWriter}; pub type TermOrdinal = u64; const DEFAULT_KEY_CAPACITY: usize = 50; -const SSTABLE_VERSION: u32 = 2; +const SSTABLE_VERSION: u32 = 3; /// Given two byte string returns the length of /// the longest common prefix. @@ -387,13 +387,10 @@ mod test { 16, 17, 33, 18, 19, 17, 20, // data block 0, 0, 0, 0, // no more block // index - 8, 0, 0, 0, // size of index block - 0, // compression - 1, 0, 12, 0, 32, 17, 20, // index block - 0, 0, 0, 0, // no more index block + 0, 0, 0, 0, 0, 0, 0, 0, // fst lenght 16, 0, 0, 0, 0, 0, 0, 0, // index start offset 3, 0, 0, 0, 0, 0, 0, 0, // num term - 2, 0, 0, 0, // version + 3, 0, 0, 0, // version ] ); let buffer = OwnedBytes::new(buffer); diff --git a/sstable/src/sstable_index.rs b/sstable/src/sstable_index.rs index f2b5b41697..0da384c35b 100644 --- a/sstable/src/sstable_index.rs +++ b/sstable/src/sstable_index.rs @@ -10,14 +10,74 @@ use tantivy_fst::{IntoStreamer, Map, MapBuilder, Streamer}; use crate::{common_prefix_len, SSTableDataCorruption, TermOrdinal}; #[derive(Debug, Clone)] -pub struct SSTableIndex { +pub enum SSTableIndex { + V2, + V3(SSTableIndexV3), + V3Empty(SSTableIndexV3Empty), +} + +impl SSTableIndex { + /// Get the [`BlockAddr`] of the requested block. + pub(crate) fn get_block(&self, block_id: u64) -> Option { + match self { + SSTableIndex::V2 => todo!(), + SSTableIndex::V3(v3_index) => v3_index.get_block(block_id), + SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block(block_id), + } + } + + /// Get the block id of the block that would contain `key`. + /// + /// Returns None if `key` is lexicographically after the last key recorded. + pub(crate) fn locate_with_key(&self, key: &[u8]) -> Option { + match self { + SSTableIndex::V2 => todo!(), + SSTableIndex::V3(v3_index) => v3_index.locate_with_key(key), + SSTableIndex::V3Empty(v3_empty) => v3_empty.locate_with_key(key), + } + } + + /// Get the [`BlockAddr`] of the block that would contain `key`. + /// + /// Returns None if `key` is lexicographically after the last key recorded. + pub fn get_block_with_key(&self, key: &[u8]) -> Option { + match self { + SSTableIndex::V2 => todo!(), + SSTableIndex::V3(v3_index) => v3_index.get_block_with_key(key), + SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block_with_key(key), + } + } + + pub(crate) fn locate_with_ord(&self, ord: TermOrdinal) -> u64 { + match self { + SSTableIndex::V2 => todo!(), + SSTableIndex::V3(v3_index) => v3_index.locate_with_ord(ord), + SSTableIndex::V3Empty(v3_empty) => v3_empty.locate_with_ord(ord), + } + } + + /// Get the [`BlockAddr`] of the block containing the `ord`-th term. + pub(crate) fn get_block_with_ord(&self, ord: TermOrdinal) -> BlockAddr { + match self { + SSTableIndex::V2 => todo!(), + SSTableIndex::V3(v3_index) => v3_index.get_block_with_ord(ord), + SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block_with_ord(ord), + } + } +} + +#[derive(Debug, Clone)] +pub struct SSTableIndexV3 { fst_index: Arc>, block_addr_store: BlockAddrStore, } -impl SSTableIndex { +impl SSTableIndexV3 { /// Load an index from its binary representation - pub fn load(data: OwnedBytes, fst_length: u64) -> Result { + pub fn load( + data: OwnedBytes, + fst_length: u64, + ) -> Result { let (fst_slice, block_addr_store_slice) = data.split(fst_length as usize); let fst_index = Fst::new(fst_slice) .map_err(|_| SSTableDataCorruption)? @@ -25,7 +85,7 @@ impl SSTableIndex { let block_addr_store = BlockAddrStore::open(block_addr_store_slice).map_err(|_| SSTableDataCorruption)?; - Ok(SSTableIndex { + Ok(SSTableIndexV3 { fst_index: Arc::new(fst_index), block_addr_store, }) @@ -65,6 +125,49 @@ impl SSTableIndex { } } +#[derive(Debug, Clone)] +pub struct SSTableIndexV3Empty { + block_addr: BlockAddr, +} + +impl SSTableIndexV3Empty { + pub fn load(index_start_pos: usize) -> SSTableIndexV3Empty { + SSTableIndexV3Empty { + block_addr: BlockAddr { + first_ordinal: 0, + byte_range: 0..index_start_pos, + }, + } + } + + /// Get the [`BlockAddr`] of the requested block. + pub(crate) fn get_block(&self, _block_id: u64) -> Option { + Some(self.block_addr.clone()) + } + + /// Get the block id of the block that would contain `key`. + /// + /// Returns None if `key` is lexicographically after the last key recorded. + pub(crate) fn locate_with_key(&self, _key: &[u8]) -> Option { + Some(0) + } + + /// Get the [`BlockAddr`] of the block that would contain `key`. + /// + /// Returns None if `key` is lexicographically after the last key recorded. + pub fn get_block_with_key(&self, _key: &[u8]) -> Option { + Some(self.block_addr.clone()) + } + + pub(crate) fn locate_with_ord(&self, _ord: TermOrdinal) -> u64 { + 0 + } + + /// Get the [`BlockAddr`] of the block containing the `ord`-th term. + pub(crate) fn get_block_with_ord(&self, _ord: TermOrdinal) -> BlockAddr { + self.block_addr.clone() + } +} #[derive(Clone, Eq, PartialEq, Debug)] pub struct BlockAddr { pub first_ordinal: u64, @@ -176,6 +279,9 @@ impl SSTableIndexBuilder { } pub fn serialize(&self, wrt: W) -> io::Result { + if self.blocks.len() <= 1 { + return Ok(0); + } // TODO handle errors let counting_writer = common::CountingWriter::wrap(wrt); let mut map_builder = MapBuilder::new(counting_writer).unwrap(); @@ -616,7 +722,7 @@ fn find_best_slop(elements: impl Iterator + Clone) -> (u32, mod tests { use common::OwnedBytes; - use super::{BlockAddr, SSTableIndex, SSTableIndexBuilder}; + use super::{BlockAddr, SSTableIndexBuilder, SSTableIndexV3}; use crate::SSTableDataCorruption; #[test] @@ -629,7 +735,7 @@ mod tests { let mut buffer: Vec = Vec::new(); let fst_len = sstable_builder.serialize(&mut buffer).unwrap(); let buffer = OwnedBytes::new(buffer); - let sstable_index = SSTableIndex::load(buffer, fst_len).unwrap(); + let sstable_index = SSTableIndexV3::load(buffer, fst_len).unwrap(); assert_eq!( sstable_index.get_block_with_key(b"bbbde"), Some(BlockAddr { @@ -662,7 +768,7 @@ mod tests { let fst_len = sstable_builder.serialize(&mut buffer).unwrap(); buffer[2] = 9u8; let buffer = OwnedBytes::new(buffer); - let data_corruption_err = SSTableIndex::load(buffer, fst_len).err().unwrap(); + let data_corruption_err = SSTableIndexV3::load(buffer, fst_len).err().unwrap(); assert!(matches!(data_corruption_err, SSTableDataCorruption)); } From fd441da32d9201f040dd455d3c335281680aefba Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Tue, 21 Nov 2023 16:49:53 +0100 Subject: [PATCH 09/18] plug backward compat --- sstable/src/dictionary.rs | 48 +++++++++------ sstable/src/lib.rs | 1 + sstable/src/sstable_index.rs | 12 ++-- sstable/src/sstable_index_v2.rs | 101 ++++++++++++++++++++++++++++++++ 4 files changed, 137 insertions(+), 25 deletions(-) create mode 100644 sstable/src/sstable_index_v2.rs diff --git a/sstable/src/dictionary.rs b/sstable/src/dictionary.rs index 95f0ab4092..54c28dcc43 100644 --- a/sstable/src/dictionary.rs +++ b/sstable/src/dictionary.rs @@ -181,34 +181,44 @@ impl Dictionary { /// Opens a `TermDictionary`. pub fn open(term_dictionary_file: FileSlice) -> io::Result { - let (main_slice, footer_len_slice) = term_dictionary_file.split_from_end(28); + let (main_slice, footer_len_slice) = term_dictionary_file.split_from_end(20); let mut footer_len_bytes: OwnedBytes = footer_len_slice.read_bytes()?; - - let store_offset = u64::deserialize(&mut footer_len_bytes)?; let index_offset = u64::deserialize(&mut footer_len_bytes)?; let num_terms = u64::deserialize(&mut footer_len_bytes)?; let version = u32::deserialize(&mut footer_len_bytes)?; - if version != crate::SSTABLE_VERSION { - return Err(io::Error::new( - io::ErrorKind::Other, - format!( - "Unsuported sstable version, expected {version}, found {}", - crate::SSTABLE_VERSION, - ), - )); - } - let (sstable_slice, index_slice) = main_slice.split(index_offset as usize); let sstable_index_bytes = index_slice.read_bytes()?; - let sstable_index = if store_offset != 0 { - SSTableIndex::V3( - SSTableIndexV3::load(sstable_index_bytes, store_offset).map_err(|_| { + + let sstable_index = match version { + 2 => SSTableIndex::V2( + crate::sstable_index_v2::SSTableIndex::load(sstable_index_bytes).map_err(|_| { io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption") })?, - ) - } else { - SSTableIndex::V3Empty(SSTableIndexV3Empty::load(index_offset as usize)) + ), + crate::SSTABLE_VERSION => { + let (sstable_index_bytes, mut footerv3_len_bytes) = sstable_index_bytes.rsplit(8); + let store_offset = u64::deserialize(&mut footerv3_len_bytes)?; + if store_offset != 0 { + SSTableIndex::V3( + SSTableIndexV3::load(sstable_index_bytes, store_offset).map_err(|_| { + io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption") + })?, + ) + } else { + SSTableIndex::V3Empty(SSTableIndexV3Empty::load(index_offset as usize)) + } + } + _ => { + return Err(io::Error::new( + io::ErrorKind::Other, + format!( + "Unsuported sstable version, expected {}, found {version}", + crate::SSTABLE_VERSION, + ), + )) + } }; + Ok(Dictionary { sstable_slice, sstable_index, diff --git a/sstable/src/lib.rs b/sstable/src/lib.rs index 9df68edbb5..e374f89837 100644 --- a/sstable/src/lib.rs +++ b/sstable/src/lib.rs @@ -12,6 +12,7 @@ pub mod value; mod sstable_index; pub use sstable_index::{BlockAddr, SSTableIndex, SSTableIndexBuilder, SSTableIndexV3}; +mod sstable_index_v2; pub(crate) mod vint; pub use dictionary::Dictionary; pub use streamer::{Streamer, StreamerBuilder}; diff --git a/sstable/src/sstable_index.rs b/sstable/src/sstable_index.rs index 0da384c35b..c98af26eec 100644 --- a/sstable/src/sstable_index.rs +++ b/sstable/src/sstable_index.rs @@ -11,7 +11,7 @@ use crate::{common_prefix_len, SSTableDataCorruption, TermOrdinal}; #[derive(Debug, Clone)] pub enum SSTableIndex { - V2, + V2(crate::sstable_index_v2::SSTableIndex), V3(SSTableIndexV3), V3Empty(SSTableIndexV3Empty), } @@ -20,7 +20,7 @@ impl SSTableIndex { /// Get the [`BlockAddr`] of the requested block. pub(crate) fn get_block(&self, block_id: u64) -> Option { match self { - SSTableIndex::V2 => todo!(), + SSTableIndex::V2(v2_index) => v2_index.get_block(block_id as usize), SSTableIndex::V3(v3_index) => v3_index.get_block(block_id), SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block(block_id), } @@ -31,7 +31,7 @@ impl SSTableIndex { /// Returns None if `key` is lexicographically after the last key recorded. pub(crate) fn locate_with_key(&self, key: &[u8]) -> Option { match self { - SSTableIndex::V2 => todo!(), + SSTableIndex::V2(v2_index) => v2_index.locate_with_key(key).map(|i| i as u64), SSTableIndex::V3(v3_index) => v3_index.locate_with_key(key), SSTableIndex::V3Empty(v3_empty) => v3_empty.locate_with_key(key), } @@ -42,7 +42,7 @@ impl SSTableIndex { /// Returns None if `key` is lexicographically after the last key recorded. pub fn get_block_with_key(&self, key: &[u8]) -> Option { match self { - SSTableIndex::V2 => todo!(), + SSTableIndex::V2(v2_index) => v2_index.get_block_with_key(key), SSTableIndex::V3(v3_index) => v3_index.get_block_with_key(key), SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block_with_key(key), } @@ -50,7 +50,7 @@ impl SSTableIndex { pub(crate) fn locate_with_ord(&self, ord: TermOrdinal) -> u64 { match self { - SSTableIndex::V2 => todo!(), + SSTableIndex::V2(v2_index) => v2_index.locate_with_ord(ord) as u64, SSTableIndex::V3(v3_index) => v3_index.locate_with_ord(ord), SSTableIndex::V3Empty(v3_empty) => v3_empty.locate_with_ord(ord), } @@ -59,7 +59,7 @@ impl SSTableIndex { /// Get the [`BlockAddr`] of the block containing the `ord`-th term. pub(crate) fn get_block_with_ord(&self, ord: TermOrdinal) -> BlockAddr { match self { - SSTableIndex::V2 => todo!(), + SSTableIndex::V2(v2_index) => v2_index.get_block_with_ord(ord), SSTableIndex::V3(v3_index) => v3_index.get_block_with_ord(ord), SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block_with_ord(ord), } diff --git a/sstable/src/sstable_index_v2.rs b/sstable/src/sstable_index_v2.rs new file mode 100644 index 0000000000..d7c97c13a7 --- /dev/null +++ b/sstable/src/sstable_index_v2.rs @@ -0,0 +1,101 @@ +use common::OwnedBytes; + +use crate::{BlockAddr, SSTable, SSTableDataCorruption, TermOrdinal}; + +#[derive(Default, Debug, Clone)] +pub struct SSTableIndex { + blocks: Vec, +} + +impl SSTableIndex { + /// Load an index from its binary representation + pub fn load(data: OwnedBytes) -> Result { + let mut reader = IndexSSTable::reader(data); + let mut blocks = Vec::new(); + + while reader.advance().map_err(|_| SSTableDataCorruption)? { + blocks.push(BlockMeta { + last_key_or_greater: reader.key().to_vec(), + block_addr: reader.value().clone(), + }); + } + + Ok(SSTableIndex { blocks }) + } + + /// Get the [`BlockAddr`] of the requested block. + pub(crate) fn get_block(&self, block_id: usize) -> Option { + self.blocks + .get(block_id) + .map(|block_meta| block_meta.block_addr.clone()) + } + + /// Get the block id of the block that would contain `key`. + /// + /// Returns None if `key` is lexicographically after the last key recorded. + pub(crate) fn locate_with_key(&self, key: &[u8]) -> Option { + let pos = self + .blocks + .binary_search_by_key(&key, |block| &block.last_key_or_greater); + match pos { + Ok(pos) => Some(pos), + Err(pos) => { + if pos < self.blocks.len() { + Some(pos) + } else { + // after end of last block: no block matches + None + } + } + } + } + + /// Get the [`BlockAddr`] of the block that would contain `key`. + /// + /// Returns None if `key` is lexicographically after the last key recorded. + pub fn get_block_with_key(&self, key: &[u8]) -> Option { + self.locate_with_key(key).and_then(|id| self.get_block(id)) + } + + pub(crate) fn locate_with_ord(&self, ord: TermOrdinal) -> usize { + let pos = self + .blocks + .binary_search_by_key(&ord, |block| block.block_addr.first_ordinal); + + match pos { + Ok(pos) => pos, + // Err(0) can't happen as the sstable starts with ordinal zero + Err(pos) => pos - 1, + } + } + + /// Get the [`BlockAddr`] of the block containing the `ord`-th term. + pub(crate) fn get_block_with_ord(&self, ord: TermOrdinal) -> BlockAddr { + // locate_with_ord always returns an index within range + self.get_block(self.locate_with_ord(ord)).unwrap() + } +} + +#[derive(Debug, Clone)] +pub(crate) struct BlockMeta { + /// Any byte string that is lexicographically greater or equal to + /// the last key in the block, + /// and yet strictly smaller than the first key in the next block. + pub last_key_or_greater: Vec, + pub block_addr: BlockAddr, +} + +/// SSTable representing an index +/// +/// `last_key_or_greater` is used as the key, the value contains the +/// length and first ordinal of each block. The start offset is implicitly +/// obtained from lengths. +struct IndexSSTable; + +impl SSTable for IndexSSTable { + type Value = BlockAddr; + + type ValueReader = crate::value::index::IndexValueReader; + + type ValueWriter = crate::value::index::IndexValueWriter; +} From 21181b49d07dfbd7795c272931d0b2fd1b47bf12 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Wed, 22 Nov 2023 16:29:21 +0100 Subject: [PATCH 10/18] remove debug prints --- sstable/benches/ord_to_term.rs | 1 - sstable/src/lib.rs | 38 ---------------------------------- sstable/src/sstable_index.rs | 4 +--- 3 files changed, 1 insertion(+), 42 deletions(-) diff --git a/sstable/benches/ord_to_term.rs b/sstable/benches/ord_to_term.rs index afdcb8fed5..f29b719b34 100644 --- a/sstable/benches/ord_to_term.rs +++ b/sstable/benches/ord_to_term.rs @@ -15,7 +15,6 @@ fn make_test_sstable(suffix: &str) -> FileSlice { } let table = builder.finish().unwrap(); - eprintln!("len={}", table.len()); let table = Arc::new(OwnedBytes::new(table)); let slice = common::file_slice::FileSlice::new(table.clone()); diff --git a/sstable/src/lib.rs b/sstable/src/lib.rs index e374f89837..3409600b71 100644 --- a/sstable/src/lib.rs +++ b/sstable/src/lib.rs @@ -305,7 +305,6 @@ where let offset = wrt.written_bytes(); - eprintln!("index_offset={offset}"); let fst_len: u64 = self.index_builder.serialize(&mut wrt)?; wrt.write_all(&fst_len.to_le_bytes())?; wrt.write_all(&offset.to_le_bytes())?; @@ -546,41 +545,4 @@ mod test { assert!(btree_set_range.next().is_none()); } } - - #[test] - fn tmp() { - let words: std::collections::BTreeSet = - ["", "a", "aa", "aaaa", "aba", "acaaaa", "b"] - .into_iter() - .map(ToString::to_string) - .collect(); - let (lower_bound, upper_bound) = - (Bound::Excluded("b".to_string()), Bound::::Unbounded); - let mut builder = Dictionary::::builder(Vec::new()).unwrap(); - builder.set_block_len(16); - for word in &words { - builder.insert(word.as_bytes(), &()).unwrap(); - } - let buffer: Vec = builder.finish().unwrap(); - let dictionary: Dictionary = - Dictionary::open(FileSlice::from(buffer)).unwrap(); - let mut range_builder = dictionary.range(); - range_builder = match lower_bound.as_ref() { - Bound::Included(key) => range_builder.ge(key.as_bytes()), - Bound::Excluded(key) => range_builder.gt(key.as_bytes()), - Bound::Unbounded => range_builder, - }; - range_builder = match upper_bound.as_ref() { - Bound::Included(key) => range_builder.le(key.as_bytes()), - Bound::Excluded(key) => range_builder.lt(key.as_bytes()), - Bound::Unbounded => range_builder, - }; - let mut stream = range_builder.into_stream().unwrap(); - let mut btree_set_range = words.range((lower_bound, upper_bound)); - while stream.advance() { - let val = btree_set_range.next().unwrap(); - assert_eq!(val.as_bytes(), stream.key()); - } - assert!(btree_set_range.next().is_none()); - } } diff --git a/sstable/src/sstable_index.rs b/sstable/src/sstable_index.rs index c98af26eec..cc4f5dab89 100644 --- a/sstable/src/sstable_index.rs +++ b/sstable/src/sstable_index.rs @@ -299,14 +299,12 @@ impl SSTableIndexBuilder { block_store_writer.write_block_meta(block.block_addr.clone())?; } block_store_writer.serialize(&mut wrt)?; - eprintln!("fst len={written_bytes}"); - eprintln!("store len={}", wrt.written_bytes() - written_bytes); Ok(written_bytes) } } -const STORE_BLOCK_LEN: usize = 256; +const STORE_BLOCK_LEN: usize = 128; #[derive(Debug)] struct BlockAddrBlockMetadata { From b670a6e9312909e76ce881c2addb186cca28a445 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Wed, 22 Nov 2023 18:49:15 +0100 Subject: [PATCH 11/18] handle errors and comment unwraps --- sstable/src/sstable_index.rs | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/sstable/src/sstable_index.rs b/sstable/src/sstable_index.rs index cc4f5dab89..bdc56d48ed 100644 --- a/sstable/src/sstable_index.rs +++ b/sstable/src/sstable_index.rs @@ -282,17 +282,16 @@ impl SSTableIndexBuilder { if self.blocks.len() <= 1 { return Ok(0); } - // TODO handle errors let counting_writer = common::CountingWriter::wrap(wrt); - let mut map_builder = MapBuilder::new(counting_writer).unwrap(); + let mut map_builder = MapBuilder::new(counting_writer).map_err(fst_error_to_io_error)?; for (i, block) in self.blocks.iter().enumerate() { map_builder .insert(&block.last_key_or_greater, i as u64) - .unwrap(); + .map_err(fst_error_to_io_error)?; } - let counting_writer = map_builder.into_inner().unwrap(); + let counting_writer = map_builder.into_inner().map_err(fst_error_to_io_error)?; let written_bytes = counting_writer.written_bytes(); - let mut wrt = counting_writer; + let mut wrt = counting_writer.finish(); let mut block_store_writer = BlockAddrStoreWriter::new(); for block in &self.blocks { @@ -304,6 +303,13 @@ impl SSTableIndexBuilder { } } +fn fst_error_to_io_error(error: tantivy_fst::Error) -> io::Error { + match error { + tantivy_fst::Error::Fst(fst_error) => io::Error::new(io::ErrorKind::Other, fst_error), + tantivy_fst::Error::Io(ioerror) => ioerror, + } +} + const STORE_BLOCK_LEN: usize = 128; #[derive(Debug)] @@ -366,11 +372,7 @@ impl BlockAddrBlockMetadata { }) } - // /!\ countrary to deserialize_block_addr() for which inner_offset goes from 0 to - // STORE_BLOCK_LEN - 2, this function goes from 1 to STORE_BLOCK_LEN - 1, and 0 marks - // we should use ref_block_addr fn bissect_for_ord(&self, data: &[u8], target_ord: TermOrdinal) -> (u64, BlockAddr) { - // TODO can panic if block has header only let inner_target_ord = target_ord - self.ref_block_addr.first_ordinal; let num_bits = self.num_bits() as usize; let range_start_nbits = self.range_start_nbits as usize; @@ -389,6 +391,7 @@ impl BlockAddrBlockMetadata { Ok(inner_offset) => inner_offset + 1, Err(inner_offset) => inner_offset, }; + // we can unwrap because inner_offset <= self.block_len ( inner_offset, self.deserialize_block_addr(data, inner_offset as usize) @@ -502,6 +505,7 @@ impl BlockAddrStore { let max_block = (self.block_meta_bytes.len() / BlockAddrBlockMetadata::SIZE_IN_BYTES) as u64; let get_first_ordinal = |block_id| { + // we can unwrap because block_id < max_block self.get(block_id * STORE_BLOCK_LEN as u64) .unwrap() .first_ordinal @@ -511,11 +515,13 @@ impl BlockAddrStore { let store_block_id = match store_block_id { Ok(store_block_id) => { let block_id = store_block_id * STORE_BLOCK_LEN as u64; + // we can unwrap because store_block_id < max_block return (block_id, self.get(block_id).unwrap()); } Err(store_block_id) => store_block_id - 1, }; + // we can unwrap because store_block_id < max_block let block_addr_block_data = self.get_block_meta(store_block_id as usize).unwrap(); let (inner_offset, block_addr) = block_addr_block_data.bissect_for_ord( &self.addr_bytes[block_addr_block_data.offset as usize..], @@ -574,6 +580,7 @@ impl BlockAddrStoreWriter { block_addr.first_ordinal -= ref_block_addr.first_ordinal; } + // we are only called if block_addrs is not empty let mut last_block_addr = self.block_addrs.last().unwrap().clone(); last_block_addr.byte_range.end -= ref_block_addr.byte_range.start; From 915d60b4fa199effb7c9e5986df1dc1ad5d6f4d4 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Wed, 22 Nov 2023 19:49:41 +0100 Subject: [PATCH 12/18] fix sstable size in columnar tests --- columnar/src/tests.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/columnar/src/tests.rs b/columnar/src/tests.rs index 2d45080b91..5e5c50f556 100644 --- a/columnar/src/tests.rs +++ b/columnar/src/tests.rs @@ -26,7 +26,7 @@ fn test_dataframe_writer_str() { assert_eq!(columnar.num_columns(), 1); let cols: Vec = columnar.read_columns("my_string").unwrap(); assert_eq!(cols.len(), 1); - assert_eq!(cols[0].num_bytes(), 87); + assert_eq!(cols[0].num_bytes(), 73); } #[test] @@ -40,7 +40,7 @@ fn test_dataframe_writer_bytes() { assert_eq!(columnar.num_columns(), 1); let cols: Vec = columnar.read_columns("my_string").unwrap(); assert_eq!(cols.len(), 1); - assert_eq!(cols[0].num_bytes(), 87); + assert_eq!(cols[0].num_bytes(), 73); } #[test] From 850944a9e90ab0e8cab0d532f63cb36cfd83810b Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Fri, 24 Nov 2023 15:16:44 +0100 Subject: [PATCH 13/18] typos --- sstable/README.md | 14 ++--- sstable/src/dictionary.rs | 9 ++- sstable/src/sstable_index.rs | 106 +++++++++++++++++------------------ 3 files changed, 64 insertions(+), 65 deletions(-) diff --git a/sstable/README.md b/sstable/README.md index e6b2d2e3eb..a13db426a8 100644 --- a/sstable/README.md +++ b/sstable/README.md @@ -93,7 +93,7 @@ Note: as the SSTable does not support redundant keys, there is no ambiguity betw | Fst | BlockAddrStore | StoreOffset | IndexOffset | NumTerm | Version | +-----+----------------+-------------+-------------+---------+---------+ ``` -- Fst(Fst): finit state transducer mapping keys to a block number +- Fst(Fst): finite state transducer mapping keys to a block number - BlockAddrStore(BlockAddrStore): store mapping a block number to its BlockAddr - StoreOffset(u64): Offset to start of the BlockAddrStore. If zero, see the SingleBlockSStable section - IndexOffset(u64): Offset to the start of the SSTFooter @@ -111,21 +111,21 @@ Fst is in the format of tantivy\_fst +---------+-----------+-----------+-----+-----------+-----------+-----+ |---------(N blocks)----------|---------(N blocks)----------| -- MetaLen(u64): lenght of the BlockMeta section +- MetaLen(u64): length of the BlockMeta section - BlockMeta(BlockAddrBlockMetadata): metadata to seek through BlockData - BlockData(CompactedBlockAddr): bitpacked per block metadata ### BlockAddrBlockMetadata -+--------+------------+---------------+-----------+-------------+-------------------+-----------------+----------+ -| Offset | RangeStart | FirstOrdianal | RangeSlop | OrdinalSlop | FirstOrdinalNBits | RangeStartNBits | BlockLen | -+--------+------------+---------------+-----------+-------------+-------------------+-----------------+----------+ ++--------+------------+--------------+------------+--------------+-------------------+-----------------+----------+ +| Offset | RangeStart | FirstOrdinal | RangeSlope | OrdinalSlope | FirstOrdinalNBits | RangeStartNBits | BlockLen | ++--------+------------+--------------+------------+--------------+-------------------+-----------------+----------+ - Offset(u64): offset of the corresponding BlockData in the datastream - RangeStart(u64): the start position of the first block - FirstOrdinal(u64): the first ordinal of the first block -- RangeSlop(u32): slop predicted for start range evolution (see computation in BlockData) -- OrdinalSlop(u64): slop predicted for first ordinal evolution (see computation in BlockData) +- RangeSlope(u32): slope predicted for start range evolution (see computation in BlockData) +- OrdinalSlope(u64): slope predicted for first ordinal evolution (see computation in BlockData) - FirstOrdinalNBits(u8): number of bits per ordinal in datastream (see computation in BlockData) - RangeStartNBits(u8): number of bits per range start in datastream (see computation in BlockData) diff --git a/sstable/src/dictionary.rs b/sstable/src/dictionary.rs index 54c28dcc43..f5cd231dac 100644 --- a/sstable/src/dictionary.rs +++ b/sstable/src/dictionary.rs @@ -195,7 +195,7 @@ impl Dictionary { io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption") })?, ), - crate::SSTABLE_VERSION => { + 3 => { let (sstable_index_bytes, mut footerv3_len_bytes) = sstable_index_bytes.rsplit(8); let store_offset = u64::deserialize(&mut footerv3_len_bytes)?; if store_offset != 0 { @@ -205,16 +205,15 @@ impl Dictionary { })?, ) } else { + // if store_offset is zero, there is no index, so we build a pseudo-index + // assuming a single block of sstable covering everything. SSTableIndex::V3Empty(SSTableIndexV3Empty::load(index_offset as usize)) } } _ => { return Err(io::Error::new( io::ErrorKind::Other, - format!( - "Unsuported sstable version, expected {}, found {version}", - crate::SSTABLE_VERSION, - ), + format!("Unsuported sstable version, expected one of [2, 3], found {version}"), )) } }; diff --git a/sstable/src/sstable_index.rs b/sstable/src/sstable_index.rs index bdc56d48ed..60ceed3b59 100644 --- a/sstable/src/sstable_index.rs +++ b/sstable/src/sstable_index.rs @@ -316,8 +316,8 @@ const STORE_BLOCK_LEN: usize = 128; struct BlockAddrBlockMetadata { offset: u64, ref_block_addr: BlockStartAddr, - range_start_slop: u32, - first_ordinal_slop: u32, + range_start_slope: u32, + first_ordinal_slope: u32, range_start_nbits: u8, first_ordinal_nbits: u8, block_len: u16, @@ -335,7 +335,7 @@ impl BlockAddrBlockMetadata { if inner_offset == 0 { let range_end = self.ref_block_addr.byte_range_start + extract_bits(data, 0, self.range_start_nbits) as usize - + self.range_start_slop as usize + + self.range_start_slope as usize - self.range_shift as usize; return Some(self.ref_block_addr.to_block_addr(range_end)); } @@ -355,15 +355,15 @@ impl BlockAddrBlockMetadata { let range_start = self.ref_block_addr.byte_range_start + extract_bits(data, range_start_addr, self.range_start_nbits) as usize - + self.range_start_slop as usize * (inner_offset + 1) + + self.range_start_slope as usize * (inner_offset + 1) - self.range_shift as usize; let first_ordinal = self.ref_block_addr.first_ordinal + extract_bits(data, ordinal_addr, self.first_ordinal_nbits) - + self.first_ordinal_slop as u64 * (inner_offset + 1) as u64 + + self.first_ordinal_slope as u64 * (inner_offset + 1) as u64 - self.ordinal_shift as u64; let range_end = self.ref_block_addr.byte_range_start + extract_bits(data, range_end_addr, self.range_start_nbits) as usize - + self.range_start_slop as usize * (inner_offset + 2) + + self.range_start_slope as usize * (inner_offset + 2) - self.range_shift as usize; Some(BlockAddr { @@ -372,7 +372,7 @@ impl BlockAddrBlockMetadata { }) } - fn bissect_for_ord(&self, data: &[u8], target_ord: TermOrdinal) -> (u64, BlockAddr) { + fn bisect_for_ord(&self, data: &[u8], target_ord: TermOrdinal) -> (u64, BlockAddr) { let inner_target_ord = target_ord - self.ref_block_addr.first_ordinal; let num_bits = self.num_bits() as usize; let range_start_nbits = self.range_start_nbits as usize; @@ -381,7 +381,7 @@ impl BlockAddrBlockMetadata { data, num_bits * index as usize + range_start_nbits, self.first_ordinal_nbits, - ) + self.first_ordinal_slop as u64 * (index + 1) + ) + self.first_ordinal_slope as u64 * (index + 1) - self.ordinal_shift as u64 }; @@ -427,8 +427,8 @@ impl BinarySerializable for BlockAddrBlockMetadata { fn serialize(&self, write: &mut W) -> io::Result<()> { self.offset.serialize(write)?; self.ref_block_addr.serialize(write)?; - self.range_start_slop.serialize(write)?; - self.first_ordinal_slop.serialize(write)?; + self.range_start_slope.serialize(write)?; + self.first_ordinal_slope.serialize(write)?; write.write_all(&[self.first_ordinal_nbits, self.range_start_nbits])?; self.block_len.serialize(write)?; self.num_bits(); @@ -438,8 +438,8 @@ impl BinarySerializable for BlockAddrBlockMetadata { fn deserialize(reader: &mut R) -> io::Result { let offset = u64::deserialize(reader)?; let ref_block_addr = BlockStartAddr::deserialize(reader)?; - let range_start_slop = u32::deserialize(reader)?; - let first_ordinal_slop = u32::deserialize(reader)?; + let range_start_slope = u32::deserialize(reader)?; + let first_ordinal_slope = u32::deserialize(reader)?; let mut buffer = [0u8; 2]; reader.read_exact(&mut buffer)?; let first_ordinal_nbits = buffer[0]; @@ -448,8 +448,8 @@ impl BinarySerializable for BlockAddrBlockMetadata { Ok(BlockAddrBlockMetadata { offset, ref_block_addr, - range_start_slop, - first_ordinal_slop, + range_start_slope, + first_ordinal_slope, range_start_nbits, first_ordinal_nbits, block_len, @@ -523,7 +523,7 @@ impl BlockAddrStore { // we can unwrap because store_block_id < max_block let block_addr_block_data = self.get_block_meta(store_block_id as usize).unwrap(); - let (inner_offset, block_addr) = block_addr_block_data.bissect_for_ord( + let (inner_offset, block_addr) = block_addr_block_data.bisect_for_ord( &self.addr_bytes[block_addr_block_data.offset as usize..], ord, ); @@ -584,7 +584,7 @@ impl BlockAddrStoreWriter { let mut last_block_addr = self.block_addrs.last().unwrap().clone(); last_block_addr.byte_range.end -= ref_block_addr.byte_range.start; - let (range_start_slop, range_start_nbits) = find_best_slop( + let (range_start_slope, range_start_nbits) = find_best_slope( self.block_addrs .iter() .map(|block| block.byte_range.start as u64) @@ -593,7 +593,7 @@ impl BlockAddrStoreWriter { .skip(1), ); - let (first_ordinal_slop, first_ordinal_nbits) = find_best_slop( + let (first_ordinal_slope, first_ordinal_nbits) = find_best_slope( self.block_addrs .iter() .map(|block| block.first_ordinal) @@ -607,8 +607,8 @@ impl BlockAddrStoreWriter { let block_addr_block_meta = BlockAddrBlockMetadata { offset: self.buffer_addrs.len() as u64, ref_block_addr: ref_block_addr.to_block_start(), - range_start_slop, - first_ordinal_slop, + range_start_slope, + first_ordinal_slope, range_start_nbits, first_ordinal_nbits, block_len: self.block_addrs.len() as u16 - 1, @@ -620,13 +620,13 @@ impl BlockAddrStoreWriter { let mut bit_packer = BitPacker::new(); for (i, block_addr) in self.block_addrs.iter().enumerate().skip(1) { - let range_pred = (range_start_slop as usize * i) as i64; + let range_pred = (range_start_slope as usize * i) as i64; bit_packer.write( (block_addr.byte_range.start as i64 - range_pred + range_shift) as u64, range_start_nbits, &mut self.buffer_addrs, )?; - let first_ordinal_pred = (first_ordinal_slop as u64 * i as u64) as i64; + let first_ordinal_pred = (first_ordinal_slope as u64 * i as u64) as i64; bit_packer.write( (block_addr.first_ordinal as i64 - first_ordinal_pred + ordinal_shift) as u64, first_ordinal_nbits, @@ -634,7 +634,7 @@ impl BlockAddrStoreWriter { )?; } - let range_pred = (range_start_slop as usize * self.block_addrs.len()) as i64; + let range_pred = (range_start_slope as usize * self.block_addrs.len()) as i64; bit_packer.write( (last_block_addr.byte_range.end as i64 - range_pred + range_shift) as u64, range_start_nbits, @@ -666,61 +666,61 @@ impl BlockAddrStoreWriter { } } -fn find_best_slop(elements: impl Iterator + Clone) -> (u32, u8) { - let slop_iterator = elements.clone(); +fn find_best_slope(elements: impl Iterator + Clone) -> (u32, u8) { + let slope_iterator = elements.clone(); let derivation_iterator = elements; - let mut min_slop_idx = 1; - let mut min_slop_val = 0; - let mut min_slop = u32::MAX; - let mut max_slop_idx = 1; - let mut max_slop_val = 0; - let mut max_slop = 0; - for (index, value) in slop_iterator { - let slop = (value / index as u64) as u32; - if slop <= min_slop { - min_slop = slop; - min_slop_idx = index; - min_slop_val = value; + let mut min_slope_idx = 1; + let mut min_slope_val = 0; + let mut min_slope = u32::MAX; + let mut max_slope_idx = 1; + let mut max_slope_val = 0; + let mut max_slope = 0; + for (index, value) in slope_iterator { + let slope = (value / index as u64) as u32; + if slope <= min_slope { + min_slope = slope; + min_slope_idx = index; + min_slope_val = value; } - if slop >= max_slop { - max_slop = slop; - max_slop_idx = index; - max_slop_val = value; + if slope >= max_slope { + max_slope = slope; + max_slope_idx = index; + max_slope_val = value; } } - // above is an heristic giving the "highest" and "lowest" point. It's imperfect in that in that - // a point that appear earlier might have a high slop derivation, but a smaller absolute + // above is an heuristic giving the "highest" and "lowest" point. It's imperfect in that in that + // a point that appear earlier might have a high slope derivation, but a smaller absolute // derivation than a latter point. // The actual best values can be obtained by using the symplex method, but the improvement is // likely minimal, and computation is way more complexe. // - // Assuming these point are the furthest up and down, we find the slop that would cause the same - // positive derivation for the highest as negative derivation for the lowest. - // A is the optimal slop. B is the derivation to the guess + // Assuming these point are the furthest up and down, we find the slope that would cause the + // same positive derivation for the highest as negative derivation for the lowest. + // A is the optimal slope. B is the derivation to the guess // - // 0 = min_slop_val - min_slop_idx * A - B - // 0 = max_slop_val - max_slop_idx * A + B + // 0 = min_slope_val - min_slope_idx * A - B + // 0 = max_slope_val - max_slope_idx * A + B // - // 0 = min_slop_val + max_slop_val - (min_slop_idx + max_slop_idx) * A - // (min_slop_val + max_slop_val) / (min_slop_idx + max_slop_idx) = A + // 0 = min_slope_val + max_slope_val - (min_slope_idx + max_slope_idx) * A + // (min_slope_val + max_slope_val) / (min_slope_idx + max_slope_idx) = A // // we actually add some correcting factor to have proper rounding, not truncation. - let denominator = (min_slop_idx + max_slop_idx) as u64; - let final_slop = ((min_slop_val + max_slop_val + denominator / 2) / denominator) as u32; + let denominator = (min_slope_idx + max_slope_idx) as u64; + let final_slope = ((min_slope_val + max_slope_val + denominator / 2) / denominator) as u32; // we don't solve for B because our choice of point is suboptimal, so it's actually a lower // bound and we need to iterate to find the actual worst value. let max_derivation = derivation_iterator.fold(0, |max_derivation, (index, value)| { - let derivation = (value as i64 - final_slop as i64 * index as i64).unsigned_abs(); + let derivation = (value as i64 - final_slope as i64 * index as i64).unsigned_abs(); max_derivation.max(derivation) }); - (final_slop, compute_num_bits(max_derivation) + 1) + (final_slope, compute_num_bits(max_derivation) + 1) } #[cfg(test)] From 3a2f2f62c3f3b4751da1cddbbb5c8c47f219fb21 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Fri, 24 Nov 2023 15:54:55 +0100 Subject: [PATCH 14/18] document invariants arround find_best_slope --- sstable/src/sstable_index.rs | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/sstable/src/sstable_index.rs b/sstable/src/sstable_index.rs index 60ceed3b59..a14f9fccfb 100644 --- a/sstable/src/sstable_index.rs +++ b/sstable/src/sstable_index.rs @@ -401,6 +401,7 @@ impl BlockAddrBlockMetadata { } // TODO move this function to tantivy_common? +#[inline(always)] fn extract_bits(data: &[u8], addr_bits: usize, num_bits: u8) -> u64 { use byteorder::{ByteOrder, LittleEndian}; assert!(num_bits <= 56); @@ -573,6 +574,9 @@ impl BlockAddrStoreWriter { } fn flush_block(&mut self) -> io::Result<()> { + if self.block_addrs.is_empty() { + return Ok(()); + } let ref_block_addr = self.block_addrs[0].clone(); for block_addr in &mut self.block_addrs { @@ -584,6 +588,7 @@ impl BlockAddrStoreWriter { let mut last_block_addr = self.block_addrs.last().unwrap().clone(); last_block_addr.byte_range.end -= ref_block_addr.byte_range.start; + // we skip(1), so we never give an index of 0 to find_best_slope let (range_start_slope, range_start_nbits) = find_best_slope( self.block_addrs .iter() @@ -593,6 +598,7 @@ impl BlockAddrStoreWriter { .skip(1), ); + // we skip(1), so we never give an index of 0 to find_best_slope let (first_ordinal_slope, first_ordinal_nbits) = find_best_slope( self.block_addrs .iter() @@ -655,9 +661,7 @@ impl BlockAddrStoreWriter { } fn serialize(&mut self, wrt: &mut W) -> io::Result<()> { - if !self.block_addrs.is_empty() { - self.flush_block()?; - } + self.flush_block()?; let len = self.buffer_block_metas.len() as u64; len.serialize(wrt)?; wrt.write_all(&self.buffer_block_metas)?; @@ -666,6 +670,10 @@ impl BlockAddrStoreWriter { } } +/// Given an iterator over (index, value), returns the slope, and number of bits needed to +/// represente the error to a prediction made by this slope. +/// +/// The iterator may be empty, but all indexes in it must be non-zero. fn find_best_slope(elements: impl Iterator + Clone) -> (u32, u8) { let slope_iterator = elements.clone(); let derivation_iterator = elements; @@ -807,4 +815,13 @@ mod tests { } } } + + #[test] + fn test_find_best_slop() { + assert_eq!(super::find_best_slope(std::iter::empty()), (0, 1)); + assert_eq!( + super::find_best_slope(std::iter::once((1, 12345))), + (12345, 1) + ); + } } From 1f4f8959721ae034a6edfbcfa1029eff1e359211 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Fri, 24 Nov 2023 15:58:25 +0100 Subject: [PATCH 15/18] use map.max instead of fold --- sstable/src/sstable_index.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/sstable/src/sstable_index.rs b/sstable/src/sstable_index.rs index a14f9fccfb..be3e1aa497 100644 --- a/sstable/src/sstable_index.rs +++ b/sstable/src/sstable_index.rs @@ -722,11 +722,10 @@ fn find_best_slope(elements: impl Iterator + Clone) -> (u32 // we don't solve for B because our choice of point is suboptimal, so it's actually a lower // bound and we need to iterate to find the actual worst value. - let max_derivation = derivation_iterator.fold(0, |max_derivation, (index, value)| { - let derivation = (value as i64 - final_slope as i64 * index as i64).unsigned_abs(); - - max_derivation.max(derivation) - }); + let max_derivation: u64 = derivation_iterator + .map(|(index, value)| (value as i64 - final_slope as i64 * index as i64).unsigned_abs()) + .max() + .unwrap_or(0); (final_slope, compute_num_bits(max_derivation) + 1) } From fbecf03332864a7a520959a4c3b87ee0a39be8bc Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Fri, 24 Nov 2023 16:03:06 +0100 Subject: [PATCH 16/18] move sstable_index to versioned file name --- sstable/src/dictionary.rs | 2 +- sstable/src/lib.rs | 4 ++-- sstable/src/{sstable_index.rs => sstable_index_v3.rs} | 0 3 files changed, 3 insertions(+), 3 deletions(-) rename sstable/src/{sstable_index.rs => sstable_index_v3.rs} (100%) diff --git a/sstable/src/dictionary.rs b/sstable/src/dictionary.rs index f5cd231dac..b4821fe24a 100644 --- a/sstable/src/dictionary.rs +++ b/sstable/src/dictionary.rs @@ -9,7 +9,7 @@ use common::{BinarySerializable, OwnedBytes}; use tantivy_fst::automaton::AlwaysMatch; use tantivy_fst::Automaton; -use crate::sstable_index::SSTableIndexV3Empty; +use crate::sstable_index_v3::SSTableIndexV3Empty; use crate::streamer::{Streamer, StreamerBuilder}; use crate::{ BlockAddr, DeltaReader, Reader, SSTable, SSTableIndex, SSTableIndexV3, TermOrdinal, VoidSSTable, diff --git a/sstable/src/lib.rs b/sstable/src/lib.rs index 3409600b71..dadf43489a 100644 --- a/sstable/src/lib.rs +++ b/sstable/src/lib.rs @@ -10,8 +10,8 @@ pub mod merge; mod streamer; pub mod value; -mod sstable_index; -pub use sstable_index::{BlockAddr, SSTableIndex, SSTableIndexBuilder, SSTableIndexV3}; +mod sstable_index_v3; +pub use sstable_index_v3::{BlockAddr, SSTableIndex, SSTableIndexBuilder, SSTableIndexV3}; mod sstable_index_v2; pub(crate) mod vint; pub use dictionary::Dictionary; diff --git a/sstable/src/sstable_index.rs b/sstable/src/sstable_index_v3.rs similarity index 100% rename from sstable/src/sstable_index.rs rename to sstable/src/sstable_index_v3.rs From a036eb768836d3a43e921837ec418204d3c68ea0 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Fri, 24 Nov 2023 16:10:13 +0100 Subject: [PATCH 17/18] remove byteorder dep --- sstable/Cargo.toml | 1 - sstable/src/sstable_index_v3.rs | 6 +++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/sstable/Cargo.toml b/sstable/Cargo.toml index 343a20e6c1..7cce076961 100644 --- a/sstable/Cargo.toml +++ b/sstable/Cargo.toml @@ -10,7 +10,6 @@ categories = ["database-implementations", "data-structures", "compression"] description = "sstables for tantivy" [dependencies] -byteorder = "1.4.3" common = {version= "0.6", path="../common", package="tantivy-common"} tantivy-bitpacker = { version= "0.5", path="../bitpacker" } tantivy-fst = "0.5" diff --git a/sstable/src/sstable_index_v3.rs b/sstable/src/sstable_index_v3.rs index be3e1aa497..e2a6fdfc79 100644 --- a/sstable/src/sstable_index_v3.rs +++ b/sstable/src/sstable_index_v3.rs @@ -403,12 +403,12 @@ impl BlockAddrBlockMetadata { // TODO move this function to tantivy_common? #[inline(always)] fn extract_bits(data: &[u8], addr_bits: usize, num_bits: u8) -> u64 { - use byteorder::{ByteOrder, LittleEndian}; assert!(num_bits <= 56); let addr_byte = addr_bits / 8; let bit_shift = (addr_bits % 8) as u64; let val_unshifted_unmasked: u64 = if data.len() >= addr_byte + 8 { - LittleEndian::read_u64(&data[addr_byte..][..8]) + let b = data[addr_byte..addr_byte + 8].try_into().unwrap(); + u64::from_le_bytes(b) } else { // the buffer is not large enough. // Let's copy the few remaining bytes to a 8 byte buffer @@ -417,7 +417,7 @@ fn extract_bits(data: &[u8], addr_bits: usize, num_bits: u8) -> u64 { let data_to_copy = &data[addr_byte..]; let nbytes = data_to_copy.len(); buf[..nbytes].copy_from_slice(data_to_copy); - LittleEndian::read_u64(&buf) + u64::from_le_bytes(buf) }; let val_shifted_unmasked = val_unshifted_unmasked >> bit_shift; let mask = (1u64 << u64::from(num_bits)) - 1; From 0e2de874b07e55233d8ae7fea21bae0f0677a8e6 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Thu, 30 Nov 2023 16:52:57 +0100 Subject: [PATCH 18/18] typo --- sstable/README.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sstable/README.md b/sstable/README.md index a13db426a8..e5e3f5a492 100644 --- a/sstable/README.md +++ b/sstable/README.md @@ -145,12 +145,14 @@ range\_prediction := RangeStart + Index * RangeSlop; range\_derivation := RangeStartDelta - (1 << (RangeStartNBits-1)); range\_start := range\_prediction + range\_derivation -the same computation can be done for ordinal +The same computation can be done for ordinal. + +Note that `range_derivation` can take negative value. `RangeStartDelta` is just its translation to a positive range. ## SingleBlockSStable -The format used for the index is meant to be compact, however it has a constant cost of arround 70 +The format used for the index is meant to be compact, however it has a constant cost of around 70 bytes, which isn't negligible for a table containing very few keys. To limit the impact of that constant cost, single block sstable omit the Fst and BlockAddrStore from their index. Instead a block with first ordinal of 0, range start of 0 and range end of IndexOffset