diff --git a/cmd/geth/main.go b/cmd/geth/main.go index e0f29bce77..acea1fd10d 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -163,6 +163,9 @@ var ( utils.GpoMaxGasPriceFlag, utils.EWASMInterpreterFlag, utils.EVMInterpreterFlag, + utils.ParallelTxFlag, + utils.ParallelTxNumFlag, + utils.ParallelTxQueueSizeFlag, utils.MinerNotifyFullFlag, configFileFlag, utils.CatalystFlag, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 8f5141907f..b73d17e5f8 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -26,6 +26,7 @@ import ( "math/big" "os" "path/filepath" + "runtime" godebug "runtime/debug" "strconv" "strings" @@ -802,6 +803,19 @@ var ( Usage: "External EVM configuration (default = built-in interpreter)", Value: "", } + ParallelTxFlag = cli.BoolFlag{ + Name: "parallel", + Usage: "Enable the experimental parallel transaction execution mode, only valid in full sync mode (default = false)", + } + ParallelTxNumFlag = cli.IntFlag{ + Name: "parallel.num", + Usage: "Number of slot for transaction execution, only valid in parallel mode (runtime calculated, no fixed default value)", + } + ParallelTxQueueSizeFlag = cli.IntFlag{ + Name: "parallel.queuesize", + Usage: "Max number of Tx that can be queued to a slot, only valid in parallel mode (advanced option)", + Value: 20, + } // Init network InitNetworkSize = cli.IntFlag{ @@ -1642,6 +1656,32 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { if ctx.GlobalIsSet(RangeLimitFlag.Name) { cfg.RangeLimit = ctx.GlobalBool(RangeLimitFlag.Name) } + if ctx.GlobalIsSet(ParallelTxFlag.Name) { + cfg.ParallelTxMode = ctx.GlobalBool(ParallelTxFlag.Name) + // The best prallel num will be tuned later, we do a simple parallel num set here + numCpu := runtime.NumCPU() + var parallelNum int + if ctx.GlobalIsSet(ParallelTxNumFlag.Name) { + // first of all, we use "--parallel.num", but "--parallel.num 0" is not allowed + parallelNum = ctx.GlobalInt(ParallelTxNumFlag.Name) + if parallelNum < 1 { + parallelNum = 1 + } + } else if numCpu == 1 { + parallelNum = 1 // single CPU core + } else if numCpu < 10 { + parallelNum = numCpu - 1 + } else { + parallelNum = 8 // we found concurrency 8 is slightly better than 15 + } + cfg.ParallelTxNum = parallelNum + // set up queue size, it is an advanced option + if ctx.GlobalIsSet(ParallelTxQueueSizeFlag.Name) { + cfg.ParallelTxQueueSize = ctx.GlobalInt(ParallelTxQueueSizeFlag.Name) + } else { + cfg.ParallelTxQueueSize = 20 // default queue size, will be optimized + } + } // Read the value from the flag no matter if it's set or not. cfg.Preimages = ctx.GlobalBool(CachePreimagesFlag.Name) if cfg.NoPruning && !cfg.Preimages { diff --git a/core/blockchain.go b/core/blockchain.go index 6c87ffc708..96c607a0ef 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -240,12 +240,13 @@ type BlockChain struct { running int32 // 0 if chain is running, 1 when stopped procInterrupt int32 // interrupt signaler for block processing - engine consensus.Engine - prefetcher Prefetcher - validator Validator // Block and state validator interface - processor Processor // Block transaction processor interface - vmConfig vm.Config - pipeCommit bool + engine consensus.Engine + prefetcher Prefetcher + validator Validator // Block and state validator interface + processor Processor // Block transaction processor interface + vmConfig vm.Config + pipeCommit bool + parallelExecution bool shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block. terminateInsert func(common.Hash, uint64) bool // Testing hook used to terminate ancient receipt chain insertion. @@ -2111,11 +2112,14 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er statedb.StartPrefetcher("chain") var followupInterrupt uint32 // For diff sync, it may fallback to full sync, so we still do prefetch - if len(block.Transactions()) >= prefetchTxNumber { - throwaway := statedb.Copy() - go func(start time.Time, followup *types.Block, throwaway *state.StateDB, interrupt *uint32) { - bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt) - }(time.Now(), block, throwaway, &followupInterrupt) + // parallel mode has a pipeline, similar to this prefetch, to save CPU we disable this prefetch for parallel + if !bc.parallelExecution { + if len(block.Transactions()) >= prefetchTxNumber { + throwaway := statedb.Copy() + go func(start time.Time, followup *types.Block, throwaway *state.StateDB, interrupt *uint32) { + bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt) + }(time.Now(), block, throwaway, &followupInterrupt) + } } //Process block using the parent state as reference point substart := time.Now() @@ -3100,3 +3104,16 @@ func EnablePersistDiff(limit uint64) BlockChainOption { return chain } } + +func EnableParallelProcessor(parallelNum int, queueSize int) BlockChainOption { + return func(chain *BlockChain) *BlockChain { + if chain.snaps == nil { + // disable parallel processor if snapshot is not enabled to avoid concurrent issue for SecureTrie + log.Info("parallel processor is not enabled since snapshot is not enabled") + return chain + } + chain.parallelExecution = true + chain.processor = NewParallelStateProcessor(chain.Config(), chain, chain.engine, parallelNum, queueSize) + return chain + } +} diff --git a/core/state/journal.go b/core/state/journal.go index d86823c2ca..b3a2956f75 100644 --- a/core/state/journal.go +++ b/core/state/journal.go @@ -142,7 +142,11 @@ type ( ) func (ch createObjectChange) revert(s *StateDB) { - delete(s.stateObjects, *ch.account) + if s.parallel.isSlotDB { + delete(s.parallel.dirtiedStateObjectsInSlot, *ch.account) + } else { + s.deleteStateObj(*ch.account) + } delete(s.stateObjectsDirty, *ch.account) } diff --git a/core/state/state_object.go b/core/state/state_object.go index 298f4305ba..36adf786d6 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -486,6 +486,14 @@ func (s *StateObject) deepCopy(db *StateDB) *StateObject { return stateObject } +func (s *StateObject) MergeSlotObject(db Database, dirtyObjs *StateObject, keys StateKeys) { + for key := range keys { + // better to do s.GetState(db, key) to load originStorage for this key? + // since originStorage was in dirtyObjs, but it works even originStorage miss the state object. + s.SetState(db, key, dirtyObjs.GetState(db, key)) + } +} + // // Attribute accessors // diff --git a/core/state/statedb.go b/core/state/statedb.go index 5ea84f4032..3a4297ea2f 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -67,6 +67,97 @@ func (n *proofList) Delete(key []byte) error { panic("not supported") } +type StateKeys map[common.Hash]struct{} + +type StateObjectSyncMap struct { + sync.Map +} + +func (s *StateObjectSyncMap) LoadStateObject(addr common.Address) (*StateObject, bool) { + stateObject, ok := s.Load(addr) + if !ok { + return nil, ok + } + return stateObject.(*StateObject), ok +} + +func (s *StateObjectSyncMap) StoreStateObject(addr common.Address, stateObject *StateObject) { + s.Store(addr, stateObject) +} + +// loadStateObj is the entry for loading state object from stateObjects in StateDB or stateObjects in parallel +func (s *StateDB) loadStateObj(addr common.Address) (*StateObject, bool) { + if s.isParallel { + return s.parallel.stateObjects.LoadStateObject(addr) + } + obj, ok := s.stateObjects[addr] + return obj, ok +} + +// storeStateObj is the entry for storing state object to stateObjects in StateDB or stateObjects in parallel +func (s *StateDB) storeStateObj(addr common.Address, stateObject *StateObject) { + if s.isParallel { + s.parallel.stateObjects.Store(addr, stateObject) + } else { + s.stateObjects[addr] = stateObject + } +} + +// deleteStateObj is the entry for deleting state object to stateObjects in StateDB or stateObjects in parallel +func (s *StateDB) deleteStateObj(addr common.Address) { + if s.isParallel { + s.parallel.stateObjects.Delete(addr) + } else { + delete(s.stateObjects, addr) + } +} + +// For parallel mode only, keep the change list for later conflict detect +type SlotChangeList struct { + TxIndex int + StateObjectSuicided map[common.Address]struct{} + StateChangeSet map[common.Address]StateKeys + BalanceChangeSet map[common.Address]struct{} + CodeChangeSet map[common.Address]struct{} + AddrStateChangeSet map[common.Address]struct{} + NonceChangeSet map[common.Address]struct{} +} + +// For parallel mode only +type ParallelState struct { + isSlotDB bool // isSlotDB denotes StateDB is used in slot + + // stateObjects holds the state objects in the base slot db + // the reason for using stateObjects instead of stateObjects on the outside is + // we need a thread safe map to hold state objects since there are many slots will read + // state objects from it; + // And we will merge all the changes made by the concurrent slot into it. + stateObjects *StateObjectSyncMap + + baseTxIndex int // slotDB is created base on this tx index. + dirtiedStateObjectsInSlot map[common.Address]*StateObject + // for conflict check + balanceChangesInSlot map[common.Address]struct{} // the address's balance has been changed + balanceReadsInSlot map[common.Address]struct{} // the address's balance has been read and used. + codeReadsInSlot map[common.Address]struct{} + codeChangesInSlot map[common.Address]struct{} + stateReadsInSlot map[common.Address]StateKeys + stateChangesInSlot map[common.Address]StateKeys // no need record value + // Actions such as SetCode, Suicide will change address's state. + // Later call like Exist(), Empty(), HasSuicided() depend on the address's state. + addrStateReadsInSlot map[common.Address]struct{} + addrStateChangesInSlot map[common.Address]struct{} + stateObjectsSuicidedInSlot map[common.Address]struct{} + nonceChangesInSlot map[common.Address]struct{} + // Transaction will pay gas fee to system address. + // Parallel execution will clear system address's balance at first, in order to maintain transaction's + // gas fee value. Normal transaction will access system address twice, otherwise it means the transaction + // needs real system address's balance, the transaction will be marked redo with keepSystemAddressBalance = true + systemAddress common.Address + systemAddressOpsCount int + keepSystemAddressBalance bool +} + // StateDB structs within the ethereum protocol are used to store anything // within the merkle trie. StateDBs take care of caching and storing // nested states. It's the general query interface to retrieve: @@ -101,6 +192,9 @@ type StateDB struct { stateObjectsPending map[common.Address]struct{} // State objects finalized but not yet written to the trie stateObjectsDirty map[common.Address]struct{} // State objects modified in the current execution + isParallel bool + parallel ParallelState // to keep all the parallel execution elements + // DB error. // State objects are used by the consensus core and VM which are // unable to deal with database-level errors. Any error that occurs @@ -147,12 +241,36 @@ func New(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error) return newStateDB(root, db, snaps) } +// NewSlotDB creates a new State DB based on the provided StateDB. +// With parallel, each execution slot would have its own StateDB. +func NewSlotDB(db *StateDB, systemAddr common.Address, baseTxIndex int, keepSystem bool) *StateDB { + slotDB := db.CopyForSlot() + slotDB.originalRoot = db.originalRoot + slotDB.parallel.baseTxIndex = baseTxIndex + slotDB.parallel.systemAddress = systemAddr + slotDB.parallel.systemAddressOpsCount = 0 + slotDB.parallel.keepSystemAddressBalance = keepSystem + + // All transactions will pay gas fee to the systemAddr at the end, this address is + // deemed to conflict, we handle it specially, clear it now and set it back to the main + // StateDB later; + // But there are transactions that will try to read systemAddr's balance, such as: + // https://bscscan.com/tx/0xcd69755be1d2f55af259441ff5ee2f312830b8539899e82488a21e85bc121a2a. + // It will trigger transaction redo and keepSystem will be marked as true. + if !keepSystem { + slotDB.SetBalance(systemAddr, big.NewInt(0)) + } + + return slotDB +} + func newStateDB(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error) { sdb := &StateDB{ db: db, originalRoot: root, snaps: snaps, stateObjects: make(map[common.Address]*StateObject, defaultNumOfSlots), + parallel: ParallelState{}, stateObjectsPending: make(map[common.Address]struct{}, defaultNumOfSlots), stateObjectsDirty: make(map[common.Address]struct{}, defaultNumOfSlots), logs: make(map[common.Hash][]*types.Log, defaultNumOfSlots), @@ -178,6 +296,190 @@ func newStateDB(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, return sdb, nil } +func (s *StateDB) getStateObjectFromStateObjects(addr common.Address) (*StateObject, bool) { + if s.parallel.isSlotDB { + obj, ok := s.parallel.dirtiedStateObjectsInSlot[addr] + if ok { + return obj, ok + } + } + return s.loadStateObj(addr) +} + +// RevertSlotDB keep its read list for conflict detect and discard its state changes except its own balance change, +// if the transaction execution is reverted, +func (s *StateDB) RevertSlotDB(from common.Address) { + s.parallel.stateObjectsSuicidedInSlot = make(map[common.Address]struct{}) + s.parallel.stateChangesInSlot = make(map[common.Address]StateKeys) + s.parallel.balanceChangesInSlot = make(map[common.Address]struct{}, 1) + s.parallel.balanceChangesInSlot[from] = struct{}{} + s.parallel.addrStateChangesInSlot = make(map[common.Address]struct{}) + s.parallel.nonceChangesInSlot = make(map[common.Address]struct{}) +} + +// PrepareForParallel prepares for state db to be used in parallel execution mode. +func (s *StateDB) PrepareForParallel() { + s.isParallel = true + s.parallel.stateObjects = &StateObjectSyncMap{} +} + +// MergeSlotDB is for Parallel execution mode, when the transaction has been +// finalized(dirty -> pending) on execution slot, the execution results should be +// merged back to the main StateDB. +// And it will return and keep the slot's change list for later conflict detect. +func (s *StateDB) MergeSlotDB(slotDb *StateDB, slotReceipt *types.Receipt, txIndex int) SlotChangeList { + // receipt.Logs use unified log index within a block + // align slotDB's log index to the block stateDB's logSize + for _, l := range slotReceipt.Logs { + l.Index += s.logSize + } + s.logSize += slotDb.logSize + + // before merge, pay the gas fee first: AddBalance to consensus.SystemAddress + systemAddress := slotDb.parallel.systemAddress + if slotDb.parallel.keepSystemAddressBalance { + s.SetBalance(systemAddress, slotDb.GetBalance(systemAddress)) + } else { + s.AddBalance(systemAddress, slotDb.GetBalance(systemAddress)) + } + + // only merge dirty objects + addressesToPrefetch := make([][]byte, 0, len(slotDb.stateObjectsDirty)) + for addr := range slotDb.stateObjectsDirty { + if _, exist := s.stateObjectsDirty[addr]; !exist { + s.stateObjectsDirty[addr] = struct{}{} + } + // system address is EOA account, it should have no storage change + if addr == systemAddress { + continue + } + + // stateObjects: KV, balance, nonce... + dirtyObj, ok := slotDb.getStateObjectFromStateObjects(addr) + if !ok { + log.Error("parallel merge, but dirty object not exist!", "txIndex:", slotDb.txIndex, "addr", addr) + continue + } + mainObj, exist := s.loadStateObj(addr) + if !exist { + // addr not exist on main DB, do ownership transfer + dirtyObj.db = s + dirtyObj.finalise(true) // true: prefetch on dispatcher + s.storeStateObj(addr, dirtyObj) + delete(slotDb.parallel.dirtiedStateObjectsInSlot, addr) // transfer ownership + } else { + // addr already in main DB, do merge: balance, KV, code, State(create, suicide) + // can not do copy or ownership transfer directly, since dirtyObj could have outdated + // data(may be update within the conflict window) + + var newMainObj *StateObject + if _, created := slotDb.parallel.addrStateChangesInSlot[addr]; created { + // there are 3 kinds of state change: + // 1.Suicide + // 2.Empty Delete + // 3.createObject + // a.AddBalance,SetState to an unexist or deleted(suicide, empty delete) address. + // b.CreateAccount: like DAO the fork, regenerate a account carry its balance without KV + // For these state change, do ownership transafer for efficiency: + log.Debug("MergeSlotDB state object merge: addr state change") + dirtyObj.db = s + newMainObj = dirtyObj + delete(slotDb.parallel.dirtiedStateObjectsInSlot, addr) // transfer ownership + if dirtyObj.deleted { + // remove the addr from snapAccounts&snapStorage only when object is deleted. + // "deleted" is not equal to "snapDestructs", since createObject() will add an addr for + // snapDestructs to destroy previous object, while it will keep the addr in snapAccounts & snapAccounts + delete(s.snapAccounts, addr) + delete(s.snapStorage, addr) + } + } else { + // deepCopy a temporary *StateObject for safety, since slot could read the address, + // dispatch should avoid overwrite the StateObject directly otherwise, it could + // crash for: concurrent map iteration and map write + newMainObj = mainObj.deepCopy(s) + if _, balanced := slotDb.parallel.balanceChangesInSlot[addr]; balanced { + log.Debug("merge state object: Balance", + "newMainObj.Balance()", newMainObj.Balance(), + "dirtyObj.Balance()", dirtyObj.Balance()) + newMainObj.SetBalance(dirtyObj.Balance()) + } + if _, coded := slotDb.parallel.codeChangesInSlot[addr]; coded { + log.Debug("merge state object: Code") + newMainObj.code = dirtyObj.code + newMainObj.data.CodeHash = dirtyObj.data.CodeHash + newMainObj.dirtyCode = true + } + if keys, stated := slotDb.parallel.stateChangesInSlot[addr]; stated { + log.Debug("merge state object: KV") + newMainObj.MergeSlotObject(s.db, dirtyObj, keys) + } + // dirtyObj.Nonce() should not be less than newMainObj + newMainObj.setNonce(dirtyObj.Nonce()) + } + newMainObj.finalise(true) // true: prefetch on dispatcher + // update the object + s.storeStateObj(addr, newMainObj) + } + addressesToPrefetch = append(addressesToPrefetch, common.CopyBytes(addr[:])) // Copy needed for closure + } + + if s.prefetcher != nil && len(addressesToPrefetch) > 0 { + s.prefetcher.prefetch(s.originalRoot, addressesToPrefetch, emptyAddr) // prefetch for trie node of account + } + + for addr := range slotDb.stateObjectsPending { + if _, exist := s.stateObjectsPending[addr]; !exist { + s.stateObjectsPending[addr] = struct{}{} + } + } + + // slotDb.logs: logs will be kept in receipts, no need to do merge + + for hash, preimage := range slotDb.preimages { + s.preimages[hash] = preimage + } + if s.accessList != nil { + // fixme: accessList is not enabled yet, but it should use merge rather than overwrite Copy + s.accessList = slotDb.accessList.Copy() + } + + if slotDb.snaps != nil { + for k := range slotDb.snapDestructs { + // There could be a race condition for parallel transaction execution + // One transaction add balance 0 to an empty address, will delete it(delete empty is enabled). + // While another concurrent transaction could add a none-zero balance to it, make it not empty + // We fixed it by add a addr state read record for add balance 0 + s.snapDestructs[k] = struct{}{} + } + + // slotDb.snapAccounts should be empty, comment out and to be deleted later + // for k, v := range slotDb.snapAccounts { + // s.snapAccounts[k] = v + // } + // slotDb.snapStorage should be empty, comment out and to be deleted later + // for k, v := range slotDb.snapStorage { + // temp := make(map[string][]byte) + // for kk, vv := range v { + // temp[kk] = vv + // } + // s.snapStorage[k] = temp + // } + } + + // to create a new object to store change list for conflict detect, + // since slot db reuse is disabled, we do not need to do copy. + changeList := SlotChangeList{ + TxIndex: txIndex, + StateObjectSuicided: slotDb.parallel.stateObjectsSuicidedInSlot, + StateChangeSet: slotDb.parallel.stateChangesInSlot, + BalanceChangeSet: slotDb.parallel.balanceChangesInSlot, + CodeChangeSet: slotDb.parallel.codeChangesInSlot, + AddrStateChangeSet: slotDb.parallel.addrStateChangesInSlot, + NonceChangeSet: slotDb.parallel.nonceChangesInSlot, + } + return changeList +} + // StartPrefetcher initializes a new trie prefetcher to pull in nodes from the // state trie concurrently while the state is mutated so that when we reach the // commit phase, most of the needed data is already hot. @@ -335,6 +637,12 @@ func (s *StateDB) Empty(addr common.Address) bool { // GetBalance retrieves the balance from the given address or 0 if object not found func (s *StateDB) GetBalance(addr common.Address) *big.Int { + if s.parallel.isSlotDB { + s.parallel.balanceReadsInSlot[addr] = struct{}{} + if addr == s.parallel.systemAddress { + s.parallel.systemAddressOpsCount++ + } + } stateObject := s.getStateObject(addr) if stateObject != nil { return stateObject.Balance() @@ -347,7 +655,6 @@ func (s *StateDB) GetNonce(addr common.Address) uint64 { if stateObject != nil { return stateObject.Nonce() } - return 0 } @@ -361,7 +668,41 @@ func (s *StateDB) BlockHash() common.Hash { return s.bhash } +// BaseTxIndex returns the tx index that slot db based. +func (s *StateDB) BaseTxIndex() int { + return s.parallel.baseTxIndex +} + +func (s *StateDB) CodeReadsInSlot() map[common.Address]struct{} { + return s.parallel.codeReadsInSlot +} + +func (s *StateDB) AddressReadsInSlot() map[common.Address]struct{} { + return s.parallel.addrStateReadsInSlot +} + +func (s *StateDB) StateReadsInSlot() map[common.Address]StateKeys { + return s.parallel.stateReadsInSlot +} + +func (s *StateDB) BalanceReadsInSlot() map[common.Address]struct{} { + return s.parallel.balanceReadsInSlot +} + +// For most of the transactions, systemAddressOpsCount should be 2: +// one for SetBalance(0) on NewSlotDB() +// the other is for AddBalance(GasFee) at the end. +// (systemAddressOpsCount > 2) means the transaction tries to access systemAddress, in +// this case, we should redo and keep its balance on NewSlotDB() +func (s *StateDB) SystemAddressRedo() bool { + return s.parallel.systemAddressOpsCount > 2 +} + func (s *StateDB) GetCode(addr common.Address) []byte { + if s.parallel.isSlotDB { + s.parallel.codeReadsInSlot[addr] = struct{}{} + } + stateObject := s.getStateObject(addr) if stateObject != nil { return stateObject.Code(s.db) @@ -370,6 +711,10 @@ func (s *StateDB) GetCode(addr common.Address) []byte { } func (s *StateDB) GetCodeSize(addr common.Address) int { + if s.parallel.isSlotDB { + s.parallel.codeReadsInSlot[addr] = struct{}{} // code size is part of code + } + stateObject := s.getStateObject(addr) if stateObject != nil { return stateObject.CodeSize(s.db) @@ -378,6 +723,10 @@ func (s *StateDB) GetCodeSize(addr common.Address) int { } func (s *StateDB) GetCodeHash(addr common.Address) common.Hash { + if s.parallel.isSlotDB { + s.parallel.codeReadsInSlot[addr] = struct{}{} // code hash is part of code + } + stateObject := s.getStateObject(addr) if stateObject == nil { return common.Hash{} @@ -387,6 +736,13 @@ func (s *StateDB) GetCodeHash(addr common.Address) common.Hash { // GetState retrieves a value from the given account's storage trie. func (s *StateDB) GetState(addr common.Address, hash common.Hash) common.Hash { + if s.parallel.isSlotDB { + if s.parallel.stateReadsInSlot[addr] == nil { + s.parallel.stateReadsInSlot[addr] = make(map[common.Hash]struct{}, defaultNumOfSlots) + } + s.parallel.stateReadsInSlot[addr][hash] = struct{}{} + } + stateObject := s.getStateObject(addr) if stateObject != nil { return stateObject.GetState(s.db, hash) @@ -433,6 +789,13 @@ func (s *StateDB) GetStorageProofByHash(a common.Address, key common.Hash) ([][] // GetCommittedState retrieves a value from the given account's committed storage trie. func (s *StateDB) GetCommittedState(addr common.Address, hash common.Hash) common.Hash { + if s.parallel.isSlotDB { + if s.parallel.stateReadsInSlot[addr] == nil { + s.parallel.stateReadsInSlot[addr] = make(map[common.Hash]struct{}, defaultNumOfSlots) + } + s.parallel.stateReadsInSlot[addr][hash] = struct{}{} + } + stateObject := s.getStateObject(addr) if stateObject != nil { return stateObject.GetCommittedState(s.db, hash) @@ -471,16 +834,58 @@ func (s *StateDB) HasSuicided(addr common.Address) bool { // AddBalance adds amount to the account associated with addr. func (s *StateDB) AddBalance(addr common.Address, amount *big.Int) { + if s.parallel.isSlotDB { + if amount.Sign() != 0 { + s.parallel.balanceChangesInSlot[addr] = struct{}{} + // add balance will perform a read operation first + s.parallel.balanceReadsInSlot[addr] = struct{}{} + } else { + // if amount == 0, no balance change, but there is still an empty check. + // take this empty check as addr state read(create, suicide, empty delete) + s.parallel.addrStateReadsInSlot[addr] = struct{}{} + } + if addr == s.parallel.systemAddress { + s.parallel.systemAddressOpsCount++ + } + } + stateObject := s.GetOrNewStateObject(addr) if stateObject != nil { + if s.parallel.isSlotDB { + if _, ok := s.parallel.dirtiedStateObjectsInSlot[addr]; !ok { + newStateObject := stateObject.deepCopy(s) + newStateObject.AddBalance(amount) + s.parallel.dirtiedStateObjectsInSlot[addr] = newStateObject + return + } + } stateObject.AddBalance(amount) } } // SubBalance subtracts amount from the account associated with addr. func (s *StateDB) SubBalance(addr common.Address, amount *big.Int) { + if s.parallel.isSlotDB { + if amount.Sign() != 0 { + s.parallel.balanceChangesInSlot[addr] = struct{}{} + // unlike add, sub 0 balance will not touch empty object + s.parallel.balanceReadsInSlot[addr] = struct{}{} + } + if addr == s.parallel.systemAddress { + s.parallel.systemAddressOpsCount++ + } + } + stateObject := s.GetOrNewStateObject(addr) if stateObject != nil { + if s.parallel.isSlotDB { + if _, ok := s.parallel.dirtiedStateObjectsInSlot[addr]; !ok { + newStateObject := stateObject.deepCopy(s) + newStateObject.SubBalance(amount) + s.parallel.dirtiedStateObjectsInSlot[addr] = newStateObject + return + } + } stateObject.SubBalance(amount) } } @@ -488,13 +893,46 @@ func (s *StateDB) SubBalance(addr common.Address, amount *big.Int) { func (s *StateDB) SetBalance(addr common.Address, amount *big.Int) { stateObject := s.GetOrNewStateObject(addr) if stateObject != nil { + if s.parallel.isSlotDB { + s.parallel.balanceChangesInSlot[addr] = struct{}{} + if addr == s.parallel.systemAddress { + s.parallel.systemAddressOpsCount++ + } + + if _, ok := s.parallel.dirtiedStateObjectsInSlot[addr]; !ok { + newStateObject := stateObject.deepCopy(s) + newStateObject.SetBalance(amount) + s.parallel.dirtiedStateObjectsInSlot[addr] = newStateObject + return + } + } stateObject.SetBalance(amount) } } +// Generally sender's nonce will be increased by 1 for each transaction +// But if the contract tries to create a new contract, its nonce will be advanced +// for each opCreate or opCreate2. Nonce is key to transaction execution, once it is +// changed for contract created, the concurrent transaction will be marked invalid if +// they accessed the address. +func (s *StateDB) NonceChanged(addr common.Address) { + if s.parallel.isSlotDB { + log.Debug("NonceChanged", "txIndex", s.txIndex, "addr", addr) + s.parallel.nonceChangesInSlot[addr] = struct{}{} + } +} + func (s *StateDB) SetNonce(addr common.Address, nonce uint64) { stateObject := s.GetOrNewStateObject(addr) if stateObject != nil { + if s.parallel.isSlotDB { + if _, ok := s.parallel.dirtiedStateObjectsInSlot[addr]; !ok { + newStateObject := stateObject.deepCopy(s) + newStateObject.SetNonce(nonce) + s.parallel.dirtiedStateObjectsInSlot[addr] = newStateObject + return + } + } stateObject.SetNonce(nonce) } } @@ -502,6 +940,16 @@ func (s *StateDB) SetNonce(addr common.Address, nonce uint64) { func (s *StateDB) SetCode(addr common.Address, code []byte) { stateObject := s.GetOrNewStateObject(addr) if stateObject != nil { + if s.parallel.isSlotDB { + s.parallel.codeChangesInSlot[addr] = struct{}{} + + if _, ok := s.parallel.dirtiedStateObjectsInSlot[addr]; !ok { + newStateObject := stateObject.deepCopy(s) + newStateObject.SetCode(crypto.Keccak256Hash(code), code) + s.parallel.dirtiedStateObjectsInSlot[addr] = newStateObject + return + } + } stateObject.SetCode(crypto.Keccak256Hash(code), code) } } @@ -509,6 +957,29 @@ func (s *StateDB) SetCode(addr common.Address, code []byte) { func (s *StateDB) SetState(addr common.Address, key, value common.Hash) { stateObject := s.GetOrNewStateObject(addr) if stateObject != nil { + if s.parallel.isSlotDB { + if s.parallel.baseTxIndex+1 == s.txIndex { + // we check if state is unchanged + // only when current transaction is the next transaction to be committed + if stateObject.GetState(s.db, key) == value { + log.Debug("Skip set same state", "baseTxIndex", s.parallel.baseTxIndex, + "txIndex", s.txIndex) + return + } + } + + if s.parallel.stateChangesInSlot[addr] == nil { + s.parallel.stateChangesInSlot[addr] = make(StateKeys, defaultNumOfSlots) + } + s.parallel.stateChangesInSlot[addr][key] = struct{}{} + + if _, ok := s.parallel.dirtiedStateObjectsInSlot[addr]; !ok { + newStateObject := stateObject.deepCopy(s) + newStateObject.SetState(s.db, key, value) + s.parallel.dirtiedStateObjectsInSlot[addr] = newStateObject + return + } + } stateObject.SetState(s.db, key, value) } } @@ -532,14 +1003,28 @@ func (s *StateDB) Suicide(addr common.Address) bool { if stateObject == nil { return false } + s.journal.append(suicideChange{ account: &addr, prev: stateObject.suicided, prevbalance: new(big.Int).Set(stateObject.Balance()), }) + + if s.parallel.isSlotDB { + s.parallel.stateObjectsSuicidedInSlot[addr] = struct{}{} + s.parallel.addrStateChangesInSlot[addr] = struct{}{} + if _, ok := s.parallel.dirtiedStateObjectsInSlot[addr]; !ok { + // do copy-on-write for suicide "write" + newStateObject := stateObject.deepCopy(s) + newStateObject.markSuicided() + newStateObject.data.Balance = new(big.Int) + s.parallel.dirtiedStateObjectsInSlot[addr] = newStateObject + return true + } + } + stateObject.markSuicided() stateObject.data.Balance = new(big.Int) - return true } @@ -585,6 +1070,10 @@ func (s *StateDB) deleteStateObject(obj *StateObject) { // the object is not found or was deleted in this execution context. If you need // to differentiate between non-existent/just-deleted, use getDeletedStateObject. func (s *StateDB) getStateObject(addr common.Address) *StateObject { + if s.parallel.isSlotDB { + s.parallel.addrStateReadsInSlot[addr] = struct{}{} + } + if obj := s.getDeletedStateObject(addr); obj != nil && !obj.deleted { return obj } @@ -669,7 +1158,7 @@ func (s *StateDB) preloadStateObject(address []common.Address) []*StateObject { // destructed object instead of wiping all knowledge about the state object. func (s *StateDB) getDeletedStateObject(addr common.Address) *StateObject { // Prefer live objects if any is available - if obj := s.stateObjects[addr]; obj != nil { + if obj, _ := s.getStateObjectFromStateObjects(addr); obj != nil { return obj } // If no live objects are available, attempt to use snapshots @@ -734,7 +1223,11 @@ func (s *StateDB) getDeletedStateObject(addr common.Address) *StateObject { } func (s *StateDB) SetStateObject(object *StateObject) { - s.stateObjects[object.Address()] = object + if s.parallel.isSlotDB { + s.parallel.dirtiedStateObjectsInSlot[object.Address()] = object + } else { + s.storeStateObj(object.Address(), object) + } } // GetOrNewStateObject retrieves a state object or create a new state object if nil. @@ -749,12 +1242,19 @@ func (s *StateDB) GetOrNewStateObject(addr common.Address) *StateObject { // createObject creates a new state object. If there is an existing account with // the given address, it is overwritten and returned as the second return value. func (s *StateDB) createObject(addr common.Address) (newobj, prev *StateObject) { + if s.parallel.isSlotDB { + s.parallel.addrStateReadsInSlot[addr] = struct{}{} // will try to get the previous object. + s.parallel.addrStateChangesInSlot[addr] = struct{}{} + } + prev = s.getDeletedStateObject(addr) // Note, prev might have been deleted, we need that! var prevdestruct bool if s.snap != nil && prev != nil { _, prevdestruct = s.snapDestructs[prev.address] if !prevdestruct { + // createObject for deleted object will destroy the previous trie node first + // and update the trie tree with the new object on block commit. s.snapDestructs[prev.address] = struct{}{} } } @@ -787,17 +1287,21 @@ func (s *StateDB) CreateAccount(addr common.Address) { if prev != nil { newObj.setBalance(prev.data.Balance) } + if s.parallel.isSlotDB { + s.parallel.balanceReadsInSlot[addr] = struct{}{} // read the balance of previous object + s.parallel.dirtiedStateObjectsInSlot[addr] = newObj + } } -func (db *StateDB) ForEachStorage(addr common.Address, cb func(key, value common.Hash) bool) error { - so := db.getStateObject(addr) +func (s *StateDB) ForEachStorage(addr common.Address, cb func(key, value common.Hash) bool) error { + so := s.getStateObject(addr) if so == nil { return nil } - it := trie.NewIterator(so.getTrie(db.db).NodeIterator(nil)) + it := trie.NewIterator(so.getTrie(s.db).NodeIterator(nil)) for it.Next() { - key := common.BytesToHash(db.trie.GetKey(it.Key)) + key := common.BytesToHash(s.trie.GetKey(it.Key)) if value, dirty := so.dirtyStorage[key]; dirty { if !cb(key, value) { return nil @@ -834,6 +1338,7 @@ func (s *StateDB) Copy() *StateDB { preimages: make(map[common.Hash][]byte, len(s.preimages)), journal: newJournal(), hasher: crypto.NewKeccakState(), + parallel: ParallelState{}, } // Copy the dirty states, logs, and preimages for addr := range s.journal.dirties { @@ -841,11 +1346,11 @@ func (s *StateDB) Copy() *StateDB { // and in the Finalise-method, there is a case where an object is in the journal but not // in the stateObjects: OOG after touch on ripeMD prior to Byzantium. Thus, we need to check for // nil - if object, exist := s.stateObjects[addr]; exist { + if object, exist := s.getStateObjectFromStateObjects(addr); exist { // Even though the original object is dirty, we are not copying the journal, // so we need to make sure that anyside effect the journal would have caused // during a commit (or similar op) is already applied to the copy. - state.stateObjects[addr] = object.deepCopy(state) + state.storeStateObj(addr, object.deepCopy(state)) state.stateObjectsDirty[addr] = struct{}{} // Mark the copy dirty to force internal (code/state) commits state.stateObjectsPending[addr] = struct{}{} // Mark the copy pending to force external (account) commits @@ -855,14 +1360,16 @@ func (s *StateDB) Copy() *StateDB { // loop above will be a no-op, since the copy's journal is empty. // Thus, here we iterate over stateObjects, to enable copies of copies for addr := range s.stateObjectsPending { - if _, exist := state.stateObjects[addr]; !exist { - state.stateObjects[addr] = s.stateObjects[addr].deepCopy(state) + if _, exist := state.getStateObjectFromStateObjects(addr); !exist { + object, _ := s.getStateObjectFromStateObjects(addr) + state.storeStateObj(addr, object.deepCopy(state)) } state.stateObjectsPending[addr] = struct{}{} } for addr := range s.stateObjectsDirty { - if _, exist := state.stateObjects[addr]; !exist { - state.stateObjects[addr] = s.stateObjects[addr].deepCopy(state) + if _, exist := state.getStateObjectFromStateObjects(addr); !exist { + object, _ := s.getStateObjectFromStateObjects(addr) + state.storeStateObj(addr, object.deepCopy(state)) } state.stateObjectsDirty[addr] = struct{}{} } @@ -920,6 +1427,80 @@ func (s *StateDB) Copy() *StateDB { return state } +// Copy all the basic fields, initialize the memory ones +func (s *StateDB) CopyForSlot() *StateDB { + parallel := ParallelState{ + // use base(dispatcher) slot db's stateObjects. + // It is a SyncMap, only readable to slot, not writable + stateObjects: s.parallel.stateObjects, + stateObjectsSuicidedInSlot: make(map[common.Address]struct{}, 10), + codeReadsInSlot: make(map[common.Address]struct{}, defaultNumOfSlots), + codeChangesInSlot: make(map[common.Address]struct{}, 10), + stateChangesInSlot: make(map[common.Address]StateKeys, defaultNumOfSlots), + stateReadsInSlot: make(map[common.Address]StateKeys, defaultNumOfSlots), + balanceChangesInSlot: make(map[common.Address]struct{}, defaultNumOfSlots), + balanceReadsInSlot: make(map[common.Address]struct{}, defaultNumOfSlots), + addrStateReadsInSlot: make(map[common.Address]struct{}, defaultNumOfSlots), + addrStateChangesInSlot: make(map[common.Address]struct{}, 10), + nonceChangesInSlot: make(map[common.Address]struct{}, 10), + isSlotDB: true, + dirtiedStateObjectsInSlot: make(map[common.Address]*StateObject, defaultNumOfSlots), + } + state := &StateDB{ + db: s.db, + trie: s.db.CopyTrie(s.trie), + stateObjects: make(map[common.Address]*StateObject), // replaced by parallel.stateObjects in parallel mode + stateObjectsPending: make(map[common.Address]struct{}, defaultNumOfSlots), + stateObjectsDirty: make(map[common.Address]struct{}, defaultNumOfSlots), + refund: s.refund, // should be 0 + logs: make(map[common.Hash][]*types.Log, defaultNumOfSlots), + logSize: 0, + preimages: make(map[common.Hash][]byte, len(s.preimages)), + journal: newJournal(), + hasher: crypto.NewKeccakState(), + snapDestructs: make(map[common.Address]struct{}), + snapAccounts: make(map[common.Address][]byte), + snapStorage: make(map[common.Address]map[string][]byte), + isParallel: true, + parallel: parallel, + } + + for hash, preimage := range s.preimages { + state.preimages[hash] = preimage + } + + if s.snaps != nil { + // In order for the miner to be able to use and make additions + // to the snapshot tree, we need to copy that aswell. + // Otherwise, any block mined by ourselves will cause gaps in the tree, + // and force the miner to operate trie-backed only + state.snaps = s.snaps + state.snap = s.snap + // deep copy needed + state.snapDestructs = make(map[common.Address]struct{}) + for k, v := range s.snapDestructs { + state.snapDestructs[k] = v + } + // + state.snapAccounts = make(map[common.Address][]byte) + for k, v := range s.snapAccounts { + state.snapAccounts[k] = v + } + state.snapStorage = make(map[common.Address]map[string][]byte) + for k, v := range s.snapStorage { + temp := make(map[string][]byte) + for kk, vv := range v { + temp[kk] = vv + } + state.snapStorage[k] = temp + } + // trie prefetch should be done by dispacther on StateObject Merge, + // disable it in parallel slot + // state.prefetcher = s.prefetcher + } + return state +} + // Snapshot returns an identifier for the current revision of the state. func (s *StateDB) Snapshot() int { id := s.nextRevisionId @@ -966,7 +1547,7 @@ func (s *StateDB) WaitPipeVerification() error { func (s *StateDB) Finalise(deleteEmptyObjects bool) { addressesToPrefetch := make([][]byte, 0, len(s.journal.dirties)) for addr := range s.journal.dirties { - obj, exist := s.stateObjects[addr] + obj, exist := s.getStateObjectFromStateObjects(addr) if !exist { // ripeMD is 'touched' at block 1714175, in tx 0x1237f737031e40bcde4a8b7e717b2d15e3ecadfe49bb1bbc71ee9deb09c6fcf2 // That tx goes out of gas, and although the notion of 'touched' does not exist there, the @@ -977,6 +1558,9 @@ func (s *StateDB) Finalise(deleteEmptyObjects bool) { continue } if obj.suicided || (deleteEmptyObjects && obj.empty()) { + if s.parallel.isSlotDB { + s.parallel.addrStateChangesInSlot[addr] = struct{}{} // empty an StateObject is a state change + } obj.deleted = true // If state snapshotting is active, also mark the destruction there. @@ -989,7 +1573,12 @@ func (s *StateDB) Finalise(deleteEmptyObjects bool) { delete(s.snapStorage, obj.address) // Clear out any previously updated storage data (may be recreated via a ressurrect) } } else { - obj.finalise(true) // Prefetch slots in the background + // 1.none parallel mode, we do obj.finalise(true) as normal + // 2.with parallel mode, we do obj.finalise(true) on dispatcher, not on slot routine + // obj.finalise(true) will clear its dirtyStorage, will make prefetch broken. + if !s.isParallel || !s.parallel.isSlotDB { + obj.finalise(true) // Prefetch slots in the background + } } if _, exist := s.stateObjectsPending[addr]; !exist { s.stateObjectsPending[addr] = struct{}{} @@ -1047,11 +1636,10 @@ func (s *StateDB) AccountsIntermediateRoot() { // first, giving the account prefeches just a few more milliseconds of time // to pull useful data from disk. for addr := range s.stateObjectsPending { - if obj := s.stateObjects[addr]; !obj.deleted { + if obj, _ := s.getStateObjectFromStateObjects(addr); !obj.deleted { wg.Add(1) tasks <- func() { obj.updateRoot(s.db) - // If state snapshotting is active, cache the data til commit. Note, this // update mechanism is not symmetric to the deletion, because whereas it is // enough to track account updates at commit time, deletions need tracking @@ -1110,7 +1698,7 @@ func (s *StateDB) StateIntermediateRoot() common.Hash { } usedAddrs := make([][]byte, 0, len(s.stateObjectsPending)) for addr := range s.stateObjectsPending { - if obj := s.stateObjects[addr]; obj.deleted { + if obj, _ := s.getStateObjectFromStateObjects(addr); obj.deleted { s.deleteStateObject(obj) } else { s.updateStateObject(obj) @@ -1335,7 +1923,7 @@ func (s *StateDB) Commit(failPostCommitFunc func(), postCommitFuncs ...func() er if s.snap != nil { for addr := range s.stateObjectsDirty { - if obj := s.stateObjects[addr]; !obj.deleted { + if obj, _ := s.getStateObjectFromStateObjects(addr); !obj.deleted { if obj.code != nil && obj.dirtyCode { diffLayer.Codes = append(diffLayer.Codes, types.DiffCode{ Hash: common.BytesToHash(obj.CodeHash()), @@ -1347,7 +1935,7 @@ func (s *StateDB) Commit(failPostCommitFunc func(), postCommitFuncs ...func() er } for addr := range s.stateObjectsDirty { - if obj := s.stateObjects[addr]; !obj.deleted { + if obj, _ := s.getStateObjectFromStateObjects(addr); !obj.deleted { // Write any contract code associated with the state object tasks <- func() { // Write any storage changes in the state object to its storage trie @@ -1418,7 +2006,7 @@ func (s *StateDB) Commit(failPostCommitFunc func(), postCommitFuncs ...func() er func() error { codeWriter := s.db.TrieDB().DiskDB().NewBatch() for addr := range s.stateObjectsDirty { - if obj := s.stateObjects[addr]; !obj.deleted { + if obj, _ := s.getStateObjectFromStateObjects(addr); !obj.deleted { if obj.code != nil && obj.dirtyCode { rawdb.WriteCode(codeWriter, common.BytesToHash(obj.CodeHash()), obj.code) obj.dirtyCode = false diff --git a/core/state/statedb_test.go b/core/state/statedb_test.go index acbbf1cd2f..5554375560 100644 --- a/core/state/statedb_test.go +++ b/core/state/statedb_test.go @@ -19,6 +19,7 @@ package state import ( "bytes" "encoding/binary" + "encoding/hex" "fmt" "math" "math/big" @@ -34,6 +35,10 @@ import ( "github.com/ethereum/go-ethereum/core/types" ) +var ( + systemAddress = common.HexToAddress("0xffffFFFfFFffffffffffffffFfFFFfffFFFfFFfE") +) + // Tests that updating a state trie does not leak any database writes prior to // actually committing the state. func TestUpdateLeaks(t *testing.T) { @@ -932,3 +937,397 @@ func TestStateDBAccessList(t *testing.T) { t.Fatalf("expected empty, got %d", got) } } + +func TestSuicide(t *testing.T) { + // Create an initial state with a few accounts + memDb := rawdb.NewMemoryDatabase() + db := NewDatabase(memDb) + state, _ := New(common.Hash{}, db, nil) + state.PrepareForParallel() + + slotDb := NewSlotDB(state, systemAddress, 0, false) + + addr := common.BytesToAddress([]byte("so")) + slotDb.SetBalance(addr, big.NewInt(1)) + + result := slotDb.Suicide(addr) + if !result { + t.Fatalf("expected account suicide, got %v", result) + } + + if _, ok := slotDb.parallel.stateObjectsSuicidedInSlot[addr]; !ok { + t.Fatalf("address should exist in stateObjectsSuicidedInSlot") + } + + if _, ok := slotDb.parallel.addrStateChangesInSlot[addr]; !ok { + t.Fatalf("address should exist in addrStateChangesInSlot") + } + + if _, ok := slotDb.parallel.dirtiedStateObjectsInSlot[addr]; !ok { + t.Fatalf("address should exist in dirtiedStateObjectsInSlot") + } + + hasSuicide := slotDb.HasSuicided(addr) + if !hasSuicide { + t.Fatalf("address should be suicided") + } + + if _, ok := slotDb.parallel.addrStateReadsInSlot[addr]; !ok { + t.Fatalf("address should exist in addrStateReadsInSlot") + } +} + +func TestSetAndGetState(t *testing.T) { + memDb := rawdb.NewMemoryDatabase() + db := NewDatabase(memDb) + state, _ := New(common.Hash{}, db, nil) + state.PrepareForParallel() + + slotDb := NewSlotDB(state, systemAddress, 0, false) + + addr := common.BytesToAddress([]byte("so")) + state.SetBalance(addr, big.NewInt(1)) + + slotDb.SetState(addr, common.BytesToHash([]byte("test key")), common.BytesToHash([]byte("test store"))) + + if _, ok := slotDb.parallel.dirtiedStateObjectsInSlot[addr]; !ok { + t.Fatalf("address should exist in dirtiedStateObjectsInSlot") + } + + if _, ok := slotDb.parallel.stateChangesInSlot[addr]; !ok { + t.Fatalf("address should exist in stateChangesInSlot") + } + + oldValueRead := state.GetState(addr, common.BytesToHash([]byte("test key"))) + emptyHash := common.Hash{} + if oldValueRead != emptyHash { + t.Fatalf("value read in old state should be empty") + } + + valueRead := slotDb.GetState(addr, common.BytesToHash([]byte("test key"))) + if valueRead != common.BytesToHash([]byte("test store")) { + t.Fatalf("value read should be equal to the stored value") + } + + if _, ok := slotDb.parallel.stateReadsInSlot[addr]; !ok { + t.Fatalf("address should exist in stateReadsInSlot") + } +} + +func TestSetAndGetCode(t *testing.T) { + memDb := rawdb.NewMemoryDatabase() + db := NewDatabase(memDb) + state, _ := New(common.Hash{}, db, nil) + state.PrepareForParallel() + + slotDb := NewSlotDB(state, systemAddress, 0, false) + + addr := common.BytesToAddress([]byte("so")) + state.SetBalance(addr, big.NewInt(1)) + + if _, ok := slotDb.parallel.dirtiedStateObjectsInSlot[addr]; ok { + t.Fatalf("address should not exist in dirtiedStateObjectsInSlot") + } + + slotDb.SetCode(addr, []byte("test code")) + + if _, ok := slotDb.parallel.dirtiedStateObjectsInSlot[addr]; !ok { + t.Fatalf("address should exist in dirtiedStateObjectsInSlot") + } + + if _, ok := slotDb.parallel.codeChangesInSlot[addr]; !ok { + t.Fatalf("address should exist in codeChangesInSlot") + } + + codeRead := slotDb.GetCode(addr) + if string(codeRead) != "test code" { + t.Fatalf("code read should be equal to the code stored") + } + + if _, ok := slotDb.parallel.codeReadsInSlot[addr]; !ok { + t.Fatalf("address should exist in codeReadsInSlot") + } +} + +func TestGetCodeSize(t *testing.T) { + memDb := rawdb.NewMemoryDatabase() + db := NewDatabase(memDb) + state, _ := New(common.Hash{}, db, nil) + state.PrepareForParallel() + + slotDb := NewSlotDB(state, systemAddress, 0, false) + + addr := common.BytesToAddress([]byte("so")) + state.SetBalance(addr, big.NewInt(1)) + + slotDb.SetCode(addr, []byte("test code")) + + codeSize := slotDb.GetCodeSize(addr) + if codeSize != 9 { + t.Fatalf("code size should be 9") + } + + if _, ok := slotDb.parallel.codeReadsInSlot[addr]; !ok { + t.Fatalf("address should exist in codeReadsInSlot") + } +} + +func TestGetCodeHash(t *testing.T) { + memDb := rawdb.NewMemoryDatabase() + db := NewDatabase(memDb) + state, _ := New(common.Hash{}, db, nil) + state.PrepareForParallel() + + slotDb := NewSlotDB(state, systemAddress, 0, false) + + addr := common.BytesToAddress([]byte("so")) + state.SetBalance(addr, big.NewInt(1)) + + slotDb.SetCode(addr, []byte("test code")) + + codeSize := slotDb.GetCodeHash(addr) + print(hex.EncodeToString(codeSize[:])) + if hex.EncodeToString(codeSize[:]) != "6e73fa02f7828b28608b078b007a4023fb40453c3e102b83828a3609a94d8cbb" { + t.Fatalf("code hash should be 6e73fa02f7828b28608b078b007a4023fb40453c3e102b83828a3609a94d8cbb") + } + if _, ok := slotDb.parallel.codeReadsInSlot[addr]; !ok { + t.Fatalf("address should exist in codeReadsInSlot") + } +} + +func TestSetNonce(t *testing.T) { + memDb := rawdb.NewMemoryDatabase() + db := NewDatabase(memDb) + state, _ := New(common.Hash{}, db, nil) + state.PrepareForParallel() + + slotDb := NewSlotDB(state, systemAddress, 0, false) + + addr := common.BytesToAddress([]byte("so")) + state.SetBalance(addr, big.NewInt(1)) + state.SetNonce(addr, 1) + + slotDb.SetNonce(addr, 2) + + oldNonce := state.GetNonce(addr) + if oldNonce != 1 { + t.Fatalf("old nonce should be 1") + } + + newNonce := slotDb.GetNonce(addr) + if newNonce != 2 { + t.Fatalf("new nonce should be 2") + } + if _, ok := slotDb.parallel.dirtiedStateObjectsInSlot[addr]; !ok { + t.Fatalf("address should exist in dirtiedStateObjectsInSlot") + } +} + +func TestSetAndGetBalance(t *testing.T) { + memDb := rawdb.NewMemoryDatabase() + db := NewDatabase(memDb) + state, _ := New(common.Hash{}, db, nil) + state.PrepareForParallel() + + slotDb := NewSlotDB(state, systemAddress, 0, true) + + addr := systemAddress + state.SetBalance(addr, big.NewInt(1)) + + slotDb.SetBalance(addr, big.NewInt(2)) + + oldBalance := state.GetBalance(addr) + if oldBalance.Int64() != 1 { + t.Fatalf("old balance should be 1") + } + + if _, ok := slotDb.parallel.dirtiedStateObjectsInSlot[addr]; !ok { + t.Fatalf("address should exist in dirtiedStateObjectsInSlot") + } + + if _, ok := slotDb.parallel.balanceChangesInSlot[addr]; !ok { + t.Fatalf("address should exist in balanceChangesInSlot") + } + + if slotDb.parallel.systemAddressOpsCount != 1 { + t.Fatalf("systemAddressOpsCount should be 1") + } + + newBalance := slotDb.GetBalance(addr) + if newBalance.Int64() != 2 { + t.Fatalf("new nonce should be 2") + } + + if _, ok := slotDb.parallel.balanceReadsInSlot[addr]; !ok { + t.Fatalf("address should exist in balanceReadsInSlot") + } + + if slotDb.parallel.systemAddressOpsCount != 2 { + t.Fatalf("systemAddressOpsCount should be 1") + } +} + +func TestSubBalance(t *testing.T) { + memDb := rawdb.NewMemoryDatabase() + db := NewDatabase(memDb) + state, _ := New(common.Hash{}, db, nil) + state.PrepareForParallel() + + slotDb := NewSlotDB(state, systemAddress, 0, true) + + addr := systemAddress + state.SetBalance(addr, big.NewInt(2)) + + slotDb.SubBalance(addr, big.NewInt(1)) + + oldBalance := state.GetBalance(addr) + if oldBalance.Int64() != 2 { + t.Fatalf("old balance should be 1") + } + + if _, ok := slotDb.parallel.dirtiedStateObjectsInSlot[addr]; !ok { + t.Fatalf("address should exist in dirtiedStateObjectsInSlot") + } + + if _, ok := slotDb.parallel.balanceChangesInSlot[addr]; !ok { + t.Fatalf("address should exist in balanceChangesInSlot") + } + + if _, ok := slotDb.parallel.balanceReadsInSlot[addr]; !ok { + t.Fatalf("address should exist in balanceReadsInSlot") + } + + if slotDb.parallel.systemAddressOpsCount != 1 { + t.Fatalf("systemAddressOpsCount should be 1") + } + + newBalance := slotDb.GetBalance(addr) + if newBalance.Int64() != 1 { + t.Fatalf("new nonce should be 2") + } +} + +func TestAddBalance(t *testing.T) { + memDb := rawdb.NewMemoryDatabase() + db := NewDatabase(memDb) + state, _ := New(common.Hash{}, db, nil) + state.PrepareForParallel() + + slotDb := NewSlotDB(state, systemAddress, 0, true) + + addr := systemAddress + state.SetBalance(addr, big.NewInt(2)) + + slotDb.AddBalance(addr, big.NewInt(1)) + + oldBalance := state.GetBalance(addr) + if oldBalance.Int64() != 2 { + t.Fatalf("old balance should be 1") + } + + if _, ok := slotDb.parallel.dirtiedStateObjectsInSlot[addr]; !ok { + t.Fatalf("address should exist in dirtiedStateObjectsInSlot") + } + + if _, ok := slotDb.parallel.balanceChangesInSlot[addr]; !ok { + t.Fatalf("address should exist in balanceChangesInSlot") + } + + if _, ok := slotDb.parallel.balanceReadsInSlot[addr]; !ok { + t.Fatalf("address should exist in balanceReadsInSlot") + } + + if slotDb.parallel.systemAddressOpsCount != 1 { + t.Fatalf("systemAddressOpsCount should be 1") + } + + newBalance := slotDb.GetBalance(addr) + if newBalance.Int64() != 3 { + t.Fatalf("new nonce should be 2") + } +} + +func TestEmpty(t *testing.T) { + memDb := rawdb.NewMemoryDatabase() + db := NewDatabase(memDb) + state, _ := New(common.Hash{}, db, nil) + state.PrepareForParallel() + + slotDb := NewSlotDB(state, systemAddress, 0, true) + + addr := systemAddress + state.SetBalance(addr, big.NewInt(2)) + + empty := slotDb.Empty(addr) + if empty { + t.Fatalf("address should exist") + } + + if _, ok := slotDb.parallel.addrStateReadsInSlot[addr]; !ok { + t.Fatalf("address should exist in addrStateReadsInSlot") + } +} + +func TestExist(t *testing.T) { + memDb := rawdb.NewMemoryDatabase() + db := NewDatabase(memDb) + state, _ := New(common.Hash{}, db, nil) + state.PrepareForParallel() + + slotDb := NewSlotDB(state, systemAddress, 0, true) + + addr := systemAddress + state.SetBalance(addr, big.NewInt(2)) + + exist := slotDb.Exist(addr) + if !exist { + t.Fatalf("address should exist") + } + + if _, ok := slotDb.parallel.addrStateReadsInSlot[addr]; !ok { + t.Fatalf("address should exist in addrStateReadsInSlot") + } +} + +func TestMergeSlotDB(t *testing.T) { + memDb := rawdb.NewMemoryDatabase() + db := NewDatabase(memDb) + state, _ := New(common.Hash{}, db, nil) + state.PrepareForParallel() + + oldSlotDb := NewSlotDB(state, systemAddress, 0, true) + + newSlotDb := NewSlotDB(state, systemAddress, 0, true) + + addr := systemAddress + newSlotDb.SetBalance(addr, big.NewInt(2)) + newSlotDb.SetState(addr, common.BytesToHash([]byte("test key")), common.BytesToHash([]byte("test store"))) + newSlotDb.SetCode(addr, []byte("test code")) + newSlotDb.Suicide(addr) + + changeList := oldSlotDb.MergeSlotDB(newSlotDb, &types.Receipt{}, 0) + + if _, ok := changeList.StateObjectSuicided[addr]; !ok { + t.Fatalf("address should exist in StateObjectSuicided") + } + + if _, ok := changeList.StateObjectSuicided[addr]; !ok { + t.Fatalf("address should exist in StateObjectSuicided") + } + + if _, ok := changeList.StateChangeSet[addr]; !ok { + t.Fatalf("address should exist in StateChangeSet") + } + + if _, ok := changeList.BalanceChangeSet[addr]; !ok { + t.Fatalf("address should exist in StateChangeSet") + } + + if _, ok := changeList.CodeChangeSet[addr]; !ok { + t.Fatalf("address should exist in CodeChangeSet") + } + + if _, ok := changeList.AddrStateChangeSet[addr]; !ok { + t.Fatalf("address should exist in AddrStateChangeSet") + } +} diff --git a/core/state_processor.go b/core/state_processor.go index 14fe9b4b92..38fe88ef99 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -22,6 +22,7 @@ import ( "fmt" "math/big" "math/rand" + "runtime" "sync" "time" @@ -58,7 +59,6 @@ type StateProcessor struct { engine consensus.Engine // Consensus engine used for block rewards } -// NewStateProcessor initialises a new StateProcessor. func NewStateProcessor(config *params.ChainConfig, bc *BlockChain, engine consensus.Engine) *StateProcessor { return &StateProcessor{ config: config, @@ -67,6 +67,28 @@ func NewStateProcessor(config *params.ChainConfig, bc *BlockChain, engine consen } } +// add for parallel executions +type ParallelStateProcessor struct { + StateProcessor + parallelNum int // leave a CPU to dispatcher + queueSize int // parallel slot's maximum number of pending Txs + txResultChan chan *ParallelTxResult // to notify dispatcher that a tx is done + slotState []*SlotState // idle, or pending messages + mergedTxIndex int // the latest finalized tx index + debugErrorRedoNum int + debugConflictRedoNum int +} + +func NewParallelStateProcessor(config *params.ChainConfig, bc *BlockChain, engine consensus.Engine, parallelNum int, queueSize int) *ParallelStateProcessor { + processor := &ParallelStateProcessor{ + StateProcessor: *NewStateProcessor(config, bc, engine), + parallelNum: parallelNum, + queueSize: queueSize, + } + processor.init() + return processor +} + type LightStateProcessor struct { check int64 StateProcessor @@ -370,23 +392,564 @@ func (p *LightStateProcessor) LightProcess(diffLayer *types.DiffLayer, block *ty return diffLayer.Receipts, allLogs, gasUsed, nil } -// Process processes the state changes according to the Ethereum rules by running -// the transaction messages using the statedb and applying any rewards to both -// the processor (coinbase) and any included uncles. -// -// Process returns the receipts and logs accumulated during the process and -// returns the amount of gas that was used in the process. If any of the -// transactions failed to execute due to insufficient gas it will return an error. -func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (*state.StateDB, types.Receipts, []*types.Log, uint64, error) { +type SlotState struct { + tailTxReq *ParallelTxRequest // tail pending Tx of the slot, should be accessed on dispatcher only. + pendingTxReqChan chan *ParallelTxRequest + pendingTxReqList []*ParallelTxRequest // maintained by dispatcher for dispatch policy + mergedChangeList []state.SlotChangeList + slotdbChan chan *state.StateDB // dispatch will create and send this slotDB to slot +} + +type ParallelTxResult struct { + updateSlotDB bool // for redo and pending tx quest, slot needs new slotDB, + keepSystem bool // for redo, should keep system address's balance + slotIndex int // slot index + err error // to describe error message? + txReq *ParallelTxRequest + receipt *types.Receipt + slotDB *state.StateDB // if updated, it is not equal to txReq.slotDB +} + +type ParallelTxRequest struct { + txIndex int + tx *types.Transaction + slotDB *state.StateDB + gasLimit uint64 + msg types.Message + block *types.Block + vmConfig vm.Config + bloomProcessor *AsyncReceiptBloomGenerator + usedGas *uint64 + waitTxChan chan struct{} + curTxChan chan struct{} +} + +// to create and start the execution slot goroutines +func (p *ParallelStateProcessor) init() { + log.Info("Parallel execution mode is enabled", "Parallel Num", p.parallelNum, + "CPUNum", runtime.NumCPU(), + "QueueSize", p.queueSize) + p.txResultChan = make(chan *ParallelTxResult, p.parallelNum) + p.slotState = make([]*SlotState, p.parallelNum) + + for i := 0; i < p.parallelNum; i++ { + p.slotState[i] = &SlotState{ + slotdbChan: make(chan *state.StateDB, 1), + pendingTxReqChan: make(chan *ParallelTxRequest, p.queueSize), + } + // start the slot's goroutine + go func(slotIndex int) { + p.runSlotLoop(slotIndex) // this loop will be permanent live + }(i) + } +} + +// conflict check uses conflict window, it will check all state changes from (cfWindowStart + 1) +// to the previous Tx, if any state in readDb is updated in changeList, then it is conflicted +func (p *ParallelStateProcessor) hasStateConflict(readDb *state.StateDB, changeList state.SlotChangeList) bool { + // check KV change + reads := readDb.StateReadsInSlot() + writes := changeList.StateChangeSet + for readAddr, readKeys := range reads { + if _, exist := changeList.AddrStateChangeSet[readAddr]; exist { + log.Debug("conflict: read addr changed state", "addr", readAddr) + return true + } + if writeKeys, ok := writes[readAddr]; ok { + // readAddr exist + for writeKey := range writeKeys { + // same addr and same key, mark conflicted + if _, ok := readKeys[writeKey]; ok { + log.Debug("conflict: state conflict", "addr", readAddr, "key", writeKey) + return true + } + } + } + } + // check balance change + balanceReads := readDb.BalanceReadsInSlot() + balanceWrite := changeList.BalanceChangeSet + for readAddr := range balanceReads { + if _, exist := changeList.AddrStateChangeSet[readAddr]; exist { + // SystemAddress is special, SystemAddressRedo() is prepared for it. + // Since txIndex = 0 will create StateObject for SystemAddress, skip its state change check + if readAddr != consensus.SystemAddress { + log.Debug("conflict: read addr changed balance", "addr", readAddr) + return true + } + } + if _, ok := balanceWrite[readAddr]; ok { + if readAddr != consensus.SystemAddress { + log.Debug("conflict: balance conflict", "addr", readAddr) + return true + } + } + } + + // check code change + codeReads := readDb.CodeReadsInSlot() + codeWrite := changeList.CodeChangeSet + for readAddr := range codeReads { + if _, exist := changeList.AddrStateChangeSet[readAddr]; exist { + log.Debug("conflict: read addr changed code", "addr", readAddr) + return true + } + if _, ok := codeWrite[readAddr]; ok { + log.Debug("conflict: code conflict", "addr", readAddr) + return true + } + } + + // check address state change: create, suicide... + addrReads := readDb.AddressReadsInSlot() + addrWrite := changeList.AddrStateChangeSet + nonceWrite := changeList.NonceChangeSet + for readAddr := range addrReads { + if _, ok := addrWrite[readAddr]; ok { + // SystemAddress is special, SystemAddressRedo() is prepared for it. + // Since txIndex = 0 will create StateObject for SystemAddress, skip its state change check + if readAddr != consensus.SystemAddress { + log.Debug("conflict: address state conflict", "addr", readAddr) + return true + } + } + if _, ok := nonceWrite[readAddr]; ok { + log.Debug("conflict: address nonce conflict", "addr", readAddr) + return true + } + } + + return false +} + +// for parallel execute, we put contracts of same address in a slot, +// since these txs probably would have conflicts +func (p *ParallelStateProcessor) queueSameToAddress(txReq *ParallelTxRequest) bool { + txToAddr := txReq.tx.To() + // To() == nil means contract creation, no same To address + if txToAddr == nil { + return false + } + for i, slot := range p.slotState { + if slot.tailTxReq == nil { // this slot is idle + continue + } + for _, pending := range slot.pendingTxReqList { + // To() == nil means contract creation, skip it. + if pending.tx.To() == nil { + continue + } + // same to address, put it on slot's pending list. + if *txToAddr == *pending.tx.To() { + select { + case slot.pendingTxReqChan <- txReq: + slot.tailTxReq = txReq + slot.pendingTxReqList = append(slot.pendingTxReqList, txReq) + log.Debug("queue same To address", "Slot", i, "txIndex", txReq.txIndex) + return true + default: + log.Debug("queue same To address, but queue is full", "Slot", i, "txIndex", txReq.txIndex) + break // try next slot + } + } + } + } + return false +} + +// for parallel execute, we put contracts of same address in a slot, +// since these txs probably would have conflicts +func (p *ParallelStateProcessor) queueSameFromAddress(txReq *ParallelTxRequest) bool { + txFromAddr := txReq.msg.From() + for i, slot := range p.slotState { + if slot.tailTxReq == nil { // this slot is idle + continue + } + for _, pending := range slot.pendingTxReqList { + // same from address, put it on slot's pending list. + if txFromAddr == pending.msg.From() { + select { + case slot.pendingTxReqChan <- txReq: + slot.tailTxReq = txReq + slot.pendingTxReqList = append(slot.pendingTxReqList, txReq) + log.Debug("queue same From address", "Slot", i, "txIndex", txReq.txIndex) + return true + default: + log.Debug("queue same From address, but queue is full", "Slot", i, "txIndex", txReq.txIndex) + break // try next slot + } + } + } + } + return false +} + +// if there is idle slot, dispatch the msg to the first idle slot +func (p *ParallelStateProcessor) dispatchToIdleSlot(statedb *state.StateDB, txReq *ParallelTxRequest) bool { + for i, slot := range p.slotState { + if slot.tailTxReq == nil { + if len(slot.mergedChangeList) == 0 { + // first transaction of a slot, there is no usable SlotDB, have to create one for it. + txReq.slotDB = state.NewSlotDB(statedb, consensus.SystemAddress, p.mergedTxIndex, false) + } + log.Debug("dispatchToIdleSlot", "Slot", i, "txIndex", txReq.txIndex) + slot.tailTxReq = txReq + slot.pendingTxReqList = append(slot.pendingTxReqList, txReq) + slot.pendingTxReqChan <- txReq + return true + } + } + return false +} + +// wait until the next Tx is executed and its result is merged to the main stateDB +func (p *ParallelStateProcessor) waitUntilNextTxDone(statedb *state.StateDB, gp *GasPool) *ParallelTxResult { + var result *ParallelTxResult + for { + result = <-p.txResultChan + // slot may request new slotDB, if slotDB is outdated + // such as: + // tx in pending tx request, previous tx in same queue is likely "damaged" the slotDB + // tx redo for conflict + // tx stage 1 failed, nonce out of order... + if result.updateSlotDB { + // the target slot is waiting for new slotDB + slotState := p.slotState[result.slotIndex] + slotDB := state.NewSlotDB(statedb, consensus.SystemAddress, p.mergedTxIndex, result.keepSystem) + slotState.slotdbChan <- slotDB + continue + } + // ok, the tx result is valid and can be merged + break + } + + if err := gp.SubGas(result.receipt.GasUsed); err != nil { + log.Error("gas limit reached", "block", result.txReq.block.Number(), + "txIndex", result.txReq.txIndex, "GasUsed", result.receipt.GasUsed, "gp.Gas", gp.Gas()) + } + + resultSlotIndex := result.slotIndex + resultTxIndex := result.txReq.txIndex + resultSlotState := p.slotState[resultSlotIndex] + resultSlotState.pendingTxReqList = resultSlotState.pendingTxReqList[1:] + if resultSlotState.tailTxReq.txIndex == resultTxIndex { + log.Debug("ProcessParallel slot is idle", "Slot", resultSlotIndex) + resultSlotState.tailTxReq = nil + } + + // Slot's mergedChangeList is produced by dispatcher, while consumed by slot. + // It is safe, since write and read is in sequential, do write -> notify -> read + // It is not good, but work right now. + changeList := statedb.MergeSlotDB(result.slotDB, result.receipt, resultTxIndex) + resultSlotState.mergedChangeList = append(resultSlotState.mergedChangeList, changeList) + + if resultTxIndex != p.mergedTxIndex+1 { + log.Error("ProcessParallel tx result out of order", "resultTxIndex", resultTxIndex, + "p.mergedTxIndex", p.mergedTxIndex) + } + p.mergedTxIndex = resultTxIndex + // notify the following Tx, it is merged, + // todo(optimize): if next tx is in same slot, it do not need to wait; save this channel cost. + close(result.txReq.curTxChan) + return result +} + +func (p *ParallelStateProcessor) execInSlot(slotIndex int, txReq *ParallelTxRequest) *ParallelTxResult { + txIndex := txReq.txIndex + tx := txReq.tx + slotDB := txReq.slotDB + slotGasLimit := txReq.gasLimit // not accurate, but it is ok for block import. + msg := txReq.msg + block := txReq.block + header := block.Header() + cfg := txReq.vmConfig + bloomProcessor := txReq.bloomProcessor + + blockContext := NewEVMBlockContext(header, p.bc, nil) // can share blockContext within a block for efficiency + vmenv := vm.NewEVM(blockContext, vm.TxContext{}, slotDB, p.config, cfg) + + var receipt *types.Receipt + var result *ExecutionResult + var err error + var evm *vm.EVM + + slotDB.Prepare(tx.Hash(), block.Hash(), txIndex) + log.Debug("exec In Slot", "Slot", slotIndex, "txIndex", txIndex, "slotDB.baseTxIndex", slotDB.BaseTxIndex()) + + gpSlot := new(GasPool).AddGas(slotGasLimit) // each slot would use its own gas pool, and will do gaslimit check later + evm, result, err = applyTransactionStageExecution(msg, gpSlot, slotDB, vmenv) + log.Debug("Stage Execution done", "Slot", slotIndex, "txIndex", txIndex, "slotDB.baseTxIndex", slotDB.BaseTxIndex()) + + // wait until the previous tx is finalized. + if txReq.waitTxChan != nil { + log.Debug("Stage wait previous Tx done", "Slot", slotIndex, "txIndex", txIndex) + <-txReq.waitTxChan // close the channel + } + + // in parallel mode, tx can run into trouble, for example: err="nonce too high" + // in these cases, we will wait and re-run. + if err != nil { + p.debugErrorRedoNum++ + log.Debug("Stage Execution err", "Slot", slotIndex, "txIndex", txIndex, + "current slotDB.baseTxIndex", slotDB.BaseTxIndex(), "err", err) + redoResult := &ParallelTxResult{ + updateSlotDB: true, + slotIndex: slotIndex, + txReq: txReq, + receipt: receipt, + err: err, + } + p.txResultChan <- redoResult + slotDB = <-p.slotState[slotIndex].slotdbChan + slotDB.Prepare(tx.Hash(), block.Hash(), txIndex) + log.Debug("Stage Execution get new slotdb to redo", "Slot", slotIndex, + "txIndex", txIndex, "new slotDB.baseTxIndex", slotDB.BaseTxIndex()) + gpSlot = new(GasPool).AddGas(slotGasLimit) + evm, result, err = applyTransactionStageExecution(msg, gpSlot, slotDB, vmenv) + if err != nil { + log.Error("Stage Execution redo, error", err) + } + } + + // do conflict detect + hasConflict := false + systemAddrConflict := false + log.Debug("Stage Execution done, do conflict check", "Slot", slotIndex, "txIndex", txIndex) + if slotDB.SystemAddressRedo() { + hasConflict = true + systemAddrConflict = true + } else { + for index := 0; index < p.parallelNum; index++ { + if index == slotIndex { + continue + } + + // check all finalizedDb from current slot's + for _, changeList := range p.slotState[index].mergedChangeList { + if changeList.TxIndex <= slotDB.BaseTxIndex() { + continue + } + if p.hasStateConflict(slotDB, changeList) { + log.Debug("Stage Execution conflict", "Slot", slotIndex, + "txIndex", txIndex, " conflict slot", index, "slotDB.baseTxIndex", slotDB.BaseTxIndex(), + "conflict txIndex", changeList.TxIndex) + hasConflict = true + break + } + } + if hasConflict { + break + } + } + } + + if hasConflict { + p.debugConflictRedoNum++ + // re-run should not have conflict, since it has the latest world state. + redoResult := &ParallelTxResult{ + updateSlotDB: true, + keepSystem: systemAddrConflict, + slotIndex: slotIndex, + txReq: txReq, + receipt: receipt, + err: err, + } + p.txResultChan <- redoResult + slotDB = <-p.slotState[slotIndex].slotdbChan + slotDB.Prepare(tx.Hash(), block.Hash(), txIndex) + gpSlot = new(GasPool).AddGas(slotGasLimit) + evm, result, err = applyTransactionStageExecution(msg, gpSlot, slotDB, vmenv) + if err != nil { + log.Error("Stage Execution conflict redo, error", err) + } + } + + // goroutine unsafe operation will be handled from here for safety + gasConsumed := slotGasLimit - gpSlot.Gas() + if gasConsumed != result.UsedGas { + log.Error("gasConsumed != result.UsedGas mismatch", + "gasConsumed", gasConsumed, "result.UsedGas", result.UsedGas) + } + + log.Debug("ok to finalize this TX", + "Slot", slotIndex, "txIndex", txIndex, "result.UsedGas", result.UsedGas, "txReq.usedGas", *txReq.usedGas) + // ok, time to do finalize, stage2 should not be parallel + receipt, err = applyTransactionStageFinalization(evm, result, msg, p.config, slotDB, header, tx, txReq.usedGas, bloomProcessor) + + if result.Failed() { + // if Tx is reverted, all its state change will be discarded + log.Debug("TX reverted?", "Slot", slotIndex, "txIndex", txIndex, "result.Err", result.Err) + slotDB.RevertSlotDB(msg.From()) + } + + return &ParallelTxResult{ + updateSlotDB: false, + slotIndex: slotIndex, + txReq: txReq, + receipt: receipt, + slotDB: slotDB, + err: err, + } +} + +func (p *ParallelStateProcessor) runSlotLoop(slotIndex int) { + curSlot := p.slotState[slotIndex] + for { + // wait for new TxReq + txReq := <-curSlot.pendingTxReqChan + // receive a dispatched message + log.Debug("SlotLoop received a new TxReq", "Slot", slotIndex, "txIndex", txReq.txIndex) + + // SlotDB create rational: + // ** for a dispatched tx, + // the slot should be idle, it is better to create a new SlotDB, since new Tx is not related to previous Tx + // ** for a queued tx, + // it is better to create a new SlotDB, since COW is used. + if txReq.slotDB == nil { + result := &ParallelTxResult{ + updateSlotDB: true, + slotIndex: slotIndex, + err: nil, + } + p.txResultChan <- result + txReq.slotDB = <-curSlot.slotdbChan + } + result := p.execInSlot(slotIndex, txReq) + log.Debug("SlotLoop the TxReq is done", "Slot", slotIndex, "err", result.err) + p.txResultChan <- result + } +} + +// clear slot state for each block. +func (p *ParallelStateProcessor) resetState(txNum int, statedb *state.StateDB) { + if txNum == 0 { + return + } + p.mergedTxIndex = -1 + p.debugErrorRedoNum = 0 + p.debugConflictRedoNum = 0 + + statedb.PrepareForParallel() + + for _, slot := range p.slotState { + slot.tailTxReq = nil + slot.mergedChangeList = make([]state.SlotChangeList, 0) + slot.pendingTxReqList = make([]*ParallelTxRequest, 0) + } +} + +// Implement BEP-130: Parallel Transaction Execution. +func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (*state.StateDB, types.Receipts, []*types.Log, uint64, error) { var ( usedGas = new(uint64) header = block.Header() - allLogs []*types.Log gp = new(GasPool).AddGas(block.GasLimit()) ) + var receipts = make([]*types.Receipt, 0) + txNum := len(block.Transactions()) + p.resetState(txNum, statedb) + + // Iterate over and process the individual transactions + posa, isPoSA := p.engine.(consensus.PoSA) + commonTxs := make([]*types.Transaction, 0, txNum) + // usually do have two tx, one for validator set contract, another for system reward contract. + systemTxs := make([]*types.Transaction, 0, 2) + + signer, _, bloomProcessor := p.preExecute(block, statedb, cfg, true) + var waitTxChan, curTxChan chan struct{} + for i, tx := range block.Transactions() { + if isPoSA { + if isSystemTx, err := posa.IsSystemTransaction(tx, block.Header()); err != nil { + return statedb, nil, nil, 0, err + } else if isSystemTx { + systemTxs = append(systemTxs, tx) + continue + } + } + + // can be moved it into slot for efficiency, but signer is not concurrent safe + msg, err := tx.AsMessage(signer) + if err != nil { + return statedb, nil, nil, 0, err + } + + // parallel start, wrap an exec message, which will be dispatched to a slot + waitTxChan = curTxChan // can be nil, if this is the tx of first batch, otherwise, it is previous Tx's wait channel + curTxChan = make(chan struct{}, 1) + + txReq := &ParallelTxRequest{ + txIndex: i, + tx: tx, + slotDB: nil, + gasLimit: gp.Gas(), + msg: msg, + block: block, + vmConfig: cfg, + bloomProcessor: bloomProcessor, + usedGas: usedGas, + waitTxChan: waitTxChan, + curTxChan: curTxChan, + } + + // to optimize the for { for {} } loop code style? it is ok right now. + for { + if p.queueSameFromAddress(txReq) { + break + } + + if p.queueSameToAddress(txReq) { + break + } + // if idle slot available, just dispatch and process next tx. + if p.dispatchToIdleSlot(statedb, txReq) { + break + } + log.Debug("ProcessParallel no slot available, wait", "txIndex", txReq.txIndex) + // no idle slot, wait until a tx is executed and merged. + result := p.waitUntilNextTxDone(statedb, gp) + + // update tx result + if result.err != nil { + log.Warn("ProcessParallel a failed tx", "resultSlotIndex", result.slotIndex, + "resultTxIndex", result.txReq.txIndex, "result.err", result.err) + return statedb, nil, nil, 0, fmt.Errorf("could not apply tx %d [%v]: %w", result.txReq.txIndex, result.txReq.tx.Hash().Hex(), result.err) + } + + commonTxs = append(commonTxs, result.txReq.tx) + receipts = append(receipts, result.receipt) + } + } + + // wait until all tx request are done + for len(commonTxs)+len(systemTxs) < txNum { + result := p.waitUntilNextTxDone(statedb, gp) + // update tx result + if result.err != nil { + log.Warn("ProcessParallel a failed tx", "resultSlotIndex", result.slotIndex, + "resultTxIndex", result.txReq.txIndex, "result.err", result.err) + return statedb, nil, nil, 0, fmt.Errorf("could not apply tx %d [%v]: %w", result.txReq.txIndex, result.txReq.tx.Hash().Hex(), result.err) + } + commonTxs = append(commonTxs, result.txReq.tx) + receipts = append(receipts, result.receipt) + } + + // len(commonTxs) could be 0, such as: https://bscscan.com/block/14580486 + if len(commonTxs) > 0 { + log.Info("ProcessParallel tx all done", "block", header.Number, "usedGas", *usedGas, + "txNum", txNum, + "len(commonTxs)", len(commonTxs), + "errorNum", p.debugErrorRedoNum, + "conflictNum", p.debugConflictRedoNum, + "redoRate(%)", 100*(p.debugErrorRedoNum+p.debugConflictRedoNum)/len(commonTxs)) + } + allLogs, err := p.postExecute(block, statedb, &commonTxs, &receipts, &systemTxs, usedGas, bloomProcessor) + return statedb, receipts, allLogs, *usedGas, err +} + +// Before transactions are executed, do shared preparation for Process() & ProcessParallel() +func (p *StateProcessor) preExecute(block *types.Block, statedb *state.StateDB, cfg vm.Config, parallel bool) (types.Signer, *vm.EVM, *AsyncReceiptBloomGenerator) { signer := types.MakeSigner(p.bc.chainConfig, block.Number()) statedb.TryPreload(block, signer) - var receipts = make([]*types.Receipt, 0) // Mutate the block and state according to any hard-fork specs if p.config.DAOForkSupport && p.config.DAOForkBlock != nil && p.config.DAOForkBlock.Cmp(block.Number()) == 0 { misc.ApplyDAOHardFork(statedb) @@ -394,20 +957,59 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg // Handle upgrade build-in system contract code systemcontracts.UpgradeBuildInSystemContract(p.config, block.Number(), statedb) - blockContext := NewEVMBlockContext(header, p.bc, nil) - vmenv := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg) + // with parallel mode, vmenv will be created inside of slot + var vmenv *vm.EVM + if !parallel { + blockContext := NewEVMBlockContext(block.Header(), p.bc, nil) + vmenv = vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg) + } + // initialise bloom processors + bloomProcessor := NewAsyncReceiptBloomGenerator(len(block.Transactions())) + statedb.MarkFullProcessed() + + return signer, vmenv, bloomProcessor +} + +func (p *StateProcessor) postExecute(block *types.Block, statedb *state.StateDB, commonTxs *[]*types.Transaction, + receipts *[]*types.Receipt, systemTxs *[]*types.Transaction, usedGas *uint64, bloomProcessor *AsyncReceiptBloomGenerator) ([]*types.Log, error) { + allLogs := make([]*types.Log, 0, len(*receipts)) + + bloomProcessor.Close() + + // Finalize the block, applying any consensus engine specific extras (e.g. block rewards) + err := p.engine.Finalize(p.bc, block.Header(), statedb, commonTxs, block.Uncles(), receipts, systemTxs, usedGas) + if err != nil { + return allLogs, err + } + for _, receipt := range *receipts { + allLogs = append(allLogs, receipt.Logs...) + } + return allLogs, nil +} + +// Process processes the state changes according to the Ethereum rules by running +// the transaction messages using the statedb and applying any rewards to both +// the processor (coinbase) and any included uncles. +// +// Process returns the receipts and logs accumulated during the process and +// returns the amount of gas that was used in the process. If any of the +// transactions failed to execute due to insufficient gas it will return an error. +func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (*state.StateDB, types.Receipts, []*types.Log, uint64, error) { + var ( + usedGas = new(uint64) + header = block.Header() + gp = new(GasPool).AddGas(block.GasLimit()) + ) + var receipts = make([]*types.Receipt, 0) txNum := len(block.Transactions()) + commonTxs := make([]*types.Transaction, 0, txNum) // Iterate over and process the individual transactions posa, isPoSA := p.engine.(consensus.PoSA) - commonTxs := make([]*types.Transaction, 0, txNum) - - // initilise bloom processors - bloomProcessors := NewAsyncReceiptBloomGenerator(txNum) - statedb.MarkFullProcessed() - // usually do have two tx, one for validator set contract, another for system reward contract. systemTxs := make([]*types.Transaction, 0, 2) + + signer, vmenv, bloomProcessor := p.preExecute(block, statedb, cfg, false) for i, tx := range block.Transactions() { if isPoSA { if isSystemTx, err := posa.IsSystemTransaction(tx, block.Header()); err != nil { @@ -423,7 +1025,7 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg return statedb, nil, nil, 0, err } statedb.Prepare(tx.Hash(), block.Hash(), i) - receipt, err := applyTransaction(msg, p.config, p.bc, nil, gp, statedb, header, tx, usedGas, vmenv, bloomProcessors) + receipt, err := applyTransaction(msg, p.config, p.bc, nil, gp, statedb, header, tx, usedGas, vmenv, bloomProcessor) if err != nil { return statedb, nil, nil, 0, fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err) } @@ -431,21 +1033,59 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg commonTxs = append(commonTxs, tx) receipts = append(receipts, receipt) } - bloomProcessors.Close() - // Finalize the block, applying any consensus engine specific extras (e.g. block rewards) - err := p.engine.Finalize(p.bc, header, statedb, &commonTxs, block.Uncles(), &receipts, &systemTxs, usedGas) + allLogs, err := p.postExecute(block, statedb, &commonTxs, &receipts, &systemTxs, usedGas, bloomProcessor) + return statedb, receipts, allLogs, *usedGas, err +} + +func applyTransaction(msg types.Message, config *params.ChainConfig, bc ChainContext, author *common.Address, gp *GasPool, statedb *state.StateDB, header *types.Header, tx *types.Transaction, usedGas *uint64, evm *vm.EVM, receiptProcessors ...ReceiptProcessor) (*types.Receipt, error) { + // Create a new context to be used in the EVM environment. + txContext := NewEVMTxContext(msg) + evm.Reset(txContext, statedb) + + // Apply the transaction to the current state (included in the env). + result, err := ApplyMessage(evm, msg, gp) if err != nil { - return statedb, receipts, allLogs, *usedGas, err + return nil, err } - for _, receipt := range receipts { - allLogs = append(allLogs, receipt.Logs...) + + // Update the state with pending changes. + var root []byte + if config.IsByzantium(header.Number) { + statedb.Finalise(true) + } else { + root = statedb.IntermediateRoot(config.IsEIP158(header.Number)).Bytes() } + *usedGas += result.UsedGas - return statedb, receipts, allLogs, *usedGas, nil + // Create a new receipt for the transaction, storing the intermediate root and gas used + // by the tx. + receipt := &types.Receipt{Type: tx.Type(), PostState: root, CumulativeGasUsed: *usedGas} + if result.Failed() { + receipt.Status = types.ReceiptStatusFailed + } else { + receipt.Status = types.ReceiptStatusSuccessful + } + receipt.TxHash = tx.Hash() + receipt.GasUsed = result.UsedGas + + // If the transaction created a contract, store the creation address in the receipt. + if msg.To() == nil { + receipt.ContractAddress = crypto.CreateAddress(evm.TxContext.Origin, tx.Nonce()) + } + + // Set the receipt logs and create the bloom filter. + receipt.Logs = statedb.GetLogs(tx.Hash()) + receipt.BlockHash = statedb.BlockHash() + receipt.BlockNumber = header.Number + receipt.TransactionIndex = uint(statedb.TxIndex()) + for _, receiptProcessor := range receiptProcessors { + receiptProcessor.Apply(receipt) + } + return receipt, err } -func applyTransaction(msg types.Message, config *params.ChainConfig, bc ChainContext, author *common.Address, gp *GasPool, statedb *state.StateDB, header *types.Header, tx *types.Transaction, usedGas *uint64, evm *vm.EVM, receiptProcessors ...ReceiptProcessor) (*types.Receipt, error) { +func applyTransactionStageExecution(msg types.Message, gp *GasPool, statedb *state.StateDB, evm *vm.EVM) (*vm.EVM, *ExecutionResult, error) { // Create a new context to be used in the EVM environment. txContext := NewEVMTxContext(msg) evm.Reset(txContext, statedb) @@ -453,9 +1093,13 @@ func applyTransaction(msg types.Message, config *params.ChainConfig, bc ChainCon // Apply the transaction to the current state (included in the env). result, err := ApplyMessage(evm, msg, gp) if err != nil { - return nil, err + return nil, nil, err } + return evm, result, err +} + +func applyTransactionStageFinalization(evm *vm.EVM, result *ExecutionResult, msg types.Message, config *params.ChainConfig, statedb *state.StateDB, header *types.Header, tx *types.Transaction, usedGas *uint64, receiptProcessors ...ReceiptProcessor) (*types.Receipt, error) { // Update the state with pending changes. var root []byte if config.IsByzantium(header.Number) { @@ -489,7 +1133,7 @@ func applyTransaction(msg types.Message, config *params.ChainConfig, bc ChainCon for _, receiptProcessor := range receiptProcessors { receiptProcessor.Apply(receipt) } - return receipt, err + return receipt, nil } // ApplyTransaction attempts to apply a transaction to the given state database diff --git a/core/vm/evm.go b/core/vm/evm.go index 53e2e8797b..c7c8e0596c 100644 --- a/core/vm/evm.go +++ b/core/vm/evm.go @@ -475,6 +475,7 @@ func (evm *EVM) create(caller ContractRef, codeAndHash *codeAndHash, gas uint64, } nonce := evm.StateDB.GetNonce(caller.Address()) evm.StateDB.SetNonce(caller.Address(), nonce+1) + evm.StateDB.NonceChanged(caller.Address()) // We add this to the access list _before_ taking a snapshot. Even if the creation fails, // the access-list change should not be rolled back if evm.chainRules.IsBerlin { diff --git a/core/vm/interface.go b/core/vm/interface.go index ad9b05d666..c3d99aaa76 100644 --- a/core/vm/interface.go +++ b/core/vm/interface.go @@ -31,6 +31,7 @@ type StateDB interface { AddBalance(common.Address, *big.Int) GetBalance(common.Address) *big.Int + NonceChanged(common.Address) GetNonce(common.Address) uint64 SetNonce(common.Address, uint64) diff --git a/eth/backend.go b/eth/backend.go index ab93006437..8551b57eaa 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -202,6 +202,8 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { bcOps := make([]core.BlockChainOption, 0) if config.DiffSync { bcOps = append(bcOps, core.EnableLightProcessor) + } else if config.ParallelTxMode { + bcOps = append(bcOps, core.EnableParallelProcessor(config.ParallelTxNum, config.ParallelTxQueueSize)) } if config.PipeCommit { bcOps = append(bcOps, core.EnablePipelineCommit) diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index 09baad1e1c..94998acd6c 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -135,10 +135,13 @@ type Config struct { NoPruning bool // Whether to disable pruning and flush everything to disk DirectBroadcast bool - DisableSnapProtocol bool //Whether disable snap protocol + DisableSnapProtocol bool // Whether disable snap protocol DiffSync bool // Whether support diff sync PipeCommit bool RangeLimit bool + ParallelTxMode bool // Whether to execute transaction in parallel mode when do full sync + ParallelTxNum int // Number of slot for transaction execution + ParallelTxQueueSize int // Max number of Tx that can be queued to a slot TxLookupLimit uint64 `toml:",omitempty"` // The maximum number of blocks from head whose tx indices are reserved. diff --git a/go.mod b/go.mod index c58a45dfd2..41b8a28076 100644 --- a/go.mod +++ b/go.mod @@ -58,6 +58,7 @@ require ( github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c github.com/panjf2000/ants/v2 v2.4.5 github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7 + github.com/prometheus/client_golang v1.0.0 github.com/prometheus/tsdb v0.7.1 github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563 // indirect github.com/rjeczalik/notify v0.9.1 diff --git a/metrics/exp/exp.go b/metrics/exp/exp.go index 3ebe8cc68a..563a55bf65 100644 --- a/metrics/exp/exp.go +++ b/metrics/exp/exp.go @@ -8,6 +8,8 @@ import ( "net/http" "sync" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/metrics/prometheus" @@ -44,6 +46,7 @@ func Exp(r metrics.Registry) { // http.HandleFunc("/debug/vars", e.expHandler) // haven't found an elegant way, so just use a different endpoint http.Handle("/debug/metrics", h) + http.Handle("/debug/metrics/go_prometheus", promhttp.Handler()) http.Handle("/debug/metrics/prometheus", prometheus.Handler(r)) } @@ -58,6 +61,7 @@ func ExpHandler(r metrics.Registry) http.Handler { func Setup(address string) { m := http.NewServeMux() m.Handle("/debug/metrics", ExpHandler(metrics.DefaultRegistry)) + m.Handle("/debug/metrics/go_prometheus", promhttp.Handler()) m.Handle("/debug/metrics/prometheus", prometheus.Handler(metrics.DefaultRegistry)) log.Info("Starting metrics server", "addr", fmt.Sprintf("http://%s/debug/metrics", address)) go func() {