Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(compaction): Don't use cache for table compaction #1467

Merged
merged 26 commits into from
Aug 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
1291ce8
Revert "Revert "Compress/Encrypt Blocks in the background (#1227)" (#…
manishrjain Aug 11, 2020
f2e9b48
Revert "Revert "Buffer pool for decompression (#1308)" (#1408)"
manishrjain Aug 11, 2020
707323d
Revert "Revert "fix: Fix race condition in block.incRef (#1337)" (#14…
manishrjain Aug 11, 2020
54aa796
Compression, Decompression and some Encryption is using Calloc.
manishrjain Aug 11, 2020
b8e2016
Add Calloc functions
manishrjain Aug 11, 2020
f0b712c
Rename to Calloc. And also add a debug for number of blocks
manishrjain Aug 11, 2020
f70c560
The allocations and deallocations match up
manishrjain Aug 11, 2020
ea70a96
A bit more debugging
manishrjain Aug 11, 2020
3e660bb
Switch table builder to use Calloc.
manishrjain Aug 12, 2020
e254ecc
Switch levels.go compaction to only build 5 tables at a time.
manishrjain Aug 12, 2020
f416452
Have a dedicated compactor for L0 and L1.
manishrjain Aug 12, 2020
878d066
Add pool/calloc benchmark
Aug 12, 2020
f698845
Move Calloc and Free to y
Aug 14, 2020
1c1e17c
Fix numCompactor error
Aug 14, 2020
233be8e
Handle nil table in compaction and test fixes
Aug 14, 2020
378491d
Revert "Revert "add assert to check integer overflow for table size (…
Aug 14, 2020
84df817
Make level multiplier 15
Aug 14, 2020
915b0a5
Don't use Rand, instead use local rand instance.
manishrjain Aug 14, 2020
d5527e7
Pre-allocate 1.2x of Block Size for decompression.
manishrjain Aug 14, 2020
37b8b9e
Bring in latest Ristretto. Deal with rejected blocks.
manishrjain Aug 17, 2020
b73c7a4
Deal with when Logger is nil
manishrjain Aug 17, 2020
7aad718
Bring master in
manishrjain Aug 17, 2020
3965470
Avoid setting cache during compactions.
manishrjain Aug 17, 2020
ca289dc
Merge branch 'master' into mrjn/compaction-nocache
Aug 20, 2020
f1dd358
Fix test and cleanup
Aug 20, 2020
5314fed
cleanup
Aug 20, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions level_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (s *levelHandler) get(key []byte) (y.ValueStruct, error) {
continue
}

it := th.NewIterator(false)
it := th.NewIterator(0)
defer it.Close()

y.NumLSMGets.Add(s.strLevel, 1)
Expand All @@ -291,6 +291,10 @@ func (s *levelHandler) appendIterators(iters []y.Iterator, opt *IteratorOptions)
s.RLock()
defer s.RUnlock()

var topt int
if opt.Reverse {
topt = table.REVERSED
}
if s.level == 0 {
// Remember to add in reverse order!
// The newer table at the end of s.tables should be added first as it takes precedence.
Expand All @@ -301,14 +305,14 @@ func (s *levelHandler) appendIterators(iters []y.Iterator, opt *IteratorOptions)
out = append(out, t)
}
}
return appendIteratorsReversed(iters, out, opt.Reverse)
return appendIteratorsReversed(iters, out, topt)
}

tables := opt.pickTables(s.tables)
if len(tables) == 0 {
return iters
}
return append(iters, table.NewConcatIterator(tables, opt.Reverse))
return append(iters, table.NewConcatIterator(tables, topt))
}

type levelHandlerRLocked struct{}
Expand Down
12 changes: 6 additions & 6 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,10 +517,10 @@ func (s *levelsController) compactBuildTables(
var iters []y.Iterator
switch {
case lev == 0:
iters = appendIteratorsReversed(iters, topTables, false)
iters = appendIteratorsReversed(iters, topTables, table.NOCACHE)
case len(topTables) > 0:
y.AssertTrue(len(topTables) == 1)
iters = []y.Iterator{topTables[0].NewIterator(false)}
iters = []y.Iterator{topTables[0].NewIterator(table.NOCACHE)}
}

// Next level has level>=1 and we can use ConcatIterator as key ranges do not overlap.
Expand All @@ -541,7 +541,7 @@ nextTable:
}
valid = append(valid, table)
}
iters = append(iters, table.NewConcatIterator(valid, false))
iters = append(iters, table.NewConcatIterator(valid, table.NOCACHE))
it := table.NewMergeIterator(iters, false)
defer it.Close() // Important to close the iterator to do ref counting.

Expand Down Expand Up @@ -1110,10 +1110,10 @@ func (s *levelsController) get(key []byte, maxVs *y.ValueStruct, startLevel int)
return y.ValueStruct{}, nil
}

func appendIteratorsReversed(out []y.Iterator, th []*table.Table, reversed bool) []y.Iterator {
func appendIteratorsReversed(out []y.Iterator, th []*table.Table, opt int) []y.Iterator {
for i := len(th) - 1; i >= 0; i-- {
// This will increment the reference of the table handler.
out = append(out, th[i].NewIterator(reversed))
out = append(out, th[i].NewIterator(opt))
}
return out
}
Expand Down Expand Up @@ -1146,7 +1146,7 @@ func (s *levelsController) getTableInfo(withKeysCount bool) (result []TableInfo)
for _, t := range l.tables {
var count uint64
if withKeysCount {
it := t.NewIterator(false)
it := t.NewIterator(table.NOCACHE)
for it.Rewind(); it.Valid(); it.Next() {
count++
}
Expand Down
4 changes: 2 additions & 2 deletions table/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func TestBloomfilter(t *testing.T) {
require.NoError(t, err)
require.Equal(t, withBlooms, tab.hasBloomFilter)
// Forward iteration
it := tab.NewIterator(false)
it := tab.NewIterator(0)
c := 0
for it.Rewind(); it.Valid(); it.Next() {
c++
Expand All @@ -236,7 +236,7 @@ func TestBloomfilter(t *testing.T) {
require.Equal(t, keyCount, c)

// Backward iteration
it = tab.NewIterator(true)
it = tab.NewIterator(REVERSED)
c = 0
for it.Rewind(); it.Valid(); it.Next() {
c++
Expand Down
59 changes: 34 additions & 25 deletions table/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,13 @@ type Iterator struct {

// Internally, Iterator is bidirectional. However, we only expose the
// unidirectional functionality for now.
reversed bool
opt int // Valid options are REVERSED and NOCACHE.
}

// NewIterator returns a new iterator of the Table
func (t *Table) NewIterator(reversed bool) *Iterator {
func (t *Table) NewIterator(opt int) *Iterator {
t.IncrRef() // Important.
ti := &Iterator{t: t, reversed: reversed}
ti := &Iterator{t: t, opt: opt}
ti.next()
return ti
}
Expand All @@ -194,14 +194,18 @@ func (itr *Iterator) Valid() bool {
return itr.err == nil
}

func (itr *Iterator) useCache() bool {
return itr.opt&NOCACHE == 0
}

func (itr *Iterator) seekToFirst() {
numBlocks := itr.t.noOfBlocks
if numBlocks == 0 {
itr.err = io.EOF
return
}
itr.bpos = 0
block, err := itr.t.block(itr.bpos)
block, err := itr.t.block(itr.bpos, itr.useCache())
if err != nil {
itr.err = err
return
Expand All @@ -218,7 +222,7 @@ func (itr *Iterator) seekToLast() {
return
}
itr.bpos = numBlocks - 1
block, err := itr.t.block(itr.bpos)
block, err := itr.t.block(itr.bpos, itr.useCache())
if err != nil {
itr.err = err
return
Expand All @@ -230,7 +234,7 @@ func (itr *Iterator) seekToLast() {

func (itr *Iterator) seekHelper(blockIdx int, key []byte) {
itr.bpos = blockIdx
block, err := itr.t.block(blockIdx)
block, err := itr.t.block(blockIdx, itr.useCache())
if err != nil {
itr.err = err
return
Expand Down Expand Up @@ -303,7 +307,7 @@ func (itr *Iterator) next() {
}

if len(itr.bi.data) == 0 {
block, err := itr.t.block(itr.bpos)
block, err := itr.t.block(itr.bpos, itr.useCache())
if err != nil {
itr.err = err
return
Expand Down Expand Up @@ -331,7 +335,7 @@ func (itr *Iterator) prev() {
}

if len(itr.bi.data) == 0 {
block, err := itr.t.block(itr.bpos)
block, err := itr.t.block(itr.bpos, itr.useCache())
if err != nil {
itr.err = err
return
Expand Down Expand Up @@ -373,7 +377,7 @@ func (itr *Iterator) ValueCopy() (ret y.ValueStruct) {

// Next follows the y.Iterator interface
func (itr *Iterator) Next() {
if !itr.reversed {
if itr.opt&REVERSED == 0 {
itr.next()
} else {
itr.prev()
Expand All @@ -382,7 +386,7 @@ func (itr *Iterator) Next() {

// Rewind follows the y.Iterator interface
func (itr *Iterator) Rewind() {
if !itr.reversed {
if itr.opt&REVERSED == 0 {
itr.seekToFirst()
} else {
itr.seekToLast()
Expand All @@ -391,25 +395,30 @@ func (itr *Iterator) Rewind() {

// Seek follows the y.Iterator interface
func (itr *Iterator) Seek(key []byte) {
if !itr.reversed {
if itr.opt&REVERSED == 0 {
itr.seek(key)
} else {
itr.seekForPrev(key)
}
}

var (
REVERSED int = 2
NOCACHE int = 4
)

// ConcatIterator concatenates the sequences defined by several iterators. (It only works with
// TableIterators, probably just because it's faster to not be so generic.)
type ConcatIterator struct {
idx int // Which iterator is active now.
cur *Iterator
iters []*Iterator // Corresponds to tables.
tables []*Table // Disregarding reversed, this is in ascending order.
reversed bool
idx int // Which iterator is active now.
cur *Iterator
iters []*Iterator // Corresponds to tables.
tables []*Table // Disregarding reversed, this is in ascending order.
options int // Valid options are REVERSED and NOCACHE.
}

// NewConcatIterator creates a new concatenated iterator
func NewConcatIterator(tbls []*Table, reversed bool) *ConcatIterator {
func NewConcatIterator(tbls []*Table, opt int) *ConcatIterator {
iters := make([]*Iterator, len(tbls))
for i := 0; i < len(tbls); i++ {
// Increment the reference count. Since, we're not creating the iterator right now.
Expand All @@ -420,10 +429,10 @@ func NewConcatIterator(tbls []*Table, reversed bool) *ConcatIterator {
// iters[i] = tbls[i].NewIterator(reversed)
}
return &ConcatIterator{
reversed: reversed,
iters: iters,
tables: tbls,
idx: -1, // Not really necessary because s.it.Valid()=false, but good to have.
options: opt,
iters: iters,
tables: tbls,
idx: -1, // Not really necessary because s.it.Valid()=false, but good to have.
}
}

Expand All @@ -434,7 +443,7 @@ func (s *ConcatIterator) setIdx(idx int) {
return
}
if s.iters[idx] == nil {
s.iters[idx] = s.tables[idx].NewIterator(s.reversed)
s.iters[idx] = s.tables[idx].NewIterator(s.options)
}
s.cur = s.iters[s.idx]
}
Expand All @@ -444,7 +453,7 @@ func (s *ConcatIterator) Rewind() {
if len(s.iters) == 0 {
return
}
if !s.reversed {
if s.options&REVERSED == 0 {
s.setIdx(0)
} else {
s.setIdx(len(s.iters) - 1)
Expand All @@ -470,7 +479,7 @@ func (s *ConcatIterator) Value() y.ValueStruct {
// Seek brings us to element >= key if reversed is false. Otherwise, <= key.
func (s *ConcatIterator) Seek(key []byte) {
var idx int
if !s.reversed {
if s.options&REVERSED == 0 {
idx = sort.Search(len(s.tables), func(i int) bool {
return y.CompareKeys(s.tables[i].Biggest(), key) >= 0
})
Expand Down Expand Up @@ -498,7 +507,7 @@ func (s *ConcatIterator) Next() {
return
}
for { // In case there are empty tables.
if !s.reversed {
if s.options&REVERSED == 0 {
s.setIdx(s.idx + 1)
} else {
s.setIdx(s.idx - 1)
Expand Down
8 changes: 4 additions & 4 deletions table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ func (t *Table) initBiggestAndSmallest() error {

t.smallest = ko.Key

it2 := t.NewIterator(true)
it2 := t.NewIterator(REVERSED | NOCACHE)
defer it2.Close()
it2.Rewind()
if !it2.Valid() {
Expand Down Expand Up @@ -515,7 +515,7 @@ func calculateOffsetsSize(offsets []*pb.BlockOffset) int64 {
// block function return a new block. Each block holds a ref and the byte
// slice stored in the block will be reused when the ref becomes zero. The
// caller should release the block by calling block.decrRef() on it.
func (t *Table) block(idx int) (*block, error) {
func (t *Table) block(idx int, useCache bool) (*block, error) {
y.AssertTruef(idx >= 0, "idx=%d", idx)
if idx >= t.noOfBlocks {
return nil, errors.New("block out of index")
Expand Down Expand Up @@ -597,7 +597,7 @@ func (t *Table) block(idx int) (*block, error) {
}

blk.incrRef()
if t.opt.Cache != nil && t.opt.KeepBlocksInCache {
if useCache && t.opt.Cache != nil && t.opt.KeepBlocksInCache {
key := t.blockCacheKey(idx)
// incrRef should never return false here because we're calling it on a
// new block with ref=1.
Expand Down Expand Up @@ -725,7 +725,7 @@ func (t *Table) readTableIndex() *pb.TableIndex {
// OpenTable() function. This function is also called inside levelsController.VerifyChecksum().
func (t *Table) VerifyChecksum() error {
for i, os := range t.blockOffsets() {
b, err := t.block(i)
b, err := t.block(i, true)
if err != nil {
return y.Wrapf(err, "checksum validation failed for table: %s, block: %d, offset:%d",
t.Filename(), i, os.Offset)
Expand Down
Loading