From 1368da634ae424ab9e6552b3434282598f74ac2e Mon Sep 17 00:00:00 2001 From: Giulio rebuffo Date: Thu, 16 May 2024 03:44:55 +0200 Subject: [PATCH] E35: Fix transaction pool being unable to have txs (#10365) Changes in state were not being "accumulated" in the TxPool's cache --- cmd/state/exec3/state.go | 15 +++++----- core/state/rw_v3.go | 43 ++++++++++++++++++++++++---- eth/stagedsync/exec3.go | 11 +++++-- eth/stagedsync/stage_execute.go | 4 --- eth/stagedsync/stage_execute_test.go | 2 +- eth/stagedsync/stage_mining_exec.go | 2 +- turbo/execution/eth1/forkchoice.go | 20 +++++++++++++ 7 files changed, 75 insertions(+), 22 deletions(-) diff --git a/cmd/state/exec3/state.go b/cmd/state/exec3/state.go index de7fce77946..273a25093eb 100644 --- a/cmd/state/exec3/state.go +++ b/cmd/state/exec3/state.go @@ -21,6 +21,7 @@ import ( "github.com/ledgerwatch/erigon/core/vm" "github.com/ledgerwatch/erigon/core/vm/evmtypes" "github.com/ledgerwatch/erigon/turbo/services" + "github.com/ledgerwatch/erigon/turbo/shards" ) type Worker struct { @@ -53,7 +54,7 @@ type Worker struct { dirs datadir.Dirs } -func NewWorker(lock sync.Locker, logger log.Logger, ctx context.Context, background bool, chainDb kv.RoDB, rs *state.StateV3, in *state.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, results *state.ResultsQueue, engine consensus.Engine, dirs datadir.Dirs) *Worker { +func NewWorker(lock sync.Locker, logger log.Logger, accumulator *shards.Accumulator, ctx context.Context, background bool, chainDb kv.RoDB, rs *state.StateV3, in *state.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, results *state.ResultsQueue, engine consensus.Engine, dirs datadir.Dirs) *Worker { w := &Worker{ lock: lock, logger: logger, @@ -62,7 +63,7 @@ func NewWorker(lock sync.Locker, logger log.Logger, ctx context.Context, backgro rs: rs, background: background, blockReader: blockReader, - stateWriter: state.NewStateWriterV3(rs), + stateWriter: state.NewStateWriterV3(rs, accumulator), stateReader: state.NewStateReaderV3(rs.Domains()), chainConfig: chainConfig, @@ -83,10 +84,10 @@ func NewWorker(lock sync.Locker, logger log.Logger, ctx context.Context, backgro return w } -func (rw *Worker) ResetState(rs *state.StateV3) { +func (rw *Worker) ResetState(rs *state.StateV3, accumulator *shards.Accumulator) { rw.rs = rs rw.SetReader(state.NewStateReaderV3(rs.Domains())) - rw.stateWriter = state.NewStateWriterV3(rs) + rw.stateWriter = state.NewStateWriterV3(rs, accumulator) } func (rw *Worker) Tx() kv.Tx { return rw.chainTx } @@ -268,7 +269,7 @@ func (rw *Worker) RunTxTaskNoLock(txTask *state.TxTask) { } } -func NewWorkersPool(lock sync.Locker, logger log.Logger, ctx context.Context, background bool, chainDb kv.RoDB, rs *state.StateV3, in *state.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, engine consensus.Engine, workerCount int, dirs datadir.Dirs) (reconWorkers []*Worker, applyWorker *Worker, rws *state.ResultsQueue, clear func(), wait func()) { +func NewWorkersPool(lock sync.Locker, accumulator *shards.Accumulator, logger log.Logger, ctx context.Context, background bool, chainDb kv.RoDB, rs *state.StateV3, in *state.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, engine consensus.Engine, workerCount int, dirs datadir.Dirs) (reconWorkers []*Worker, applyWorker *Worker, rws *state.ResultsQueue, clear func(), wait func()) { reconWorkers = make([]*Worker, workerCount) resultChSize := workerCount * 8 @@ -279,7 +280,7 @@ func NewWorkersPool(lock sync.Locker, logger log.Logger, ctx context.Context, ba ctx, cancel := context.WithCancel(ctx) g, ctx := errgroup.WithContext(ctx) for i := 0; i < workerCount; i++ { - reconWorkers[i] = NewWorker(lock, logger, ctx, background, chainDb, rs, in, blockReader, chainConfig, genesis, rws, engine, dirs) + reconWorkers[i] = NewWorker(lock, logger, accumulator, ctx, background, chainDb, rs, in, blockReader, chainConfig, genesis, rws, engine, dirs) } if background { for i := 0; i < workerCount; i++ { @@ -305,7 +306,7 @@ func NewWorkersPool(lock sync.Locker, logger log.Logger, ctx context.Context, ba //applyWorker.ResetTx(nil) } } - applyWorker = NewWorker(lock, logger, ctx, false, chainDb, rs, in, blockReader, chainConfig, genesis, rws, engine, dirs) + applyWorker = NewWorker(lock, logger, accumulator, ctx, false, chainDb, rs, in, blockReader, chainConfig, genesis, rws, engine, dirs) return reconWorkers, applyWorker, rws, clear, wait } diff --git a/core/state/rw_v3.go b/core/state/rw_v3.go index 9b2cd419fca..acfc047bd4f 100644 --- a/core/state/rw_v3.go +++ b/core/state/rw_v3.go @@ -345,14 +345,16 @@ type StateWriterBufferedV3 struct { accountDels map[string]*accounts.Account storagePrevs map[string][]byte codePrevs map[string]uint64 + accumulator *shards.Accumulator tx kv.Tx } -func NewStateWriterBufferedV3(rs *StateV3) *StateWriterBufferedV3 { +func NewStateWriterBufferedV3(rs *StateV3, accumulator *shards.Accumulator) *StateWriterBufferedV3 { return &StateWriterBufferedV3{ - rs: rs, - writeLists: newWriteList(), + rs: rs, + writeLists: newWriteList(), + accumulator: accumulator, //trace: true, } } @@ -395,6 +397,9 @@ func (w *StateWriterBufferedV3) UpdateAccountData(address common.Address, origin } } value := accounts.SerialiseV3(account) + if w.accumulator != nil { + w.accumulator.ChangeAccount(address, account.Incarnation, value) + } w.writeLists[kv.AccountsDomain.String()].Push(string(address[:]), value) return nil @@ -404,6 +409,9 @@ func (w *StateWriterBufferedV3) UpdateAccountCode(address common.Address, incarn if w.trace { fmt.Printf("code: %x, %x, valLen: %d\n", address.Bytes(), codeHash, len(code)) } + if w.accumulator != nil { + w.accumulator.ChangeCode(address, incarnation, code) + } w.writeLists[kv.CodeDomain.String()].Push(string(address[:]), code) return nil } @@ -412,6 +420,9 @@ func (w *StateWriterBufferedV3) DeleteAccount(address common.Address, original * if w.trace { fmt.Printf("del acc: %x\n", address) } + if w.accumulator != nil { + w.accumulator.DeleteAccount(address) + } w.writeLists[kv.AccountsDomain.String()].Push(string(address.Bytes()), nil) return nil } @@ -425,6 +436,11 @@ func (w *StateWriterBufferedV3) WriteAccountStorage(address common.Address, inca if w.trace { fmt.Printf("storage: %x,%x,%x\n", address, *key, value.Bytes()) } + if w.accumulator != nil && key != nil && value != nil { + k := *key + v := value.Bytes() + w.accumulator.ChangeStorage(address, incarnation, k, v) + } return nil } @@ -446,13 +462,14 @@ func (w *StateWriterBufferedV3) CreateContract(address common.Address) error { // StateWriterV3 - used by parallel workers to accumulate updates and then send them to conflict-resolution. type StateWriterV3 struct { - rs *StateV3 - trace bool + rs *StateV3 + trace bool + accumulator *shards.Accumulator tx kv.Tx } -func NewStateWriterV3(rs *StateV3) *StateWriterV3 { +func NewStateWriterV3(rs *StateV3, accumulator *shards.Accumulator) *StateWriterV3 { return &StateWriterV3{ rs: rs, //trace: true, @@ -488,6 +505,9 @@ func (w *StateWriterV3) UpdateAccountData(address common.Address, original, acco } } value := accounts.SerialiseV3(account) + if w.accumulator != nil { + w.accumulator.ChangeAccount(address, account.Incarnation, value) + } if err := w.rs.domains.DomainPut(kv.AccountsDomain, address[:], nil, value, nil, 0); err != nil { return err @@ -502,6 +522,9 @@ func (w *StateWriterV3) UpdateAccountCode(address common.Address, incarnation ui if err := w.rs.domains.DomainPut(kv.CodeDomain, address[:], nil, code, nil, 0); err != nil { return err } + if w.accumulator != nil { + w.accumulator.ChangeCode(address, incarnation, code) + } return nil } @@ -512,6 +535,9 @@ func (w *StateWriterV3) DeleteAccount(address common.Address, original *accounts if err := w.rs.domains.DomainDel(kv.AccountsDomain, address[:], nil, nil, 0); err != nil { return err } + if w.accumulator != nil { + w.accumulator.DeleteAccount(address) + } return nil } @@ -527,6 +553,11 @@ func (w *StateWriterV3) WriteAccountStorage(address common.Address, incarnation if len(v) == 0 { return w.rs.domains.DomainDel(kv.StorageDomain, composite, nil, nil, 0) } + if w.accumulator != nil && key != nil && value != nil { + k := *key + w.accumulator.ChangeStorage(address, incarnation, k, v) + } + return w.rs.domains.DomainPut(kv.StorageDomain, composite, nil, v, nil, 0) } diff --git a/eth/stagedsync/exec3.go b/eth/stagedsync/exec3.go index 4aef00db17b..a1eaefe08d7 100644 --- a/eth/stagedsync/exec3.go +++ b/eth/stagedsync/exec3.go @@ -44,6 +44,7 @@ import ( "github.com/ledgerwatch/erigon/eth/ethconfig/estimate" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" "github.com/ledgerwatch/erigon/turbo/services" + "github.com/ledgerwatch/erigon/turbo/shards" ) var execStepsInDB = metrics.NewGauge(`exec_steps_in_db`) //nolint @@ -290,7 +291,6 @@ func ExecV3(ctx context.Context, blockNum = doms.BlockNum() initialBlockNum := blockNum outputTxNum.Store(doms.TxNum()) - if maxBlockNum-blockNum > 16 { log.Info(fmt.Sprintf("[%s] starting", execStage.LogPrefix()), "from", blockNum, "to", maxBlockNum, "fromTxNum", doms.TxNum(), "offsetFromBlockBeginning", offsetFromBlockBeginning, "initialCycle", initialCycle, "useExternalTx", useExternalTx) @@ -306,6 +306,11 @@ func ExecV3(ctx context.Context, var count uint64 var lock sync.RWMutex + shouldReportToTxPool := maxBlockNum-blockNum <= 8 + var accumulator *shards.Accumulator + if shouldReportToTxPool { + accumulator = cfg.accumulator + } rs := state.NewStateV3(doms, logger) ////TODO: owner of `resultCh` is main goroutine, but owner of `retryQueue` is applyLoop. @@ -319,7 +324,7 @@ func ExecV3(ctx context.Context, rwsConsumed := make(chan struct{}, 1) defer close(rwsConsumed) - execWorkers, applyWorker, rws, stopWorkers, waitWorkers := exec3.NewWorkersPool(lock.RLocker(), logger, ctx, parallel, chainDb, rs, in, blockReader, chainConfig, genesis, engine, workerCount+1, cfg.dirs) + execWorkers, applyWorker, rws, stopWorkers, waitWorkers := exec3.NewWorkersPool(lock.RLocker(), accumulator, logger, ctx, parallel, chainDb, rs, in, blockReader, chainConfig, genesis, engine, workerCount+1, cfg.dirs) defer stopWorkers() applyWorker.DiscardReadList() @@ -914,7 +919,7 @@ Loop: rs = state.NewStateV3(doms, logger) applyWorker.ResetTx(applyTx) - applyWorker.ResetState(rs) + applyWorker.ResetState(rs, accumulator) return nil }(); err != nil { diff --git a/eth/stagedsync/stage_execute.go b/eth/stagedsync/stage_execute.go index 6dc3fc44309..e9c17e70505 100644 --- a/eth/stagedsync/stage_execute.go +++ b/eth/stagedsync/stage_execute.go @@ -284,7 +284,6 @@ func ExecBlockV3(s *StageState, u Unwinder, txc wrap.TxContainer, toBlock uint64 return err } - logPrefix := s.LogPrefix() var to = prevStageProgress if toBlock > 0 { to = cmp.Min(prevStageProgress, toBlock) @@ -292,9 +291,6 @@ func ExecBlockV3(s *StageState, u Unwinder, txc wrap.TxContainer, toBlock uint64 if to < s.BlockNumber { return nil } - if to > s.BlockNumber+16 { - logger.Info(fmt.Sprintf("[%s] Blocks execution", logPrefix), "from", s.BlockNumber, "to", to) - } parallel := txc.Tx == nil if err := ExecV3(ctx, s, u, workersCount, cfg, txc, parallel, to, logger, initialCycle); err != nil { diff --git a/eth/stagedsync/stage_execute_test.go b/eth/stagedsync/stage_execute_test.go index 6f5e628e530..0b84f2c4a45 100644 --- a/eth/stagedsync/stage_execute_test.go +++ b/eth/stagedsync/stage_execute_test.go @@ -22,7 +22,7 @@ func apply(tx kv.RwTx, logger log.Logger) (beforeBlock, afterBlock testGenHook, panic(err) } rs := state.NewStateV3(domains, logger) - stateWriter := state.NewStateWriterBufferedV3(rs) + stateWriter := state.NewStateWriterBufferedV3(rs, nil) stateWriter.SetTx(tx) return func(n, from, numberOfBlocks uint64) { diff --git a/eth/stagedsync/stage_mining_exec.go b/eth/stagedsync/stage_mining_exec.go index f6a21970149..cdecd0b6898 100644 --- a/eth/stagedsync/stage_mining_exec.go +++ b/eth/stagedsync/stage_mining_exec.go @@ -404,7 +404,7 @@ func filterBadTransactions(transactions []types.Transaction, config chain.Config filtered = append(filtered, transaction) transactions = transactions[1:] } - logger.Debug("Filtration", "initial", initialCnt, "no sender", noSenderCnt, "no account", noAccountCnt, "nonce too low", nonceTooLowCnt, "nonceTooHigh", missedTxs, "sender not EOA", notEOACnt, "fee too low", feeTooLowCnt, "overflow", overflowCnt, "balance too low", balanceTooLowCnt, "filtered", len(filtered)) + logger.Info("Filtration", "initial", initialCnt, "no sender", noSenderCnt, "no account", noAccountCnt, "nonce too low", nonceTooLowCnt, "nonceTooHigh", missedTxs, "sender not EOA", notEOACnt, "fee too low", feeTooLowCnt, "overflow", overflowCnt, "balance too low", balanceTooLowCnt, "filtered", len(filtered)) return filtered, nil } diff --git a/turbo/execution/eth1/forkchoice.go b/turbo/execution/eth1/forkchoice.go index d79c3695940..26d50c746f9 100644 --- a/turbo/execution/eth1/forkchoice.go +++ b/turbo/execution/eth1/forkchoice.go @@ -2,6 +2,7 @@ package eth1 import ( "context" + "errors" "fmt" "runtime" "slices" @@ -13,6 +14,7 @@ import ( execution "github.com/ledgerwatch/erigon-lib/gointerfaces/executionproto" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/rawdbv3" + "github.com/ledgerwatch/erigon-lib/state" "github.com/ledgerwatch/erigon-lib/wrap" "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/eth/consensuschain" @@ -41,6 +43,15 @@ func sendForkchoiceErrorWithoutWaiting(ch chan forkchoiceOutcome, err error) { } } +func isDomainAheadOfBlocks(tx kv.RwTx) bool { + doms, err := state.NewSharedDomains(tx, log.New()) + if err != nil { + return errors.Is(err, state.ErrBehindCommitment) + } + defer doms.Close() + return false +} + // verifyForkchoiceHashes verifies the finalized and safe hash of the forkchoice state func (e *EthereumExecutionModule) verifyForkchoiceHashes(ctx context.Context, tx kv.Tx, blockHash, finalizedHash, safeHash common.Hash) (bool, error) { // Client software MUST return -38002: Invalid forkchoice state error if the payload referenced by @@ -310,6 +321,14 @@ func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, original } } } + if isDomainAheadOfBlocks(tx) { + sendForkchoiceReceiptWithoutWaiting(outcomeCh, &execution.ForkChoiceReceipt{ + LatestValidHash: gointerfaces.ConvertHashToH256(common.Hash{}), + Status: execution.ExecutionStatus_TooFarAway, + ValidationError: "domain ahead of blocks", + }) + return + } // Set Progress for headers and bodies accordingly. if err := stages.SaveStageProgress(tx, stages.Headers, fcuHeader.Number.Uint64()); err != nil { @@ -351,6 +370,7 @@ func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, original // Update forks... writeForkChoiceHashes(tx, blockHash, safeHash, finalizedHash) status := execution.ExecutionStatus_Success + if headHash != blockHash { status = execution.ExecutionStatus_BadBlock validationError = "headHash and blockHash mismatch"