Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[R4R] Implement BEP 130: Parallel Transaction Execution #12

Merged
merged 8 commits into from
May 26, 2022
3 changes: 3 additions & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ var (
utils.GpoMaxGasPriceFlag,
utils.EWASMInterpreterFlag,
utils.EVMInterpreterFlag,
utils.ParallelTxFlag,
utils.ParallelTxNumFlag,
utils.ParallelTxQueueSizeFlag,
utils.MinerNotifyFullFlag,
configFileFlag,
utils.CatalystFlag,
Expand Down
40 changes: 40 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"math/big"
"os"
"path/filepath"
"runtime"
godebug "runtime/debug"
"strconv"
"strings"
Expand Down Expand Up @@ -802,6 +803,19 @@ var (
Usage: "External EVM configuration (default = built-in interpreter)",
Value: "",
}
ParallelTxFlag = cli.BoolFlag{
Name: "parallel",
Usage: "Enable the experimental parallel transaction execution mode, only valid in full sync mode (default = false)",
}
ParallelTxNumFlag = cli.IntFlag{
Name: "parallel.num",
Usage: "Number of slot for transaction execution, only valid in parallel mode (runtime calculated, no fixed default value)",
}
ParallelTxQueueSizeFlag = cli.IntFlag{
Name: "parallel.queuesize",
Usage: "Max number of Tx that can be queued to a slot, only valid in parallel mode (advanced option)",
Value: 20,
}

// Init network
InitNetworkSize = cli.IntFlag{
Expand Down Expand Up @@ -1642,6 +1656,32 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
if ctx.GlobalIsSet(RangeLimitFlag.Name) {
cfg.RangeLimit = ctx.GlobalBool(RangeLimitFlag.Name)
}
if ctx.GlobalIsSet(ParallelTxFlag.Name) {
cfg.ParallelTxMode = ctx.GlobalBool(ParallelTxFlag.Name)
// The best prallel num will be tuned later, we do a simple parallel num set here
numCpu := runtime.NumCPU()
var parallelNum int
if ctx.GlobalIsSet(ParallelTxNumFlag.Name) {
// first of all, we use "--parallel.num", but "--parallel.num 0" is not allowed
parallelNum = ctx.GlobalInt(ParallelTxNumFlag.Name)
if parallelNum < 1 {
parallelNum = 1
}
} else if numCpu == 1 {
parallelNum = 1 // single CPU core
} else if numCpu < 10 {
parallelNum = numCpu - 1
} else {
parallelNum = 8 // we found concurrency 8 is slightly better than 15
}
cfg.ParallelTxNum = parallelNum
// set up queue size, it is an advanced option
if ctx.GlobalIsSet(ParallelTxQueueSizeFlag.Name) {
cfg.ParallelTxQueueSize = ctx.GlobalInt(ParallelTxQueueSizeFlag.Name)
} else {
cfg.ParallelTxQueueSize = 20 // default queue size, will be optimized
}
}
// Read the value from the flag no matter if it's set or not.
cfg.Preimages = ctx.GlobalBool(CachePreimagesFlag.Name)
if cfg.NoPruning && !cfg.Preimages {
Expand Down
39 changes: 28 additions & 11 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,12 +240,13 @@ type BlockChain struct {
running int32 // 0 if chain is running, 1 when stopped
procInterrupt int32 // interrupt signaler for block processing

engine consensus.Engine
prefetcher Prefetcher
validator Validator // Block and state validator interface
processor Processor // Block transaction processor interface
vmConfig vm.Config
pipeCommit bool
engine consensus.Engine
prefetcher Prefetcher
validator Validator // Block and state validator interface
processor Processor // Block transaction processor interface
vmConfig vm.Config
pipeCommit bool
parallelExecution bool

shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
terminateInsert func(common.Hash, uint64) bool // Testing hook used to terminate ancient receipt chain insertion.
Expand Down Expand Up @@ -2111,11 +2112,14 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
statedb.StartPrefetcher("chain")
var followupInterrupt uint32
// For diff sync, it may fallback to full sync, so we still do prefetch
if len(block.Transactions()) >= prefetchTxNumber {
throwaway := statedb.Copy()
go func(start time.Time, followup *types.Block, throwaway *state.StateDB, interrupt *uint32) {
bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt)
}(time.Now(), block, throwaway, &followupInterrupt)
// parallel mode has a pipeline, similar to this prefetch, to save CPU we disable this prefetch for parallel
if !bc.parallelExecution {
if len(block.Transactions()) >= prefetchTxNumber {
throwaway := statedb.Copy()
go func(start time.Time, followup *types.Block, throwaway *state.StateDB, interrupt *uint32) {
bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt)
}(time.Now(), block, throwaway, &followupInterrupt)
}
}
//Process block using the parent state as reference point
substart := time.Now()
Expand Down Expand Up @@ -3100,3 +3104,16 @@ func EnablePersistDiff(limit uint64) BlockChainOption {
return chain
}
}

func EnableParallelProcessor(parallelNum int, queueSize int) BlockChainOption {
return func(chain *BlockChain) *BlockChain {
if chain.snaps == nil {
// disable parallel processor if snapshot is not enabled to avoid concurrent issue for SecureTrie
log.Info("parallel processor is not enabled since snapshot is not enabled")
return chain
}
chain.parallelExecution = true
chain.processor = NewParallelStateProcessor(chain.Config(), chain, chain.engine, parallelNum, queueSize)
return chain
}
}
6 changes: 5 additions & 1 deletion core/state/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,11 @@ type (
)

func (ch createObjectChange) revert(s *StateDB) {
delete(s.stateObjects, *ch.account)
if s.parallel.isSlotDB {
delete(s.parallel.dirtiedStateObjectsInSlot, *ch.account)
} else {
s.deleteStateObj(*ch.account)
}
delete(s.stateObjectsDirty, *ch.account)
}

Expand Down
8 changes: 8 additions & 0 deletions core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,14 @@ func (s *StateObject) deepCopy(db *StateDB) *StateObject {
return stateObject
}

func (s *StateObject) MergeSlotObject(db Database, dirtyObjs *StateObject, keys StateKeys) {
for key := range keys {
// better to do s.GetState(db, key) to load originStorage for this key?
// since originStorage was in dirtyObjs, but it works even originStorage miss the state object.
s.SetState(db, key, dirtyObjs.GetState(db, key))
}
}

//
// Attribute accessors
//
Expand Down
Loading