Skip to content

Commit

Permalink
opt(memory): Use z.Calloc for allocating KVList (#1563)
Browse files Browse the repository at this point in the history
KVs can take up a lot of memory in the stream framework. With this change, we allocate them using z.Allocator, and allow callers to KeyToList to use the allocator to generate KVs as well. After we call Send, we release them.

Also change Stream Framework to spit out StreamDone markers.
  • Loading branch information
manishrjain authored Oct 13, 2020
1 parent 928087d commit 8d26d52
Show file tree
Hide file tree
Showing 12 changed files with 241 additions and 74 deletions.
6 changes: 6 additions & 0 deletions backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,17 @@ func (stream *Stream) Backup(w io.Writer, since uint64) (uint64, error) {

var maxVersion uint64
stream.Send = func(list *pb.KVList) error {
out := list.Kv[:0]
for _, kv := range list.Kv {
if maxVersion < kv.Version {
maxVersion = kv.Version
}
if !kv.StreamDone {
// Don't pick stream done changes.
out = append(out, kv)
}
}
list.Kv = out
return writeTo(list, w)
}

Expand Down
25 changes: 21 additions & 4 deletions badger/cmd/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package cmd

import (
"fmt"
"io"
"math"
"os"
Expand Down Expand Up @@ -53,6 +54,8 @@ func init() {
streamCmd.Flags().Uint32VarP(&compressionType, "compression", "", 0,
"Option to configure the compression type in output DB. "+
"0 to disable, 1 for Snappy, and 2 for ZSTD.")
streamCmd.Flags().StringVarP(&keyPath, "encryption-key-file", "e", "",
"Path of the encryption key file.")
}

func stream(cmd *cobra.Command, args []string) error {
Expand All @@ -74,23 +77,37 @@ func stream(cmd *cobra.Command, args []string) error {
if numVersions <= 0 {
numVersions = math.MaxInt32
}
encKey, err := getKey(keyPath)
if err != nil {
return err
}
inOpt := badger.DefaultOptions(sstDir).
WithReadOnly(readOnly).
WithValueThreshold(1 << 10 /* 1KB */).
WithNumVersionsToKeep(numVersions)
WithNumVersionsToKeep(numVersions).
WithBlockCacheSize(100 << 20).
WithIndexCacheSize(200 << 20).
WithEncryptionKey(encKey)

// Options for output DB.
if compressionType < 0 || compressionType > 2 {
return errors.Errorf(
"compression value must be one of 0 (disabled), 1 (Snappy), or 2 (ZSTD)")
}
outOpt := inOpt.WithDir(outDir).WithValueDir(outDir).
WithCompression(options.CompressionType(compressionType)).WithReadOnly(false)
outOpt := inOpt.
WithDir(outDir).
WithValueDir(outDir).
WithNumVersionsToKeep(numVersions).
WithCompression(options.CompressionType(compressionType)).
WithReadOnly(false)

inDB, err := badger.OpenManaged(inOpt)
if err != nil {
return y.Wrapf(err, "cannot open DB at %s", sstDir)
}
defer inDB.Close()
return inDB.StreamDB(outOpt)

err = inDB.StreamDB(outOpt)
fmt.Println("Done.")
return err
}
13 changes: 13 additions & 0 deletions badger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"runtime"

"github.com/dgraph-io/badger/v2/badger/cmd"
"github.com/dgraph-io/ristretto/z"
"github.com/dustin/go-humanize"
)

func main() {
Expand All @@ -38,5 +40,16 @@ func main() {
}()
runtime.SetBlockProfileRate(100)
runtime.GOMAXPROCS(128)

out := z.CallocNoRef(1)
fmt.Printf("jemalloc enabled: %v\n", len(out) > 0)
z.StatsPrint()
z.Free(out)

cmd.Execute()
fmt.Printf("Num Allocated Bytes at program end: %s\n",
humanize.IBytes(uint64(z.NumAllocBytes())))
if z.NumAllocBytes() > 0 {
z.PrintLeaks()
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ go 1.12
require (
github.com/DataDog/zstd v1.4.1
github.com/cespare/xxhash v1.1.0
github.com/dgraph-io/ristretto v0.0.4-0.20201007164332-9739cfa2564b
github.com/dgraph-io/ristretto v0.0.4-0.20201013194302-6d6fac64beae
github.com/dustin/go-humanize v1.0.0
github.com/golang/protobuf v1.3.1
github.com/golang/snappy v0.0.1
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/ristretto v0.0.4-0.20201007164332-9739cfa2564b h1:CfEwAh85rqfhhE/DrDCbFdcwVpZ8ESnFRhLatiKdDYM=
github.com/dgraph-io/ristretto v0.0.4-0.20201007164332-9739cfa2564b/go.mod h1:bDI4cDaalvYSji3vBVDKrn9ouDZrwN974u8ZO/AhYXs=
github.com/dgraph-io/ristretto v0.0.4-0.20201012224315-0af15dd47cb9 h1:/E/ew7/iVTZnsWhUGi+dNpcWl+4D6FfojPmqX7nghWI=
github.com/dgraph-io/ristretto v0.0.4-0.20201012224315-0af15dd47cb9/go.mod h1:bDI4cDaalvYSji3vBVDKrn9ouDZrwN974u8ZO/AhYXs=
github.com/dgraph-io/ristretto v0.0.4-0.20201013194302-6d6fac64beae h1:yh5085twGpsgfuu56DXKOM3SKyZKQPskJIoMNb3jzos=
github.com/dgraph-io/ristretto v0.0.4-0.20201013194302-6d6fac64beae/go.mod h1:bDI4cDaalvYSji3vBVDKrn9ouDZrwN974u8ZO/AhYXs=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
Expand Down
115 changes: 76 additions & 39 deletions pb/badgerpb2.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pb/badgerpb2.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ message KV {

message KVList {
repeated KV kv = 1;

// alloc_ref used internally for memory management.
uint64 alloc_ref = 10;
}

message ManifestChangeSet {
Expand Down
Loading

0 comments on commit 8d26d52

Please sign in to comment.