From d8b2a0490754e3037b089e475b7557f097f74604 Mon Sep 17 00:00:00 2001 From: Yumin Xia Date: Mon, 29 Jul 2019 17:15:05 -0700 Subject: [PATCH 1/5] add rocksdb --- db.go | 5 + db/rocks_db.go | 342 ++++++++++++++++++++++++++++++++++++++++++++ db/rocks_db_test.go | 32 +++++ 3 files changed, 379 insertions(+) create mode 100644 db/rocks_db.go create mode 100644 db/rocks_db_test.go diff --git a/db.go b/db.go index d88df398c..cced5c9ac 100644 --- a/db.go +++ b/db.go @@ -32,6 +32,11 @@ const ( // - may be faster is some use-cases (random reads - indexer) // - use boltdb build tag (go build -tags boltdb) BoltDBBackend DBBackendType = "boltdb" + // RocksDBBackend represents rocksdb (uses tecbot's wrapper) + // - EXPERIMENTAL + // - requires gcc + // - use boltdb build tag (go build -tags rocksdb) + RocksDBBackend DBBackendType = "rocksdb" ) type dbCreator func(name string, dir string) (DB, error) diff --git a/db/rocks_db.go b/db/rocks_db.go new file mode 100644 index 000000000..500a6ee18 --- /dev/null +++ b/db/rocks_db.go @@ -0,0 +1,342 @@ +// +build rocksdb + +package db + +import ( + "bytes" + "fmt" + "path/filepath" + "runtime" + + "github.com/stumble/gorocksdb" +) + +func init() { + dbCreator := func(name string, dir string) (DB, error) { + return NewRocksDB(name, dir) + } + registerDBCreator(RocksDBBackend, dbCreator, false) +} + +var _ DB = (*RocksDB)(nil) + +type RocksDB struct { + db *gorocksdb.DB + ro *gorocksdb.ReadOptions + wo *gorocksdb.WriteOptions + woSync *gorocksdb.WriteOptions +} + +func NewRocksDB(name string, dir string) (*RocksDB, error) { + // default rocksdb option, good enough for most cases, including heavy workloads. + // 1GB table cache, 512MB write buffer(may use 50% more on heavy workloads). + // compression: snappy as default, need to -lsnappy to enable. + bbto := gorocksdb.NewDefaultBlockBasedTableOptions() + bbto.SetBlockCache(gorocksdb.NewLRUCache(1 << 30)) + bbto.SetFilterPolicy(gorocksdb.NewBloomFilter(10)) + + opts := gorocksdb.NewDefaultOptions() + opts.SetBlockBasedTableFactory(bbto) + opts.SetCreateIfMissing(true) + opts.IncreaseParallelism(runtime.NumCPU()) + // 1.5GB maximum memory use for writebuffer. + opts.OptimizeLevelStyleCompaction(512 * 1024 * 1024) + return NewRocksDBWithOptions(name, dir, opts) +} + +func NewRocksDBWithOptions(name string, dir string, opts *gorocksdb.Options) (*RocksDB, error) { + dbPath := filepath.Join(dir, name+".db") + db, err := gorocksdb.OpenDb(opts, dbPath) + if err != nil { + return nil, err + } + ro := gorocksdb.NewDefaultReadOptions() + wo := gorocksdb.NewDefaultWriteOptions() + woSync := gorocksdb.NewDefaultWriteOptions() + woSync.SetSync(true) + database := &RocksDB{ + db: db, + ro: ro, + wo: wo, + woSync: woSync, + } + return database, nil +} + +// Implements DB. +func (db *RocksDB) Get(key []byte) []byte { + key = nonNilBytes(key) + res, err := db.db.Get(db.ro, key) + if err != nil { + res.Free() + panic(err) + } + return moveSliceToBytes(res) +} + +// Implements DB. +func (db *RocksDB) Has(key []byte) bool { + return db.Get(key) != nil +} + +// Implements DB. +func (db *RocksDB) Set(key []byte, value []byte) { + key = nonNilBytes(key) + value = nonNilBytes(value) + err := db.db.Put(db.wo, key, value) + if err != nil { + panic(err) + } +} + +// Implements DB. +func (db *RocksDB) SetSync(key []byte, value []byte) { + key = nonNilBytes(key) + value = nonNilBytes(value) + err := db.db.Put(db.woSync, key, value) + if err != nil { + panic(err) + } +} + +// Implements DB. +func (db *RocksDB) Delete(key []byte) { + key = nonNilBytes(key) + err := db.db.Delete(db.wo, key) + if err != nil { + panic(err) + } +} + +// Implements DB. +func (db *RocksDB) DeleteSync(key []byte) { + key = nonNilBytes(key) + err := db.db.Delete(db.woSync, key) + if err != nil { + panic(err) + } +} + +func (db *RocksDB) DB() *gorocksdb.DB { + return db.db +} + +// Implements DB. +func (db *RocksDB) Close() { + db.ro.Destroy() + db.wo.Destroy() + db.woSync.Destroy() + db.db.Close() +} + +// Implements DB. +func (db *RocksDB) Print() { + itr := db.Iterator(nil, nil) + defer itr.Close() + for ; itr.Valid(); itr.Next() { + key := itr.Key() + value := itr.Value() + fmt.Printf("[%X]:\t[%X]\n", key, value) + } +} + +// Implements DB. +func (db *RocksDB) Stats() map[string]string { + keys := []string{"rocksdb.stats"} + stats := make(map[string]string, len(keys)) + for _, key := range keys { + stats[key] = db.db.GetProperty(key) + } + return stats +} + +//---------------------------------------- +// Batch + +// Implements DB. +func (db *RocksDB) NewBatch() Batch { + batch := gorocksdb.NewWriteBatch() + return &rocksDBBatch{db, batch} +} + +type rocksDBBatch struct { + db *RocksDB + batch *gorocksdb.WriteBatch +} + +// Implements Batch. +func (mBatch *rocksDBBatch) Set(key, value []byte) { + mBatch.batch.Put(key, value) +} + +// Implements Batch. +func (mBatch *rocksDBBatch) Delete(key []byte) { + mBatch.batch.Delete(key) +} + +// Implements Batch. +func (mBatch *rocksDBBatch) Write() { + err := mBatch.db.db.Write(mBatch.db.wo, mBatch.batch) + if err != nil { + panic(err) + } +} + +// Implements Batch. +func (mBatch *rocksDBBatch) WriteSync() { + err := mBatch.db.db.Write(mBatch.db.woSync, mBatch.batch) + if err != nil { + panic(err) + } +} + +// Implements Batch. +func (mBatch *rocksDBBatch) Close() { + mBatch.batch.Destroy() +} + +//---------------------------------------- +// Iterator +// NOTE This is almost identical to db/go_level_db.Iterator +// Before creating a third version, refactor. + +func (db *RocksDB) Iterator(start, end []byte) Iterator { + itr := db.db.NewIterator(db.ro) + return newRocksDBIterator(itr, start, end, false) +} + +func (db *RocksDB) ReverseIterator(start, end []byte) Iterator { + itr := db.db.NewIterator(db.ro) + return newRocksDBIterator(itr, start, end, true) +} + +var _ Iterator = (*rocksDBIterator)(nil) + +type rocksDBIterator struct { + source *gorocksdb.Iterator + start, end []byte + isReverse bool + isInvalid bool +} + +func newRocksDBIterator(source *gorocksdb.Iterator, start, end []byte, isReverse bool) *rocksDBIterator { + if isReverse { + if end == nil { + source.SeekToLast() + } else { + source.Seek(end) + if source.Valid() { + eoakey := moveSliceToBytes(source.Key()) // end or after key + if bytes.Compare(end, eoakey) <= 0 { + source.Prev() + } + } else { + source.SeekToLast() + } + } + } else { + if start == nil { + source.SeekToFirst() + } else { + source.Seek(start) + } + } + return &rocksDBIterator{ + source: source, + start: start, + end: end, + isReverse: isReverse, + isInvalid: false, + } +} + +func (itr rocksDBIterator) Domain() ([]byte, []byte) { + return itr.start, itr.end +} + +func (itr rocksDBIterator) Valid() bool { + + // Once invalid, forever invalid. + if itr.isInvalid { + return false + } + + // Panic on DB error. No way to recover. + itr.assertNoError() + + // If source is invalid, invalid. + if !itr.source.Valid() { + itr.isInvalid = true + return false + } + + // If key is end or past it, invalid. + var start = itr.start + var end = itr.end + var key = moveSliceToBytes(itr.source.Key()) + if itr.isReverse { + if start != nil && bytes.Compare(key, start) < 0 { + itr.isInvalid = true + return false + } + } else { + if end != nil && bytes.Compare(end, key) <= 0 { + itr.isInvalid = true + return false + } + } + + // It's valid. + return true +} + +func (itr rocksDBIterator) Key() []byte { + itr.assertNoError() + itr.assertIsValid() + return moveSliceToBytes(itr.source.Key()) +} + +func (itr rocksDBIterator) Value() []byte { + itr.assertNoError() + itr.assertIsValid() + return moveSliceToBytes(itr.source.Value()) +} + +func (itr rocksDBIterator) Next() { + itr.assertNoError() + itr.assertIsValid() + if itr.isReverse { + itr.source.Prev() + } else { + itr.source.Next() + } +} + +func (itr rocksDBIterator) Close() { + itr.source.Close() +} + +func (itr rocksDBIterator) assertNoError() { + if err := itr.source.Err(); err != nil { + panic(err) + } +} + +func (itr rocksDBIterator) assertIsValid() { + if !itr.Valid() { + panic("rocksDBIterator is invalid") + } +} + +// moveSliceToBytes will free the slice and copy out a go []byte +// This function can be applied on *Slice returned from Key() and Value() +// of an Iterator, because they are marked as freed. +func moveSliceToBytes(s *gorocksdb.Slice) []byte { + defer s.Free() + if !s.Exists() { + return nil + } + v := make([]byte, len(s.Data())) + copy(v, s.Data()) + return v +} diff --git a/db/rocks_db_test.go b/db/rocks_db_test.go new file mode 100644 index 000000000..52455d448 --- /dev/null +++ b/db/rocks_db_test.go @@ -0,0 +1,32 @@ +// +build rocksdb + +package db + +import ( + "fmt" + "os" + "testing" + + "github.com/stretchr/testify/assert" + + cmn "github.com/tendermint/tm-cmn/common" +) + +func TestRocksDBBackend(t *testing.T) { + name := fmt.Sprintf("test_%x", cmn.RandStr(12)) + dir := os.TempDir() + db := NewDB(name, RocksDBBackend, dir) + defer cleanupDBDir(dir, name) + + _, ok := db.(*RocksDB) + assert.True(t, ok) +} + +func TestCLevelDBStats(t *testing.T) { + name := fmt.Sprintf("test_%x", cmn.RandStr(12)) + dir := os.TempDir() + db := NewDB(name, RocksDBBackend, dir) + defer cleanupDBDir(dir, name) + + assert.NotEmpty(t, db.Stats()) +} From 059f930059ccc3968a5706374bcf23410b84b48e Mon Sep 17 00:00:00 2001 From: Yumin Xia Date: Wed, 31 Jul 2019 12:13:01 -0700 Subject: [PATCH 2/5] move codes --- db/rocks_db.go => rocks_db.go | 0 db/rocks_db_test.go => rocks_db_test.go | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename db/rocks_db.go => rocks_db.go (100%) rename db/rocks_db_test.go => rocks_db_test.go (100%) diff --git a/db/rocks_db.go b/rocks_db.go similarity index 100% rename from db/rocks_db.go rename to rocks_db.go diff --git a/db/rocks_db_test.go b/rocks_db_test.go similarity index 100% rename from db/rocks_db_test.go rename to rocks_db_test.go From f50c457cd582381f2618eb5ed5e2faba44a9be7b Mon Sep 17 00:00:00 2001 From: Yumin Xia Date: Wed, 31 Jul 2019 12:23:31 -0700 Subject: [PATCH 3/5] dir update and misc fixes --- db.go | 2 +- rocks_db.go | 1 - rocks_db_test.go | 2 -- 3 files changed, 1 insertion(+), 4 deletions(-) diff --git a/db.go b/db.go index cced5c9ac..fa06f1b1b 100644 --- a/db.go +++ b/db.go @@ -35,7 +35,7 @@ const ( // RocksDBBackend represents rocksdb (uses tecbot's wrapper) // - EXPERIMENTAL // - requires gcc - // - use boltdb build tag (go build -tags rocksdb) + // - use rocksdb build tag (go build -tags rocksdb) RocksDBBackend DBBackendType = "rocksdb" ) diff --git a/rocks_db.go b/rocks_db.go index 500a6ee18..81cb967cc 100644 --- a/rocks_db.go +++ b/rocks_db.go @@ -68,7 +68,6 @@ func (db *RocksDB) Get(key []byte) []byte { key = nonNilBytes(key) res, err := db.db.Get(db.ro, key) if err != nil { - res.Free() panic(err) } return moveSliceToBytes(res) diff --git a/rocks_db_test.go b/rocks_db_test.go index 52455d448..7f960608c 100644 --- a/rocks_db_test.go +++ b/rocks_db_test.go @@ -8,8 +8,6 @@ import ( "testing" "github.com/stretchr/testify/assert" - - cmn "github.com/tendermint/tm-cmn/common" ) func TestRocksDBBackend(t *testing.T) { From 5414ac1c7a0d6d1afbcf4f1761baf02c2e8d348d Mon Sep 17 00:00:00 2001 From: Yumin Xia Date: Wed, 31 Jul 2019 12:24:59 -0700 Subject: [PATCH 4/5] fix tests --- rocks_db_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rocks_db_test.go b/rocks_db_test.go index 7f960608c..15ad9f9c7 100644 --- a/rocks_db_test.go +++ b/rocks_db_test.go @@ -11,7 +11,7 @@ import ( ) func TestRocksDBBackend(t *testing.T) { - name := fmt.Sprintf("test_%x", cmn.RandStr(12)) + name := fmt.Sprintf("test_%x", randStr(12)) dir := os.TempDir() db := NewDB(name, RocksDBBackend, dir) defer cleanupDBDir(dir, name) @@ -21,7 +21,7 @@ func TestRocksDBBackend(t *testing.T) { } func TestCLevelDBStats(t *testing.T) { - name := fmt.Sprintf("test_%x", cmn.RandStr(12)) + name := fmt.Sprintf("test_%x", randStr(12)) dir := os.TempDir() db := NewDB(name, RocksDBBackend, dir) defer cleanupDBDir(dir, name) From 9ec1059cfeea8a6323de078f70b4da1de4261561 Mon Sep 17 00:00:00 2001 From: Yumin Xia Date: Wed, 18 Sep 2019 11:25:22 -0700 Subject: [PATCH 5/5] Update db.go Co-Authored-By: Anton Kaliaev --- db.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db.go b/db.go index fa06f1b1b..7c754286b 100644 --- a/db.go +++ b/db.go @@ -32,7 +32,7 @@ const ( // - may be faster is some use-cases (random reads - indexer) // - use boltdb build tag (go build -tags boltdb) BoltDBBackend DBBackendType = "boltdb" - // RocksDBBackend represents rocksdb (uses tecbot's wrapper) + // RocksDBBackend represents rocksdb (uses Stumble fork github.com/stumble/gorocksdb of the tecbot wrapper) // - EXPERIMENTAL // - requires gcc // - use rocksdb build tag (go build -tags rocksdb)