Skip to content

Commit

Permalink
triedb: check whether the async flush is done
Browse files Browse the repository at this point in the history
  • Loading branch information
VM committed Dec 22, 2023
1 parent 72d8c83 commit d4f4db1
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 34 deletions.
48 changes: 21 additions & 27 deletions trie/triedb/pathdb/asyncnodebuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/VictoriaMetrics/fastcache"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/crypto"
Expand All @@ -18,14 +19,14 @@ import (

var _ trienodebuffer = &asyncnodebuffer{}

// asyncnodebuffer implement trienodebuffer interface, and aysnc the nodecache
// asyncnodebuffer implement trienodebuffer interface, and async the nodecache
// to disk.
type asyncnodebuffer struct {
mux sync.RWMutex
current *nodecache
background *nodecache
stopFlushing uint64
flushing uint64
isFlushing atomic.Bool
stopFlushing atomic.Bool
}

// newAsyncNodeBuffer initializes the async node buffer with the provided nodes.
Expand Down Expand Up @@ -114,28 +115,21 @@ func (a *asyncnodebuffer) empty() bool {
return a.current.empty() && a.background.empty()
}

// setSize sets the buffer size to the provided number, and invokes a flush
// operation if the current memory usage exceeds the new limit.
//func (b *nodebuffer) setSize(size int, db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error {
// b.limit = uint64(size)
// return b.flush(db, clean, id, false)
//}

// flush persists the in-memory dirty trie node into the disk if the configured
// memory threshold is reached. Note, all data must be written atomically.
func (a *asyncnodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64, force bool) error {
a.mux.Lock()
defer a.mux.Unlock()

if atomic.LoadUint64(&a.stopFlushing) == 1 {
if a.stopFlushing.Load() {
return nil
}

if force {
for {
if atomic.LoadUint64(&a.background.immutable) == 1 {
time.Sleep(time.Duration(DefaultBackgroundFlushInterval) * time.Second)
log.Info("waiting background memory table flush to disk for force flush node buffer")
log.Info("waiting background memory table flushed into disk for forcing flush node buffer")
continue
}
atomic.StoreUint64(&a.current.immutable, 1)
Expand All @@ -155,28 +149,27 @@ func (a *asyncnodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache,
atomic.StoreUint64(&a.current.immutable, 1)
a.current, a.background = a.background, a.current

atomic.StoreUint64(&a.flushing, 1)
go func(persistId uint64) {
defer atomic.StoreUint64(&a.flushing, 0)
a.isFlushing.Store(true)
go func(persistID uint64) {
defer a.isFlushing.Store(false)
for {
err := a.background.flush(db, clean, persistId)
err := a.background.flush(db, clean, persistID)
if err == nil {
log.Debug("succeed to flush background nodecahce to disk", "state_id", persistId)
log.Debug("succeed to flush background nodecache to disk", "state_id", persistID)
return
}
log.Error("failed to flush background nodecahce to disk", "state_id", persistId, "error", err)
log.Error("failed to flush background nodecache to disk", "state_id", persistID, "error", err)
}
}(id)
return nil
}

func (a *asyncnodebuffer) waitAndStopFlushing() {
atomic.StoreUint64(&a.stopFlushing, 1)
if atomic.LoadUint64(&a.flushing) == 1 {
time.Sleep(time.Duration(1) * time.Second)
log.Info("waiting background memory table flush to disk")
a.stopFlushing.Store(true)
for a.isFlushing.Load() {
time.Sleep(time.Second)
log.Warn("waiting background memory table flushed into disk")
}
return
}

func (a *asyncnodebuffer) getAllNodes() map[common.Hash]map[string]*trienode.Node {
Expand All @@ -185,7 +178,7 @@ func (a *asyncnodebuffer) getAllNodes() map[common.Hash]map[string]*trienode.Nod

cached, err := a.current.merge(a.background)
if err != nil {
log.Crit("[BUG] failed to merge nodecache under revert asyncnodebuffer", "error", err)
log.Crit("[BUG] failed to merge node cache under revert async node buffer", "error", err)
}
return cached.nodes
}
Expand Down Expand Up @@ -243,6 +236,7 @@ func (nc *nodecache) commit(nodes map[common.Hash]map[string]*trienode.Node) err
if atomic.LoadUint64(&nc.immutable) == 1 {
return errWriteImmutable
}

var (
delta int64
overwrite int64
Expand Down Expand Up @@ -342,12 +336,12 @@ func (nc *nodecache) merge(nc1 *nodecache) (*nodecache, error) {
if nc == nil || nc.empty() {
res := copyNodeCache(nc1)
atomic.StoreUint64(&res.immutable, 0)
return nc1, nil
return res, nil
}
if nc1 == nil || nc1.empty() {
res := copyNodeCache(nc)
atomic.StoreUint64(&res.immutable, 0)
return nc, nil
return res, nil
}
if atomic.LoadUint64(&nc.immutable) == atomic.LoadUint64(&nc1.immutable) {
return nil, errIncompatibleMerge
Expand All @@ -367,7 +361,7 @@ func (nc *nodecache) merge(nc1 *nodecache) (*nodecache, error) {
}
res.size = immutable.size + mutable.size
res.layers = immutable.layers + mutable.layers
res.limit = immutable.size
res.limit = immutable.limit
res.nodes = make(map[common.Hash]map[string]*trienode.Node)
for acc, subTree := range immutable.nodes {
if _, ok := res.nodes[acc]; !ok {
Expand Down
2 changes: 1 addition & 1 deletion trie/triedb/pathdb/disklayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type trienodebuffer interface {
// getLayers return the size of cached difflayers.
getLayers() uint64

// waitFlushing will block unit writing the trie nodes of trienodebuffer to disk
// waitAndStopFlushing will block unit writing the trie nodes of trienodebuffer to disk.
waitAndStopFlushing()
}

Expand Down
4 changes: 2 additions & 2 deletions trie/triedb/pathdb/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func (db *Database) loadDiffLayer(parent layer, r *rlp.Stream) (layer, error) {
}

// journal implements the layer interface, marshaling the un-flushed trie nodes
// along with layer meta data into provided byte buffer.
// along with layer metadata into provided byte buffer.
func (dl *diskLayer) journal(w io.Writer) error {
dl.lock.RLock()
defer dl.lock.RUnlock()
Expand Down Expand Up @@ -355,7 +355,7 @@ func (db *Database) Journal(root common.Hash) error {
}
start := time.Now()

// wait and stop the flush trienodebuffer, for async node buffer need fixed diskroot
// wait and stop the flush trienodebuffer, for asyncnodebuffer need fixed diskroot
disk.buffer.waitAndStopFlushing()
// Short circuit if the database is in read only mode.
if db.readOnly {
Expand Down
6 changes: 2 additions & 4 deletions trie/triedb/pathdb/nodebuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (b *nodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id ui
start = time.Now()
// Although the calculation of b.size has been as accurate as possible,
// some omissions were still found during testing and code review, but
// we are still not sure it is completely accurate. For better protection,
// we are still not sure if it is completely accurate. For better protection,
// some redundancy is added here.
batch = db.NewBatchWithSize(int(float64(b.size) * DefaultBatchRedundancyRate))
)
Expand All @@ -241,9 +241,7 @@ func (b *nodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id ui
return nil
}

func (b *nodebuffer) waitAndStopFlushing() {
return
}
func (b *nodebuffer) waitAndStopFlushing() {}

// writeNodes writes the trie nodes into the provided database batch.
// Note this function will also inject all the newly written nodes
Expand Down

0 comments on commit d4f4db1

Please sign in to comment.