Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

opt(builder): Use z.Allocator for building tables #1576

Merged
merged 28 commits into from
Nov 3, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
434928c
Basic working z.buffer
ahsanbarkati Oct 28, 2020
1ecde72
Cleanup
ahsanbarkati Oct 28, 2020
43254e6
Cleanup
ahsanbarkati Oct 28, 2020
8381b8f
Add comment
ahsanbarkati Oct 28, 2020
0c96aae
Use stream reader to make test fast. Also switch to using allocator i…
manishrjain Oct 28, 2020
abf7c87
Make allocator changes
ahsanbarkati Oct 29, 2020
c7dc592
Fix tests and cleanup
ahsanbarkati Oct 29, 2020
66d21aa
Allocate curBlock if needed
ahsanbarkati Oct 29, 2020
7f29ec5
Fix table size
ahsanbarkati Oct 29, 2020
5ae1659
Use z.Buffer for final copy.
manishrjain Oct 29, 2020
0669741
Add a TODO around logic
manishrjain Oct 29, 2020
ac241c7
Fix race conditions and cleanup
ahsanbarkati Oct 29, 2020
ad03afe
Return data back to allocator when not needed.
manishrjain Oct 30, 2020
af2c607
Reuse allocators
manishrjain Oct 30, 2020
1507408
Avoid an extra copy of the table data.
manishrjain Nov 2, 2020
233559a
Merge branch 'master' into ahsan/mmap-builder
manishrjain Nov 2, 2020
d447fee
Bring in latest ristretto
manishrjain Nov 3, 2020
9eecf5c
Create exact size file as needed and copy build data over.
manishrjain Nov 3, 2020
dbc1147
Fix TestStreamWriter6 and the ReachedCapacity function
ahsanbarkati Nov 3, 2020
f7c08ab
Fix compilation in table test
Nov 3, 2020
4ad08ee
Fix compilation in badger tests
Nov 3, 2020
a4eba92
Remove builder.offsets
Nov 3, 2020
e7120fe
Don't close builder twice
Nov 3, 2020
b6a16c1
Merge branch 'master' into ahsan/mmap-builder
manishrjain Nov 3, 2020
fe74308
All tests pass
manishrjain Nov 3, 2020
f3c54bf
No need to reverse
manishrjain Nov 3, 2020
ae6e239
Bring test.sh down to only 2 funcs.
manishrjain Nov 3, 2020
9a54599
Add TODOs
manishrjain Nov 3, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 16 additions & 12 deletions db2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package badger

import (
"bytes"
"context"
"encoding/binary"
"flag"
"fmt"
Expand Down Expand Up @@ -987,27 +988,30 @@ func TestKeyCount(t *testing.T) {
defer db.Close()
writeSorted(db, N)
require.NoError(t, db.Close())
t.Logf("Writing DONE\n")

// Read the db
db2, err := Open(DefaultOptions(dir))
y.Check(err)
defer db.Close()
lastKey := -1
count := 0
db2.View(func(txn *Txn) error {
iopt := DefaultIteratorOptions
iopt.AllVersions = true
it := txn.NewIterator(iopt)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
count++
i := it.Item()
key := binary.BigEndian.Uint64(i.Key())

streams := make(map[uint32]int)
stream := db2.NewStream()
stream.Send = func(list *pb.KVList) error {
count += len(list.Kv)
for _, kv := range list.Kv {
last := streams[kv.StreamId]
key := binary.BigEndian.Uint64(kv.Key)
// The following should happen as we're writing sorted data.
require.Equalf(t, lastKey+1, int(key), "Expected key: %d, Found Key: %d", lastKey+1, int(key))
lastKey = int(key)
if last > 0 {
require.Equalf(t, last+1, int(key), "Expected key: %d, Found Key: %d", lastKey+1, int(key))
}
streams[kv.StreamId] = int(key)
}
return nil
})
}
require.NoError(t, stream.Orchestrate(context.Background()))
require.Equal(t, N, uint64(count))
}
2 changes: 2 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/dgraph-io/badger/v2/options"
"github.com/dgraph-io/badger/v2/pb"
"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/ristretto/z"
)

// summary is produced when DB is closed. Currently it is used only for testing.
Expand Down Expand Up @@ -2109,6 +2110,7 @@ func TestVerifyChecksum(t *testing.T) {

func TestMain(m *testing.M) {
flag.Parse()
z.StatsPrint()
os.Exit(m.Run())
}

Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb6
github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
Expand Down
91 changes: 41 additions & 50 deletions table/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,23 @@ func (h *header) Decode(buf []byte) {

// bblock represents a block that is being compressed/encrypted in the background.
type bblock struct {
data []byte
start uint32 // Points to the starting offset of the block.
end uint32 // Points to the end offset of the block.
data []byte
end int // Points to the end offset of the block.
}

func (bb *bblock) Append(data []byte) {
n := copy(bb.data[bb.end:], data)
bb.end += n
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initialize n using a composite literal to simplify the code. as specified in Effective Go

View Rule

}

// Builder is used in building a table.
type Builder struct {
// Typically tens or hundreds of meg. This is for one single file.
buf []byte
sz uint32
bufLock sync.Mutex // This lock guards the buf. We acquire lock when we resize the buf.
actualSize uint32 // Used to store the sum of sizes of blocks after compression/encryption.
alloc *z.Allocator
sz uint32
curBlock *bblock

actualSize uint32 // Used to store the sum of sizes of blocks after compression/encryption.

baseKey []byte // Base key for the current block.
baseOffset uint32 // Offset for the current block.
Expand All @@ -104,10 +109,13 @@ func NewTableBuilder(opts Options) *Builder {
// Additional 16 MB to store index (approximate).
// We trim the additional space in table.Finish().
// TODO: Switch this buf over to z.Buffer.
buf: make([]byte, int(opts.TableSize+16*MB)),
alloc: z.NewAllocator(16 * MB),
opt: &opts,
offsets: z.NewBuffer(1 << 20),
}
b.curBlock = &bblock{
data: b.alloc.Allocate(opts.BlockSize + padding),
}
b.opt.tableCapacity = uint64(float64(b.opt.TableSize) * 0.9)

// If encryption or compression is not enabled, do not start compression/encryption goroutines
Expand Down Expand Up @@ -153,13 +161,12 @@ func (b *Builder) handleBlock() {
y.AssertTruef(uint32(len(blockBuf)) <= allocatedSpace, "newend: %d oldend: %d padding: %d",
item.start+uint32(len(blockBuf)), item.end, padding)

// Acquire the buflock here. The builder.grow function might change
// the b.buf while this goroutine was running.
// Acquire the buflock here. The z.buffer.Allocation might change
// the b.buf while this goroutine is running.
b.bufLock.Lock()
// Copy over compressed/encrypted data back to the main buffer.
copy(b.buf[item.start:], blockBuf)
copy(item.data[item.start:item.start+uint32(len(blockBuf))], blockBuf)
b.bufLock.Unlock()

// Add the actual size of current block.
atomic.AddUint32(&b.actualSize, uint32(len(blockBuf)))

Expand Down Expand Up @@ -225,45 +232,21 @@ func (b *Builder) addHelper(key []byte, v y.ValueStruct, vpLen uint32) {
b.append(h.Encode())
b.append(diffKey)

if uint32(len(b.buf)) < b.sz+v.EncodedSize() {
b.grow(v.EncodedSize())
}
b.sz += v.Encode(b.buf[b.sz:])

tmp := make([]byte, int(v.EncodedSize()))
v.Encode(tmp)
b.curBlock.Append(tmp)
// Size of KV on SST.
sstSz := uint32(headerSize) + uint32(len(diffKey)) + v.EncodedSize()
// Total estimated size = size on SST + size on vlog (length of value pointer).
b.estimatedSize += (sstSz + vpLen)
}

// grow increases the size of b.buf by atleast 50%.
func (b *Builder) grow(n uint32) {
l := uint32(len(b.buf))
if n < l/2 {
n = l / 2
}
newBuf := make([]byte, l+n)
y.AssertTrue(uint32(len(newBuf)) == l+n)

b.bufLock.Lock()
copy(newBuf, b.buf)
b.buf = newBuf
b.bufLock.Unlock()
}
func (b *Builder) append(data []byte) {
// Ensure we have enough space to store new data.
if uint32(len(b.buf)) < b.sz+uint32(len(data)) {
b.grow(uint32(len(data)))
}
copy(b.buf[b.sz:], data)
b.sz += uint32(len(data))
b.curBlock.Append(data)
}

// TODO: Remove this func.
func (b *Builder) addPadding(sz uint32) {
if uint32(len(b.buf)) < b.sz+sz {
b.grow(sz)
}
b.sz += sz
}

/*
Expand All @@ -285,7 +268,7 @@ func (b *Builder) finishBlock() {
b.append(y.U32SliceToBytes(b.entryOffsets))
b.append(y.U32ToBytes(uint32(len(b.entryOffsets))))

b.writeChecksum(b.buf[b.baseOffset:b.sz])
b.writeChecksum(b.buf.Bytes()[b.baseOffset:b.sz])

// If compression/encryption is disabled, no need to send the block to the blockChan.
// There's nothing to be done.
Expand All @@ -296,18 +279,19 @@ func (b *Builder) finishBlock() {
}

b.addPadding(padding)

// Block end is the actual end of the block ignoring the padding.
block := &bblock{start: b.baseOffset, end: uint32(b.sz - padding), data: b.buf}
block := &bblock{start: b.baseOffset, end: uint32(b.sz - padding), data: b.buf.Bytes()}

b.blockList = append(b.blockList, block)

b.addBlockToIndex()
// Push to the block handler.
// Push to the block handler.f
b.blockChan <- block
}

func (b *Builder) addBlockToIndex() {
blockBuf := b.buf[b.baseOffset:b.sz]
// blockBuf := b.buf[b.baseOffset:b.sz]
blockBuf := b.buf.Bytes()[b.baseOffset:b.sz]
// Add key to the block index.
builder := fbs.NewBuilder(64)
off := builder.CreateByteVector(b.baseKey)
Expand Down Expand Up @@ -360,6 +344,11 @@ func (b *Builder) Add(key []byte, value y.ValueStruct, valueLen uint32) {
y.AssertTrue(uint32(b.sz) < math.MaxUint32)
b.baseOffset = uint32((b.sz))
b.entryOffsets = b.entryOffsets[:0]

// Create a new block and start writing.
b.curBlock = &bblock{
data: b.alloc.Allocate(b.opt.BlockSize + padding),
}
}
b.addHelper(key, value, valueLen)
}
Expand Down Expand Up @@ -412,12 +401,13 @@ func (b *Builder) Finish(allocate bool) []byte {
// padding from the actual table size. len(blocklist) would be zero if
// there is no compression/encryption.
uncompressedSize := b.sz - uint32(padding*len(b.blockList))
dst := b.buf
dst := b.buf.Bytes()
// Fix block boundaries. This includes moving the blocks so that we
// don't have any interleaving space between them.
if len(b.blockList) > 0 {
i, dstLen := 0, uint32(0)
b.offsets.SliceIterate(func(slice []byte) error {

bl := b.blockList[i]
// Length of the block is end minus the start.
fbo := fb.GetRootAsBlockOffset(slice, 0)
Expand All @@ -426,7 +416,8 @@ func (b *Builder) Finish(allocate bool) []byte {
// which we have written data.
fbo.MutateOffset(dstLen)

copy(dst[dstLen:], b.buf[bl.start:bl.end])
// Copy over to z.Buffer here.
copy(dst[dstLen:], b.buf.Bytes()[bl.start:bl.end])

// New length is the start of the block plus its length.
dstLen = fbo.Offset() + fbo.Len()
Expand Down Expand Up @@ -457,9 +448,9 @@ func (b *Builder) Finish(allocate bool) []byte {
b.writeChecksum(index)

if allocate {
return append([]byte{}, b.buf[:b.sz]...)
return append([]byte{}, b.buf.Bytes()[:b.sz]...)
}
return b.buf[:b.sz]
return b.buf.Bytes()[:b.sz]
}

func (b *Builder) writeChecksum(data []byte) {
Expand Down