Skip to content

Commit

Permalink
pevm opt: fallback to sequencial processor when the TxDAG's depth of …
Browse files Browse the repository at this point in the history
…dependencies is too deep
  • Loading branch information
andyzhang2023 committed Jan 10, 2025
1 parent d60b170 commit 746438b
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 28 deletions.
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ var (
utils.ParallelTxDAGFlag,
utils.ParallelTxDAGFileFlag,
utils.ParallelTxDAGSenderPrivFlag,
utils.ParallelTxDATMaxDepthRatioFlag,
configFileFlag,
utils.LogDebugFlag,
utils.LogBacktraceAtFlag,
Expand Down
11 changes: 11 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -1136,6 +1136,13 @@ Please note that --` + MetricsHTTPFlag.Name + ` must be set to start the server.
Category: flags.VMCategory,
}

ParallelTxDATMaxDepthRatioFlag = &cli.Float64Flag{
Name: "parallel.txdag-max-depth-ratio",
Usage: "A ratio to decide whether or not to execute transactions in parallel, it will fallback to sequencial processor if the depth is larger than this value (default = 0.9)",
Value: 0.9,
Category: flags.VMCategory,
}

VMOpcodeOptimizeFlag = &cli.BoolFlag{
Name: "vm.opcode.optimize",
Usage: "enable opcode optimization",
Expand Down Expand Up @@ -2057,6 +2064,10 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
cfg.ParallelTxDAGFile = ctx.String(ParallelTxDAGFileFlag.Name)
}

if ctx.IsSet(ParallelTxDATMaxDepthRatioFlag.Name) {
cfg.ParallelTxDAGMaxDepthRatio = ctx.Float64(ParallelTxDATMaxDepthRatioFlag.Name)
}

if ctx.IsSet(ParallelTxDAGSenderPrivFlag.Name) {
priHex := ctx.String(ParallelTxDAGSenderPrivFlag.Name)
if cfg.Miner.ParallelTxDAGSenderPriv, err = crypto.HexToECDSA(priHex); err != nil {
Expand Down
16 changes: 14 additions & 2 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1971,6 +1971,10 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
)

blockProcessedInParallel := false
var (
tooDeep bool
depth int
)
// skip block process if we already have the state, receipts and logs from mining work
if !(receiptExist && logExist && stateExist) {
// Retrieve the parent block and it's state to execute on top
Expand Down Expand Up @@ -2016,10 +2020,16 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)

statedb.SetExpectedStateRoot(block.Root())

// findout whether or not the dependencies of the block are too deep to be processed
// if the dependencies are too deep, we will fallback to serial processing
txCount := len(block.Transactions())
_, depth = BuildTxLevels(txCount, bc.vmConfig.TxDAG)
tooDeep = float64(depth)/float64(txCount) > bc.vmConfig.TxDAGMaxDepthRatio

// Process block using the parent state as reference point
pstart = time.Now()
txDAGMissButNecessary := bc.vmConfig.TxDAG == nil && (bc.vmConfig.EnableParallelUnorderedMerge || bc.vmConfig.EnableTxParallelMerge)
useSerialProcessor := !bc.vmConfig.EnableParallelExec || txDAGMissButNecessary
useSerialProcessor := !bc.vmConfig.EnableParallelExec || txDAGMissButNecessary || tooDeep
if useSerialProcessor {
receipts, logs, usedGas, err = bc.serialProcessor.Process(block, statedb, bc.vmConfig)
blockProcessedInParallel = false
Expand Down Expand Up @@ -2143,7 +2153,9 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
"accountUpdates", common.PrettyDuration(timers.AccountUpdates),
"storageUpdates", common.PrettyDuration(timers.StorageUpdates),
"accountHashes", common.PrettyDuration(timers.AccountHashes),
"storageHashes", common.PrettyDuration(timers.StorageHashes))
"storageHashes", common.PrettyDuration(timers.StorageHashes),
"tooDeep", tooDeep, "depth", depth,
)

// Write the block to the chain and get the status.
var (
Expand Down
45 changes: 27 additions & 18 deletions core/parallel_state_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,6 @@ func (tl TxLevel) predictTxDAG(dag types.TxDAG) {

func NewTxLevels(all []*PEVMTxRequest, dag types.TxDAG) TxLevels {
var levels TxLevels = make(TxLevels, 0, 8)
var currLevel int = 0

var enlargeLevelsIfNeeded = func(currLevel int, levels *TxLevels) {
if len(*levels) <= currLevel {
for i := len(*levels); i <= currLevel; i++ {
Expand All @@ -367,22 +365,37 @@ func NewTxLevels(all []*PEVMTxRequest, dag types.TxDAG) TxLevels {
return TxLevels{all}
}

marked := make(map[int]int, len(all))
for _, tx := range all {
dep := dag.TxDep(tx.txIndex)
// build the levels from the DAG
marked, _ := BuildTxLevels(len(all), dag)
// put the transactions into the levels
for txIndex, tx := range all {
level := marked[txIndex]
enlargeLevelsIfNeeded(level, &levels)
levels[level] = append(levels[level], tx)
}
return levels
}

func BuildTxLevels(txCount int, dag types.TxDAG) (marked map[int]int, depth int) {
if dag == nil {
return make(map[int]int), 0
}
// marked is used to record which level that each transaction should be put
marked = make(map[int]int, txCount)
// currLevel is the level cursor to put the transactions in
depth = 0
for txIndex := 0; txIndex < txCount; txIndex++ {
dep := dag.TxDep(txIndex)
switch true {
case dep != nil && dep.CheckFlag(types.ExcludedTxFlag),
dep != nil && dep.CheckFlag(types.NonDependentRelFlag):
// excluted tx, occupies the whole level
// or dependent-to-all tx, occupies the whole level, too
levels = append(levels, TxLevel{tx})
marked[tx.txIndex], currLevel = len(levels)-1, len(levels)
marked[txIndex], depth = depth, depth+1

case dep == nil || len(dep.TxIndexes) == 0:
// dependent on none
enlargeLevelsIfNeeded(currLevel, &levels)
levels[currLevel] = append(levels[currLevel], tx)
marked[tx.txIndex] = currLevel
// dependent on none, just put it in the current level
marked[txIndex] = depth

case dep != nil && len(dep.TxIndexes) > 0:
// dependent on others
Expand All @@ -395,19 +408,15 @@ func NewTxLevels(all []*PEVMTxRequest, dag types.TxDAG) TxLevels {
}
if prevLevel < 0 {
// broken DAG, just ignored it
enlargeLevelsIfNeeded(currLevel, &levels)
levels[currLevel] = append(levels[currLevel], tx)
marked[tx.txIndex] = currLevel
marked[txIndex] = depth
continue
}
enlargeLevelsIfNeeded(prevLevel+1, &levels)
levels[prevLevel+1] = append(levels[prevLevel+1], tx)
// record the level of this tx
marked[tx.txIndex] = prevLevel + 1
marked[txIndex] = prevLevel + 1

default:
panic("unexpected case")
}
}
return levels
return marked, depth
}
1 change: 1 addition & 0 deletions core/vm/interpreter.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Config struct {
TxDAG types.TxDAG
EnableParallelUnorderedMerge bool // Whether to enable unordered merge in parallel mode
EnableTxParallelMerge bool // Whether to enable parallel merge in parallel mode
TxDAGMaxDepthRatio float64
}

// ScopeContext contains the things that are per-call, such as stack and memory,
Expand Down
1 change: 1 addition & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
EnableTxParallelMerge: config.ParallelTxParallelMerge,
ParallelTxNum: config.ParallelTxNum,
EnableOpcodeOptimizations: config.EnableOpcodeOptimizing,
TxDAGMaxDepthRatio: config.ParallelTxDAGMaxDepthRatio,
}
cacheConfig = &core.CacheConfig{
TrieCleanLimit: config.TrieCleanCache,
Expand Down
16 changes: 8 additions & 8 deletions eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,14 +219,14 @@ type Config struct {
RollupDisableTxPoolAdmission bool
RollupHaltOnIncompatibleProtocolVersion string

ParallelTxMode bool // Whether to execute transaction in parallel mode when do full sync
ParallelTxNum int // Number of slot for transaction execution
EnableOpcodeOptimizing bool
EnableParallelTxDAG bool
ParallelTxDAGFile string
ParallelTxUnorderedMerge bool // Whether to enable unordered merge in parallel mode
ParallelTxParallelMerge bool

ParallelTxMode bool // Whether to execute transaction in parallel mode when do full sync
ParallelTxNum int // Number of slot for transaction execution
EnableOpcodeOptimizing bool
EnableParallelTxDAG bool
ParallelTxDAGFile string
ParallelTxUnorderedMerge bool // Whether to enable unordered merge in parallel mode
ParallelTxParallelMerge bool
ParallelTxDAGMaxDepthRatio float64
}

// CreateConsensusEngine creates a consensus engine for the given chain config.
Expand Down

0 comments on commit 746438b

Please sign in to comment.