Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use fst for sstable index #2268

Merged
merged 19 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions columnar/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ fn test_dataframe_writer_str() {
assert_eq!(columnar.num_columns(), 1);
let cols: Vec<DynamicColumnHandle> = columnar.read_columns("my_string").unwrap();
assert_eq!(cols.len(), 1);
assert_eq!(cols[0].num_bytes(), 87);
assert_eq!(cols[0].num_bytes(), 73);
}

#[test]
Expand All @@ -40,7 +40,7 @@ fn test_dataframe_writer_bytes() {
assert_eq!(columnar.num_columns(), 1);
let cols: Vec<DynamicColumnHandle> = columnar.read_columns("my_string").unwrap();
assert_eq!(cols.len(), 1);
assert_eq!(cols[0].num_bytes(), 87);
assert_eq!(cols[0].num_bytes(), 73);
}

#[test]
Expand Down
16 changes: 8 additions & 8 deletions src/fastfield/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ mod tests {
}
let file = directory.open_read(path).unwrap();

assert_eq!(file.len(), 93);
assert_eq!(file.len(), 80);
let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap();
let column = fast_field_readers
.u64("field")
Expand Down Expand Up @@ -181,7 +181,7 @@ mod tests {
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 121);
assert_eq!(file.len(), 108);
let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap();
let col = fast_field_readers
.u64("field")
Expand Down Expand Up @@ -214,7 +214,7 @@ mod tests {
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 94);
assert_eq!(file.len(), 81);
let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap();
let fast_field_reader = fast_field_readers
.u64("field")
Expand Down Expand Up @@ -246,7 +246,7 @@ mod tests {
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 4489);
assert_eq!(file.len(), 4476);
{
let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap();
let col = fast_field_readers
Expand Down Expand Up @@ -279,7 +279,7 @@ mod tests {
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 265);
assert_eq!(file.len(), 252);

{
let fast_field_readers = FastFieldReaders::open(file, schema).unwrap();
Expand Down Expand Up @@ -773,7 +773,7 @@ mod tests {
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 102);
assert_eq!(file.len(), 84);
let fast_field_readers = FastFieldReaders::open(file, schema).unwrap();
let bool_col = fast_field_readers.bool("field_bool").unwrap();
assert_eq!(bool_col.first(0), Some(true));
Expand Down Expand Up @@ -805,7 +805,7 @@ mod tests {
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 114);
assert_eq!(file.len(), 96);
let readers = FastFieldReaders::open(file, schema).unwrap();
let bool_col = readers.bool("field_bool").unwrap();
for i in 0..25 {
Expand All @@ -830,7 +830,7 @@ mod tests {
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 104);
assert_eq!(file.len(), 86);
let fastfield_readers = FastFieldReaders::open(file, schema).unwrap();
let col = fastfield_readers.bool("field_bool").unwrap();
assert_eq!(col.first(0), None);
Expand Down
2 changes: 2 additions & 0 deletions sstable/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ categories = ["database-implementations", "data-structures", "compression"]
description = "sstables for tantivy"

[dependencies]
byteorder = "1.4.3"
common = {version= "0.6", path="../common", package="tantivy-common"}
tantivy-bitpacker = { version= "0.5", path="../bitpacker" }
tantivy-fst = "0.5"
# experimental gives us access to Decompressor::upper_bound
zstd = { version = "0.13", features = ["experimental"] }
Expand Down
84 changes: 60 additions & 24 deletions sstable/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,33 +89,69 @@ Note: as the SSTable does not support redundant keys, there is no ambiguity betw

### SSTFooter
```
+-------+-------+-----+-------------+---------+---------+
| Block | Block | ... | IndexOffset | NumTerm | Version |
+-------+-------+-----+-------------+---------+---------+
|----( # of blocks)---|
+-----+----------------+-------------+-------------+---------+---------+
| Fst | BlockAddrStore | StoreOffset | IndexOffset | NumTerm | Version |
+-----+----------------+-------------+-------------+---------+---------+
```
- Block(SSTBlock): uses IndexValue for its Values format
- Fst(Fst): finit state transducer mapping keys to a block number
trinity-1686a marked this conversation as resolved.
Show resolved Hide resolved
- BlockAddrStore(BlockAddrStore): store mapping a block number to its BlockAddr
- StoreOffset(u64): Offset to start of the BlockAddrStore. If zero, see the SingleBlockSStable section
- IndexOffset(u64): Offset to the start of the SSTFooter
- NumTerm(u64): number of terms in the sstable
- Version(u32): Currently equal to 2
- Version(u32): Currently equal to 3

### IndexValue
```
+------------+----------+-------+-------+-----+
| EntryCount | StartPos | Entry | Entry | ... |
+------------+----------+-------+-------+-----+
|---( # of entries)---|
```
### Fst

- EntryCount(VInt): number of entries
- StartPos(VInt): the start pos of the first (data) block referenced by this (index) block
- Entry (IndexEntry)
Fst is in the format of tantivy\_fst

### Entry
```
+----------+--------------+
| BlockLen | FirstOrdinal |
+----------+--------------+
```
- BlockLen(VInt): length of the block
- FirstOrdinal(VInt): ordinal of the first element in the given block
### BlockAddrStore

+---------+-----------+-----------+-----+-----------+-----------+-----+
| MetaLen | BlockMeta | BlockMeta | ... | BlockData | BlockData | ... |
+---------+-----------+-----------+-----+-----------+-----------+-----+
|---------(N blocks)----------|---------(N blocks)----------|

- MetaLen(u64): lenght of the BlockMeta section
trinity-1686a marked this conversation as resolved.
Show resolved Hide resolved
- BlockMeta(BlockAddrBlockMetadata): metadata to seek through BlockData
- BlockData(CompactedBlockAddr): bitpacked per block metadata

### BlockAddrBlockMetadata

+--------+------------+---------------+-----------+-------------+-------------------+-----------------+----------+
| Offset | RangeStart | FirstOrdianal | RangeSlop | OrdinalSlop | FirstOrdinalNBits | RangeStartNBits | BlockLen |
trinity-1686a marked this conversation as resolved.
Show resolved Hide resolved
+--------+------------+---------------+-----------+-------------+-------------------+-----------------+----------+

- Offset(u64): offset of the corresponding BlockData in the datastream
- RangeStart(u64): the start position of the first block
- FirstOrdinal(u64): the first ordinal of the first block
- RangeSlop(u32): slop predicted for start range evolution (see computation in BlockData)
- OrdinalSlop(u64): slop predicted for first ordinal evolution (see computation in BlockData)
- FirstOrdinalNBits(u8): number of bits per ordinal in datastream (see computation in BlockData)
- RangeStartNBits(u8): number of bits per range start in datastream (see computation in BlockData)

### BlockData

+-----------------+-------------------+---------------+
| RangeStartDelta | FirstOrdinalDelta | FinalRangeEnd |
+-----------------+-------------------+---------------+
|------(BlockLen repetitions)---------|

- RangeStartDelta(var): RangeStartNBits *bits* of little endian number. See below for decoding
- FirstOrdinalDelta(var): FirstOrdinalNBits *bits* of little endian number. See below for decoding
- FinalRangeEnd(var): RangeStartNBits *bits* of integer. See below for decoding

converting a BlockData of index Index and a BlockAddrBlockMetadata to an actual block address is done as follow:
range\_prediction := RangeStart + Index * RangeSlop;
range\_derivation := RangeStartDelta - (1 << (RangeStartNBits-1));
range\_start := range\_prediction + range\_derivation

the same computation can be done for ordinal
trinity-1686a marked this conversation as resolved.
Show resolved Hide resolved


## SingleBlockSStable

The format used for the index is meant to be compact, however it has a constant cost of arround 70
trinity-1686a marked this conversation as resolved.
Show resolved Hide resolved
bytes, which isn't negligible for a table containing very few keys.
To limit the impact of that constant cost, single block sstable omit the Fst and BlockAddrStore from
their index. Instead a block with first ordinal of 0, range start of 0 and range end of IndexOffset
is implicitly used for every operations.
44 changes: 44 additions & 0 deletions sstable/benches/ord_to_term.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,31 @@ pub fn criterion_benchmark(c: &mut Criterion) {
assert!(dict.ord_to_term(19_000_000, &mut res).unwrap());
})
});
c.bench_function("term_ord_suffix", |b| {
b.iter(|| {
assert_eq!(
dict.term_ord(b"prefix.00186A0.suffix").unwrap().unwrap(),
100_000
);
assert_eq!(
dict.term_ord(b"prefix.121EAC0.suffix").unwrap().unwrap(),
19_000_000
);
})
});
c.bench_function("open_and_term_ord_suffix", |b| {
b.iter(|| {
let dict = Dictionary::<MonotonicU64SSTable>::open(slice.clone()).unwrap();
assert_eq!(
dict.term_ord(b"prefix.00186A0.suffix").unwrap().unwrap(),
100_000
);
assert_eq!(
dict.term_ord(b"prefix.121EAC0.suffix").unwrap().unwrap(),
19_000_000
);
})
});
}
{
let slice = make_test_sstable("");
Expand All @@ -59,6 +84,25 @@ pub fn criterion_benchmark(c: &mut Criterion) {
assert!(dict.ord_to_term(19_000_000, &mut res).unwrap());
})
});
c.bench_function("term_ord", |b| {
b.iter(|| {
assert_eq!(dict.term_ord(b"prefix.00186A0").unwrap().unwrap(), 100_000);
assert_eq!(
dict.term_ord(b"prefix.121EAC0").unwrap().unwrap(),
19_000_000
);
})
});
c.bench_function("open_and_term_ord", |b| {
b.iter(|| {
let dict = Dictionary::<MonotonicU64SSTable>::open(slice.clone()).unwrap();
assert_eq!(dict.term_ord(b"prefix.00186A0").unwrap().unwrap(), 100_000);
assert_eq!(
dict.term_ord(b"prefix.121EAC0").unwrap().unwrap(),
19_000_000
);
})
});
}
}

Expand Down
49 changes: 35 additions & 14 deletions sstable/src/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@ use common::{BinarySerializable, OwnedBytes};
use tantivy_fst::automaton::AlwaysMatch;
use tantivy_fst::Automaton;

use crate::sstable_index::SSTableIndexV3Empty;
use crate::streamer::{Streamer, StreamerBuilder};
use crate::{BlockAddr, DeltaReader, Reader, SSTable, SSTableIndex, TermOrdinal, VoidSSTable};
use crate::{
BlockAddr, DeltaReader, Reader, SSTable, SSTableIndex, SSTableIndexV3, TermOrdinal, VoidSSTable,
};

/// An SSTable is a sorted map that associates sorted `&[u8]` keys
/// to any kind of typed values.
Expand Down Expand Up @@ -180,24 +183,42 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
pub fn open(term_dictionary_file: FileSlice) -> io::Result<Self> {
let (main_slice, footer_len_slice) = term_dictionary_file.split_from_end(20);
let mut footer_len_bytes: OwnedBytes = footer_len_slice.read_bytes()?;

let index_offset = u64::deserialize(&mut footer_len_bytes)?;
let num_terms = u64::deserialize(&mut footer_len_bytes)?;
let version = u32::deserialize(&mut footer_len_bytes)?;
if version != crate::SSTABLE_VERSION {
return Err(io::Error::new(
io::ErrorKind::Other,
format!(
"Unsuported sstable version, expected {version}, found {}",
crate::SSTABLE_VERSION,
),
));
}

let (sstable_slice, index_slice) = main_slice.split(index_offset as usize);
let sstable_index_bytes = index_slice.read_bytes()?;
let sstable_index = SSTableIndex::load(sstable_index_bytes)
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption"))?;

let sstable_index = match version {
2 => SSTableIndex::V2(
crate::sstable_index_v2::SSTableIndex::load(sstable_index_bytes).map_err(|_| {
io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption")
})?,
),
crate::SSTABLE_VERSION => {
trinity-1686a marked this conversation as resolved.
Show resolved Hide resolved
let (sstable_index_bytes, mut footerv3_len_bytes) = sstable_index_bytes.rsplit(8);
let store_offset = u64::deserialize(&mut footerv3_len_bytes)?;
if store_offset != 0 {
trinity-1686a marked this conversation as resolved.
Show resolved Hide resolved
SSTableIndex::V3(
SSTableIndexV3::load(sstable_index_bytes, store_offset).map_err(|_| {
io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption")
})?,
)
} else {
SSTableIndex::V3Empty(SSTableIndexV3Empty::load(index_offset as usize))
}
}
_ => {
return Err(io::Error::new(
io::ErrorKind::Other,
format!(
"Unsuported sstable version, expected {}, found {version}",
crate::SSTABLE_VERSION,
),
))
}
};

Ok(Dictionary {
sstable_slice,
sstable_index,
Expand Down
15 changes: 7 additions & 8 deletions sstable/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ mod streamer;
pub mod value;

mod sstable_index;
pub use sstable_index::{BlockAddr, SSTableIndex, SSTableIndexBuilder};
pub use sstable_index::{BlockAddr, SSTableIndex, SSTableIndexBuilder, SSTableIndexV3};
mod sstable_index_v2;
pub(crate) mod vint;
pub use dictionary::Dictionary;
pub use streamer::{Streamer, StreamerBuilder};
Expand All @@ -28,7 +29,7 @@ use crate::value::{RangeValueReader, RangeValueWriter};
pub type TermOrdinal = u64;

const DEFAULT_KEY_CAPACITY: usize = 50;
const SSTABLE_VERSION: u32 = 2;
const SSTABLE_VERSION: u32 = 3;

/// Given two byte string returns the length of
/// the longest common prefix.
Expand Down Expand Up @@ -304,7 +305,8 @@ where

let offset = wrt.written_bytes();

self.index_builder.serialize(&mut wrt)?;
let fst_len: u64 = self.index_builder.serialize(&mut wrt)?;
wrt.write_all(&fst_len.to_le_bytes())?;
wrt.write_all(&offset.to_le_bytes())?;
wrt.write_all(&self.num_terms.to_le_bytes())?;

Expand Down Expand Up @@ -385,13 +387,10 @@ mod test {
16, 17, 33, 18, 19, 17, 20, // data block
0, 0, 0, 0, // no more block
// index
8, 0, 0, 0, // size of index block
0, // compression
1, 0, 12, 0, 32, 17, 20, // index block
0, 0, 0, 0, // no more index block
0, 0, 0, 0, 0, 0, 0, 0, // fst lenght
16, 0, 0, 0, 0, 0, 0, 0, // index start offset
3, 0, 0, 0, 0, 0, 0, 0, // num term
2, 0, 0, 0, // version
3, 0, 0, 0, // version
]
);
let buffer = OwnedBytes::new(buffer);
Expand Down
Loading
Loading