diff --git a/cmd/sentry/download/sentry.go b/cmd/sentry/download/sentry.go index 4faf08a4283..738913d0de0 100644 --- a/cmd/sentry/download/sentry.go +++ b/cmd/sentry/download/sentry.go @@ -19,6 +19,7 @@ import ( "github.com/golang/protobuf/ptypes/empty" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" + //grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/holiman/uint256" "github.com/ledgerwatch/erigon-lib/gointerfaces" @@ -552,6 +553,7 @@ func Sentry(datadir string, sentryAddr string, discoveryDNS []string, cfg *p2p.C sentryServer.discoveryDNS = discoveryDNS <-ctx.Done() + sentryServer.Close() return nil } @@ -946,6 +948,13 @@ func (ss *SentryServerImpl) Messages(req *proto_sentry.MessagesRequest, server p } } +// Close performs cleanup operations for the sentry +func (ss *SentryServerImpl) Close() { + if ss.P2pServer != nil { + ss.P2pServer.Stop() + } +} + // MessageStreams - it's safe to use this class as non-pointer type MessageStreams struct { mu sync.RWMutex diff --git a/eth/backend.go b/eth/backend.go index e80649c6c60..92b691535d0 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -721,5 +721,8 @@ func (s *Ethereum) Stop() error { if s.config.Miner.Enabled { <-s.waitForMiningStop } + for _, sentryServer := range s.sentryServers { + sentryServer.Close() + } return nil } diff --git a/p2p/enode/nodedb.go b/p2p/enode/nodedb.go index 56f2ea506c0..6e491dcce73 100644 --- a/p2p/enode/nodedb.go +++ b/p2p/enode/nodedb.go @@ -28,6 +28,8 @@ import ( "sync" "time" + "github.com/google/btree" + "github.com/c2h5oh/datasize" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/mdbx" @@ -73,9 +75,23 @@ var zeroIP = make(net.IP, 16) // DB is the node database, storing previously seen nodes and any collected metadata about // them for QoS purposes. type DB struct { - kv kv.RwDB // Interface to the database itself - runner sync.Once // Ensures we can start at most one expirer - quit chan struct{} // Channel to signal the expiring thread to stop + kvCache *btree.BTree + kvCacheLock sync.RWMutex + path string // Remember path for log messages + kv kv.RwDB // Interface to the database itself + runner sync.Once // Ensures we can start at most one expirer + quit chan struct{} // Channel to signal the expiring thread to stop +} + +// DbItem is type of items stored in the kvCache's btrees +type DbItem struct { + key []byte + val []byte +} + +func (di *DbItem) Less(than btree.Item) bool { + i := than.(*DbItem) + return bytes.Compare(di.key, i.key) < 0 } // OpenDB opens a node database for storing and retrieving infos about known peers in the @@ -102,6 +118,7 @@ func newMemoryDB(logger log.Logger) (*DB, error) { if err != nil { return nil, err } + db.kvCache = btree.New(32) return db, nil } @@ -146,7 +163,7 @@ func newPersistentDB(logger log.Logger, path string) (*DB, error) { } return newPersistentDB(logger, path) } - return &DB{kv: db, quit: make(chan struct{})}, nil + return &DB{path: path, kvCache: btree.New(32), kv: db, quit: make(chan struct{})}, nil } // nodeKey returns the database key for a node record. @@ -213,9 +230,54 @@ func localItemKey(id ID, field string) []byte { return key } +func (db *DB) getFromCache(key []byte) []byte { + db.kvCacheLock.RLock() + defer db.kvCacheLock.RUnlock() + i := db.kvCache.Get(&DbItem{key: key}) + if i != nil { + di := i.(*DbItem) + return di.val + } + return nil +} + +func (db *DB) setToCache(key, val []byte) { + db.kvCacheLock.Lock() + defer db.kvCacheLock.Unlock() + db.kvCache.ReplaceOrInsert(&DbItem{key: key, val: val}) + if db.kvCache.Len() > 16*1024 { + db.commitCache(false /* logit */) + } +} + +func (db *DB) searchCache(key []byte) (foundKey, foundVal, nextKey []byte) { + if key == nil { + return nil, nil, nil + } + db.kvCacheLock.RLock() + defer db.kvCacheLock.RUnlock() + db.kvCache.AscendGreaterOrEqual(&DbItem{key: key}, func(i btree.Item) bool { + di := i.(*DbItem) + if foundKey == nil { + foundKey = di.key + foundVal = di.val + return true + } + nextKey = di.key + return false + }) + return +} + // fetchInt64 retrieves an integer associated with a particular key. func (db *DB) fetchInt64(key []byte) int64 { var val int64 + if blob := db.getFromCache(key); blob != nil { + if v, read := binary.Varint(blob); read > 0 { + return v + } + return 0 + } if err := db.kv.View(context.Background(), func(tx kv.Tx) error { blob, errGet := tx.GetOne(kv.Inodes, key) if errGet != nil { @@ -238,14 +300,17 @@ func (db *DB) fetchInt64(key []byte) int64 { func (db *DB) storeInt64(key []byte, n int64) error { blob := make([]byte, binary.MaxVarintLen64) blob = blob[:binary.PutVarint(blob, n)] - return db.kv.Update(context.Background(), func(tx kv.RwTx) error { - return tx.Put(kv.Inodes, common.CopyBytes(key), blob) - }) + db.setToCache(common.CopyBytes(key), blob) + return nil } // fetchUint64 retrieves an integer associated with a particular key. func (db *DB) fetchUint64(key []byte) uint64 { var val uint64 + if blob := db.getFromCache(key); blob != nil { + val, _ = binary.Uvarint(blob) + return val + } if err := db.kv.View(context.Background(), func(tx kv.Tx) error { blob, errGet := tx.GetOne(kv.Inodes, key) if errGet != nil { @@ -265,26 +330,28 @@ func (db *DB) fetchUint64(key []byte) uint64 { func (db *DB) storeUint64(key []byte, n uint64) error { blob := make([]byte, binary.MaxVarintLen64) blob = blob[:binary.PutUvarint(blob, n)] - return db.kv.Update(context.Background(), func(tx kv.RwTx) error { - return tx.Put(kv.Inodes, common.CopyBytes(key), blob) - }) + db.setToCache(common.CopyBytes(key), blob) + return nil } // Node retrieves a node with a given id from the database. func (db *DB) Node(id ID) *Node { var blob []byte - if err := db.kv.View(context.Background(), func(tx kv.Tx) error { - v, errGet := tx.GetOne(kv.Inodes, nodeKey(id)) - if errGet != nil { - return errGet - } - if v != nil { - blob = make([]byte, len(v)) - copy(blob, v) + blob = db.getFromCache(nodeKey(id)) + if blob == nil { + if err := db.kv.View(context.Background(), func(tx kv.Tx) error { + v, errGet := tx.GetOne(kv.Inodes, nodeKey(id)) + if errGet != nil { + return errGet + } + if v != nil { + blob = make([]byte, len(v)) + copy(blob, v) + } + return nil + }); err != nil { + return nil } - return nil - }); err != nil { - return nil } if blob == nil { return nil @@ -311,11 +378,7 @@ func (db *DB) UpdateNode(node *Node) error { if err != nil { return err } - if err := db.kv.Update(context.Background(), func(tx kv.RwTx) error { - return tx.Put(kv.Inodes, nodeKey(node.ID()), blob) - }); err != nil { - return err - } + db.setToCache(nodeKey(node.ID()), blob) return db.storeUint64(nodeItemKey(node.ID(), zeroIP, dbNodeSeq), node.Seq()) } @@ -335,12 +398,24 @@ func (db *DB) Resolve(n *Node) *Node { // DeleteNode deletes all information associated with a node. func (db *DB) DeleteNode(id ID) { - deleteRange(db.kv, nodeKey(id)) + deleteRange(db, nodeKey(id)) } -func deleteRange(db kv.RwDB, prefix []byte) { - if err := db.Update(context.Background(), func(tx kv.RwTx) error { - c, err := tx.RwCursor(kv.Inodes) +func deleteRange(db *DB, prefix []byte) { + db.kvCacheLock.Lock() + defer db.kvCacheLock.Unlock() + // First delete relevant entries from the cache + db.kvCache.AscendGreaterOrEqual(&DbItem{key: prefix}, func(i btree.Item) bool { + di := i.(*DbItem) + if !bytes.HasPrefix(di.key, prefix) { + return false + } + di.val = nil // Mark for deletion + return true + }) + // Now mark all other entries for deletion + if err := db.kv.View(context.Background(), func(tx kv.Tx) error { + c, err := tx.Cursor(kv.Inodes) if err != nil { return err } @@ -348,8 +423,9 @@ func deleteRange(db kv.RwDB, prefix []byte) { if err != nil { return err } - if err := c.Delete(k, nil); err != nil { - return nil + if f := db.kvCache.Get(&DbItem{key: k}); f == nil { + // Only copy key if item is missing in the cache + db.kvCache.ReplaceOrInsert(&DbItem{key: common.CopyBytes(k), val: nil}) } } return nil @@ -402,7 +478,8 @@ func (db *DB) expireNodes() { p := []byte(dbNodePrefix) var prevId ID var empty = true - for k, v, err := c.Seek(p); bytes.HasPrefix(k, p); k, v, err = c.Next() { + ci := cachedIter{c: c, db: db} + for k, v, err := ci.Seek(p); bytes.HasPrefix(k, p); k, v, err = ci.Next() { if err != nil { return err } @@ -437,7 +514,7 @@ func (db *DB) expireNodes() { log.Warn("nodeDB.expireNodes failed", "err", err) } for _, td := range toDelete { - deleteRange(db.kv, td) + deleteRange(db, td) } } @@ -518,6 +595,72 @@ func (db *DB) storeLocalSeq(id ID, n uint64) { db.storeUint64(localItemKey(id, dbLocalSeq), n) } +type cachedIter struct { + c kv.Cursor + db *DB + cKey, cVal []byte + cacheKey, cacheVal, cacheNextKey []byte +} + +func (ci *cachedIter) Seek(searchKey []byte) (k, v []byte, err error) { + ci.cKey, ci.cVal, err = ci.c.Seek(searchKey) + if err != nil { + return nil, nil, err + } + ci.cacheKey, ci.cacheVal, ci.cacheNextKey = ci.db.searchCache(searchKey) + return ci.Next() +} + +func (ci *cachedIter) Next() (k, v []byte, err error) { + for { + if ci.cKey == nil && ci.cacheKey == nil { + k = nil + v = nil + return + } + if ci.cKey == nil { + k = ci.cacheKey + v = ci.cacheVal + ci.cacheKey, ci.cacheVal, ci.cacheNextKey = ci.db.searchCache(ci.cacheNextKey) + if v != nil { + // if v == nil, it is deleted entry and we try the next record + return + } + continue + } + if ci.cacheKey == nil { + k = ci.cKey + v = ci.cVal + ci.cKey, ci.cVal, err = ci.c.Next() + return + } + switch bytes.Compare(ci.cKey, ci.cacheKey) { + case -1: + k = ci.cKey + v = ci.cVal + ci.cKey, ci.cVal, err = ci.c.Next() + return + case 0: + k = ci.cacheKey + v = ci.cacheVal + ci.cacheKey, ci.cacheVal, ci.cacheNextKey = ci.db.searchCache(ci.cacheNextKey) + ci.cKey, ci.cVal, err = ci.c.Next() + if v != nil { + // if v == nil, it is deleted entry and we try the next record + return + } + case 1: + k = ci.cacheKey + v = ci.cacheVal + ci.cacheKey, ci.cacheVal, ci.cacheNextKey = ci.db.searchCache(ci.cacheNextKey) + if v != nil { + // if v == nil, it is deleted entry and we try the next record + return + } + } + } +} + // QuerySeeds retrieves random nodes to be used as potential seed nodes // for bootstrapping. func (db *DB) QuerySeeds(n int, maxAge time.Duration) []*Node { @@ -532,6 +675,7 @@ func (db *DB) QuerySeeds(n int, maxAge time.Duration) []*Node { if err != nil { return err } + ci := &cachedIter{db: db, c: c} seek: for seeks := 0; len(nodes) < n && seeks < n*5; seeks++ { // Seek to a random entry. The first byte is incremented by a @@ -541,7 +685,7 @@ func (db *DB) QuerySeeds(n int, maxAge time.Duration) []*Node { rand.Read(id[:]) id[0] = ctr + id[0]%16 var n *Node - for k, v, err := c.Seek(nodeKey(id)); k != nil && n == nil; k, v, err = c.Next() { + for k, v, err := ci.Seek(nodeKey(id)); k != nil && n == nil; k, v, err = ci.Next() { if err != nil { return err } @@ -557,9 +701,12 @@ func (db *DB) QuerySeeds(n int, maxAge time.Duration) []*Node { db.ensureExpirer() pongKey := nodeItemKey(n.ID(), n.IP(), dbNodePong) var lastPongReceived int64 - blob, errGet := tx.GetOne(kv.Inodes, pongKey) - if errGet != nil { - return errGet + blob := db.getFromCache(pongKey) + if blob == nil { + var errGet error + if blob, errGet = tx.GetOne(kv.Inodes, pongKey); errGet != nil { + return errGet + } } if blob != nil { if v, read := binary.Varint(blob); read > 0 { @@ -583,6 +730,42 @@ func (db *DB) QuerySeeds(n int, maxAge time.Duration) []*Node { return nodes } +func (db *DB) commitCache(logit bool) { + db.kvCacheLock.Lock() + defer db.kvCacheLock.Unlock() + entriesUpdated := 0 + entriesDeleted := 0 + if err := db.kv.Update(context.Background(), func(tx kv.RwTx) error { + c, err := tx.RwCursor(kv.Inodes) + if err != nil { + return err + } + db.kvCache.Ascend(func(i btree.Item) bool { + di := i.(*DbItem) + if di.val == nil { + if err = c.Delete(di.key, nil); err != nil { + return false + } + entriesUpdated++ + } else { + if err = c.Put(di.key, di.val); err != nil { + return false + } + entriesDeleted++ + } + return true + }) + return err + }); err != nil { + log.Warn("p2p node database update failed", "path", db.path, "err", err) + } else { + if logit { + log.Info("Successfully update p2p node database", "path", db.path, "updated", entriesUpdated, "deleted", entriesDeleted) + } + db.kvCache.Clear(true) + } +} + // close flushes and closes the database files. func (db *DB) Close() { if db.quit == nil { @@ -590,5 +773,6 @@ func (db *DB) Close() { } close(db.quit) db.quit = nil + db.commitCache(true /* logit */) db.kv.Close() }