Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

opt(builder): Use z.Allocator for building tables #1576

Merged
merged 28 commits into from
Nov 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
434928c
Basic working z.buffer
ahsanbarkati Oct 28, 2020
1ecde72
Cleanup
ahsanbarkati Oct 28, 2020
43254e6
Cleanup
ahsanbarkati Oct 28, 2020
8381b8f
Add comment
ahsanbarkati Oct 28, 2020
0c96aae
Use stream reader to make test fast. Also switch to using allocator i…
manishrjain Oct 28, 2020
abf7c87
Make allocator changes
ahsanbarkati Oct 29, 2020
c7dc592
Fix tests and cleanup
ahsanbarkati Oct 29, 2020
66d21aa
Allocate curBlock if needed
ahsanbarkati Oct 29, 2020
7f29ec5
Fix table size
ahsanbarkati Oct 29, 2020
5ae1659
Use z.Buffer for final copy.
manishrjain Oct 29, 2020
0669741
Add a TODO around logic
manishrjain Oct 29, 2020
ac241c7
Fix race conditions and cleanup
ahsanbarkati Oct 29, 2020
ad03afe
Return data back to allocator when not needed.
manishrjain Oct 30, 2020
af2c607
Reuse allocators
manishrjain Oct 30, 2020
1507408
Avoid an extra copy of the table data.
manishrjain Nov 2, 2020
233559a
Merge branch 'master' into ahsan/mmap-builder
manishrjain Nov 2, 2020
d447fee
Bring in latest ristretto
manishrjain Nov 3, 2020
9eecf5c
Create exact size file as needed and copy build data over.
manishrjain Nov 3, 2020
dbc1147
Fix TestStreamWriter6 and the ReachedCapacity function
ahsanbarkati Nov 3, 2020
f7c08ab
Fix compilation in table test
Nov 3, 2020
4ad08ee
Fix compilation in badger tests
Nov 3, 2020
a4eba92
Remove builder.offsets
Nov 3, 2020
e7120fe
Don't close builder twice
Nov 3, 2020
b6a16c1
Merge branch 'master' into ahsan/mmap-builder
manishrjain Nov 3, 2020
fe74308
All tests pass
manishrjain Nov 3, 2020
f3c54bf
No need to reverse
manishrjain Nov 3, 2020
ae6e239
Bring test.sh down to only 2 funcs.
manishrjain Nov 3, 2020
9a54599
Add TODOs
manishrjain Nov 3, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion badger/cmd/write_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ func reportStats(c *z.Closer, db *badger.DB) {
bytesRate := sz / uint64(dur.Seconds())
entriesRate := entries / uint64(dur.Seconds())
fmt.Printf("[WRITE] Time elapsed: %s, bytes written: %s, speed: %s/sec, "+
"entries written: %d, speed: %d/sec, Memory: %s\n",
"entries written: %d, speed: %d/sec, jemalloc: %s\n",
y.FixedDuration(time.Since(startTime)),
humanize.Bytes(sz), humanize.Bytes(bytesRate), entries, entriesRate,
humanize.IBytes(uint64(z.NumAllocBytes())))
Expand Down
1 change: 1 addition & 0 deletions badger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func main() {
z.Free(out)

cmd.Execute()
z.Done()
fmt.Printf("Num Allocated Bytes at program end: %s\n",
humanize.IBytes(uint64(z.NumAllocBytes())))
if z.NumAllocBytes() > 0 {
Expand Down
3 changes: 3 additions & 0 deletions batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,16 @@ func TestWriteBatch(t *testing.T) {
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
test(t, db)
})
t.Logf("Disk mode done\n")
})
t.Run("InMemory mode", func(t *testing.T) {
t.Skipf("TODO(ibrahim): Please fix this")
opt := getTestOptions("")
opt.InMemory = true
db, err := Open(opt)
require.NoError(t, err)
test(t, db)
t.Logf("Disk mode done\n")
require.NoError(t, db.Close())
})
}
Expand Down
29 changes: 12 additions & 17 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -957,12 +957,10 @@ func arenaSize(opt Options) int64 {
}

// buildL0Table builds a new table from the memtable.
func buildL0Table(ft flushTask, bopts table.Options) []byte {
func buildL0Table(ft flushTask, bopts table.Options) *table.Builder {
iter := ft.mt.sl.NewIterator()
defer iter.Close()
b := table.NewTableBuilder(bopts)
defer b.Close()

var vp valuePointer
for iter.SeekToFirst(); iter.Valid(); iter.Next() {
if len(ft.dropPrefixes) > 0 && hasAnyPrefixes(iter.Key(), ft.dropPrefixes) {
Expand All @@ -974,7 +972,7 @@ func buildL0Table(ft flushTask, bopts table.Options) []byte {
}
b.Add(iter.Key(), iter.Value(), vp.Len)
}
return b.Finish(true)
return b
}

type flushTask struct {
Expand All @@ -989,30 +987,26 @@ func (db *DB) handleFlushTask(ft flushTask) error {
return nil
}

dk, err := db.registry.LatestDataKey()
if err != nil {
return y.Wrapf(err, "failed to get datakey in db.handleFlushTask")
}
bopts := buildTableOptions(db.opt)
bopts.DataKey = dk
// Builder does not need cache but the same options are used for opening table.
bopts.BlockCache = db.blockCache
bopts.IndexCache = db.indexCache
tableData := buildL0Table(ft, bopts)
bopts := buildTableOptions(db)
builder := buildL0Table(ft, bopts)
defer builder.Close()

// buildL0Table can return nil if the none of the items in the skiplist are
// added to the builder. This can happen when drop prefix is set and all
// the items are skipped.
if len(tableData) == 0 {
if builder.Empty() {
builder.Finish()
return nil
}

fileID := db.lc.reserveFileID()
var tbl *table.Table
var err error
if db.opt.InMemory {
tbl, err = table.OpenInMemoryTable(tableData, fileID, &bopts)
data := builder.Finish()
tbl, err = table.OpenInMemoryTable(data, fileID, &bopts)
} else {
tbl, err = table.CreateTable(table.NewFilename(fileID, db.opt.Dir), tableData, bopts)
tbl, err = table.CreateTable(table.NewFilename(fileID, db.opt.Dir), builder)
}
if err != nil {
return y.Wrap(err, "error while creating table")
Expand Down Expand Up @@ -1789,6 +1783,7 @@ func (db *DB) StreamDB(outOptions Options) error {
// Stream contents of DB to the output DB.
stream := db.NewStreamAt(math.MaxUint64)
stream.LogPrefix = fmt.Sprintf("Streaming DB to new DB at %s", outDir)

stream.Send = func(kvs *pb.KVList) error {
return writer.Write(kvs)
}
Expand Down
32 changes: 18 additions & 14 deletions db2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package badger

import (
"bytes"
"context"
"encoding/binary"
"flag"
"fmt"
Expand Down Expand Up @@ -504,7 +505,7 @@ func addToManifest(t *testing.T, db *DB, tab *table.Table, level uint32) {
// createTableWithRange function is used in TestCompactionFilePicking. It creates
// a table with key starting from start and ending with end.
func createTableWithRange(t *testing.T, db *DB, start, end int) *table.Table {
bopts := buildTableOptions(db.opt)
bopts := buildTableOptions(db)
b := table.NewTableBuilder(bopts)
defer b.Close()
nums := []int{start, end}
Expand All @@ -517,7 +518,7 @@ func createTableWithRange(t *testing.T, db *DB, start, end int) *table.Table {
}

fileID := db.lc.reserveFileID()
tab, err := table.CreateTable(table.NewFilename(fileID, db.opt.Dir), b.Finish(false), bopts)
tab, err := table.CreateTable(table.NewFilename(fileID, db.opt.Dir), b)
require.NoError(t, err)
return tab
}
Expand Down Expand Up @@ -987,27 +988,30 @@ func TestKeyCount(t *testing.T) {
defer db.Close()
writeSorted(db, N)
require.NoError(t, db.Close())
t.Logf("Writing DONE\n")

// Read the db
db2, err := Open(DefaultOptions(dir))
y.Check(err)
defer db.Close()
lastKey := -1
count := 0
db2.View(func(txn *Txn) error {
iopt := DefaultIteratorOptions
iopt.AllVersions = true
it := txn.NewIterator(iopt)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
count++
i := it.Item()
key := binary.BigEndian.Uint64(i.Key())

streams := make(map[uint32]int)
stream := db2.NewStream()
stream.Send = func(list *pb.KVList) error {
count += len(list.Kv)
for _, kv := range list.Kv {
last := streams[kv.StreamId]
key := binary.BigEndian.Uint64(kv.Key)
// The following should happen as we're writing sorted data.
require.Equalf(t, lastKey+1, int(key), "Expected key: %d, Found Key: %d", lastKey+1, int(key))
lastKey = int(key)
if last > 0 {
require.Equalf(t, last+1, int(key), "Expected key: %d, Found Key: %d", lastKey+1, int(key))
}
streams[kv.StreamId] = int(key)
}
return nil
})
}
require.NoError(t, stream.Orchestrate(context.Background()))
require.Equal(t, N, uint64(count))
}
26 changes: 15 additions & 11 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,19 @@ func TestForceCompactL0(t *testing.T) {
}

func TestStreamDB(t *testing.T) {
check := func(db *DB) {
for i := 0; i < 100; i++ {
key := []byte(fmt.Sprintf("key%d", i))
val := []byte(fmt.Sprintf("val%d", i))
txn := db.NewTransactionAt(1, false)
item, err := txn.Get(key)
require.NoError(t, err)
require.EqualValues(t, val, getItemValue(t, item))
require.Equal(t, byte(0x00), item.UserMeta())
txn.Discard()
}
}

dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)
Expand All @@ -393,6 +406,7 @@ func TestStreamDB(t *testing.T) {
require.NoError(t, writer.SetEntryAt(NewEntry(key, val).WithMeta(0x00), 1))
}
require.NoError(t, writer.Flush())
check(db)

outDir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
Expand All @@ -404,17 +418,7 @@ func TestStreamDB(t *testing.T) {
defer func() {
require.NoError(t, outDB.Close())
}()

for i := 0; i < 100; i++ {
key := []byte(fmt.Sprintf("key%d", i))
val := []byte(fmt.Sprintf("val%d", i))
txn := outDB.NewTransactionAt(1, false)
item, err := txn.Get(key)
require.NoError(t, err)
require.EqualValues(t, val, getItemValue(t, item))
require.Equal(t, byte(0x00), item.UserMeta())
txn.Discard()
}
check(outDB)
}

func dirSize(path string) (int64, error) {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ go 1.12
require (
github.com/DataDog/zstd v1.4.1
github.com/cespare/xxhash v1.1.0
github.com/dgraph-io/ristretto v0.0.4-0.20201023213945-72c2139ec27f
github.com/dgraph-io/ristretto v0.0.4-0.20201103012257-4dcfe40a6fc0
github.com/dustin/go-humanize v1.0.0
github.com/golang/protobuf v1.3.1
github.com/golang/snappy v0.0.1
Expand Down
5 changes: 3 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/ristretto v0.0.4-0.20201023213945-72c2139ec27f h1:YPDUnM9Rkd0V41Ie43v/QoNgz5NNGcZv05UnYEnQgo4=
github.com/dgraph-io/ristretto v0.0.4-0.20201023213945-72c2139ec27f/go.mod h1:bDI4cDaalvYSji3vBVDKrn9ouDZrwN974u8ZO/AhYXs=
github.com/dgraph-io/ristretto v0.0.4-0.20201103012257-4dcfe40a6fc0 h1:5ZtQ7aGng65gFPo1sdoZI0pTpYjJDU4t+rIFFoWUOpc=
github.com/dgraph-io/ristretto v0.0.4-0.20201103012257-4dcfe40a6fc0/go.mod h1:bDI4cDaalvYSji3vBVDKrn9ouDZrwN974u8ZO/AhYXs=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
Expand Down Expand Up @@ -64,6 +64,7 @@ github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb6
github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
Expand Down
26 changes: 7 additions & 19 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,10 @@ func newLevelsController(db *DB, mf *Manifest) (*levelsController, error) {
rerr = y.Wrapf(err, "Error while reading datakey")
return
}
topt := buildTableOptions(db.opt)
// Set compression from table manifest.
topt := buildTableOptions(db)
// Explicitly set Compression and DataKey based on how the table was generated.
topt.Compression = tf.Compression
topt.DataKey = dk
topt.BlockCache = db.blockCache
topt.IndexCache = db.indexCache

mf, err := z.OpenMmapFile(fname, db.opt.getFileFlags(), 0)
if err != nil {
Expand Down Expand Up @@ -758,17 +756,7 @@ func (s *levelsController) subcompact(it y.Iterator, kr keyRange, cd compactDef,
break
}

dk, err := s.kv.registry.LatestDataKey()
if err != nil {
inflightBuilders.Done(y.Wrapf(err, "Error while retrieving datakey in levelsController.compactBuildTables"))
return
}
bopts := buildTableOptions(s.kv.opt)
bopts.DataKey = dk
// Builder does not need cache but the same options are used for opening table.
bopts.BlockCache = s.kv.blockCache
bopts.IndexCache = s.kv.indexCache

bopts := buildTableOptions(s.kv)
// Set TableSize to the target file size for that level.
bopts.TableSize = uint64(cd.t.fileSz[cd.nextLevel.level])
builder := table.NewTableBuilder(bopts)
Expand All @@ -780,7 +768,7 @@ func (s *levelsController) subcompact(it y.Iterator, kr keyRange, cd compactDef,
// called Add() at least once, and builder is not Empty().
if builder.Empty() {
// Cleanup builder resources:
builder.Finish(false)
builder.Finish()
builder.Close()
continue
}
Expand All @@ -791,18 +779,18 @@ func (s *levelsController) subcompact(it y.Iterator, kr keyRange, cd compactDef,
break
}
go func(builder *table.Builder) {
var err error
defer builder.Close()
defer inflightBuilders.Done(err)

build := func(fileID uint64) (*table.Table, error) {
fname := table.NewFilename(fileID, s.kv.opt.Dir)
return table.CreateTable(fname, builder.Finish(false), bopts)
return table.CreateTable(fname, builder)
}

var tbl *table.Table
var err error
if s.kv.opt.InMemory {
tbl, err = table.OpenInMemoryTable(builder.Finish(true), fileID, &bopts)
tbl, err = table.OpenInMemoryTable(builder.Finish(), fileID, &bopts)
} else {
tbl, err = build(fileID)
}
Expand Down
4 changes: 2 additions & 2 deletions levels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func createAndOpen(db *DB, td []keyValVersion, level int) {
b.Add(key, val, 0)
}
fname := table.NewFilename(db.lc.reserveFileID(), db.opt.Dir)
tab, err := table.CreateTable(fname, b.Finish(false), opts)
tab, err := table.CreateTable(fname, b)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -768,7 +768,7 @@ func createEmptyTable(db *DB) *table.Table {
b.Add(y.KeyWithTs([]byte("foo"), 1), y.ValueStruct{}, 0)

// Open table in memory to avoid adding changes to manifest file.
tab, err := table.OpenInMemoryTable(b.Finish(true), db.lc.reserveFileID(), &opts)
tab, err := table.OpenInMemoryTable(b.Finish(), db.lc.reserveFileID(), &opts)
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion manifest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func buildTable(t *testing.T, keyValues [][]string, bopts table.Options) *table.
}, 0)
}

tbl, err := table.CreateTable(filename, b.Finish(false), bopts)
tbl, err := table.CreateTable(filename, b)
require.NoError(t, err)
return tbl
}
Expand Down
9 changes: 8 additions & 1 deletion options.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/dgraph-io/badger/v2/options"
"github.com/dgraph-io/badger/v2/table"
"github.com/dgraph-io/badger/v2/y"
)

// Note: If you add a new option X make sure you also add a WithX method on Options.
Expand Down Expand Up @@ -161,7 +162,10 @@ func DefaultOptions(path string) Options {
}
}

func buildTableOptions(opt Options) table.Options {
func buildTableOptions(db *DB) table.Options {
opt := db.opt
dk, err := db.registry.LatestDataKey()
y.Check(err)
return table.Options{
SyncWrites: opt.SyncWrites,
ReadOnly: opt.ReadOnly,
Expand All @@ -171,6 +175,9 @@ func buildTableOptions(opt Options) table.Options {
ChkMode: opt.ChecksumVerificationMode,
Compression: opt.Compression,
ZSTDCompressionLevel: opt.ZSTDCompressionLevel,
BlockCache: db.blockCache,
IndexCache: db.indexCache,
DataKey: dk,
}
}

Expand Down
Loading