Skip to content

Commit

Permalink
triedb/pathdb: fix async node buffer diskroot mismatches when journal…
Browse files Browse the repository at this point in the history
…ing (#2083)

* triedb/pathdb: fix async node buffer diskroot mismatches when journaling

* triedb: check whether the async flush is done

* fix: generate new eth config

---------

Co-authored-by: VM <[email protected]>
  • Loading branch information
sysvm and VM authored Dec 26, 2023
1 parent ad09930 commit c255942
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 26 deletions.
6 changes: 6 additions & 0 deletions eth/ethconfig/gen_config.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

51 changes: 31 additions & 20 deletions trie/triedb/pathdb/asyncnodebuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/VictoriaMetrics/fastcache"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/crypto"
Expand All @@ -18,12 +19,14 @@ import (

var _ trienodebuffer = &asyncnodebuffer{}

// asyncnodebuffer implement trienodebuffer interface, and aysnc the nodecache
// asyncnodebuffer implement trienodebuffer interface, and async the nodecache
// to disk.
type asyncnodebuffer struct {
mux sync.RWMutex
current *nodecache
background *nodecache
mux sync.RWMutex
current *nodecache
background *nodecache
isFlushing atomic.Bool
stopFlushing atomic.Bool
}

// newAsyncNodeBuffer initializes the async node buffer with the provided nodes.
Expand Down Expand Up @@ -112,24 +115,21 @@ func (a *asyncnodebuffer) empty() bool {
return a.current.empty() && a.background.empty()
}

// setSize sets the buffer size to the provided number, and invokes a flush
// operation if the current memory usage exceeds the new limit.
//func (b *nodebuffer) setSize(size int, db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error {
// b.limit = uint64(size)
// return b.flush(db, clean, id, false)
//}

// flush persists the in-memory dirty trie node into the disk if the configured
// memory threshold is reached. Note, all data must be written atomically.
func (a *asyncnodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64, force bool) error {
a.mux.Lock()
defer a.mux.Unlock()

if a.stopFlushing.Load() {
return nil
}

if force {
for {
if atomic.LoadUint64(&a.background.immutable) == 1 {
time.Sleep(time.Duration(DefaultBackgroundFlushInterval) * time.Second)
log.Info("waiting background memory table flush to disk for force flush node buffer")
log.Info("waiting background memory table flushed into disk for forcing flush node buffer")
continue
}
atomic.StoreUint64(&a.current.immutable, 1)
Expand All @@ -149,26 +149,36 @@ func (a *asyncnodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache,
atomic.StoreUint64(&a.current.immutable, 1)
a.current, a.background = a.background, a.current

go func(persistId uint64) {
a.isFlushing.Store(true)
go func(persistID uint64) {
defer a.isFlushing.Store(false)
for {
err := a.background.flush(db, clean, persistId)
err := a.background.flush(db, clean, persistID)
if err == nil {
log.Debug("succeed to flush background nodecahce to disk", "state_id", persistId)
log.Debug("succeed to flush background nodecache to disk", "state_id", persistID)
return
}
log.Error("failed to flush background nodecahce to disk", "state_id", persistId, "error", err)
log.Error("failed to flush background nodecache to disk", "state_id", persistID, "error", err)
}
}(id)
return nil
}

func (a *asyncnodebuffer) waitAndStopFlushing() {
a.stopFlushing.Store(true)
for a.isFlushing.Load() {
time.Sleep(time.Second)
log.Warn("waiting background memory table flushed into disk")
}
}

func (a *asyncnodebuffer) getAllNodes() map[common.Hash]map[string]*trienode.Node {
a.mux.Lock()
defer a.mux.Unlock()

cached, err := a.current.merge(a.background)
if err != nil {
log.Crit("[BUG] failed to merge nodecache under revert asyncnodebuffer", "error", err)
log.Crit("[BUG] failed to merge node cache under revert async node buffer", "error", err)
}
return cached.nodes
}
Expand Down Expand Up @@ -226,6 +236,7 @@ func (nc *nodecache) commit(nodes map[common.Hash]map[string]*trienode.Node) err
if atomic.LoadUint64(&nc.immutable) == 1 {
return errWriteImmutable
}

var (
delta int64
overwrite int64
Expand Down Expand Up @@ -325,12 +336,12 @@ func (nc *nodecache) merge(nc1 *nodecache) (*nodecache, error) {
if nc == nil || nc.empty() {
res := copyNodeCache(nc1)
atomic.StoreUint64(&res.immutable, 0)
return nc1, nil
return res, nil
}
if nc1 == nil || nc1.empty() {
res := copyNodeCache(nc)
atomic.StoreUint64(&res.immutable, 0)
return nc, nil
return res, nil
}
if atomic.LoadUint64(&nc.immutable) == atomic.LoadUint64(&nc1.immutable) {
return nil, errIncompatibleMerge
Expand All @@ -350,7 +361,7 @@ func (nc *nodecache) merge(nc1 *nodecache) (*nodecache, error) {
}
res.size = immutable.size + mutable.size
res.layers = immutable.layers + mutable.layers
res.limit = immutable.size
res.limit = immutable.limit
res.nodes = make(map[common.Hash]map[string]*trienode.Node)
for acc, subTree := range immutable.nodes {
if _, ok := res.nodes[acc]; !ok {
Expand Down
3 changes: 3 additions & 0 deletions trie/triedb/pathdb/disklayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ type trienodebuffer interface {

// getLayers return the size of cached difflayers.
getLayers() uint64

// waitAndStopFlushing will block unit writing the trie nodes of trienodebuffer to disk.
waitAndStopFlushing()
}

func NewTrieNodeBuffer(sync bool, limit int, nodes map[common.Hash]map[string]*trienode.Node, layers uint64) trienodebuffer {
Expand Down
12 changes: 7 additions & 5 deletions trie/triedb/pathdb/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func (db *Database) loadDiffLayer(parent layer, r *rlp.Stream) (layer, error) {
}

// journal implements the layer interface, marshaling the un-flushed trie nodes
// along with layer meta data into provided byte buffer.
// along with layer metadata into provided byte buffer.
func (dl *diskLayer) journal(w io.Writer) error {
dl.lock.RLock()
defer dl.lock.RUnlock()
Expand Down Expand Up @@ -338,6 +338,10 @@ func (dl *diffLayer) journal(w io.Writer) error {
// flattening everything down (bad for reorgs). And this function will mark the
// database as read-only to prevent all following mutation to disk.
func (db *Database) Journal(root common.Hash) error {
// Run the journaling
db.lock.Lock()
defer db.lock.Unlock()

// Retrieve the head layer to journal from.
l := db.tree.get(root)
if l == nil {
Expand All @@ -351,10 +355,8 @@ func (db *Database) Journal(root common.Hash) error {
}
start := time.Now()

// Run the journaling
db.lock.Lock()
defer db.lock.Unlock()

// wait and stop the flush trienodebuffer, for asyncnodebuffer need fixed diskroot
disk.buffer.waitAndStopFlushing()
// Short circuit if the database is in read only mode.
if db.readOnly {
return errSnapshotReadOnly
Expand Down
4 changes: 3 additions & 1 deletion trie/triedb/pathdb/nodebuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (b *nodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id ui
start = time.Now()
// Although the calculation of b.size has been as accurate as possible,
// some omissions were still found during testing and code review, but
// we are still not sure it is completely accurate. For better protection,
// we are still not sure if it is completely accurate. For better protection,
// some redundancy is added here.
batch = db.NewBatchWithSize(int(float64(b.size) * DefaultBatchRedundancyRate))
)
Expand All @@ -241,6 +241,8 @@ func (b *nodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id ui
return nil
}

func (b *nodebuffer) waitAndStopFlushing() {}

// writeNodes writes the trie nodes into the provided database batch.
// Note this function will also inject all the newly written nodes
// into clean cache.
Expand Down

0 comments on commit c255942

Please sign in to comment.