Skip to content

Commit

Permalink
Merge pull request #12 from setunapo/parallel_dev
Browse files Browse the repository at this point in the history
[R4R] Implement BEP 130: Parallel Transaction Execution
  • Loading branch information
realuncle authored May 26, 2022
2 parents 859186f + a630f16 commit fbd87ec
Show file tree
Hide file tree
Showing 14 changed files with 1,780 additions and 65 deletions.
3 changes: 3 additions & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ var (
utils.GpoMaxGasPriceFlag,
utils.EWASMInterpreterFlag,
utils.EVMInterpreterFlag,
utils.ParallelTxFlag,
utils.ParallelTxNumFlag,
utils.ParallelTxQueueSizeFlag,
utils.MinerNotifyFullFlag,
configFileFlag,
utils.CatalystFlag,
Expand Down
40 changes: 40 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"math/big"
"os"
"path/filepath"
"runtime"
godebug "runtime/debug"
"strconv"
"strings"
Expand Down Expand Up @@ -802,6 +803,19 @@ var (
Usage: "External EVM configuration (default = built-in interpreter)",
Value: "",
}
ParallelTxFlag = cli.BoolFlag{
Name: "parallel",
Usage: "Enable the experimental parallel transaction execution mode, only valid in full sync mode (default = false)",
}
ParallelTxNumFlag = cli.IntFlag{
Name: "parallel.num",
Usage: "Number of slot for transaction execution, only valid in parallel mode (runtime calculated, no fixed default value)",
}
ParallelTxQueueSizeFlag = cli.IntFlag{
Name: "parallel.queuesize",
Usage: "Max number of Tx that can be queued to a slot, only valid in parallel mode (advanced option)",
Value: 20,
}

// Init network
InitNetworkSize = cli.IntFlag{
Expand Down Expand Up @@ -1642,6 +1656,32 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
if ctx.GlobalIsSet(RangeLimitFlag.Name) {
cfg.RangeLimit = ctx.GlobalBool(RangeLimitFlag.Name)
}
if ctx.GlobalIsSet(ParallelTxFlag.Name) {
cfg.ParallelTxMode = ctx.GlobalBool(ParallelTxFlag.Name)
// The best prallel num will be tuned later, we do a simple parallel num set here
numCpu := runtime.NumCPU()
var parallelNum int
if ctx.GlobalIsSet(ParallelTxNumFlag.Name) {
// first of all, we use "--parallel.num", but "--parallel.num 0" is not allowed
parallelNum = ctx.GlobalInt(ParallelTxNumFlag.Name)
if parallelNum < 1 {
parallelNum = 1
}
} else if numCpu == 1 {
parallelNum = 1 // single CPU core
} else if numCpu < 10 {
parallelNum = numCpu - 1
} else {
parallelNum = 8 // we found concurrency 8 is slightly better than 15
}
cfg.ParallelTxNum = parallelNum
// set up queue size, it is an advanced option
if ctx.GlobalIsSet(ParallelTxQueueSizeFlag.Name) {
cfg.ParallelTxQueueSize = ctx.GlobalInt(ParallelTxQueueSizeFlag.Name)
} else {
cfg.ParallelTxQueueSize = 20 // default queue size, will be optimized
}
}
// Read the value from the flag no matter if it's set or not.
cfg.Preimages = ctx.GlobalBool(CachePreimagesFlag.Name)
if cfg.NoPruning && !cfg.Preimages {
Expand Down
39 changes: 28 additions & 11 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,12 +240,13 @@ type BlockChain struct {
running int32 // 0 if chain is running, 1 when stopped
procInterrupt int32 // interrupt signaler for block processing

engine consensus.Engine
prefetcher Prefetcher
validator Validator // Block and state validator interface
processor Processor // Block transaction processor interface
vmConfig vm.Config
pipeCommit bool
engine consensus.Engine
prefetcher Prefetcher
validator Validator // Block and state validator interface
processor Processor // Block transaction processor interface
vmConfig vm.Config
pipeCommit bool
parallelExecution bool

shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
terminateInsert func(common.Hash, uint64) bool // Testing hook used to terminate ancient receipt chain insertion.
Expand Down Expand Up @@ -2111,11 +2112,14 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
statedb.StartPrefetcher("chain")
var followupInterrupt uint32
// For diff sync, it may fallback to full sync, so we still do prefetch
if len(block.Transactions()) >= prefetchTxNumber {
throwaway := statedb.Copy()
go func(start time.Time, followup *types.Block, throwaway *state.StateDB, interrupt *uint32) {
bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt)
}(time.Now(), block, throwaway, &followupInterrupt)
// parallel mode has a pipeline, similar to this prefetch, to save CPU we disable this prefetch for parallel
if !bc.parallelExecution {
if len(block.Transactions()) >= prefetchTxNumber {
throwaway := statedb.Copy()
go func(start time.Time, followup *types.Block, throwaway *state.StateDB, interrupt *uint32) {
bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt)
}(time.Now(), block, throwaway, &followupInterrupt)
}
}
//Process block using the parent state as reference point
substart := time.Now()
Expand Down Expand Up @@ -3100,3 +3104,16 @@ func EnablePersistDiff(limit uint64) BlockChainOption {
return chain
}
}

func EnableParallelProcessor(parallelNum int, queueSize int) BlockChainOption {
return func(chain *BlockChain) *BlockChain {
if chain.snaps == nil {
// disable parallel processor if snapshot is not enabled to avoid concurrent issue for SecureTrie
log.Info("parallel processor is not enabled since snapshot is not enabled")
return chain
}
chain.parallelExecution = true
chain.processor = NewParallelStateProcessor(chain.Config(), chain, chain.engine, parallelNum, queueSize)
return chain
}
}
6 changes: 5 additions & 1 deletion core/state/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,11 @@ type (
)

func (ch createObjectChange) revert(s *StateDB) {
delete(s.stateObjects, *ch.account)
if s.parallel.isSlotDB {
delete(s.parallel.dirtiedStateObjectsInSlot, *ch.account)
} else {
s.deleteStateObj(*ch.account)
}
delete(s.stateObjectsDirty, *ch.account)
}

Expand Down
8 changes: 8 additions & 0 deletions core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,14 @@ func (s *StateObject) deepCopy(db *StateDB) *StateObject {
return stateObject
}

func (s *StateObject) MergeSlotObject(db Database, dirtyObjs *StateObject, keys StateKeys) {
for key := range keys {
// better to do s.GetState(db, key) to load originStorage for this key?
// since originStorage was in dirtyObjs, but it works even originStorage miss the state object.
s.SetState(db, key, dirtyObjs.GetState(db, key))
}
}

//
// Attribute accessors
//
Expand Down
Loading

0 comments on commit fbd87ec

Please sign in to comment.