Skip to content

Commit

Permalink
E35: Fix transaction pool being unable to have txs (#10365)
Browse files Browse the repository at this point in the history
Changes in state were not being "accumulated" in the TxPool's cache
  • Loading branch information
Giulio2002 authored May 16, 2024
1 parent 99f15f8 commit 1368da6
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 22 deletions.
15 changes: 8 additions & 7 deletions cmd/state/exec3/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/ledgerwatch/erigon/core/vm"
"github.com/ledgerwatch/erigon/core/vm/evmtypes"
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/erigon/turbo/shards"
)

type Worker struct {
Expand Down Expand Up @@ -53,7 +54,7 @@ type Worker struct {
dirs datadir.Dirs
}

func NewWorker(lock sync.Locker, logger log.Logger, ctx context.Context, background bool, chainDb kv.RoDB, rs *state.StateV3, in *state.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, results *state.ResultsQueue, engine consensus.Engine, dirs datadir.Dirs) *Worker {
func NewWorker(lock sync.Locker, logger log.Logger, accumulator *shards.Accumulator, ctx context.Context, background bool, chainDb kv.RoDB, rs *state.StateV3, in *state.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, results *state.ResultsQueue, engine consensus.Engine, dirs datadir.Dirs) *Worker {
w := &Worker{
lock: lock,
logger: logger,
Expand All @@ -62,7 +63,7 @@ func NewWorker(lock sync.Locker, logger log.Logger, ctx context.Context, backgro
rs: rs,
background: background,
blockReader: blockReader,
stateWriter: state.NewStateWriterV3(rs),
stateWriter: state.NewStateWriterV3(rs, accumulator),
stateReader: state.NewStateReaderV3(rs.Domains()),
chainConfig: chainConfig,

Expand All @@ -83,10 +84,10 @@ func NewWorker(lock sync.Locker, logger log.Logger, ctx context.Context, backgro
return w
}

func (rw *Worker) ResetState(rs *state.StateV3) {
func (rw *Worker) ResetState(rs *state.StateV3, accumulator *shards.Accumulator) {
rw.rs = rs
rw.SetReader(state.NewStateReaderV3(rs.Domains()))
rw.stateWriter = state.NewStateWriterV3(rs)
rw.stateWriter = state.NewStateWriterV3(rs, accumulator)
}

func (rw *Worker) Tx() kv.Tx { return rw.chainTx }
Expand Down Expand Up @@ -268,7 +269,7 @@ func (rw *Worker) RunTxTaskNoLock(txTask *state.TxTask) {
}
}

func NewWorkersPool(lock sync.Locker, logger log.Logger, ctx context.Context, background bool, chainDb kv.RoDB, rs *state.StateV3, in *state.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, engine consensus.Engine, workerCount int, dirs datadir.Dirs) (reconWorkers []*Worker, applyWorker *Worker, rws *state.ResultsQueue, clear func(), wait func()) {
func NewWorkersPool(lock sync.Locker, accumulator *shards.Accumulator, logger log.Logger, ctx context.Context, background bool, chainDb kv.RoDB, rs *state.StateV3, in *state.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, engine consensus.Engine, workerCount int, dirs datadir.Dirs) (reconWorkers []*Worker, applyWorker *Worker, rws *state.ResultsQueue, clear func(), wait func()) {
reconWorkers = make([]*Worker, workerCount)

resultChSize := workerCount * 8
Expand All @@ -279,7 +280,7 @@ func NewWorkersPool(lock sync.Locker, logger log.Logger, ctx context.Context, ba
ctx, cancel := context.WithCancel(ctx)
g, ctx := errgroup.WithContext(ctx)
for i := 0; i < workerCount; i++ {
reconWorkers[i] = NewWorker(lock, logger, ctx, background, chainDb, rs, in, blockReader, chainConfig, genesis, rws, engine, dirs)
reconWorkers[i] = NewWorker(lock, logger, accumulator, ctx, background, chainDb, rs, in, blockReader, chainConfig, genesis, rws, engine, dirs)
}
if background {
for i := 0; i < workerCount; i++ {
Expand All @@ -305,7 +306,7 @@ func NewWorkersPool(lock sync.Locker, logger log.Logger, ctx context.Context, ba
//applyWorker.ResetTx(nil)
}
}
applyWorker = NewWorker(lock, logger, ctx, false, chainDb, rs, in, blockReader, chainConfig, genesis, rws, engine, dirs)
applyWorker = NewWorker(lock, logger, accumulator, ctx, false, chainDb, rs, in, blockReader, chainConfig, genesis, rws, engine, dirs)

return reconWorkers, applyWorker, rws, clear, wait
}
43 changes: 37 additions & 6 deletions core/state/rw_v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,14 +345,16 @@ type StateWriterBufferedV3 struct {
accountDels map[string]*accounts.Account
storagePrevs map[string][]byte
codePrevs map[string]uint64
accumulator *shards.Accumulator

tx kv.Tx
}

func NewStateWriterBufferedV3(rs *StateV3) *StateWriterBufferedV3 {
func NewStateWriterBufferedV3(rs *StateV3, accumulator *shards.Accumulator) *StateWriterBufferedV3 {
return &StateWriterBufferedV3{
rs: rs,
writeLists: newWriteList(),
rs: rs,
writeLists: newWriteList(),
accumulator: accumulator,
//trace: true,
}
}
Expand Down Expand Up @@ -395,6 +397,9 @@ func (w *StateWriterBufferedV3) UpdateAccountData(address common.Address, origin
}
}
value := accounts.SerialiseV3(account)
if w.accumulator != nil {
w.accumulator.ChangeAccount(address, account.Incarnation, value)
}
w.writeLists[kv.AccountsDomain.String()].Push(string(address[:]), value)

return nil
Expand All @@ -404,6 +409,9 @@ func (w *StateWriterBufferedV3) UpdateAccountCode(address common.Address, incarn
if w.trace {
fmt.Printf("code: %x, %x, valLen: %d\n", address.Bytes(), codeHash, len(code))
}
if w.accumulator != nil {
w.accumulator.ChangeCode(address, incarnation, code)
}
w.writeLists[kv.CodeDomain.String()].Push(string(address[:]), code)
return nil
}
Expand All @@ -412,6 +420,9 @@ func (w *StateWriterBufferedV3) DeleteAccount(address common.Address, original *
if w.trace {
fmt.Printf("del acc: %x\n", address)
}
if w.accumulator != nil {
w.accumulator.DeleteAccount(address)
}
w.writeLists[kv.AccountsDomain.String()].Push(string(address.Bytes()), nil)
return nil
}
Expand All @@ -425,6 +436,11 @@ func (w *StateWriterBufferedV3) WriteAccountStorage(address common.Address, inca
if w.trace {
fmt.Printf("storage: %x,%x,%x\n", address, *key, value.Bytes())
}
if w.accumulator != nil && key != nil && value != nil {
k := *key
v := value.Bytes()
w.accumulator.ChangeStorage(address, incarnation, k, v)
}
return nil
}

Expand All @@ -446,13 +462,14 @@ func (w *StateWriterBufferedV3) CreateContract(address common.Address) error {

// StateWriterV3 - used by parallel workers to accumulate updates and then send them to conflict-resolution.
type StateWriterV3 struct {
rs *StateV3
trace bool
rs *StateV3
trace bool
accumulator *shards.Accumulator

tx kv.Tx
}

func NewStateWriterV3(rs *StateV3) *StateWriterV3 {
func NewStateWriterV3(rs *StateV3, accumulator *shards.Accumulator) *StateWriterV3 {
return &StateWriterV3{
rs: rs,
//trace: true,
Expand Down Expand Up @@ -488,6 +505,9 @@ func (w *StateWriterV3) UpdateAccountData(address common.Address, original, acco
}
}
value := accounts.SerialiseV3(account)
if w.accumulator != nil {
w.accumulator.ChangeAccount(address, account.Incarnation, value)
}

if err := w.rs.domains.DomainPut(kv.AccountsDomain, address[:], nil, value, nil, 0); err != nil {
return err
Expand All @@ -502,6 +522,9 @@ func (w *StateWriterV3) UpdateAccountCode(address common.Address, incarnation ui
if err := w.rs.domains.DomainPut(kv.CodeDomain, address[:], nil, code, nil, 0); err != nil {
return err
}
if w.accumulator != nil {
w.accumulator.ChangeCode(address, incarnation, code)
}
return nil
}

Expand All @@ -512,6 +535,9 @@ func (w *StateWriterV3) DeleteAccount(address common.Address, original *accounts
if err := w.rs.domains.DomainDel(kv.AccountsDomain, address[:], nil, nil, 0); err != nil {
return err
}
if w.accumulator != nil {
w.accumulator.DeleteAccount(address)
}
return nil
}

Expand All @@ -527,6 +553,11 @@ func (w *StateWriterV3) WriteAccountStorage(address common.Address, incarnation
if len(v) == 0 {
return w.rs.domains.DomainDel(kv.StorageDomain, composite, nil, nil, 0)
}
if w.accumulator != nil && key != nil && value != nil {
k := *key
w.accumulator.ChangeStorage(address, incarnation, k, v)
}

return w.rs.domains.DomainPut(kv.StorageDomain, composite, nil, v, nil, 0)
}

Expand Down
11 changes: 8 additions & 3 deletions eth/stagedsync/exec3.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/ledgerwatch/erigon/eth/ethconfig/estimate"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/erigon/turbo/shards"
)

var execStepsInDB = metrics.NewGauge(`exec_steps_in_db`) //nolint
Expand Down Expand Up @@ -290,7 +291,6 @@ func ExecV3(ctx context.Context,
blockNum = doms.BlockNum()
initialBlockNum := blockNum
outputTxNum.Store(doms.TxNum())

if maxBlockNum-blockNum > 16 {
log.Info(fmt.Sprintf("[%s] starting", execStage.LogPrefix()),
"from", blockNum, "to", maxBlockNum, "fromTxNum", doms.TxNum(), "offsetFromBlockBeginning", offsetFromBlockBeginning, "initialCycle", initialCycle, "useExternalTx", useExternalTx)
Expand All @@ -306,6 +306,11 @@ func ExecV3(ctx context.Context,
var count uint64
var lock sync.RWMutex

shouldReportToTxPool := maxBlockNum-blockNum <= 8
var accumulator *shards.Accumulator
if shouldReportToTxPool {
accumulator = cfg.accumulator
}
rs := state.NewStateV3(doms, logger)

////TODO: owner of `resultCh` is main goroutine, but owner of `retryQueue` is applyLoop.
Expand All @@ -319,7 +324,7 @@ func ExecV3(ctx context.Context,
rwsConsumed := make(chan struct{}, 1)
defer close(rwsConsumed)

execWorkers, applyWorker, rws, stopWorkers, waitWorkers := exec3.NewWorkersPool(lock.RLocker(), logger, ctx, parallel, chainDb, rs, in, blockReader, chainConfig, genesis, engine, workerCount+1, cfg.dirs)
execWorkers, applyWorker, rws, stopWorkers, waitWorkers := exec3.NewWorkersPool(lock.RLocker(), accumulator, logger, ctx, parallel, chainDb, rs, in, blockReader, chainConfig, genesis, engine, workerCount+1, cfg.dirs)
defer stopWorkers()
applyWorker.DiscardReadList()

Expand Down Expand Up @@ -914,7 +919,7 @@ Loop:
rs = state.NewStateV3(doms, logger)

applyWorker.ResetTx(applyTx)
applyWorker.ResetState(rs)
applyWorker.ResetState(rs, accumulator)

return nil
}(); err != nil {
Expand Down
4 changes: 0 additions & 4 deletions eth/stagedsync/stage_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,17 +284,13 @@ func ExecBlockV3(s *StageState, u Unwinder, txc wrap.TxContainer, toBlock uint64
return err
}

logPrefix := s.LogPrefix()
var to = prevStageProgress
if toBlock > 0 {
to = cmp.Min(prevStageProgress, toBlock)
}
if to < s.BlockNumber {
return nil
}
if to > s.BlockNumber+16 {
logger.Info(fmt.Sprintf("[%s] Blocks execution", logPrefix), "from", s.BlockNumber, "to", to)
}

parallel := txc.Tx == nil
if err := ExecV3(ctx, s, u, workersCount, cfg, txc, parallel, to, logger, initialCycle); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion eth/stagedsync/stage_execute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func apply(tx kv.RwTx, logger log.Logger) (beforeBlock, afterBlock testGenHook,
panic(err)
}
rs := state.NewStateV3(domains, logger)
stateWriter := state.NewStateWriterBufferedV3(rs)
stateWriter := state.NewStateWriterBufferedV3(rs, nil)
stateWriter.SetTx(tx)

return func(n, from, numberOfBlocks uint64) {
Expand Down
2 changes: 1 addition & 1 deletion eth/stagedsync/stage_mining_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ func filterBadTransactions(transactions []types.Transaction, config chain.Config
filtered = append(filtered, transaction)
transactions = transactions[1:]
}
logger.Debug("Filtration", "initial", initialCnt, "no sender", noSenderCnt, "no account", noAccountCnt, "nonce too low", nonceTooLowCnt, "nonceTooHigh", missedTxs, "sender not EOA", notEOACnt, "fee too low", feeTooLowCnt, "overflow", overflowCnt, "balance too low", balanceTooLowCnt, "filtered", len(filtered))
logger.Info("Filtration", "initial", initialCnt, "no sender", noSenderCnt, "no account", noAccountCnt, "nonce too low", nonceTooLowCnt, "nonceTooHigh", missedTxs, "sender not EOA", notEOACnt, "fee too low", feeTooLowCnt, "overflow", overflowCnt, "balance too low", balanceTooLowCnt, "filtered", len(filtered))
return filtered, nil
}

Expand Down
20 changes: 20 additions & 0 deletions turbo/execution/eth1/forkchoice.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package eth1

import (
"context"
"errors"
"fmt"
"runtime"
"slices"
Expand All @@ -13,6 +14,7 @@ import (
execution "github.com/ledgerwatch/erigon-lib/gointerfaces/executionproto"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/rawdbv3"
"github.com/ledgerwatch/erigon-lib/state"
"github.com/ledgerwatch/erigon-lib/wrap"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/eth/consensuschain"
Expand Down Expand Up @@ -41,6 +43,15 @@ func sendForkchoiceErrorWithoutWaiting(ch chan forkchoiceOutcome, err error) {
}
}

func isDomainAheadOfBlocks(tx kv.RwTx) bool {
doms, err := state.NewSharedDomains(tx, log.New())
if err != nil {
return errors.Is(err, state.ErrBehindCommitment)
}
defer doms.Close()
return false
}

// verifyForkchoiceHashes verifies the finalized and safe hash of the forkchoice state
func (e *EthereumExecutionModule) verifyForkchoiceHashes(ctx context.Context, tx kv.Tx, blockHash, finalizedHash, safeHash common.Hash) (bool, error) {
// Client software MUST return -38002: Invalid forkchoice state error if the payload referenced by
Expand Down Expand Up @@ -310,6 +321,14 @@ func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, original
}
}
}
if isDomainAheadOfBlocks(tx) {
sendForkchoiceReceiptWithoutWaiting(outcomeCh, &execution.ForkChoiceReceipt{
LatestValidHash: gointerfaces.ConvertHashToH256(common.Hash{}),
Status: execution.ExecutionStatus_TooFarAway,
ValidationError: "domain ahead of blocks",
})
return
}

// Set Progress for headers and bodies accordingly.
if err := stages.SaveStageProgress(tx, stages.Headers, fcuHeader.Number.Uint64()); err != nil {
Expand Down Expand Up @@ -351,6 +370,7 @@ func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, original
// Update forks...
writeForkChoiceHashes(tx, blockHash, safeHash, finalizedHash)
status := execution.ExecutionStatus_Success

if headHash != blockHash {
status = execution.ExecutionStatus_BadBlock
validationError = "headHash and blockHash mismatch"
Expand Down

0 comments on commit 1368da6

Please sign in to comment.