From c8dcba7f85c7f3382f2682663b459aada25cb1d4 Mon Sep 17 00:00:00 2001 From: Joshua Gutow Date: Tue, 27 Oct 2020 17:37:35 -0400 Subject: [PATCH] Splits generic db from enodes packages (#1162) * Renames * NewGenericDB -> New * Splits generic db from enodes packages This enables other packages to create leveldbs outside of the enode package. --- .../backend/internal/db/generic_db.go | 183 ++++++++++++++++++ .../{enodes => db}/generic_db_test.go | 18 +- .../istanbul/backend/internal/enodes/db.go | 68 ------- .../backend/internal/enodes/generic_db.go | 116 ----------- .../backend/internal/enodes/val_enode_db.go | 29 +-- .../internal/enodes/version_certificate_db.go | 17 +- 6 files changed, 216 insertions(+), 215 deletions(-) create mode 100644 consensus/istanbul/backend/internal/db/generic_db.go rename consensus/istanbul/backend/internal/{enodes => db}/generic_db_test.go (76%) delete mode 100644 consensus/istanbul/backend/internal/enodes/generic_db.go diff --git a/consensus/istanbul/backend/internal/db/generic_db.go b/consensus/istanbul/backend/internal/db/generic_db.go new file mode 100644 index 000000000000..932525669b3d --- /dev/null +++ b/consensus/istanbul/backend/internal/db/generic_db.go @@ -0,0 +1,183 @@ +// Copyright 2017 The Celo Authors +// This file is part of the celo library. +// +// The celo library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The celo library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the celo library. If not, see . + +package db + +import ( + "bytes" + "encoding/binary" + "os" + + "github.com/syndtr/goleveldb/leveldb" + lvlerrors "github.com/syndtr/goleveldb/leveldb/errors" + "github.com/syndtr/goleveldb/leveldb/opt" + "github.com/syndtr/goleveldb/leveldb/storage" + "github.com/syndtr/goleveldb/leveldb/util" + + "github.com/ethereum/go-ethereum/log" +) + +const ( + dbVersionKey = "version" // Version of the database to flush if changes +) + +// GenericDB manages a levelDB database +type GenericDB struct { + db *leveldb.DB + writeOptions *opt.WriteOptions +} + +type GenericEntry interface{} + +// New will open a new db at the given file path with the given version. +// If the path is empty, the db will be created in memory. +// If there is a version mismatch in the existing db, the contents are flushed. +func New(dbVersion int64, path string, logger log.Logger, writeOptions *opt.WriteOptions) (*GenericDB, error) { + db, err := NewDB(dbVersion, path, logger) + if err != nil { + return nil, err + } + return &GenericDB{ + db: db, + writeOptions: writeOptions, + }, nil +} + +// Close flushes and closes the database files. +func (gdb *GenericDB) Close() error { + return gdb.db.Close() +} + +// Upsert iterates through each provided entry and determines if the entry is +// new. If there is an existing entry in the db, `onUpdatedEntry` is called. +// If there is no existing entry, `onNewEntry` is called. Db content modifications are left to those functions +// by providing a leveldb Batch that is written after all entries are processed. +func (gdb *GenericDB) Upsert( + entries []GenericEntry, + getExistingEntry func(entry GenericEntry) (GenericEntry, error), + onUpdatedEntry func(batch *leveldb.Batch, existingEntry GenericEntry, newEntry GenericEntry) error, + onNewEntry func(batch *leveldb.Batch, entry GenericEntry) error, +) error { + batch := new(leveldb.Batch) + for _, entry := range entries { + existingEntry, err := getExistingEntry(entry) + isNew := err == leveldb.ErrNotFound + if !isNew && err != nil { + return err + } + if isNew { + if err := onNewEntry(batch, entry); err != nil { + return err + } + } else { + if err := onUpdatedEntry(batch, existingEntry, entry); err != nil { + return err + } + } + } + + if batch.Len() > 0 { + err := gdb.Write(batch) + if err != nil { + return err + } + } + return nil +} + +// Get gets the bytes at a given key in the db +func (gdb *GenericDB) Get(key []byte) ([]byte, error) { + return gdb.db.Get(key, nil) +} + +// Write writes a Batch to modify the db +func (gdb *GenericDB) Write(batch *leveldb.Batch) error { + return gdb.db.Write(batch, gdb.writeOptions) +} + +// Iterate will iterate through each entry in the db whose key has the prefix +// keyPrefix, and call `onEntry` with the bytes of the key (without the prefix) +// and the bytes of the value +func (gdb *GenericDB) Iterate(keyPrefix []byte, onEntry func([]byte, []byte) error) error { + iter := gdb.db.NewIterator(util.BytesPrefix(keyPrefix), nil) + defer iter.Release() + + for iter.Next() { + key := iter.Key()[len(keyPrefix):] + err := onEntry(key, iter.Value()) + if err != nil { + return err + } + } + return iter.Error() +} + +// newDB creates/opens a leveldb persistent database at the given path. +// If no path is given, an in-memory, temporary database is constructed. +func NewDB(dbVersion int64, path string, logger log.Logger) (*leveldb.DB, error) { + if path == "" { + return NewMemoryDB() + } + return NewPersistentDB(dbVersion, path, logger) +} + +// newMemoryDB creates a new in-memory node database without a persistent backend. +func NewMemoryDB() (*leveldb.DB, error) { + db, err := leveldb.Open(storage.NewMemStorage(), nil) + if err != nil { + return nil, err + } + return db, nil +} + +// newPersistentNodeDB creates/opens a leveldb backed persistent database, +// also flushing its contents in case of a version mismatch. +func NewPersistentDB(dbVersion int64, path string, logger log.Logger) (*leveldb.DB, error) { + opts := &opt.Options{OpenFilesCacheCapacity: 5} + db, err := leveldb.OpenFile(path, opts) + if _, iscorrupted := err.(*lvlerrors.ErrCorrupted); iscorrupted { + db, err = leveldb.RecoverFile(path, nil) + } + if err != nil { + return nil, err + } + currentVer := make([]byte, binary.MaxVarintLen64) + currentVer = currentVer[:binary.PutVarint(currentVer, dbVersion)] + + blob, err := db.Get([]byte(dbVersionKey), nil) + switch err { + case leveldb.ErrNotFound: + // Version not found (i.e. empty cache), insert it + if err := db.Put([]byte(dbVersionKey), currentVer, nil); err != nil { + db.Close() + return nil, err + } + + case nil: + // Version present, flush if different + if !bytes.Equal(blob, currentVer) { + oldVersion, _ := binary.Varint(blob) + newVersion, _ := binary.Varint(currentVer) + logger.Info("DB version has changed. Creating a new leveldb.", "old version", oldVersion, "new version", newVersion) + db.Close() + if err = os.RemoveAll(path); err != nil { + return nil, err + } + return NewPersistentDB(dbVersion, path, logger) + } + } + return db, nil +} diff --git a/consensus/istanbul/backend/internal/enodes/generic_db_test.go b/consensus/istanbul/backend/internal/db/generic_db_test.go similarity index 76% rename from consensus/istanbul/backend/internal/enodes/generic_db_test.go rename to consensus/istanbul/backend/internal/db/generic_db_test.go index 01b6ed14b0d7..739709a68139 100644 --- a/consensus/istanbul/backend/internal/enodes/generic_db_test.go +++ b/consensus/istanbul/backend/internal/db/generic_db_test.go @@ -1,4 +1,4 @@ -package enodes +package db import ( "testing" @@ -10,7 +10,7 @@ import ( type mockEntry struct{} func TestUpsert(t *testing.T) { - vedb, err := newGenericDB(int64(0), "", log.New(), nil) + gdb, err := New(int64(0), "", log.New(), nil) if err != nil { t.Fatal("Failed to create DB") } @@ -38,7 +38,7 @@ func TestUpsert(t *testing.T) { } for i, testCase := range testCases { - onExistingEntryCalled, onNewEntryCalled, err := upsertEntry(vedb, testCase.ExistingEntry, testCase.NewEntry) + onExistingEntryCalled, onNewEntryCalled, err := upsertEntry(gdb, testCase.ExistingEntry, testCase.NewEntry) if err != nil { t.Fatal("Failed to upsert entry") } @@ -51,29 +51,29 @@ func TestUpsert(t *testing.T) { } } -func upsertEntry(vedb *genericDB, existingEntry *mockEntry, newEntry *mockEntry) (bool, bool, error) { +func upsertEntry(gdb *GenericDB, existingEntry *mockEntry, newEntry *mockEntry) (bool, bool, error) { var ( onExistingEntryCalled bool onNewEntryCalled bool ) - getExistingEntry := func(_ genericEntry) (genericEntry, error) { + getExistingEntry := func(_ GenericEntry) (GenericEntry, error) { if existingEntry == nil { return nil, leveldb.ErrNotFound } return existingEntry, nil } - onExistingEntry := func(_ *leveldb.Batch, _ genericEntry, _ genericEntry) error { + onExistingEntry := func(_ *leveldb.Batch, _ GenericEntry, _ GenericEntry) error { onExistingEntryCalled = true return nil } - onNewEntry := func(_ *leveldb.Batch, _ genericEntry) error { + onNewEntry := func(_ *leveldb.Batch, _ GenericEntry) error { onNewEntryCalled = true return nil } - err := vedb.Upsert( - []genericEntry{genericEntry(newEntry)}, + err := gdb.Upsert( + []GenericEntry{GenericEntry(newEntry)}, getExistingEntry, onExistingEntry, onNewEntry, diff --git a/consensus/istanbul/backend/internal/enodes/db.go b/consensus/istanbul/backend/internal/enodes/db.go index 75a0920a2236..ce082f3e6bff 100644 --- a/consensus/istanbul/backend/internal/enodes/db.go +++ b/consensus/istanbul/backend/internal/enodes/db.go @@ -17,18 +17,9 @@ package enodes import ( - "bytes" - "encoding/binary" "errors" - "os" - - "github.com/syndtr/goleveldb/leveldb" - lvlerrors "github.com/syndtr/goleveldb/leveldb/errors" - "github.com/syndtr/goleveldb/leveldb/opt" - "github.com/syndtr/goleveldb/leveldb/storage" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/enode" ) @@ -37,8 +28,6 @@ var ( ) const ( - dbVersionKey = "version" // Version of the database to flush if changes - dbAddressPrefix = "address:" // Identifier to prefix node entries with dbNodeIDPrefix = "nodeid:" // Identifier to prefix node entries with ) @@ -50,60 +39,3 @@ func addressKey(address common.Address) []byte { func nodeIDKey(nodeID enode.ID) []byte { return append([]byte(dbNodeIDPrefix), nodeID.Bytes()...) } - -// newDB creates/opens a leveldb persistent database at the given path. -// If no path is given, an in-memory, temporary database is constructed. -func newDB(dbVersion int64, path string, logger log.Logger) (*leveldb.DB, error) { - if path == "" { - return newMemoryDB() - } - return newPersistentDB(int64(valEnodeDBVersion), path, logger) -} - -// newMemoryDB creates a new in-memory node database without a persistent backend. -func newMemoryDB() (*leveldb.DB, error) { - db, err := leveldb.Open(storage.NewMemStorage(), nil) - if err != nil { - return nil, err - } - return db, nil -} - -// newPersistentNodeDB creates/opens a leveldb backed persistent database, -// also flushing its contents in case of a version mismatch. -func newPersistentDB(dbVersion int64, path string, logger log.Logger) (*leveldb.DB, error) { - opts := &opt.Options{OpenFilesCacheCapacity: 5} - db, err := leveldb.OpenFile(path, opts) - if _, iscorrupted := err.(*lvlerrors.ErrCorrupted); iscorrupted { - db, err = leveldb.RecoverFile(path, nil) - } - if err != nil { - return nil, err - } - currentVer := make([]byte, binary.MaxVarintLen64) - currentVer = currentVer[:binary.PutVarint(currentVer, dbVersion)] - - blob, err := db.Get([]byte(dbVersionKey), nil) - switch err { - case leveldb.ErrNotFound: - // Version not found (i.e. empty cache), insert it - if err := db.Put([]byte(dbVersionKey), currentVer, nil); err != nil { - db.Close() - return nil, err - } - - case nil: - // Version present, flush if different - if !bytes.Equal(blob, currentVer) { - oldVersion, _ := binary.Varint(blob) - newVersion, _ := binary.Varint(currentVer) - logger.Info("DB version has changed. Creating a new leveldb.", "old version", oldVersion, "new version", newVersion) - db.Close() - if err = os.RemoveAll(path); err != nil { - return nil, err - } - return newPersistentDB(dbVersion, path, logger) - } - } - return db, nil -} diff --git a/consensus/istanbul/backend/internal/enodes/generic_db.go b/consensus/istanbul/backend/internal/enodes/generic_db.go deleted file mode 100644 index 857f981000ff..000000000000 --- a/consensus/istanbul/backend/internal/enodes/generic_db.go +++ /dev/null @@ -1,116 +0,0 @@ -// Copyright 2017 The Celo Authors -// This file is part of the celo library. -// -// The celo library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The celo library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the celo library. If not, see . - -package enodes - -import ( - "github.com/syndtr/goleveldb/leveldb" - "github.com/syndtr/goleveldb/leveldb/opt" - "github.com/syndtr/goleveldb/leveldb/util" - - "github.com/ethereum/go-ethereum/log" -) - -// genericDB manages a levelDB database -type genericDB struct { - db *leveldb.DB - writeOptions *opt.WriteOptions -} - -type genericEntry interface{} - -// newGenericDB will open a new db at the given file path with the given version. -// If the path is empty, the db will be created in memory. -// If there is a version mismatch in the existing db, the contents are flushed. -func newGenericDB(dbVersion int64, path string, logger log.Logger, writeOptions *opt.WriteOptions) (*genericDB, error) { - db, err := newDB(dbVersion, path, logger) - if err != nil { - return nil, err - } - return &genericDB{ - db: db, - writeOptions: writeOptions, - }, nil -} - -// Close flushes and closes the database files. -func (vedb *genericDB) Close() error { - return vedb.db.Close() -} - -// Upsert iterates through each provided entry and determines if the entry is -// new. If there is an existing entry in the db, `onUpdatedEntry` is called. -// If there is no existing entry, `onNewEntry` is called. Db content modifications are left to those functions -// by providing a leveldb Batch that is written after all entries are processed. -func (vedb *genericDB) Upsert( - entries []genericEntry, - getExistingEntry func(entry genericEntry) (genericEntry, error), - onUpdatedEntry func(batch *leveldb.Batch, existingEntry genericEntry, newEntry genericEntry) error, - onNewEntry func(batch *leveldb.Batch, entry genericEntry) error, -) error { - batch := new(leveldb.Batch) - for _, entry := range entries { - existingEntry, err := getExistingEntry(entry) - isNew := err == leveldb.ErrNotFound - if !isNew && err != nil { - return err - } - if isNew { - if err := onNewEntry(batch, entry); err != nil { - return err - } - } else { - if err := onUpdatedEntry(batch, existingEntry, entry); err != nil { - return err - } - } - } - - if batch.Len() > 0 { - err := vedb.Write(batch) - if err != nil { - return err - } - } - return nil -} - -// Get gets the bytes at a given key in the db -func (vedb *genericDB) Get(key []byte) ([]byte, error) { - return vedb.db.Get(key, nil) -} - -// Write writes a Batch to modify the db -func (vedb *genericDB) Write(batch *leveldb.Batch) error { - return vedb.db.Write(batch, vedb.writeOptions) -} - -// Iterate will iterate through each entry in the db whose key has the prefix -// keyPrefix, and call `onEntry` with the bytes of the key (without the prefix) -// and the bytes of the value -func (vedb *genericDB) Iterate(keyPrefix []byte, onEntry func([]byte, []byte) error) error { - iter := vedb.db.NewIterator(util.BytesPrefix(keyPrefix), nil) - defer iter.Release() - - for iter.Next() { - key := iter.Key()[len(keyPrefix):] - err := onEntry(key, iter.Value()) - if err != nil { - return err - } - } - return iter.Error() -} diff --git a/consensus/istanbul/backend/internal/enodes/val_enode_db.go b/consensus/istanbul/backend/internal/enodes/val_enode_db.go index 345a7f1fcae9..56c0b44056a5 100644 --- a/consensus/istanbul/backend/internal/enodes/val_enode_db.go +++ b/consensus/istanbul/backend/internal/enodes/val_enode_db.go @@ -28,6 +28,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/consensus/istanbul" + "github.com/ethereum/go-ethereum/consensus/istanbul/backend/internal/db" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/enode" @@ -54,7 +55,7 @@ type ValidatorEnodeHandler interface { ClearValidatorPeers() } -func addressEntryFromGenericEntry(entry genericEntry) (*istanbul.AddressEntry, error) { +func addressEntryFromGenericEntry(entry db.GenericEntry) (*istanbul.AddressEntry, error) { addressEntry, ok := entry.(*istanbul.AddressEntry) if !ok { return nil, errIncorrectEntryType @@ -65,7 +66,7 @@ func addressEntryFromGenericEntry(entry genericEntry) (*istanbul.AddressEntry, e // ValidatorEnodeDB represents a Map that can be accessed either // by address or enode type ValidatorEnodeDB struct { - gdb *genericDB + gdb *db.GenericDB lock sync.RWMutex handler ValidatorEnodeHandler logger log.Logger @@ -76,7 +77,7 @@ type ValidatorEnodeDB struct { func OpenValidatorEnodeDB(path string, handler ValidatorEnodeHandler) (*ValidatorEnodeDB, error) { logger := log.New("db", "ValidatorEnodeDB") - gdb, err := newGenericDB(int64(valEnodeDBVersion), path, logger, &opt.WriteOptions{NoWriteMerge: true}) + gdb, err := db.New(int64(valEnodeDBVersion), path, logger, &opt.WriteOptions{NoWriteMerge: true}) if err != nil { logger.Error("Error creating db", "err", err) return nil, err @@ -199,7 +200,7 @@ func (vet *ValidatorEnodeDB) GetValEnodes(valAddresses []common.Address) (map[co func (vet *ValidatorEnodeDB) UpsertHighestKnownVersion(valEnodeEntries []*istanbul.AddressEntry) error { logger := vet.logger.New("func", "UpsertHighestKnownVersion") - onNewEntry := func(batch *leveldb.Batch, entry genericEntry) error { + onNewEntry := func(batch *leveldb.Batch, entry db.GenericEntry) error { addressEntry, err := addressEntryFromGenericEntry(entry) if err != nil { return err @@ -215,7 +216,7 @@ func (vet *ValidatorEnodeDB) UpsertHighestKnownVersion(valEnodeEntries []*istanb return nil } - onUpdatedEntry := func(batch *leveldb.Batch, existingEntry genericEntry, newEntry genericEntry) error { + onUpdatedEntry := func(batch *leveldb.Batch, existingEntry db.GenericEntry, newEntry db.GenericEntry) error { existingAddressEntry, err := addressEntryFromGenericEntry(existingEntry) if err != nil { return err @@ -259,7 +260,7 @@ func (vet *ValidatorEnodeDB) UpsertVersionAndEnode(valEnodeEntries []*istanbul.A peersToRemove := make([]*enode.Node, 0, len(valEnodeEntries)) peersToAdd := make(map[common.Address]*enode.Node) - onNewEntry := func(batch *leveldb.Batch, entry genericEntry) error { + onNewEntry := func(batch *leveldb.Batch, entry db.GenericEntry) error { addressEntry, err := addressEntryFromGenericEntry(entry) if err != nil { return err @@ -276,7 +277,7 @@ func (vet *ValidatorEnodeDB) UpsertVersionAndEnode(valEnodeEntries []*istanbul.A return nil } - onUpdatedEntry := func(batch *leveldb.Batch, existingEntry genericEntry, newEntry genericEntry) error { + onUpdatedEntry := func(batch *leveldb.Batch, existingEntry db.GenericEntry, newEntry db.GenericEntry) error { existingAddressEntry, err := addressEntryFromGenericEntry(existingEntry) if err != nil { return err @@ -334,7 +335,7 @@ func (vet *ValidatorEnodeDB) UpsertVersionAndEnode(valEnodeEntries []*istanbul.A func (vet *ValidatorEnodeDB) UpdateQueryEnodeStats(valEnodeEntries []*istanbul.AddressEntry) error { logger := vet.logger.New("func", "UpdateEnodeQueryStats") - onNewEntry := func(batch *leveldb.Batch, entry genericEntry) error { + onNewEntry := func(batch *leveldb.Batch, entry db.GenericEntry) error { addressEntry, err := addressEntryFromGenericEntry(entry) if err != nil { return err @@ -350,7 +351,7 @@ func (vet *ValidatorEnodeDB) UpdateQueryEnodeStats(valEnodeEntries []*istanbul.A return nil } - onUpdatedEntry := func(batch *leveldb.Batch, existingEntry genericEntry, newEntry genericEntry) error { + onUpdatedEntry := func(batch *leveldb.Batch, existingEntry db.GenericEntry, newEntry db.GenericEntry) error { existingAddressEntry, err := addressEntryFromGenericEntry(existingEntry) if err != nil { return err @@ -390,13 +391,13 @@ func (vet *ValidatorEnodeDB) UpdateQueryEnodeStats(valEnodeEntries []*istanbul.A // and/or connect the corresponding validator connenctions. The validator connections // should be managed be a separate thread (see https://github.com/celo-org/celo-blockchain/issues/607) func (vet *ValidatorEnodeDB) upsert(valEnodeEntries []*istanbul.AddressEntry, - onNewEntry func(batch *leveldb.Batch, entry genericEntry) error, - onUpdatedEntry func(batch *leveldb.Batch, existingEntry genericEntry, newEntry genericEntry) error) error { + onNewEntry func(batch *leveldb.Batch, entry db.GenericEntry) error, + onUpdatedEntry func(batch *leveldb.Batch, existingEntry db.GenericEntry, newEntry db.GenericEntry) error) error { logger := vet.logger.New("func", "Upsert") vet.lock.Lock() defer vet.lock.Unlock() - getExistingEntry := func(entry genericEntry) (genericEntry, error) { + getExistingEntry := func(entry db.GenericEntry) (db.GenericEntry, error) { addressEntry, err := addressEntryFromGenericEntry(entry) if err != nil { return entry, err @@ -404,9 +405,9 @@ func (vet *ValidatorEnodeDB) upsert(valEnodeEntries []*istanbul.AddressEntry, return vet.getAddressEntry(addressEntry.Address) } - entries := make([]genericEntry, len(valEnodeEntries)) + entries := make([]db.GenericEntry, len(valEnodeEntries)) for i, valEnodeEntry := range valEnodeEntries { - entries[i] = genericEntry(valEnodeEntry) + entries[i] = db.GenericEntry(valEnodeEntry) } if err := vet.gdb.Upsert(entries, getExistingEntry, onUpdatedEntry, onNewEntry); err != nil { diff --git a/consensus/istanbul/backend/internal/enodes/version_certificate_db.go b/consensus/istanbul/backend/internal/enodes/version_certificate_db.go index 65a4ca7f17b0..85bc2c85f6ee 100644 --- a/consensus/istanbul/backend/internal/enodes/version_certificate_db.go +++ b/consensus/istanbul/backend/internal/enodes/version_certificate_db.go @@ -27,6 +27,7 @@ import ( "github.com/syndtr/goleveldb/leveldb/opt" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/consensus/istanbul/backend/internal/db" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rlp" @@ -38,7 +39,7 @@ const ( // VersionCertificateDB stores type VersionCertificateDB struct { - gdb *genericDB + gdb *db.GenericDB logger log.Logger } @@ -52,7 +53,7 @@ type VersionCertificateEntry struct { Signature []byte } -func versionCertificateEntryFromGenericEntry(entry genericEntry) (*VersionCertificateEntry, error) { +func versionCertificateEntryFromGenericEntry(entry db.GenericEntry) (*VersionCertificateEntry, error) { signedAnnVersionEntry, ok := entry.(*VersionCertificateEntry) if !ok { return nil, errIncorrectEntryType @@ -96,7 +97,7 @@ func (entry *VersionCertificateEntry) String() string { func OpenVersionCertificateDB(path string) (*VersionCertificateDB, error) { logger := log.New("db", "VersionCertificateDB") - gdb, err := newGenericDB(int64(versionCertificateDBVersion), path, logger, &opt.WriteOptions{NoWriteMerge: true}) + gdb, err := db.New(int64(versionCertificateDBVersion), path, logger, &opt.WriteOptions{NoWriteMerge: true}) if err != nil { logger.Error("Error creating db", "err", err) return nil, err @@ -137,7 +138,7 @@ func (svdb *VersionCertificateDB) Upsert(savEntries []*VersionCertificateEntry) var newEntries []*VersionCertificateEntry - getExistingEntry := func(entry genericEntry) (genericEntry, error) { + getExistingEntry := func(entry db.GenericEntry) (db.GenericEntry, error) { savEntry, err := versionCertificateEntryFromGenericEntry(entry) if err != nil { return entry, err @@ -145,7 +146,7 @@ func (svdb *VersionCertificateDB) Upsert(savEntries []*VersionCertificateEntry) return svdb.Get(savEntry.Address) } - onNewEntry := func(batch *leveldb.Batch, entry genericEntry) error { + onNewEntry := func(batch *leveldb.Batch, entry db.GenericEntry) error { savEntry, err := versionCertificateEntryFromGenericEntry(entry) if err != nil { return err @@ -161,7 +162,7 @@ func (svdb *VersionCertificateDB) Upsert(savEntries []*VersionCertificateEntry) return nil } - onUpdatedEntry := func(batch *leveldb.Batch, existingEntry genericEntry, newEntry genericEntry) error { + onUpdatedEntry := func(batch *leveldb.Batch, existingEntry db.GenericEntry, newEntry db.GenericEntry) error { existingSav, err := versionCertificateEntryFromGenericEntry(existingEntry) if err != nil { return err @@ -177,9 +178,9 @@ func (svdb *VersionCertificateDB) Upsert(savEntries []*VersionCertificateEntry) return onNewEntry(batch, newEntry) } - entries := make([]genericEntry, len(savEntries)) + entries := make([]db.GenericEntry, len(savEntries)) for i, sav := range savEntries { - entries[i] = genericEntry(sav) + entries[i] = db.GenericEntry(sav) } if err := svdb.gdb.Upsert(entries, getExistingEntry, onUpdatedEntry, onNewEntry); err != nil {