Skip to content

Commit

Permalink
Fix(builder): Too many small tables when compression is enabled (#1549)
Browse files Browse the repository at this point in the history
This fixes the issue of too many sst files of very small sizes when compression is enabled.
We now account for the actual sizes of blocks after compression and we assume that the 
table capacity is reached if the sum of actual sizes of block buffers is more than 90% of
the table capacity.
  • Loading branch information
ahsanbarkati authored Oct 3, 2020
1 parent 68fb85d commit 5d1bab4
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 8 deletions.
2 changes: 1 addition & 1 deletion levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ nextTable:
}

if !y.SameKey(it.Key(), lastKey) {
if builder.ReachedCapacity(s.kv.opt.MaxTableSize) {
if builder.ReachedCapacity(uint64(float64(s.kv.opt.MaxTableSize) * 0.9)) {
// Only break if we are on a different key, and have reached capacity. We want
// to ensure that all versions of the key are stored in the same sstable, and
// not divided across multiple tables at the same level.
Expand Down
2 changes: 1 addition & 1 deletion stream_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func (w *sortedWriter) Add(key []byte, vs y.ValueStruct) error {

sameKey := y.SameKey(key, w.lastKey)
// Same keys should go into the same SSTable.
if !sameKey && w.builder.ReachedCapacity(w.db.opt.MaxTableSize) {
if !sameKey && w.builder.ReachedCapacity(uint64(float64(w.db.opt.MaxTableSize)*0.9)) {
if err := w.send(false); err != nil {
return err
}
Expand Down
18 changes: 12 additions & 6 deletions table/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"math"
"runtime"
"sync"
"sync/atomic"
"unsafe"

"github.com/dgryski/go-farm"
Expand Down Expand Up @@ -74,9 +75,10 @@ type bblock struct {
// Builder is used in building a table.
type Builder struct {
// Typically tens or hundreds of meg. This is for one single file.
buf []byte
sz uint32
bufLock sync.Mutex // This lock guards the buf. We acquire lock when we resize the buf.
buf []byte
sz uint32
bufLock sync.Mutex // This lock guards the buf. We acquire lock when we resize the buf.
actualSize uint32 // Used to store the sum of sizes of blocks after compression/encryption.

baseKey []byte // Base key for the current block.
baseOffset uint32 // Offset for the current block.
Expand Down Expand Up @@ -152,6 +154,9 @@ func (b *Builder) handleBlock() {
copy(b.buf[item.start:], blockBuf)
b.bufLock.Unlock()

// Add the actual size of current block.
atomic.AddUint32(&b.actualSize, uint32(len(blockBuf)))

// Fix the boundary of the block.
item.end = item.start + uint32(len(blockBuf))

Expand Down Expand Up @@ -273,6 +278,7 @@ func (b *Builder) finishBlock() {
// If compression/encryption is disabled, no need to send the block to the blockChan.
// There's nothing to be done.
if b.blockChan == nil {
atomic.StoreUint32(&b.actualSize, b.sz)
b.addBlockToIndex()
return
}
Expand Down Expand Up @@ -346,8 +352,8 @@ func (b *Builder) Add(key []byte, value y.ValueStruct, valueLen uint32) {
// at the end. The diff can vary.

// ReachedCapacity returns true if we... roughly (?) reached capacity?
func (b *Builder) ReachedCapacity(cap int64) bool {
blocksSize := b.sz + // length of current buffer
func (b *Builder) ReachedCapacity(capacity uint64) bool {
blocksSize := atomic.LoadUint32(&b.actualSize) + // actual length of current buffer
uint32(len(b.entryOffsets)*4) + // all entry offsets size
4 + // count of all entry offsets
8 + // checksum bytes
Expand All @@ -356,7 +362,7 @@ func (b *Builder) ReachedCapacity(cap int64) bool {
4 + // Index length
5*(uint32(len(b.tableIndex.Offsets))) // approximate index size

return int64(estimateSz) > cap
return uint64(estimateSz) > capacity
}

// Finish finishes the table by appending the index.
Expand Down

0 comments on commit 5d1bab4

Please sign in to comment.