From 8d26d52f694f3c1a4d4b649d61b286571234159a Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Tue, 13 Oct 2020 14:31:31 -0700 Subject: [PATCH] opt(memory): Use z.Calloc for allocating KVList (#1563) KVs can take up a lot of memory in the stream framework. With this change, we allocate them using z.Allocator, and allow callers to KeyToList to use the allocator to generate KVs as well. After we call Send, we release them. Also change Stream Framework to spit out StreamDone markers. --- backup.go | 6 +++ badger/cmd/stream.go | 25 ++++++++-- badger/main.go | 13 +++++ go.mod | 2 +- go.sum | 6 ++- pb/badgerpb2.pb.go | 115 ++++++++++++++++++++++++++++--------------- pb/badgerpb2.proto | 3 ++ stream.go | 103 +++++++++++++++++++++++++++++++------- stream_test.go | 11 ++++- stream_writer.go | 2 +- test.sh | 17 ++++--- y/y.go | 12 +++++ 12 files changed, 241 insertions(+), 74 deletions(-) diff --git a/backup.go b/backup.go index 3c6242167..1c0032401 100644 --- a/backup.go +++ b/backup.go @@ -116,11 +116,17 @@ func (stream *Stream) Backup(w io.Writer, since uint64) (uint64, error) { var maxVersion uint64 stream.Send = func(list *pb.KVList) error { + out := list.Kv[:0] for _, kv := range list.Kv { if maxVersion < kv.Version { maxVersion = kv.Version } + if !kv.StreamDone { + // Don't pick stream done changes. + out = append(out, kv) + } } + list.Kv = out return writeTo(list, w) } diff --git a/badger/cmd/stream.go b/badger/cmd/stream.go index 05c6cf14e..da6ee875a 100644 --- a/badger/cmd/stream.go +++ b/badger/cmd/stream.go @@ -17,6 +17,7 @@ package cmd import ( + "fmt" "io" "math" "os" @@ -53,6 +54,8 @@ func init() { streamCmd.Flags().Uint32VarP(&compressionType, "compression", "", 0, "Option to configure the compression type in output DB. "+ "0 to disable, 1 for Snappy, and 2 for ZSTD.") + streamCmd.Flags().StringVarP(&keyPath, "encryption-key-file", "e", "", + "Path of the encryption key file.") } func stream(cmd *cobra.Command, args []string) error { @@ -74,23 +77,37 @@ func stream(cmd *cobra.Command, args []string) error { if numVersions <= 0 { numVersions = math.MaxInt32 } + encKey, err := getKey(keyPath) + if err != nil { + return err + } inOpt := badger.DefaultOptions(sstDir). WithReadOnly(readOnly). WithValueThreshold(1 << 10 /* 1KB */). - WithNumVersionsToKeep(numVersions) + WithNumVersionsToKeep(numVersions). + WithBlockCacheSize(100 << 20). + WithIndexCacheSize(200 << 20). + WithEncryptionKey(encKey) // Options for output DB. if compressionType < 0 || compressionType > 2 { return errors.Errorf( "compression value must be one of 0 (disabled), 1 (Snappy), or 2 (ZSTD)") } - outOpt := inOpt.WithDir(outDir).WithValueDir(outDir). - WithCompression(options.CompressionType(compressionType)).WithReadOnly(false) + outOpt := inOpt. + WithDir(outDir). + WithValueDir(outDir). + WithNumVersionsToKeep(numVersions). + WithCompression(options.CompressionType(compressionType)). + WithReadOnly(false) inDB, err := badger.OpenManaged(inOpt) if err != nil { return y.Wrapf(err, "cannot open DB at %s", sstDir) } defer inDB.Close() - return inDB.StreamDB(outOpt) + + err = inDB.StreamDB(outOpt) + fmt.Println("Done.") + return err } diff --git a/badger/main.go b/badger/main.go index 1d6bcd65e..43fe27cf9 100644 --- a/badger/main.go +++ b/badger/main.go @@ -23,6 +23,8 @@ import ( "runtime" "github.com/dgraph-io/badger/v2/badger/cmd" + "github.com/dgraph-io/ristretto/z" + "github.com/dustin/go-humanize" ) func main() { @@ -38,5 +40,16 @@ func main() { }() runtime.SetBlockProfileRate(100) runtime.GOMAXPROCS(128) + + out := z.CallocNoRef(1) + fmt.Printf("jemalloc enabled: %v\n", len(out) > 0) + z.StatsPrint() + z.Free(out) + cmd.Execute() + fmt.Printf("Num Allocated Bytes at program end: %s\n", + humanize.IBytes(uint64(z.NumAllocBytes()))) + if z.NumAllocBytes() > 0 { + z.PrintLeaks() + } } diff --git a/go.mod b/go.mod index a0a0fb988..3208b4b91 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ go 1.12 require ( github.com/DataDog/zstd v1.4.1 github.com/cespare/xxhash v1.1.0 - github.com/dgraph-io/ristretto v0.0.4-0.20201007164332-9739cfa2564b + github.com/dgraph-io/ristretto v0.0.4-0.20201013194302-6d6fac64beae github.com/dustin/go-humanize v1.0.0 github.com/golang/protobuf v1.3.1 github.com/golang/snappy v0.0.1 diff --git a/go.sum b/go.sum index ccd9e6fb0..c64123ee0 100644 --- a/go.sum +++ b/go.sum @@ -13,8 +13,10 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dgraph-io/ristretto v0.0.4-0.20201007164332-9739cfa2564b h1:CfEwAh85rqfhhE/DrDCbFdcwVpZ8ESnFRhLatiKdDYM= -github.com/dgraph-io/ristretto v0.0.4-0.20201007164332-9739cfa2564b/go.mod h1:bDI4cDaalvYSji3vBVDKrn9ouDZrwN974u8ZO/AhYXs= +github.com/dgraph-io/ristretto v0.0.4-0.20201012224315-0af15dd47cb9 h1:/E/ew7/iVTZnsWhUGi+dNpcWl+4D6FfojPmqX7nghWI= +github.com/dgraph-io/ristretto v0.0.4-0.20201012224315-0af15dd47cb9/go.mod h1:bDI4cDaalvYSji3vBVDKrn9ouDZrwN974u8ZO/AhYXs= +github.com/dgraph-io/ristretto v0.0.4-0.20201013194302-6d6fac64beae h1:yh5085twGpsgfuu56DXKOM3SKyZKQPskJIoMNb3jzos= +github.com/dgraph-io/ristretto v0.0.4-0.20201013194302-6d6fac64beae/go.mod h1:bDI4cDaalvYSji3vBVDKrn9ouDZrwN974u8ZO/AhYXs= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= diff --git a/pb/badgerpb2.pb.go b/pb/badgerpb2.pb.go index ba6f93b41..6da2cf853 100644 --- a/pb/badgerpb2.pb.go +++ b/pb/badgerpb2.pb.go @@ -200,7 +200,9 @@ func (m *KV) GetStreamDone() bool { } type KVList struct { - Kv []*KV `protobuf:"bytes,1,rep,name=kv,proto3" json:"kv,omitempty"` + Kv []*KV `protobuf:"bytes,1,rep,name=kv,proto3" json:"kv,omitempty"` + // alloc_ref used internally for memory management. + AllocRef uint64 `protobuf:"varint,10,opt,name=alloc_ref,json=allocRef,proto3" json:"alloc_ref,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -246,6 +248,13 @@ func (m *KVList) GetKv() []*KV { return nil } +func (m *KVList) GetAllocRef() uint64 { + if m != nil { + return m.AllocRef + } + return 0 +} + type ManifestChangeSet struct { // A set of changes that are applied atomically. Changes []*ManifestChange `protobuf:"bytes,1,rep,name=changes,proto3" json:"changes,omitempty"` @@ -522,44 +531,45 @@ func init() { func init() { proto.RegisterFile("badgerpb2.proto", fileDescriptor_e63e84f9f0d3998c) } var fileDescriptor_e63e84f9f0d3998c = []byte{ - // 589 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x93, 0xcf, 0x6e, 0xda, 0x4e, - 0x10, 0xc7, 0x59, 0xe3, 0xf0, 0x67, 0x48, 0x08, 0xbf, 0xd5, 0xaf, 0x92, 0xa3, 0x2a, 0x94, 0x3a, - 0xaa, 0x8a, 0x2a, 0x15, 0x54, 0xa8, 0x7a, 0x27, 0x04, 0x29, 0x88, 0x44, 0x91, 0xb6, 0x51, 0x14, - 0xf5, 0x82, 0x16, 0x7b, 0x0a, 0x16, 0xf8, 0x8f, 0x76, 0x17, 0xab, 0x3c, 0x44, 0xef, 0x7d, 0xa4, - 0x1e, 0x7b, 0xe8, 0x03, 0x54, 0xe9, 0x8b, 0x54, 0xbb, 0x36, 0x14, 0x0e, 0xbd, 0xcd, 0x7c, 0xe7, - 0xbb, 0x3b, 0xe3, 0xcf, 0x8e, 0xe1, 0x74, 0xc6, 0xfd, 0x39, 0x8a, 0x64, 0xd6, 0xeb, 0x24, 0x22, - 0x56, 0x31, 0xad, 0xee, 0x04, 0xf7, 0x27, 0x01, 0x6b, 0xf2, 0x40, 0x1b, 0x50, 0x5c, 0xe2, 0xc6, - 0x21, 0x2d, 0xd2, 0x3e, 0x66, 0x3a, 0xa4, 0xff, 0xc3, 0x51, 0xca, 0x57, 0x6b, 0x74, 0x2c, 0xa3, - 0x65, 0x09, 0x7d, 0x0e, 0xd5, 0xb5, 0x44, 0x31, 0x0d, 0x51, 0x71, 0xa7, 0x68, 0x2a, 0x15, 0x2d, - 0xdc, 0xa2, 0xe2, 0xd4, 0x81, 0x72, 0x8a, 0x42, 0x06, 0x71, 0xe4, 0xd8, 0x2d, 0xd2, 0xb6, 0xd9, - 0x36, 0xa5, 0xe7, 0x00, 0xf8, 0x25, 0x09, 0x04, 0xca, 0x29, 0x57, 0xce, 0x91, 0x29, 0x56, 0x73, - 0x65, 0xa0, 0x28, 0x05, 0xdb, 0x5c, 0x58, 0x32, 0x17, 0x9a, 0x58, 0x77, 0x92, 0x4a, 0x20, 0x0f, - 0xa7, 0x81, 0xef, 0x40, 0x8b, 0xb4, 0x4f, 0x58, 0x25, 0x13, 0xc6, 0x3e, 0x7d, 0x01, 0xb5, 0xbc, - 0xe8, 0xc7, 0x11, 0x3a, 0xb5, 0x16, 0x69, 0x57, 0x18, 0x64, 0xd2, 0x55, 0x1c, 0xa1, 0xfb, 0x1a, - 0x4a, 0x93, 0x87, 0x9b, 0x40, 0x2a, 0x7a, 0x0e, 0xd6, 0x32, 0x75, 0x48, 0xab, 0xd8, 0xae, 0xf5, - 0x4e, 0x3a, 0x7f, 0x49, 0x4c, 0x1e, 0x98, 0xb5, 0x4c, 0xdd, 0x6b, 0xf8, 0xef, 0x96, 0x47, 0xc1, - 0x67, 0x94, 0x6a, 0xb8, 0xe0, 0xd1, 0x1c, 0x3f, 0xa2, 0xa2, 0x7d, 0x28, 0x7b, 0x26, 0x91, 0xf9, - 0xc1, 0xb3, 0xbd, 0x83, 0x87, 0x76, 0xb6, 0x75, 0xba, 0x5f, 0x2d, 0xa8, 0x1f, 0xd6, 0x68, 0x1d, - 0xac, 0xb1, 0x6f, 0xa0, 0xda, 0xcc, 0x1a, 0xfb, 0xb4, 0x0f, 0xd6, 0x5d, 0x62, 0x80, 0xd6, 0x7b, - 0x17, 0xff, 0xbc, 0xb2, 0x73, 0x97, 0xa0, 0xe0, 0x2a, 0x88, 0x23, 0x66, 0xdd, 0x25, 0xfa, 0x21, - 0x6e, 0x30, 0xc5, 0x95, 0xc1, 0x7d, 0xc2, 0xb2, 0x84, 0x3e, 0x83, 0xd2, 0x12, 0x37, 0x9a, 0x4d, - 0x86, 0xfa, 0x68, 0x89, 0x9b, 0xb1, 0x4f, 0x2f, 0xe1, 0x14, 0x23, 0x4f, 0x6c, 0x12, 0x7d, 0x7c, - 0xca, 0x57, 0xf3, 0xd8, 0xd0, 0xae, 0x1f, 0x7c, 0xc1, 0x68, 0xe7, 0x18, 0xac, 0xe6, 0x31, 0xab, - 0xe3, 0x41, 0x4e, 0x5b, 0x50, 0xf3, 0xe2, 0x30, 0x11, 0x28, 0xcd, 0x53, 0x96, 0x4c, 0xdb, 0x7d, - 0xc9, 0xbd, 0x80, 0xea, 0x6e, 0x46, 0x0a, 0x50, 0x1a, 0xb2, 0xd1, 0xe0, 0x7e, 0xd4, 0x28, 0xe8, - 0xf8, 0x6a, 0x74, 0x33, 0xba, 0x1f, 0x35, 0x88, 0x9b, 0x42, 0x65, 0xb8, 0x40, 0x6f, 0x29, 0xd7, - 0x21, 0x7d, 0x07, 0xb6, 0x99, 0x85, 0x98, 0x59, 0xce, 0xf7, 0x66, 0xd9, 0x5a, 0x3a, 0xba, 0xb5, - 0x08, 0xd4, 0x22, 0x64, 0xc6, 0xaa, 0x37, 0x52, 0xae, 0x43, 0x03, 0xcb, 0x66, 0x3a, 0x74, 0x5f, - 0x41, 0x75, 0x67, 0xca, 0xba, 0x0e, 0xfb, 0xbd, 0x61, 0xa3, 0x40, 0x8f, 0xa1, 0xf2, 0xf8, 0x78, - 0xcd, 0xe5, 0xe2, 0xc3, 0xfb, 0x06, 0x71, 0x3d, 0x28, 0x5f, 0x71, 0xc5, 0x27, 0xb8, 0xd9, 0x83, - 0x44, 0xf6, 0x21, 0x51, 0xb0, 0x7d, 0xae, 0x78, 0xbe, 0xd9, 0x26, 0xd6, 0x4f, 0x15, 0xa4, 0xf9, - 0x46, 0x5b, 0x41, 0xaa, 0x37, 0xd6, 0x13, 0xc8, 0x15, 0xfa, 0x7a, 0x63, 0x35, 0xe3, 0x22, 0xab, - 0xe6, 0xca, 0x40, 0xbd, 0x39, 0x83, 0xfa, 0x21, 0x45, 0x5a, 0x86, 0x22, 0x47, 0xd9, 0x28, 0x5c, - 0xf6, 0xbf, 0x3f, 0x35, 0xc9, 0x8f, 0xa7, 0x26, 0xf9, 0xf5, 0xd4, 0x24, 0xdf, 0x7e, 0x37, 0x0b, - 0x9f, 0x5e, 0xce, 0x03, 0xb5, 0x58, 0xcf, 0x3a, 0x5e, 0x1c, 0x76, 0xfd, 0xb9, 0xe0, 0xc9, 0xe2, - 0x6d, 0x10, 0x77, 0x33, 0x06, 0xdd, 0xb4, 0xd7, 0x4d, 0x66, 0xb3, 0x92, 0xf9, 0x31, 0xfb, 0x7f, - 0x02, 0x00, 0x00, 0xff, 0xff, 0x1d, 0xc3, 0x11, 0xa3, 0xab, 0x03, 0x00, 0x00, + // 604 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x53, 0xdd, 0x6e, 0xda, 0x4c, + 0x10, 0x65, 0x8d, 0xc3, 0xcf, 0x90, 0x10, 0xbe, 0xd5, 0x57, 0xc9, 0x51, 0x15, 0x4a, 0x1d, 0x55, + 0x42, 0x95, 0x0a, 0x2a, 0x54, 0xbd, 0x27, 0x80, 0x14, 0x44, 0xa2, 0x48, 0xdb, 0x28, 0x8a, 0x7a, + 0x83, 0x16, 0x7b, 0x02, 0x16, 0xf8, 0x47, 0xbb, 0x8b, 0x55, 0x1e, 0xa2, 0xf7, 0x7d, 0xa4, 0x5e, + 0xf6, 0xa2, 0x0f, 0x50, 0xa5, 0x2f, 0x52, 0xed, 0xda, 0xa1, 0x70, 0xd1, 0xbb, 0x99, 0x33, 0xe3, + 0x39, 0xe3, 0x73, 0x66, 0xe1, 0x74, 0xce, 0xfd, 0x05, 0x8a, 0x64, 0xde, 0xeb, 0x24, 0x22, 0x56, + 0x31, 0xad, 0xee, 0x00, 0xf7, 0x27, 0x01, 0x6b, 0x7a, 0x4f, 0x1b, 0x50, 0x5c, 0xe1, 0xd6, 0x21, + 0x2d, 0xd2, 0x3e, 0x66, 0x3a, 0xa4, 0xff, 0xc3, 0x51, 0xca, 0xd7, 0x1b, 0x74, 0x2c, 0x83, 0x65, + 0x09, 0x7d, 0x09, 0xd5, 0x8d, 0x44, 0x31, 0x0b, 0x51, 0x71, 0xa7, 0x68, 0x2a, 0x15, 0x0d, 0xdc, + 0xa0, 0xe2, 0xd4, 0x81, 0x72, 0x8a, 0x42, 0x06, 0x71, 0xe4, 0xd8, 0x2d, 0xd2, 0xb6, 0xd9, 0x73, + 0x4a, 0xcf, 0x01, 0xf0, 0x4b, 0x12, 0x08, 0x94, 0x33, 0xae, 0x9c, 0x23, 0x53, 0xac, 0xe6, 0xc8, + 0x40, 0x51, 0x0a, 0xb6, 0x19, 0x58, 0x32, 0x03, 0x4d, 0xac, 0x99, 0xa4, 0x12, 0xc8, 0xc3, 0x59, + 0xe0, 0x3b, 0xd0, 0x22, 0xed, 0x13, 0x56, 0xc9, 0x80, 0x89, 0x4f, 0x5f, 0x41, 0x2d, 0x2f, 0xfa, + 0x71, 0x84, 0x4e, 0xad, 0x45, 0xda, 0x15, 0x06, 0x19, 0x34, 0x8a, 0x23, 0x74, 0x47, 0x50, 0x9a, + 0xde, 0x5f, 0x07, 0x52, 0xd1, 0x73, 0xb0, 0x56, 0xa9, 0x43, 0x5a, 0xc5, 0x76, 0xad, 0x77, 0xd2, + 0xf9, 0xab, 0xc4, 0xf4, 0x9e, 0x59, 0xab, 0x54, 0xd3, 0xf0, 0xf5, 0x3a, 0xf6, 0x66, 0x02, 0x1f, + 0x0d, 0x8d, 0xcd, 0x2a, 0x06, 0x60, 0xf8, 0xe8, 0x5e, 0xc1, 0x7f, 0x37, 0x3c, 0x0a, 0x1e, 0x51, + 0xaa, 0xe1, 0x92, 0x47, 0x0b, 0xfc, 0x84, 0x8a, 0xf6, 0xa1, 0xec, 0x99, 0x44, 0xe6, 0x53, 0xcf, + 0xf6, 0xa6, 0x1e, 0xb6, 0xb3, 0xe7, 0x4e, 0xf7, 0xab, 0x05, 0xf5, 0xc3, 0x1a, 0xad, 0x83, 0x35, + 0xf1, 0x8d, 0xe2, 0x36, 0xb3, 0x26, 0x3e, 0xed, 0x83, 0x75, 0x9b, 0x18, 0xb5, 0xeb, 0xbd, 0x8b, + 0x7f, 0x8e, 0xec, 0xdc, 0x26, 0x28, 0xb8, 0x0a, 0xe2, 0x88, 0x59, 0xb7, 0x89, 0x76, 0xe9, 0x1a, + 0x53, 0x5c, 0x1b, 0x2f, 0x4e, 0x58, 0x96, 0xd0, 0x17, 0x50, 0x5a, 0xe1, 0x56, 0x0b, 0x97, 0xf9, + 0x70, 0xb4, 0xc2, 0xed, 0xc4, 0xa7, 0x97, 0x70, 0x8a, 0x91, 0x27, 0xb6, 0x89, 0xfe, 0x7c, 0xc6, + 0xd7, 0x8b, 0xd8, 0x58, 0x51, 0x3f, 0xf8, 0x83, 0xf1, 0xae, 0x63, 0xb0, 0x5e, 0xc4, 0xac, 0x8e, + 0x07, 0x39, 0x6d, 0x41, 0xcd, 0x8b, 0xc3, 0x44, 0xa0, 0x34, 0x3e, 0x97, 0x0c, 0xed, 0x3e, 0xe4, + 0x5e, 0x40, 0x75, 0xb7, 0x23, 0x05, 0x28, 0x0d, 0xd9, 0x78, 0x70, 0x37, 0x6e, 0x14, 0x74, 0x3c, + 0x1a, 0x5f, 0x8f, 0xef, 0xc6, 0x0d, 0xe2, 0xa6, 0x50, 0x19, 0x2e, 0xd1, 0x5b, 0xc9, 0x4d, 0x48, + 0xdf, 0x83, 0x6d, 0x76, 0x21, 0x66, 0x97, 0xf3, 0xbd, 0x5d, 0x9e, 0x5b, 0x3a, 0x9a, 0x5a, 0x04, + 0x6a, 0x19, 0x32, 0xd3, 0xaa, 0xcf, 0x55, 0x6e, 0x42, 0x23, 0x96, 0xcd, 0x74, 0xe8, 0xbe, 0x81, + 0xea, 0xae, 0x29, 0x63, 0x1d, 0xf6, 0x7b, 0xc3, 0x46, 0x81, 0x1e, 0x43, 0xe5, 0xe1, 0xe1, 0x8a, + 0xcb, 0xe5, 0xc7, 0x0f, 0x0d, 0xe2, 0x7a, 0x50, 0x1e, 0x71, 0xc5, 0xa7, 0xb8, 0xdd, 0x13, 0x89, + 0xec, 0x8b, 0x44, 0xc1, 0xf6, 0xb9, 0xe2, 0xf9, 0xd9, 0x9b, 0x58, 0x5b, 0x15, 0xa4, 0xf9, 0xb9, + 0x5b, 0x41, 0xaa, 0xcf, 0xd9, 0x13, 0xc8, 0x15, 0xfa, 0xfa, 0x9c, 0xb5, 0xc6, 0x45, 0x56, 0xcd, + 0x91, 0x81, 0x7a, 0x7b, 0x06, 0xf5, 0x43, 0x15, 0x69, 0x19, 0x8a, 0x1c, 0x65, 0xa3, 0x70, 0xd9, + 0xff, 0xfe, 0xd4, 0x24, 0x3f, 0x9e, 0x9a, 0xe4, 0xd7, 0x53, 0x93, 0x7c, 0xfb, 0xdd, 0x2c, 0x7c, + 0x7e, 0xbd, 0x08, 0xd4, 0x72, 0x33, 0xef, 0x78, 0x71, 0xd8, 0xf5, 0x17, 0x82, 0x27, 0xcb, 0x77, + 0x41, 0xdc, 0xcd, 0x34, 0xe8, 0xa6, 0xbd, 0x6e, 0x32, 0x9f, 0x97, 0xcc, 0xab, 0xed, 0xff, 0x09, + 0x00, 0x00, 0xff, 0xff, 0xa7, 0xb8, 0x6c, 0x4c, 0xc8, 0x03, 0x00, 0x00, } func (m *KV) Marshal() (dAtA []byte, err error) { @@ -666,6 +676,11 @@ func (m *KVList) MarshalToSizedBuffer(dAtA []byte) (int, error) { i -= len(m.XXX_unrecognized) copy(dAtA[i:], m.XXX_unrecognized) } + if m.AllocRef != 0 { + i = encodeVarintBadgerpb2(dAtA, i, uint64(m.AllocRef)) + i-- + dAtA[i] = 0x50 + } if len(m.Kv) > 0 { for iNdEx := len(m.Kv) - 1; iNdEx >= 0; iNdEx-- { { @@ -932,6 +947,9 @@ func (m *KVList) Size() (n int) { n += 1 + l + sovBadgerpb2(uint64(l)) } } + if m.AllocRef != 0 { + n += 1 + sovBadgerpb2(uint64(m.AllocRef)) + } if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } @@ -1366,6 +1384,25 @@ func (m *KVList) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 10: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field AllocRef", wireType) + } + m.AllocRef = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBadgerpb2 + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.AllocRef |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipBadgerpb2(dAtA[iNdEx:]) diff --git a/pb/badgerpb2.proto b/pb/badgerpb2.proto index 862e1477c..821185d44 100644 --- a/pb/badgerpb2.proto +++ b/pb/badgerpb2.proto @@ -37,6 +37,9 @@ message KV { message KVList { repeated KV kv = 1; + + // alloc_ref used internally for memory management. + uint64 alloc_ref = 10; } message ManifestChangeSet { diff --git a/stream.go b/stream.go index fdbf76580..d6c5e4317 100644 --- a/stream.go +++ b/stream.go @@ -26,6 +26,7 @@ import ( "github.com/dgraph-io/badger/v2/pb" "github.com/dgraph-io/badger/v2/y" + "github.com/dgraph-io/ristretto/z" humanize "github.com/dustin/go-humanize" "github.com/golang/protobuf/proto" ) @@ -68,6 +69,12 @@ type Stream struct { // with a mismatching key. See example usage in ToList function. Can be left nil to use ToList // function by default. // + // KeyToList has access to z.Allocator accessible via stream.Allocator(itr.ThreadId). This + // allocator can be used to allocate KVs, to decrease the memory pressure on Go GC. Stream + // framework takes care of releasing those resources after calling Send. AllocRef does + // NOT need to be set in the returned KVList, as Stream framework would ignore that field, + // instead using the allocator assigned to that thread id. + // // Note: Calls to KeyToList are concurrent. KeyToList func(key []byte, itr *Iterator) (*pb.KVList, error) @@ -80,11 +87,24 @@ type Stream struct { rangeCh chan keyRange kvChan chan *pb.KVList nextStreamId uint32 + + // Use allocators to generate KVs. + allocatorsMu sync.RWMutex + allocators map[int]*z.Allocator +} + +func (st *Stream) Allocator(threadId int) *z.Allocator { + st.allocatorsMu.RLock() + defer st.allocatorsMu.RUnlock() + return st.allocators[threadId] } // ToList is a default implementation of KeyToList. It picks up all valid versions of the key, // skipping over deleted or expired keys. func (st *Stream) ToList(key []byte, itr *Iterator) (*pb.KVList, error) { + alloc := st.Allocator(itr.ThreadId) + ka := alloc.Copy(key) + list := &pb.KVList{} for ; itr.Valid(); itr.Next() { item := itr.Item() @@ -96,17 +116,20 @@ func (st *Stream) ToList(key []byte, itr *Iterator) (*pb.KVList, error) { break } - valCopy, err := item.ValueCopy(nil) - if err != nil { + kv := y.NewKV(alloc) + kv.Key = ka + + if err := item.Value(func(val []byte) error { + kv.Value = alloc.Copy(val) + return nil + + }); err != nil { return nil, err } - kv := &pb.KV{ - Key: item.KeyCopy(nil), - Value: valCopy, - UserMeta: []byte{item.UserMeta()}, - Version: item.Version(), - ExpiresAt: item.ExpiresAt(), - } + kv.Version = item.Version() + kv.ExpiresAt = item.ExpiresAt() + kv.UserMeta = alloc.Copy([]byte{item.UserMeta()}) + list.Kv = append(list.Kv, kv) if st.db.opt.NumVersionsToKeep == 1 { break @@ -151,6 +174,21 @@ func (st *Stream) produceRanges(ctx context.Context) { close(st.rangeCh) } +func (st *Stream) newAllocator(threadId int) *z.Allocator { + st.allocatorsMu.Lock() + var a *z.Allocator + if cur, ok := st.allocators[threadId]; ok && cur.Size() == 0 { + a = cur // Reuse. + } else { + // Current allocator has been used already. Create a new one. + a = z.NewAllocator(batchSize) + // a.Tag = fmt.Sprintf("Stream %d: %s", threadId, st.LogPrefix) + st.allocators[threadId] = a + } + st.allocatorsMu.Unlock() + return a +} + // produceKVs picks up ranges from rangeCh, generates KV lists and sends them to kvChan. func (st *Stream) produceKVs(ctx context.Context, threadId int) error { var size int @@ -175,6 +213,7 @@ func (st *Stream) produceKVs(ctx context.Context, threadId int) error { streamId := atomic.AddUint32(&st.nextStreamId, 1) outList := new(pb.KVList) + outList.AllocRef = st.newAllocator(threadId).Ref sendIt := func() error { select { @@ -183,9 +222,11 @@ func (st *Stream) produceKVs(ctx context.Context, threadId int) error { return ctx.Err() } outList = new(pb.KVList) + outList.AllocRef = st.newAllocator(threadId).Ref size = 0 return nil } + var prevKey []byte for itr.Seek(kr.left); itr.Valid(); { // it.Valid would only return true for keys with the provided Prefix in iterOpts. @@ -226,13 +267,12 @@ func (st *Stream) produceKVs(ctx context.Context, threadId int) error { } } } - if len(outList.Kv) > 0 { - // TODO: Think of a way to indicate that a stream is over. - if err := sendIt(); err != nil { - return err - } - } - return nil + // Mark the stream as done. + outList.Kv = append(outList.Kv, &pb.KV{ + StreamId: streamId, + StreamDone: true, + }) + return sendIt() } for { @@ -258,6 +298,13 @@ func (st *Stream) streamKVs(ctx context.Context) error { defer t.Stop() now := time.Now() + var allocs []*z.Allocator + defer func() { + for _, a := range allocs { + a.Release() + } + }() + sendBatch := func(batch *pb.KVList) error { sz := uint64(proto.Size(batch)) bytesSent += sz @@ -268,6 +315,11 @@ func (st *Stream) streamKVs(ctx context.Context) error { } st.db.opt.Infof("%s Created batch of size: %s in %s.\n", st.LogPrefix, humanize.Bytes(sz), time.Since(t)) + + for _, a := range allocs { + a.Release() + } + allocs = allocs[:0] return nil } @@ -288,6 +340,8 @@ func (st *Stream) streamKVs(ctx context.Context) error { } y.AssertTrue(kvs != nil) batch.Kv = append(batch.Kv, kvs.Kv...) + allocs = append(allocs, z.AllocatorFrom(kvs.AllocRef)) + default: break loop } @@ -309,8 +363,9 @@ outer: continue } speed := bytesSent / durSec - st.db.opt.Infof("%s Time elapsed: %s, bytes sent: %s, speed: %s/sec\n", st.LogPrefix, - y.FixedDuration(dur), humanize.Bytes(bytesSent), humanize.Bytes(speed)) + st.db.opt.Infof("%s Time elapsed: %s, bytes sent: %s, speed: %s/sec, jemalloc: %s\n", + st.LogPrefix, y.FixedDuration(dur), humanize.IBytes(bytesSent), + humanize.IBytes(speed), humanize.IBytes(uint64(z.NumAllocBytes()))) case kvs, ok := <-st.kvChan: if !ok { @@ -318,6 +373,7 @@ outer: } y.AssertTrue(kvs != nil) batch = kvs + allocs = append(allocs, z.AllocatorFrom(kvs.AllocRef)) // Otherwise, slurp more keys into this batch. if err := slurp(batch); err != nil { @@ -385,11 +441,20 @@ func (st *Stream) Orchestrate(ctx context.Context) error { // Wait for key streaming to be over. err := <-kvErr + + for _, a := range st.allocators { + a.Release() + } return err } func (db *DB) newStream() *Stream { - return &Stream{db: db, NumGo: 16, LogPrefix: "Badger.Stream"} + return &Stream{ + db: db, + NumGo: 16, + LogPrefix: "Badger.Stream", + allocators: make(map[int]*z.Allocator), + } } // NewStream creates a new Stream. diff --git a/stream_test.go b/stream_test.go index f3c4b1d48..f0bd8bd0c 100644 --- a/stream_test.go +++ b/stream_test.go @@ -27,6 +27,8 @@ import ( bpb "github.com/dgraph-io/badger/v2/pb" "github.com/dgraph-io/badger/v2/y" + "github.com/dgraph-io/ristretto/z" + "github.com/golang/protobuf/proto" "github.com/stretchr/testify/require" ) @@ -50,7 +52,13 @@ type collector struct { } func (c *collector) Send(list *bpb.KVList) error { - c.kv = append(c.kv, list.Kv...) + for _, kv := range list.Kv { + if kv.StreamDone == true { + continue + } + cp := proto.Clone(kv).(*bpb.KV) + c.kv = append(c.kv, cp) + } return nil } @@ -156,6 +164,7 @@ func TestStream(t *testing.T) { require.Equal(t, 50, count, "Count mismatch for pred: %s", pred) } require.NoError(t, db.Close()) + require.Equal(t, int64(0), z.NumAllocBytes()) } func TestStreamWithThreadId(t *testing.T) { diff --git a/stream_writer.go b/stream_writer.go index ecdb079fc..71c934ae5 100644 --- a/stream_writer.go +++ b/stream_writer.go @@ -112,7 +112,7 @@ func (sw *StreamWriter) Write(kvs *pb.KVList) error { } e := &Entry{ Key: y.KeyWithTs(kv.Key, kv.Version), - Value: kv.Value, + Value: y.Copy(kv.Value), UserMeta: userMeta, ExpiresAt: kv.ExpiresAt, meta: meta, diff --git a/test.sh b/test.sh index 23bb660c2..fdf899e48 100755 --- a/test.sh +++ b/test.sh @@ -15,16 +15,19 @@ pushd badger go build -v . popd +# tags="-tags=jemalloc" +tags="" + # Run the memory intensive tests first. -go test -v -run='TestBigKeyValuePairs$' --manual=true -go test -v -run='TestPushValueLogLimit' --manual=true +go test -v $tags -run='TestBigKeyValuePairs$' --manual=true +go test -v $tags -run='TestPushValueLogLimit' --manual=true # Run the special Truncate test. rm -rf p -go test -v -run='TestTruncateVlogNoClose$' --manual=true +go test -v $tags -run='TestTruncateVlogNoClose$' --manual=true truncate --size=4096 p/000000.vlog -go test -v -run='TestTruncateVlogNoClose2$' --manual=true -go test -v -run='TestTruncateVlogNoClose3$' --manual=true +go test -v $tags -run='TestTruncateVlogNoClose2$' --manual=true +go test -v $tags -run='TestTruncateVlogNoClose3$' --manual=true rm -rf p # Run the normal tests. @@ -32,8 +35,8 @@ echo "==> Starting tests.. " # go test -timeout=25m -v -race github.com/dgraph-io/badger/v2/... for pkg in $packages; do echo "===> Testing $pkg" - go test -timeout=25m -v -race $pkg + go test $tags -timeout=25m -v -race $pkg done echo "===> Testing root level" -go test -timeout=25m -v . -race +go test $tags -timeout=25m -v . -race diff --git a/y/y.go b/y/y.go index cd75c3dfd..e527d42a1 100644 --- a/y/y.go +++ b/y/y.go @@ -29,6 +29,8 @@ import ( "time" "unsafe" + "github.com/dgraph-io/badger/v2/pb" + "github.com/dgraph-io/ristretto/z" "github.com/pkg/errors" ) @@ -465,3 +467,13 @@ func (r *PageBufferReader) Read(p []byte) (int, error) { return read, nil } + +const kvsz = int(unsafe.Sizeof(pb.KV{})) + +func NewKV(alloc *z.Allocator) *pb.KV { + if alloc == nil { + return &pb.KV{} + } + b := alloc.AllocateAligned(kvsz) + return (*pb.KV)(unsafe.Pointer(&b[0])) +}