Skip to content

Commit

Permalink
feat: add async flush nodebuffer in path schema
Browse files Browse the repository at this point in the history
  • Loading branch information
joeylichang committed Oct 31, 2023
1 parent cf15dff commit 6ab199b
Show file tree
Hide file tree
Showing 16 changed files with 589 additions and 61 deletions.
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ var (
utils.TransactionHistoryFlag,
utils.StateSchemeFlag,
utils.StateHistoryFlag,
utils.PathDBSyncFlag,
utils.LightServeFlag,
utils.LightIngressFlag,
utils.LightEgressFlag,
Expand Down
9 changes: 9 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,12 @@ var (
Value: rawdb.HashScheme,
Category: flags.StateCategory,
}
PathDBSyncFlag = &cli.BoolFlag{
Name: "pathdb.sync",
Usage: "sync flush nodes cache to disk in path schema",
Value: false,
Category: flags.StateCategory,
}
StateHistoryFlag = &cli.Uint64Flag{
Name: "history.state",
Usage: "Number of recent blocks to retain state history for (default = 90,000 blocks, 0 = entire chain)",
Expand Down Expand Up @@ -1951,6 +1957,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
log.Warn("The flag --txlookuplimit is deprecated and will be removed, please use --history.transactions")
cfg.TransactionHistory = ctx.Uint64(TransactionHistoryFlag.Name)
}
if ctx.IsSet(PathDBSyncFlag.Name) {
cfg.PathSyncFlush = true
}
if ctx.String(GCModeFlag.Name) == "archive" && cfg.TransactionHistory != 0 {
cfg.TransactionHistory = 0
log.Warn("Disabled transaction unindexing for archive node")
Expand Down
12 changes: 7 additions & 5 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ type CacheConfig struct {
NoTries bool // Insecure settings. Do not have any tries in databases if enabled.
StateHistory uint64 // Number of blocks from head whose state histories are reserved.
StateScheme string // Scheme used to store ethereum states and merkle tree nodes on top
PathSyncFlush bool // Whether sync flush the trienodebuffer of pathdb to disk.

SnapshotNoBuild bool // Whether the background generation is allowed
SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it
Expand All @@ -174,6 +175,7 @@ func (c *CacheConfig) triedbConfig() *trie.Config {
}
if c.StateScheme == rawdb.PathScheme {
config.PathDB = &pathdb.Config{
SyncFlush: c.PathSyncFlush,
StateHistory: c.StateHistory,
CleanCacheSize: c.TrieCleanLimit * 1024 * 1024,
DirtyCacheSize: c.TrieDirtyLimit * 1024 * 1024,
Expand Down Expand Up @@ -1200,7 +1202,7 @@ func (bc *BlockChain) Stop() {
for !bc.triegc.Empty() {
triedb.Dereference(bc.triegc.PopItem())
}
if size, _ := triedb.Size(); size != 0 {
if _, size, _, _ := triedb.Size(); size != 0 {
log.Error("Dangling trie nodes after full cleanup")
}
}
Expand Down Expand Up @@ -1604,8 +1606,8 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
if current := block.NumberU64(); current > bc.triesInMemory {
// If we exceeded our memory allowance, flush matured singleton nodes to disk
var (
nodes, imgs = triedb.Size()
limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024
_, nodes, _, imgs = triedb.Size()
limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024
)
if nodes > limit || imgs > 4*1024*1024 {
triedb.Cap(limit - ethdb.IdealBatchSize)
Expand Down Expand Up @@ -2089,8 +2091,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
stats.processed++
stats.usedGas += usedGas

dirty, _ := bc.triedb.Size()
stats.report(chain, it.index, dirty, setHead)
trieDiffNodes, trieBufNodes, trieImmutableBufNodes, _ := bc.triedb.Size()
stats.report(chain, it.index, trieDiffNodes, trieBufNodes, trieImmutableBufNodes, setHead)

if !setHead {
// After merge we expect few side chains. Simply count
Expand Down
10 changes: 8 additions & 2 deletions core/blockchain_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const statsReportLimit = 8 * time.Second

// report prints statistics if some number of blocks have been processed
// or more than a few seconds have passed since the last message.
func (st *insertStats) report(chain []*types.Block, index int, dirty common.StorageSize, setHead bool) {
func (st *insertStats) report(chain []*types.Block, index int, trieDiffNodes, trieBufNodes, trieImmutableBufNodes common.StorageSize, setHead bool) {
// Fetch the timings for the batch
var (
now = mclock.Now()
Expand All @@ -63,7 +63,13 @@ func (st *insertStats) report(chain []*types.Block, index int, dirty common.Stor
if timestamp := time.Unix(int64(end.Time()), 0); time.Since(timestamp) > time.Minute {
context = append(context, []interface{}{"age", common.PrettyAge(timestamp)}...)
}
context = append(context, []interface{}{"dirty", dirty}...)
if trieDiffNodes != 0 { // pathdb
context = append(context, []interface{}{"triediffs", trieDiffNodes}...)
context = append(context, []interface{}{"triedirty", trieBufNodes}...)
context = append(context, []interface{}{"trieimutabledirty", trieImmutableBufNodes}...)
} else {
context = append(context, []interface{}{"triedirty", trieBufNodes}...)
}

if st.queued > 0 {
context = append(context, []interface{}{"queued", st.queued}...)
Expand Down
3 changes: 2 additions & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
// dirty cache to the clean cache.
if config.StateScheme == rawdb.PathScheme && config.TrieDirtyCache > pathdb.MaxDirtyBufferSize/1024/1024 {
log.Info("Capped dirty cache size", "provided", common.StorageSize(config.TrieDirtyCache)*1024*1024, "adjusted", common.StorageSize(pathdb.MaxDirtyBufferSize))
config.TrieCleanCache += config.TrieDirtyCache - pathdb.MaxDirtyBufferSize/1024/1024
log.Info("Clean cache size", "provided", common.StorageSize(config.TrieCleanCache)*1024*1024)
config.TrieDirtyCache = pathdb.MaxDirtyBufferSize / 1024 / 1024
}
log.Info("Allocated trie memory caches", "clean", common.StorageSize(config.TrieCleanCache)*1024*1024, "dirty", common.StorageSize(config.TrieDirtyCache)*1024*1024)
Expand Down Expand Up @@ -225,6 +225,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
Preimages: config.Preimages,
StateHistory: config.StateHistory,
StateScheme: config.StateScheme,
PathSyncFlush: config.PathSyncFlush,
}
)
bcOps := make([]core.BlockChainOption, 0)
Expand Down
1 change: 1 addition & 0 deletions eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ type Config struct {
TransactionHistory uint64 `toml:",omitempty"` // The maximum number of blocks from head whose tx indices are reserved.
StateHistory uint64 `toml:",omitempty"` // The maximum number of blocks from head whose state histories are reserved.
StateScheme string `toml:",omitempty"` // State scheme used to store ethereum state and merkle trie nodes on top
PathSyncFlush bool `toml:",omitempty"` // State scheme used to store ethereum state and merkle trie nodes on top

// RequiredBlocks is a set of block number -> hash mappings which must be in the
// canonical chain of all remote peers. Setting the option makes geth verify the
Expand Down
4 changes: 2 additions & 2 deletions eth/state_accessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@ func (eth *Ethereum) hashState(ctx context.Context, block *types.Block, reexec u
parent = root
}
if report {
nodes, imgs := triedb.Size()
log.Info("Historical state regenerated", "block", current.NumberU64(), "elapsed", time.Since(start), "nodes", nodes, "preimages", imgs)
diff, nodes, immutablenodes, imgs := triedb.Size()
log.Info("Historical state regenerated", "block", current.NumberU64(), "elapsed", time.Since(start), "layer", diff, "nodes", nodes, "immutablenodes", immutablenodes, "preimages", imgs)
}
return statedb, func() { triedb.Dereference(block.Root()) }, nil
}
Expand Down
4 changes: 2 additions & 2 deletions eth/tracers/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,8 @@ func (api *API) traceChain(start, end *types.Block, config *TraceConfig, closed
// if the relevant state is available in disk.
var preferDisk bool
if statedb != nil {
s1, s2 := statedb.Database().TrieDB().Size()
preferDisk = s1+s2 > defaultTracechainMemLimit
s1, s2, s3, s4 := statedb.Database().TrieDB().Size()
preferDisk = s1+s2+s3+s4 > defaultTracechainMemLimit
}
statedb, release, err = api.backend.StateAtBlock(ctx, block, reexec, statedb, false, preferDisk)
if err != nil {
Expand Down
12 changes: 6 additions & 6 deletions trie/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type backend interface {

// Size returns the current storage size of the memory cache in front of the
// persistent database layer.
Size() common.StorageSize
Size() (common.StorageSize, common.StorageSize, common.StorageSize)

// Update performs a state transition by committing dirty nodes contained
// in the given set in order to update state from the specified parent to
Expand Down Expand Up @@ -207,16 +207,16 @@ func (db *Database) Commit(root common.Hash, report bool) error {

// Size returns the storage size of dirty trie nodes in front of the persistent
// database and the size of cached preimages.
func (db *Database) Size() (common.StorageSize, common.StorageSize) {
func (db *Database) Size() (common.StorageSize, common.StorageSize, common.StorageSize, common.StorageSize) {
var (
storages common.StorageSize
preimages common.StorageSize
diffs, nodes, immutablenodes common.StorageSize
preimages common.StorageSize
)
storages = db.backend.Size()
diffs, nodes, immutablenodes = db.backend.Size()
if db.preimages != nil {
preimages = db.preimages.size()
}
return storages, preimages
return diffs, nodes, immutablenodes, preimages
}

// Initialized returns an indicator if the state data is already initialized
Expand Down
4 changes: 2 additions & 2 deletions trie/triedb/hashdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,15 +641,15 @@ func (db *Database) Update(root common.Hash, parent common.Hash, block uint64, n

// Size returns the current storage size of the memory cache in front of the
// persistent database layer.
func (db *Database) Size() common.StorageSize {
func (db *Database) Size() (common.StorageSize, common.StorageSize, common.StorageSize) {
db.lock.RLock()
defer db.lock.RUnlock()

// db.dirtiesSize only contains the useful data in the cache, but when reporting
// the total memory consumption, the maintenance metadata is also needed to be
// counted.
var metadataSize = common.StorageSize(len(db.dirties) * cachedNodeSize)
return db.dirtiesSize + db.childrenSize + metadataSize
return 0, db.dirtiesSize + db.childrenSize + metadataSize, 0
}

// Close closes the trie database and releases all held resources.
Expand Down
Loading

0 comments on commit 6ab199b

Please sign in to comment.