From 9db1a4497b2714eaa5ed0390cd2d6641b38a2472 Mon Sep 17 00:00:00 2001 From: Alessandro Sforzin Date: Fri, 9 Aug 2024 17:20:05 +0200 Subject: [PATCH] Reinstate BoltDB and ClevelDB as backend DBs (#177) * Revert "remove deprecated boltdb and cleveldb (#155)" This reverts commit badc0b8f567fbe20ee55e913e50175ef38ab64a3. We decided to reinstate boltDB and clevelDB and mark them as deprecated until a future version of CometBFT in which we'll drop cometbft-db and support only 1 backend DB. * updated cleveldb Iterator API docs to conform to the changes made in #168 * updated go.mod * updated boltDB Iterator APIs to conform to the changes made in #168 * added changelog entry * Formatting Co-authored-by: Daniel * formatting to please the linter * added additional context in the docs of backend types constants --------- Co-authored-by: Daniel --- .../177-reinstate-boltdb-cleveldb.md | 2 + Makefile | 14 +- README.md | 13 ++ boltdb.go | 213 ++++++++++++++++++ boltdb_batch.go | 87 +++++++ boltdb_iterator.go | 142 ++++++++++++ boltdb_test.go | 36 +++ cleveldb.go | 205 +++++++++++++++++ cleveldb_batch.go | 82 +++++++ cleveldb_iterator.go | 138 ++++++++++++ cleveldb_test.go | 101 +++++++++ db.go | 13 +- go.mod | 2 + go.sum | 4 + util_test.go | 5 + 15 files changed, 1054 insertions(+), 3 deletions(-) create mode 100644 .changelog/unreleased/dependencies/177-reinstate-boltdb-cleveldb.md create mode 100644 boltdb.go create mode 100644 boltdb_batch.go create mode 100644 boltdb_iterator.go create mode 100644 boltdb_test.go create mode 100644 cleveldb.go create mode 100644 cleveldb_batch.go create mode 100644 cleveldb_iterator.go create mode 100644 cleveldb_test.go diff --git a/.changelog/unreleased/dependencies/177-reinstate-boltdb-cleveldb.md b/.changelog/unreleased/dependencies/177-reinstate-boltdb-cleveldb.md new file mode 100644 index 0000000..2a0f7c1 --- /dev/null +++ b/.changelog/unreleased/dependencies/177-reinstate-boltdb-cleveldb.md @@ -0,0 +1,2 @@ +- reinstate BoltDB and ClevelDB as backend DBs + ([\#177](https://github.com/cometbft/cometbft-db/pull/177)) \ No newline at end of file diff --git a/Makefile b/Makefile index 8f95ae7..9a14560 100644 --- a/Makefile +++ b/Makefile @@ -19,11 +19,21 @@ test: @go test $(PACKAGES) -v .PHONY: test +test-cleveldb: + @echo "--> Running go test" + @go test $(PACKAGES) -tags cleveldb -v +.PHONY: test-cleveldb + test-rocksdb: @echo "--> Running go test" @go test $(PACKAGES) -tags rocksdb -v .PHONY: test-rocksdb +test-boltdb: + @echo "--> Running go test" + @go test $(PACKAGES) -tags boltdb -v +.PHONY: test-boltdb + test-badgerdb: @echo "--> Running go test" @go test $(PACKAGES) -tags badgerdb -v @@ -35,7 +45,7 @@ test-pebble: test-all: @echo "--> Running go test" - @go test $(PACKAGES) -tags rocksdb,grocksdb_clean_link,badgerdb,pebbledb -v + @go test $(PACKAGES) -tags cleveldb,boltdb,rocksdb,grocksdb_clean_link,badgerdb,pebbledb -v .PHONY: test-all test-all-with-coverage: @@ -46,7 +56,7 @@ test-all-with-coverage: -race \ -coverprofile=coverage.txt \ -covermode=atomic \ - -tags=memdb,goleveldb,rocksdb,grocksdb_clean_link,badgerdb,pebbledb \ + -tags=memdb,goleveldb,cleveldb,boltdb,rocksdb,grocksdb_clean_link,badgerdb,pebbledb \ -v .PHONY: test-all-with-coverage diff --git a/README.md b/README.md index 348caf8..ce4f152 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,19 @@ Go 1.22+ sets, and tests. Used for [IAVL](https://github.com/tendermint/iavl) working sets when the pruning strategy allows it. +- **[LevelDB](https://github.com/google/leveldb) [DEPRECATED]:** A [Go + wrapper](https://github.com/jmhodges/levigo) around + [LevelDB](https://github.com/google/leveldb). Uses LSM-trees for on-disk + storage, which have good performance for write-heavy workloads, particularly + on spinning disks, but requires periodic compaction to maintain decent read + performance and reclaim disk space. Does not support transactions. + +- **[BoltDB](https://github.com/etcd-io/bbolt) [DEPRECATED]:** A + [fork](https://github.com/etcd-io/bbolt) of + [BoltDB](https://github.com/boltdb/bolt). Uses B+trees for on-disk storage, + which have good performance for read-heavy workloads and range scans. Supports + serializable ACID transactions. + - **[RocksDB](https://github.com/linxGnu/grocksdb) [experimental]:** A [Go wrapper](https://github.com/linxGnu/grocksdb) around [RocksDB](https://rocksdb.org). Similarly to LevelDB (above) it uses LSM-trees diff --git a/boltdb.go b/boltdb.go new file mode 100644 index 0000000..b5db174 --- /dev/null +++ b/boltdb.go @@ -0,0 +1,213 @@ +//go:build boltdb +// +build boltdb + +package db + +import ( + "errors" + "fmt" + "os" + "path/filepath" + + "go.etcd.io/bbolt" +) + +var bucket = []byte("tm") + +func init() { + registerDBCreator(BoltDBBackend, func(name, dir string) (DB, error) { + return NewBoltDB(name, dir) + }) +} + +// BoltDB is a wrapper around etcd's fork of bolt (https://github.com/etcd-io/bbolt). +// +// NOTE: All operations (including Set, Delete) are synchronous by default. One +// can globally turn it off by using NoSync config option (not recommended). +// +// A single bucket ([]byte("tm")) is used per a database instance. This could +// lead to performance issues when/if there will be lots of keys. +type BoltDB struct { + db *bbolt.DB +} + +var _ DB = (*BoltDB)(nil) + +// NewBoltDB returns a BoltDB with default options. +// +// Deprecated: boltdb is deprecated and will be removed in the future. +func NewBoltDB(name, dir string) (DB, error) { + return NewBoltDBWithOpts(name, dir, bbolt.DefaultOptions) +} + +// NewBoltDBWithOpts allows you to supply *bbolt.Options. ReadOnly: true is not +// supported because NewBoltDBWithOpts creates a global bucket. +func NewBoltDBWithOpts(name string, dir string, opts *bbolt.Options) (DB, error) { + if opts.ReadOnly { + return nil, errors.New("ReadOnly: true is not supported") + } + + dbPath := filepath.Join(dir, name+".db") + db, err := bbolt.Open(dbPath, os.ModePerm, opts) + if err != nil { + return nil, err + } + + // create a global bucket + err = db.Update(func(tx *bbolt.Tx) error { + _, err := tx.CreateBucketIfNotExists(bucket) + return err + }) + if err != nil { + return nil, err + } + + return &BoltDB{db: db}, nil +} + +// Get implements DB. +func (bdb *BoltDB) Get(key []byte) (value []byte, err error) { + if len(key) == 0 { + return nil, errKeyEmpty + } + err = bdb.db.View(func(tx *bbolt.Tx) error { + b := tx.Bucket(bucket) + if v := b.Get(key); v != nil { + value = append([]byte{}, v...) + } + return nil + }) + if err != nil { + return nil, err + } + return +} + +// Has implements DB. +func (bdb *BoltDB) Has(key []byte) (bool, error) { + bytes, err := bdb.Get(key) + if err != nil { + return false, err + } + return bytes != nil, nil +} + +// Set implements DB. +func (bdb *BoltDB) Set(key, value []byte) error { + if len(key) == 0 { + return errKeyEmpty + } + if value == nil { + return errValueNil + } + err := bdb.db.Update(func(tx *bbolt.Tx) error { + b := tx.Bucket(bucket) + return b.Put(key, value) + }) + if err != nil { + return err + } + return nil +} + +// SetSync implements DB. +func (bdb *BoltDB) SetSync(key, value []byte) error { + return bdb.Set(key, value) +} + +// Delete implements DB. +func (bdb *BoltDB) Delete(key []byte) error { + if len(key) == 0 { + return errKeyEmpty + } + err := bdb.db.Update(func(tx *bbolt.Tx) error { + return tx.Bucket(bucket).Delete(key) + }) + if err != nil { + return err + } + return nil +} + +// DeleteSync implements DB. +func (bdb *BoltDB) DeleteSync(key []byte) error { + return bdb.Delete(key) +} + +// Close implements DB. +func (bdb *BoltDB) Close() error { + return bdb.db.Close() +} + +// Print implements DB. +func (bdb *BoltDB) Print() error { + stats := bdb.db.Stats() + fmt.Printf("%v\n", stats) + + err := bdb.db.View(func(tx *bbolt.Tx) error { + tx.Bucket(bucket).ForEach(func(k, v []byte) error { + fmt.Printf("[%X]:\t[%X]\n", k, v) + return nil + }) + return nil + }) + if err != nil { + return err + } + return nil +} + +// Stats implements DB. +func (bdb *BoltDB) Stats() map[string]string { + stats := bdb.db.Stats() + m := make(map[string]string) + + // Freelist stats + m["FreePageN"] = fmt.Sprintf("%v", stats.FreePageN) + m["PendingPageN"] = fmt.Sprintf("%v", stats.PendingPageN) + m["FreeAlloc"] = fmt.Sprintf("%v", stats.FreeAlloc) + m["FreelistInuse"] = fmt.Sprintf("%v", stats.FreelistInuse) + + // Transaction stats + m["TxN"] = fmt.Sprintf("%v", stats.TxN) + m["OpenTxN"] = fmt.Sprintf("%v", stats.OpenTxN) + + return m +} + +// NewBatch implements DB. +func (bdb *BoltDB) NewBatch() Batch { + return newBoltDBBatch(bdb) +} + +// WARNING: Any concurrent writes or reads will block until the iterator is +// closed. +func (bdb *BoltDB) Iterator(start, end []byte) (Iterator, error) { + if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) { + return nil, errKeyEmpty + } + tx, err := bdb.db.Begin(false) + if err != nil { + return nil, err + } + return newBoltDBIterator(tx, start, end, false), nil +} + +// WARNING: Any concurrent writes or reads will block until the iterator is +// closed. +func (bdb *BoltDB) ReverseIterator(start, end []byte) (Iterator, error) { + if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) { + return nil, errKeyEmpty + } + tx, err := bdb.db.Begin(false) + if err != nil { + return nil, err + } + return newBoltDBIterator(tx, start, end, true), nil +} + +func (bdb *BoltDB) Compact(start, end []byte) error { + // There is no explicit CompactRange support in BoltDB, only a function that copies the + // entire DB from one place to another while doing deletions. Hence we do not support it. + return nil +} diff --git a/boltdb_batch.go b/boltdb_batch.go new file mode 100644 index 0000000..cd22c67 --- /dev/null +++ b/boltdb_batch.go @@ -0,0 +1,87 @@ +//go:build boltdb +// +build boltdb + +package db + +import "go.etcd.io/bbolt" + +// boltDBBatch stores operations internally and dumps them to BoltDB on Write(). +type boltDBBatch struct { + db *BoltDB + ops []operation +} + +var _ Batch = (*boltDBBatch)(nil) + +func newBoltDBBatch(db *BoltDB) *boltDBBatch { + return &boltDBBatch{ + db: db, + ops: []operation{}, + } +} + +// Set implements Batch. +func (b *boltDBBatch) Set(key, value []byte) error { + if len(key) == 0 { + return errKeyEmpty + } + if value == nil { + return errValueNil + } + if b.ops == nil { + return errBatchClosed + } + b.ops = append(b.ops, operation{opTypeSet, key, value}) + return nil +} + +// Delete implements Batch. +func (b *boltDBBatch) Delete(key []byte) error { + if len(key) == 0 { + return errKeyEmpty + } + if b.ops == nil { + return errBatchClosed + } + b.ops = append(b.ops, operation{opTypeDelete, key, nil}) + return nil +} + +// Write implements Batch. +func (b *boltDBBatch) Write() error { + if b.ops == nil { + return errBatchClosed + } + err := b.db.db.Batch(func(tx *bbolt.Tx) error { + bkt := tx.Bucket(bucket) + for _, op := range b.ops { + switch op.opType { + case opTypeSet: + if err := bkt.Put(op.key, op.value); err != nil { + return err + } + case opTypeDelete: + if err := bkt.Delete(op.key); err != nil { + return err + } + } + } + return nil + }) + if err != nil { + return err + } + // Make sure batch cannot be used afterwards. Callers should still call Close(), for errors. + return b.Close() +} + +// WriteSync implements Batch. +func (b *boltDBBatch) WriteSync() error { + return b.Write() +} + +// Close implements Batch. +func (b *boltDBBatch) Close() error { + b.ops = nil + return nil +} diff --git a/boltdb_iterator.go b/boltdb_iterator.go new file mode 100644 index 0000000..d7fd883 --- /dev/null +++ b/boltdb_iterator.go @@ -0,0 +1,142 @@ +//go:build boltdb +// +build boltdb + +package db + +import ( + "bytes" + + "go.etcd.io/bbolt" +) + +// boltDBIterator allows you to iterate on range of keys/values given some +// start / end keys (nil & nil will result in doing full scan). +type boltDBIterator struct { + tx *bbolt.Tx + + itr *bbolt.Cursor + start []byte + end []byte + + currentKey []byte + currentValue []byte + + isInvalid bool + isReverse bool +} + +var _ Iterator = (*boltDBIterator)(nil) + +// newBoltDBIterator creates a new boltDBIterator. +func newBoltDBIterator(tx *bbolt.Tx, start, end []byte, isReverse bool) *boltDBIterator { + itr := tx.Bucket(bucket).Cursor() + + var ck, cv []byte + if isReverse { + switch { + case end == nil: + ck, cv = itr.Last() + default: + _, _ = itr.Seek(end) // after key + ck, cv = itr.Prev() // return to end key + } + } else { + switch { + case start == nil: + ck, cv = itr.First() + default: + ck, cv = itr.Seek(start) + } + } + + return &boltDBIterator{ + tx: tx, + itr: itr, + start: start, + end: end, + currentKey: ck, + currentValue: cv, + isReverse: isReverse, + isInvalid: false, + } +} + +// Domain implements Iterator. +func (itr *boltDBIterator) Domain() ([]byte, []byte) { + return itr.start, itr.end +} + +// Valid implements Iterator. +func (itr *boltDBIterator) Valid() bool { + if itr.isInvalid { + return false + } + + if itr.Error() != nil { + itr.isInvalid = true + return false + } + + // iterated to the end of the cursor + if itr.currentKey == nil { + itr.isInvalid = true + return false + } + + if itr.isReverse { + if itr.start != nil && bytes.Compare(itr.currentKey, itr.start) < 0 { + itr.isInvalid = true + return false + } + } else { + if itr.end != nil && bytes.Compare(itr.end, itr.currentKey) <= 0 { + itr.isInvalid = true + return false + } + } + + // Valid + return true +} + +// Next implements Iterator. +func (itr *boltDBIterator) Next() { + itr.assertIsValid() + if itr.isReverse { + itr.currentKey, itr.currentValue = itr.itr.Prev() + } else { + itr.currentKey, itr.currentValue = itr.itr.Next() + } +} + +// Key implements Iterator. +// The caller should not modify the contents of the returned slice. +// Instead, the caller should make a copy and work on the copy. +func (itr *boltDBIterator) Key() []byte { + itr.assertIsValid() + return itr.currentKey +} + +// Value implements Iterator. +// The caller should not modify the contents of the returned slice. +// Instead, the caller should make a copy and work on the copy. +func (itr *boltDBIterator) Value() []byte { + itr.assertIsValid() + return itr.currentValue +} + +// Error implements Iterator. +func (itr *boltDBIterator) Error() error { + return nil +} + +// Close implements Iterator. +func (itr *boltDBIterator) Close() error { + return itr.tx.Rollback() +} + +func (itr *boltDBIterator) assertIsValid() { + if !itr.Valid() { + panic("iterator is invalid") + } +} diff --git a/boltdb_test.go b/boltdb_test.go new file mode 100644 index 0000000..e68c85b --- /dev/null +++ b/boltdb_test.go @@ -0,0 +1,36 @@ +//go:build boltdb +// +build boltdb + +package db + +import ( + "fmt" + "os" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestBoltDBNewBoltDB(t *testing.T) { + name := fmt.Sprintf("test_%x", randStr(12)) + dir := os.TempDir() + defer cleanupDBDir(dir, name) + + db, err := NewBoltDB(name, dir) + require.NoError(t, err) + db.Close() +} + +func BenchmarkBoltDBRandomReadsWrites(b *testing.B) { + name := fmt.Sprintf("test_%x", randStr(12)) + db, err := NewBoltDB(name, "") + if err != nil { + b.Fatal(err) + } + defer func() { + db.Close() + cleanupDBDir("", name) + }() + + benchmarkRandomReadsWrites(b, db) +} diff --git a/cleveldb.go b/cleveldb.go new file mode 100644 index 0000000..d0d1cce --- /dev/null +++ b/cleveldb.go @@ -0,0 +1,205 @@ +//go:build cleveldb +// +build cleveldb + +package db + +import ( + "fmt" + "path/filepath" + + "github.com/jmhodges/levigo" +) + +func init() { + dbCreator := func(name string, dir string) (DB, error) { + return NewCLevelDB(name, dir) + } + registerDBCreator(CLevelDBBackend, dbCreator) +} + +// CLevelDB uses the C LevelDB database via a Go wrapper. +type CLevelDB struct { + db *levigo.DB + ro *levigo.ReadOptions + wo *levigo.WriteOptions + woSync *levigo.WriteOptions +} + +var _ DB = (*CLevelDB)(nil) + +// NewCLevelDB creates a new CLevelDB. +// +// Deprecated: cleveldb is deprecated and will be removed in the future. +func NewCLevelDB(name string, dir string) (*CLevelDB, error) { + dbPath := filepath.Join(dir, name+".db") + + opts := levigo.NewOptions() + opts.SetCache(levigo.NewLRUCache(1 << 30)) + opts.SetCreateIfMissing(true) + db, err := levigo.Open(dbPath, opts) + if err != nil { + return nil, err + } + ro := levigo.NewReadOptions() + wo := levigo.NewWriteOptions() + woSync := levigo.NewWriteOptions() + woSync.SetSync(true) + database := &CLevelDB{ + db: db, + ro: ro, + wo: wo, + woSync: woSync, + } + return database, nil +} + +// Get implements DB. +func (db *CLevelDB) Get(key []byte) ([]byte, error) { + if len(key) == 0 { + return nil, errKeyEmpty + } + res, err := db.db.Get(db.ro, key) + if err != nil { + return nil, err + } + return res, nil +} + +// Has implements DB. +func (db *CLevelDB) Has(key []byte) (bool, error) { + bytes, err := db.Get(key) + if err != nil { + return false, err + } + return bytes != nil, nil +} + +// Set implements DB. +func (db *CLevelDB) Set(key []byte, value []byte) error { + if len(key) == 0 { + return errKeyEmpty + } + if value == nil { + return errValueNil + } + if err := db.db.Put(db.wo, key, value); err != nil { + return err + } + return nil +} + +// SetSync implements DB. +func (db *CLevelDB) SetSync(key []byte, value []byte) error { + if len(key) == 0 { + return errKeyEmpty + } + if value == nil { + return errValueNil + } + if err := db.db.Put(db.woSync, key, value); err != nil { + return err + } + return nil +} + +// Delete implements DB. +func (db *CLevelDB) Delete(key []byte) error { + if len(key) == 0 { + return errKeyEmpty + } + if err := db.db.Delete(db.wo, key); err != nil { + return err + } + return nil +} + +// DeleteSync implements DB. +func (db *CLevelDB) DeleteSync(key []byte) error { + if len(key) == 0 { + return errKeyEmpty + } + if err := db.db.Delete(db.woSync, key); err != nil { + return err + } + return nil +} + +// Compact implements DB and compacts the given range of the DB +func (db *CLevelDB) Compact(start, end []byte) error { + // CompactRange of clevelDB does not return anything + db.db.CompactRange(levigo.Range{Start: start, Limit: end}) + return nil +} + +// FIXME This should not be exposed +func (db *CLevelDB) DB() *levigo.DB { + return db.db +} + +// Close implements DB. +func (db *CLevelDB) Close() error { + db.db.Close() + db.ro.Close() + db.wo.Close() + db.woSync.Close() + return nil +} + +// Print implements DB. +func (db *CLevelDB) Print() error { + itr, err := db.Iterator(nil, nil) + if err != nil { + return err + } + defer itr.Close() + for ; itr.Valid(); itr.Next() { + key := itr.Key() + value := itr.Value() + fmt.Printf("[%X]:\t[%X]\n", key, value) + } + return nil +} + +// Stats implements DB. +func (db *CLevelDB) Stats() map[string]string { + keys := []string{ + "leveldb.aliveiters", + "leveldb.alivesnaps", + "leveldb.blockpool", + "leveldb.cachedblock", + "leveldb.num-files-at-level{n}", + "leveldb.openedtables", + "leveldb.sstables", + "leveldb.stats", + } + + stats := make(map[string]string, len(keys)) + for _, key := range keys { + str := db.db.PropertyValue(key) + stats[key] = str + } + return stats +} + +// NewBatch implements DB. +func (db *CLevelDB) NewBatch() Batch { + return newCLevelDBBatch(db) +} + +// Iterator implements DB. +func (db *CLevelDB) Iterator(start, end []byte) (Iterator, error) { + if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) { + return nil, errKeyEmpty + } + itr := db.db.NewIterator(db.ro) + return newCLevelDBIterator(itr, start, end, false), nil +} + +// ReverseIterator implements DB. +func (db *CLevelDB) ReverseIterator(start, end []byte) (Iterator, error) { + if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) { + return nil, errKeyEmpty + } + itr := db.db.NewIterator(db.ro) + return newCLevelDBIterator(itr, start, end, true), nil +} diff --git a/cleveldb_batch.go b/cleveldb_batch.go new file mode 100644 index 0000000..b77bd52 --- /dev/null +++ b/cleveldb_batch.go @@ -0,0 +1,82 @@ +//go:build cleveldb +// +build cleveldb + +package db + +import "github.com/jmhodges/levigo" + +// cLevelDBBatch is a LevelDB batch. +type cLevelDBBatch struct { + db *CLevelDB + batch *levigo.WriteBatch +} + +func newCLevelDBBatch(db *CLevelDB) *cLevelDBBatch { + return &cLevelDBBatch{ + db: db, + batch: levigo.NewWriteBatch(), + } +} + +// Set implements Batch. +func (b *cLevelDBBatch) Set(key, value []byte) error { + if len(key) == 0 { + return errKeyEmpty + } + if value == nil { + return errValueNil + } + if b.batch == nil { + return errBatchClosed + } + b.batch.Put(key, value) + return nil +} + +// Delete implements Batch. +func (b *cLevelDBBatch) Delete(key []byte) error { + if len(key) == 0 { + return errKeyEmpty + } + if b.batch == nil { + return errBatchClosed + } + b.batch.Delete(key) + return nil +} + +// Write implements Batch. +func (b *cLevelDBBatch) Write() error { + if b.batch == nil { + return errBatchClosed + } + err := b.db.db.Write(b.db.wo, b.batch) + if err != nil { + return err + } + // Make sure batch cannot be used afterwards. Callers should still call Close(), for errors. + return b.Close() +} + +// WriteSync implements Batch. +func (b *cLevelDBBatch) WriteSync() error { + if b.batch == nil { + return errBatchClosed + } + err := b.db.db.Write(b.db.woSync, b.batch) + if err != nil { + return err + } + // Make sure batch cannot be used afterwards. Callers should still call Close(), for errors. + b.Close() + return nil +} + +// Close implements Batch. +func (b *cLevelDBBatch) Close() error { + if b.batch != nil { + b.batch.Close() + b.batch = nil + } + return nil +} diff --git a/cleveldb_iterator.go b/cleveldb_iterator.go new file mode 100644 index 0000000..7ed0344 --- /dev/null +++ b/cleveldb_iterator.go @@ -0,0 +1,138 @@ +//go:build cleveldb +// +build cleveldb + +package db + +import ( + "bytes" + + "github.com/jmhodges/levigo" +) + +// cLevelDBIterator is a cLevelDB iterator. +type cLevelDBIterator struct { + source *levigo.Iterator + start, end []byte + isReverse bool + isInvalid bool +} + +var _ Iterator = (*cLevelDBIterator)(nil) + +func newCLevelDBIterator(source *levigo.Iterator, start, end []byte, isReverse bool) *cLevelDBIterator { + if isReverse { + if end == nil || len(end) == 0 { + source.SeekToLast() + } else { + source.Seek(end) + if source.Valid() { + eoakey := source.Key() // end or after key + if bytes.Compare(end, eoakey) <= 0 { + source.Prev() + } + } else { + source.SeekToLast() + } + } + } else { + if start == nil || len(start) == 0 { + source.SeekToFirst() + } else { + source.Seek(start) + } + } + return &cLevelDBIterator{ + source: source, + start: start, + end: end, + isReverse: isReverse, + isInvalid: false, + } +} + +// Domain implements Iterator. +func (itr cLevelDBIterator) Domain() ([]byte, []byte) { + return itr.start, itr.end +} + +// Valid implements Iterator. +func (itr cLevelDBIterator) Valid() bool { + // Once invalid, forever invalid. + if itr.isInvalid { + return false + } + + // If source errors, invalid. + if itr.source.GetError() != nil { + itr.isInvalid = true + return false + } + + // If source is invalid, invalid. + if !itr.source.Valid() { + itr.isInvalid = true + return false + } + + // If key is end or past it, invalid. + start := itr.start + end := itr.end + key := itr.source.Key() + if itr.isReverse { + if start != nil && bytes.Compare(key, start) < 0 { + itr.isInvalid = true + return false + } + } else { + if end != nil && bytes.Compare(end, key) <= 0 { + itr.isInvalid = true + return false + } + } + + // It's valid. + return true +} + +// Key implements Iterator. +// The caller should not modify the contents of the returned slice. +// Instead, the caller should make a copy and work on the copy. +func (itr cLevelDBIterator) Key() []byte { + itr.assertIsValid() + return itr.source.Key() +} + +// Value implements Iterator. +// The caller should not modify the contents of the returned slice. +// Instead, the caller should make a copy and work on the copy. +func (itr cLevelDBIterator) Value() []byte { + itr.assertIsValid() + return itr.source.Value() +} + +// Next implements Iterator. +func (itr cLevelDBIterator) Next() { + itr.assertIsValid() + if itr.isReverse { + itr.source.Prev() + } else { + itr.source.Next() + } +} + +// Error implements Iterator. +func (itr cLevelDBIterator) Error() error { + return itr.source.GetError() +} + +// Close implements Iterator. +func (itr cLevelDBIterator) Close() error { + itr.source.Close() + return nil +} + +func (itr cLevelDBIterator) assertIsValid() { + if !itr.Valid() { + panic("iterator is invalid") + } +} diff --git a/cleveldb_test.go b/cleveldb_test.go new file mode 100644 index 0000000..42a1bd9 --- /dev/null +++ b/cleveldb_test.go @@ -0,0 +1,101 @@ +//go:build cleveldb +// +build cleveldb + +package db + +import ( + "bytes" + "fmt" + "math/rand" + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func BenchmarkRandomReadsWrites2(b *testing.B) { + b.StopTimer() + + numItems := int64(1000000) + internal := map[int64]int64{} + for i := 0; i < int(numItems); i++ { + internal[int64(i)] = int64(0) + } + db, err := NewCLevelDB(fmt.Sprintf("test_%x", randStr(12)), "") + if err != nil { + b.Fatal(err.Error()) + return + } + + b.StartTimer() + + for i := 0; i < b.N; i++ { + // Write something + { + idx := (int64(rand.Int()) % numItems) + internal[idx]++ + val := internal[idx] + idxBytes := int642Bytes(int64(idx)) + valBytes := int642Bytes(int64(val)) + db.Set( + idxBytes, + valBytes, + ) + } + // Read something + { + idx := (int64(rand.Int()) % numItems) + val := internal[idx] + idxBytes := int642Bytes(int64(idx)) + valBytes, err := db.Get(idxBytes) + if err != nil { + b.Error(err) + } + if val == 0 { + if !bytes.Equal(valBytes, nil) { + b.Errorf("Expected %v for %v, got %X", + nil, idx, valBytes) + break + } + } else { + if len(valBytes) != 8 { + b.Errorf("Expected length 8 for %v, got %X", + idx, valBytes) + break + } + valGot := bytes2Int64(valBytes) + if val != valGot { + b.Errorf("Expected %v for %v, got %v", + val, idx, valGot) + break + } + } + } + } + + db.Close() +} + +func TestCLevelDBBackend(t *testing.T) { + name := fmt.Sprintf("test_%x", randStr(12)) + // Can't use "" (current directory) or "./" here because levigo.Open returns: + // "Error initializing DB: IO error: test_XXX.db: Invalid argument" + dir := os.TempDir() + db, err := NewDB(name, CLevelDBBackend, dir) + require.NoError(t, err) + defer cleanupDBDir(dir, name) + + _, ok := db.(*CLevelDB) + assert.True(t, ok) +} + +func TestCLevelDBStats(t *testing.T) { + name := fmt.Sprintf("test_%x", randStr(12)) + dir := os.TempDir() + db, err := NewDB(name, CLevelDBBackend, dir) + require.NoError(t, err) + defer cleanupDBDir(dir, name) + + assert.NotEmpty(t, db.Stats()) +} diff --git a/db.go b/db.go index 3123fae..517d2cd 100644 --- a/db.go +++ b/db.go @@ -13,11 +13,22 @@ const ( // popular implementation) // - pure go // - stable - // - unmaintained + // - unmaintaned GoLevelDBBackend BackendType = "goleveldb" + // CLevelDBBackend represents cleveldb (uses levigo wrapper) + // - fast + // - requires gcc + // - use cleveldb build tag (go build -tags cleveldb) + CLevelDBBackend BackendType = "cleveldb" // MemDBBackend represents in-memory key value store, which is mostly used // for testing. MemDBBackend BackendType = "memdb" + // BoltDBBackend represents bolt (uses etcd's fork of bolt - + // github.com/etcd-io/bbolt) + // - EXPERIMENTAL + // - may be faster is some use-cases (random reads - indexer) + // - use boltdb build tag (go build -tags boltdb) + BoltDBBackend BackendType = "boltdb" // RocksDBBackend represents rocksdb (uses github.com/tecbot/gorocksdb) // - EXPERIMENTAL // - requires gcc diff --git a/go.mod b/go.mod index 9fcfd0a..16ad5cc 100644 --- a/go.mod +++ b/go.mod @@ -6,9 +6,11 @@ require ( github.com/cockroachdb/pebble v1.1.1 github.com/dgraph-io/badger/v4 v4.2.0 github.com/google/btree v1.1.2 + github.com/jmhodges/levigo v1.0.0 github.com/linxGnu/grocksdb v1.8.14 github.com/stretchr/testify v1.9.0 github.com/syndtr/goleveldb v1.0.1-0.20200815110645-5c35d600f0ca + go.etcd.io/bbolt v1.3.10 ) require ( diff --git a/go.sum b/go.sum index 14dbc9c..ffc19d1 100644 --- a/go.sum +++ b/go.sum @@ -173,6 +173,8 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= +github.com/jmhodges/levigo v1.0.0 h1:q5EC36kV79HWeTBWsod3mG11EgStG3qArTKcvlksN1U= +github.com/jmhodges/levigo v1.0.0/go.mod h1:Q6Qx+uH3RAqyK4rFQroq9RL7mdkABMcfhEI+nNuzMJQ= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= @@ -268,6 +270,8 @@ github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.etcd.io/bbolt v1.3.10 h1:+BqfJTcCzTItrop8mq/lbzL8wSGtj94UO/3U31shqG0= +go.etcd.io/bbolt v1.3.10/go.mod h1:bK3UQLPJZly7IlNmV7uVHJDxfe5aK9Ll93e/74Y9oEQ= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= diff --git a/util_test.go b/util_test.go index 68e6b8e..411abe1 100644 --- a/util_test.go +++ b/util_test.go @@ -25,6 +25,11 @@ func TestPrefixIteratorNoMatchNil(t *testing.T) { // Empty iterator for db populated after iterator created. func TestPrefixIteratorNoMatch1(t *testing.T) { for backend := range backends { + if backend == BoltDBBackend { + t.Log("bolt does not support concurrent writes while iterating") + continue + } + t.Run(fmt.Sprintf("Prefix w/ backend %s", backend), func(t *testing.T) { db, dir := newTempDB(t, backend) defer os.RemoveAll(dir)