Skip to content

Commit

Permalink
Splits generic db from enodes packages (ethereum#1162)
Browse files Browse the repository at this point in the history
    * Renames

    * NewGenericDB -> New

    * Splits generic db from enodes packages
    
        This enables other packages to create leveldbs outside of the enode package.
  • Loading branch information
Joshua Gutow authored Oct 27, 2020
1 parent 2e17f0d commit c8dcba7
Show file tree
Hide file tree
Showing 6 changed files with 216 additions and 215 deletions.
183 changes: 183 additions & 0 deletions consensus/istanbul/backend/internal/db/generic_db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
// Copyright 2017 The Celo Authors
// This file is part of the celo library.
//
// The celo library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The celo library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the celo library. If not, see <http://www.gnu.org/licenses/>.

package db

import (
"bytes"
"encoding/binary"
"os"

"github.com/syndtr/goleveldb/leveldb"
lvlerrors "github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/storage"
"github.com/syndtr/goleveldb/leveldb/util"

"github.com/ethereum/go-ethereum/log"
)

const (
dbVersionKey = "version" // Version of the database to flush if changes
)

// GenericDB manages a levelDB database
type GenericDB struct {
db *leveldb.DB
writeOptions *opt.WriteOptions
}

type GenericEntry interface{}

// New will open a new db at the given file path with the given version.
// If the path is empty, the db will be created in memory.
// If there is a version mismatch in the existing db, the contents are flushed.
func New(dbVersion int64, path string, logger log.Logger, writeOptions *opt.WriteOptions) (*GenericDB, error) {
db, err := NewDB(dbVersion, path, logger)
if err != nil {
return nil, err
}
return &GenericDB{
db: db,
writeOptions: writeOptions,
}, nil
}

// Close flushes and closes the database files.
func (gdb *GenericDB) Close() error {
return gdb.db.Close()
}

// Upsert iterates through each provided entry and determines if the entry is
// new. If there is an existing entry in the db, `onUpdatedEntry` is called.
// If there is no existing entry, `onNewEntry` is called. Db content modifications are left to those functions
// by providing a leveldb Batch that is written after all entries are processed.
func (gdb *GenericDB) Upsert(
entries []GenericEntry,
getExistingEntry func(entry GenericEntry) (GenericEntry, error),
onUpdatedEntry func(batch *leveldb.Batch, existingEntry GenericEntry, newEntry GenericEntry) error,
onNewEntry func(batch *leveldb.Batch, entry GenericEntry) error,
) error {
batch := new(leveldb.Batch)
for _, entry := range entries {
existingEntry, err := getExistingEntry(entry)
isNew := err == leveldb.ErrNotFound
if !isNew && err != nil {
return err
}
if isNew {
if err := onNewEntry(batch, entry); err != nil {
return err
}
} else {
if err := onUpdatedEntry(batch, existingEntry, entry); err != nil {
return err
}
}
}

if batch.Len() > 0 {
err := gdb.Write(batch)
if err != nil {
return err
}
}
return nil
}

// Get gets the bytes at a given key in the db
func (gdb *GenericDB) Get(key []byte) ([]byte, error) {
return gdb.db.Get(key, nil)
}

// Write writes a Batch to modify the db
func (gdb *GenericDB) Write(batch *leveldb.Batch) error {
return gdb.db.Write(batch, gdb.writeOptions)
}

// Iterate will iterate through each entry in the db whose key has the prefix
// keyPrefix, and call `onEntry` with the bytes of the key (without the prefix)
// and the bytes of the value
func (gdb *GenericDB) Iterate(keyPrefix []byte, onEntry func([]byte, []byte) error) error {
iter := gdb.db.NewIterator(util.BytesPrefix(keyPrefix), nil)
defer iter.Release()

for iter.Next() {
key := iter.Key()[len(keyPrefix):]
err := onEntry(key, iter.Value())
if err != nil {
return err
}
}
return iter.Error()
}

// newDB creates/opens a leveldb persistent database at the given path.
// If no path is given, an in-memory, temporary database is constructed.
func NewDB(dbVersion int64, path string, logger log.Logger) (*leveldb.DB, error) {
if path == "" {
return NewMemoryDB()
}
return NewPersistentDB(dbVersion, path, logger)
}

// newMemoryDB creates a new in-memory node database without a persistent backend.
func NewMemoryDB() (*leveldb.DB, error) {
db, err := leveldb.Open(storage.NewMemStorage(), nil)
if err != nil {
return nil, err
}
return db, nil
}

// newPersistentNodeDB creates/opens a leveldb backed persistent database,
// also flushing its contents in case of a version mismatch.
func NewPersistentDB(dbVersion int64, path string, logger log.Logger) (*leveldb.DB, error) {
opts := &opt.Options{OpenFilesCacheCapacity: 5}
db, err := leveldb.OpenFile(path, opts)
if _, iscorrupted := err.(*lvlerrors.ErrCorrupted); iscorrupted {
db, err = leveldb.RecoverFile(path, nil)
}
if err != nil {
return nil, err
}
currentVer := make([]byte, binary.MaxVarintLen64)
currentVer = currentVer[:binary.PutVarint(currentVer, dbVersion)]

blob, err := db.Get([]byte(dbVersionKey), nil)
switch err {
case leveldb.ErrNotFound:
// Version not found (i.e. empty cache), insert it
if err := db.Put([]byte(dbVersionKey), currentVer, nil); err != nil {
db.Close()
return nil, err
}

case nil:
// Version present, flush if different
if !bytes.Equal(blob, currentVer) {
oldVersion, _ := binary.Varint(blob)
newVersion, _ := binary.Varint(currentVer)
logger.Info("DB version has changed. Creating a new leveldb.", "old version", oldVersion, "new version", newVersion)
db.Close()
if err = os.RemoveAll(path); err != nil {
return nil, err
}
return NewPersistentDB(dbVersion, path, logger)
}
}
return db, nil
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package enodes
package db

import (
"testing"
Expand All @@ -10,7 +10,7 @@ import (
type mockEntry struct{}

func TestUpsert(t *testing.T) {
vedb, err := newGenericDB(int64(0), "", log.New(), nil)
gdb, err := New(int64(0), "", log.New(), nil)
if err != nil {
t.Fatal("Failed to create DB")
}
Expand Down Expand Up @@ -38,7 +38,7 @@ func TestUpsert(t *testing.T) {
}

for i, testCase := range testCases {
onExistingEntryCalled, onNewEntryCalled, err := upsertEntry(vedb, testCase.ExistingEntry, testCase.NewEntry)
onExistingEntryCalled, onNewEntryCalled, err := upsertEntry(gdb, testCase.ExistingEntry, testCase.NewEntry)
if err != nil {
t.Fatal("Failed to upsert entry")
}
Expand All @@ -51,29 +51,29 @@ func TestUpsert(t *testing.T) {
}
}

func upsertEntry(vedb *genericDB, existingEntry *mockEntry, newEntry *mockEntry) (bool, bool, error) {
func upsertEntry(gdb *GenericDB, existingEntry *mockEntry, newEntry *mockEntry) (bool, bool, error) {
var (
onExistingEntryCalled bool
onNewEntryCalled bool
)

getExistingEntry := func(_ genericEntry) (genericEntry, error) {
getExistingEntry := func(_ GenericEntry) (GenericEntry, error) {
if existingEntry == nil {
return nil, leveldb.ErrNotFound
}
return existingEntry, nil
}
onExistingEntry := func(_ *leveldb.Batch, _ genericEntry, _ genericEntry) error {
onExistingEntry := func(_ *leveldb.Batch, _ GenericEntry, _ GenericEntry) error {
onExistingEntryCalled = true
return nil
}
onNewEntry := func(_ *leveldb.Batch, _ genericEntry) error {
onNewEntry := func(_ *leveldb.Batch, _ GenericEntry) error {
onNewEntryCalled = true
return nil
}

err := vedb.Upsert(
[]genericEntry{genericEntry(newEntry)},
err := gdb.Upsert(
[]GenericEntry{GenericEntry(newEntry)},
getExistingEntry,
onExistingEntry,
onNewEntry,
Expand Down
68 changes: 0 additions & 68 deletions consensus/istanbul/backend/internal/enodes/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,9 @@
package enodes

import (
"bytes"
"encoding/binary"
"errors"
"os"

"github.com/syndtr/goleveldb/leveldb"
lvlerrors "github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/storage"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
)

Expand All @@ -37,8 +28,6 @@ var (
)

const (
dbVersionKey = "version" // Version of the database to flush if changes

dbAddressPrefix = "address:" // Identifier to prefix node entries with
dbNodeIDPrefix = "nodeid:" // Identifier to prefix node entries with
)
Expand All @@ -50,60 +39,3 @@ func addressKey(address common.Address) []byte {
func nodeIDKey(nodeID enode.ID) []byte {
return append([]byte(dbNodeIDPrefix), nodeID.Bytes()...)
}

// newDB creates/opens a leveldb persistent database at the given path.
// If no path is given, an in-memory, temporary database is constructed.
func newDB(dbVersion int64, path string, logger log.Logger) (*leveldb.DB, error) {
if path == "" {
return newMemoryDB()
}
return newPersistentDB(int64(valEnodeDBVersion), path, logger)
}

// newMemoryDB creates a new in-memory node database without a persistent backend.
func newMemoryDB() (*leveldb.DB, error) {
db, err := leveldb.Open(storage.NewMemStorage(), nil)
if err != nil {
return nil, err
}
return db, nil
}

// newPersistentNodeDB creates/opens a leveldb backed persistent database,
// also flushing its contents in case of a version mismatch.
func newPersistentDB(dbVersion int64, path string, logger log.Logger) (*leveldb.DB, error) {
opts := &opt.Options{OpenFilesCacheCapacity: 5}
db, err := leveldb.OpenFile(path, opts)
if _, iscorrupted := err.(*lvlerrors.ErrCorrupted); iscorrupted {
db, err = leveldb.RecoverFile(path, nil)
}
if err != nil {
return nil, err
}
currentVer := make([]byte, binary.MaxVarintLen64)
currentVer = currentVer[:binary.PutVarint(currentVer, dbVersion)]

blob, err := db.Get([]byte(dbVersionKey), nil)
switch err {
case leveldb.ErrNotFound:
// Version not found (i.e. empty cache), insert it
if err := db.Put([]byte(dbVersionKey), currentVer, nil); err != nil {
db.Close()
return nil, err
}

case nil:
// Version present, flush if different
if !bytes.Equal(blob, currentVer) {
oldVersion, _ := binary.Varint(blob)
newVersion, _ := binary.Varint(currentVer)
logger.Info("DB version has changed. Creating a new leveldb.", "old version", oldVersion, "new version", newVersion)
db.Close()
if err = os.RemoveAll(path); err != nil {
return nil, err
}
return newPersistentDB(dbVersion, path, logger)
}
}
return db, nil
}
Loading

0 comments on commit c8dcba7

Please sign in to comment.