Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

E35: Fix transaction pool being unable to have txs #10365

Merged
merged 13 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions cmd/state/exec3/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/ledgerwatch/erigon/core/vm"
"github.com/ledgerwatch/erigon/core/vm/evmtypes"
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/erigon/turbo/shards"
)

type Worker struct {
Expand Down Expand Up @@ -53,7 +54,7 @@ type Worker struct {
dirs datadir.Dirs
}

func NewWorker(lock sync.Locker, logger log.Logger, ctx context.Context, background bool, chainDb kv.RoDB, rs *state.StateV3, in *state.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, results *state.ResultsQueue, engine consensus.Engine, dirs datadir.Dirs) *Worker {
func NewWorker(lock sync.Locker, logger log.Logger, accumulator *shards.Accumulator, ctx context.Context, background bool, chainDb kv.RoDB, rs *state.StateV3, in *state.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, results *state.ResultsQueue, engine consensus.Engine, dirs datadir.Dirs) *Worker {
w := &Worker{
lock: lock,
logger: logger,
Expand All @@ -62,7 +63,7 @@ func NewWorker(lock sync.Locker, logger log.Logger, ctx context.Context, backgro
rs: rs,
background: background,
blockReader: blockReader,
stateWriter: state.NewStateWriterV3(rs),
stateWriter: state.NewStateWriterV3(rs, accumulator),
stateReader: state.NewStateReaderV3(rs.Domains()),
chainConfig: chainConfig,

Expand All @@ -83,10 +84,10 @@ func NewWorker(lock sync.Locker, logger log.Logger, ctx context.Context, backgro
return w
}

func (rw *Worker) ResetState(rs *state.StateV3) {
func (rw *Worker) ResetState(rs *state.StateV3, accumulator *shards.Accumulator) {
rw.rs = rs
rw.SetReader(state.NewStateReaderV3(rs.Domains()))
rw.stateWriter = state.NewStateWriterV3(rs)
rw.stateWriter = state.NewStateWriterV3(rs, accumulator)
}

func (rw *Worker) Tx() kv.Tx { return rw.chainTx }
Expand Down Expand Up @@ -268,7 +269,7 @@ func (rw *Worker) RunTxTaskNoLock(txTask *state.TxTask) {
}
}

func NewWorkersPool(lock sync.Locker, logger log.Logger, ctx context.Context, background bool, chainDb kv.RoDB, rs *state.StateV3, in *state.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, engine consensus.Engine, workerCount int, dirs datadir.Dirs) (reconWorkers []*Worker, applyWorker *Worker, rws *state.ResultsQueue, clear func(), wait func()) {
func NewWorkersPool(lock sync.Locker, accumulator *shards.Accumulator, logger log.Logger, ctx context.Context, background bool, chainDb kv.RoDB, rs *state.StateV3, in *state.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, engine consensus.Engine, workerCount int, dirs datadir.Dirs) (reconWorkers []*Worker, applyWorker *Worker, rws *state.ResultsQueue, clear func(), wait func()) {
reconWorkers = make([]*Worker, workerCount)

resultChSize := workerCount * 8
Expand All @@ -279,7 +280,7 @@ func NewWorkersPool(lock sync.Locker, logger log.Logger, ctx context.Context, ba
ctx, cancel := context.WithCancel(ctx)
g, ctx := errgroup.WithContext(ctx)
for i := 0; i < workerCount; i++ {
reconWorkers[i] = NewWorker(lock, logger, ctx, background, chainDb, rs, in, blockReader, chainConfig, genesis, rws, engine, dirs)
reconWorkers[i] = NewWorker(lock, logger, accumulator, ctx, background, chainDb, rs, in, blockReader, chainConfig, genesis, rws, engine, dirs)
}
if background {
for i := 0; i < workerCount; i++ {
Expand All @@ -305,7 +306,7 @@ func NewWorkersPool(lock sync.Locker, logger log.Logger, ctx context.Context, ba
//applyWorker.ResetTx(nil)
}
}
applyWorker = NewWorker(lock, logger, ctx, false, chainDb, rs, in, blockReader, chainConfig, genesis, rws, engine, dirs)
applyWorker = NewWorker(lock, logger, accumulator, ctx, false, chainDb, rs, in, blockReader, chainConfig, genesis, rws, engine, dirs)

return reconWorkers, applyWorker, rws, clear, wait
}
43 changes: 37 additions & 6 deletions core/state/rw_v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,14 +345,16 @@ type StateWriterBufferedV3 struct {
accountDels map[string]*accounts.Account
storagePrevs map[string][]byte
codePrevs map[string]uint64
accumulator *shards.Accumulator

tx kv.Tx
}

func NewStateWriterBufferedV3(rs *StateV3) *StateWriterBufferedV3 {
func NewStateWriterBufferedV3(rs *StateV3, accumulator *shards.Accumulator) *StateWriterBufferedV3 {
return &StateWriterBufferedV3{
rs: rs,
writeLists: newWriteList(),
rs: rs,
writeLists: newWriteList(),
accumulator: accumulator,
//trace: true,
}
}
Expand Down Expand Up @@ -395,6 +397,9 @@ func (w *StateWriterBufferedV3) UpdateAccountData(address common.Address, origin
}
}
value := accounts.SerialiseV3(account)
if w.accumulator != nil {
w.accumulator.ChangeAccount(address, account.Incarnation, value)
}
w.writeLists[kv.AccountsDomain.String()].Push(string(address[:]), value)

return nil
Expand All @@ -404,6 +409,9 @@ func (w *StateWriterBufferedV3) UpdateAccountCode(address common.Address, incarn
if w.trace {
fmt.Printf("code: %x, %x, valLen: %d\n", address.Bytes(), codeHash, len(code))
}
if w.accumulator != nil {
w.accumulator.ChangeCode(address, incarnation, code)
}
w.writeLists[kv.CodeDomain.String()].Push(string(address[:]), code)
return nil
}
Expand All @@ -412,6 +420,9 @@ func (w *StateWriterBufferedV3) DeleteAccount(address common.Address, original *
if w.trace {
fmt.Printf("del acc: %x\n", address)
}
if w.accumulator != nil {
w.accumulator.DeleteAccount(address)
}
w.writeLists[kv.AccountsDomain.String()].Push(string(address.Bytes()), nil)
return nil
}
Expand All @@ -425,6 +436,11 @@ func (w *StateWriterBufferedV3) WriteAccountStorage(address common.Address, inca
if w.trace {
fmt.Printf("storage: %x,%x,%x\n", address, *key, value.Bytes())
}
if w.accumulator != nil && key != nil && value != nil {
k := *key
v := value.Bytes()
w.accumulator.ChangeStorage(address, incarnation, k, v)
}
return nil
}

Expand All @@ -446,13 +462,14 @@ func (w *StateWriterBufferedV3) CreateContract(address common.Address) error {

// StateWriterV3 - used by parallel workers to accumulate updates and then send them to conflict-resolution.
type StateWriterV3 struct {
rs *StateV3
trace bool
rs *StateV3
trace bool
accumulator *shards.Accumulator

tx kv.Tx
}

func NewStateWriterV3(rs *StateV3) *StateWriterV3 {
func NewStateWriterV3(rs *StateV3, accumulator *shards.Accumulator) *StateWriterV3 {
return &StateWriterV3{
rs: rs,
//trace: true,
Expand Down Expand Up @@ -488,6 +505,9 @@ func (w *StateWriterV3) UpdateAccountData(address common.Address, original, acco
}
}
value := accounts.SerialiseV3(account)
if w.accumulator != nil {
w.accumulator.ChangeAccount(address, account.Incarnation, value)
}

if err := w.rs.domains.DomainPut(kv.AccountsDomain, address[:], nil, value, nil, 0); err != nil {
return err
Expand All @@ -502,6 +522,9 @@ func (w *StateWriterV3) UpdateAccountCode(address common.Address, incarnation ui
if err := w.rs.domains.DomainPut(kv.CodeDomain, address[:], nil, code, nil, 0); err != nil {
return err
}
if w.accumulator != nil {
w.accumulator.ChangeCode(address, incarnation, code)
}
return nil
}

Expand All @@ -512,6 +535,9 @@ func (w *StateWriterV3) DeleteAccount(address common.Address, original *accounts
if err := w.rs.domains.DomainDel(kv.AccountsDomain, address[:], nil, nil, 0); err != nil {
return err
}
if w.accumulator != nil {
w.accumulator.DeleteAccount(address)
}
return nil
}

Expand All @@ -527,6 +553,11 @@ func (w *StateWriterV3) WriteAccountStorage(address common.Address, incarnation
if len(v) == 0 {
return w.rs.domains.DomainDel(kv.StorageDomain, composite, nil, nil, 0)
}
if w.accumulator != nil && key != nil && value != nil {
k := *key
w.accumulator.ChangeStorage(address, incarnation, k, v)
}

return w.rs.domains.DomainPut(kv.StorageDomain, composite, nil, v, nil, 0)
}

Expand Down
11 changes: 8 additions & 3 deletions eth/stagedsync/exec3.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/ledgerwatch/erigon/eth/ethconfig/estimate"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/erigon/turbo/shards"
)

var execStepsInDB = metrics.NewGauge(`exec_steps_in_db`) //nolint
Expand Down Expand Up @@ -290,7 +291,6 @@ func ExecV3(ctx context.Context,
blockNum = doms.BlockNum()
initialBlockNum := blockNum
outputTxNum.Store(doms.TxNum())

if maxBlockNum-blockNum > 16 {
log.Info(fmt.Sprintf("[%s] starting", execStage.LogPrefix()),
"from", blockNum, "to", maxBlockNum, "fromTxNum", doms.TxNum(), "offsetFromBlockBeginning", offsetFromBlockBeginning, "initialCycle", initialCycle, "useExternalTx", useExternalTx)
Expand All @@ -306,6 +306,11 @@ func ExecV3(ctx context.Context,
var count uint64
var lock sync.RWMutex

shouldReportToTxPool := maxBlockNum-blockNum <= 8
var accumulator *shards.Accumulator
if shouldReportToTxPool {
accumulator = cfg.accumulator
}
rs := state.NewStateV3(doms, logger)

////TODO: owner of `resultCh` is main goroutine, but owner of `retryQueue` is applyLoop.
Expand All @@ -319,7 +324,7 @@ func ExecV3(ctx context.Context,
rwsConsumed := make(chan struct{}, 1)
defer close(rwsConsumed)

execWorkers, applyWorker, rws, stopWorkers, waitWorkers := exec3.NewWorkersPool(lock.RLocker(), logger, ctx, parallel, chainDb, rs, in, blockReader, chainConfig, genesis, engine, workerCount+1, cfg.dirs)
execWorkers, applyWorker, rws, stopWorkers, waitWorkers := exec3.NewWorkersPool(lock.RLocker(), accumulator, logger, ctx, parallel, chainDb, rs, in, blockReader, chainConfig, genesis, engine, workerCount+1, cfg.dirs)
defer stopWorkers()
applyWorker.DiscardReadList()

Expand Down Expand Up @@ -914,7 +919,7 @@ Loop:
rs = state.NewStateV3(doms, logger)

applyWorker.ResetTx(applyTx)
applyWorker.ResetState(rs)
applyWorker.ResetState(rs, accumulator)

return nil
}(); err != nil {
Expand Down
4 changes: 0 additions & 4 deletions eth/stagedsync/stage_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,17 +284,13 @@ func ExecBlockV3(s *StageState, u Unwinder, txc wrap.TxContainer, toBlock uint64
return err
}

logPrefix := s.LogPrefix()
var to = prevStageProgress
if toBlock > 0 {
to = cmp.Min(prevStageProgress, toBlock)
}
if to < s.BlockNumber {
return nil
}
if to > s.BlockNumber+16 {
logger.Info(fmt.Sprintf("[%s] Blocks execution", logPrefix), "from", s.BlockNumber, "to", to)
}

parallel := txc.Tx == nil
if err := ExecV3(ctx, s, u, workersCount, cfg, txc, parallel, to, logger, initialCycle); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion eth/stagedsync/stage_execute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func apply(tx kv.RwTx, logger log.Logger) (beforeBlock, afterBlock testGenHook,
panic(err)
}
rs := state.NewStateV3(domains, logger)
stateWriter := state.NewStateWriterBufferedV3(rs)
stateWriter := state.NewStateWriterBufferedV3(rs, nil)
stateWriter.SetTx(tx)

return func(n, from, numberOfBlocks uint64) {
Expand Down
2 changes: 1 addition & 1 deletion eth/stagedsync/stage_mining_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ func filterBadTransactions(transactions []types.Transaction, config chain.Config
filtered = append(filtered, transaction)
transactions = transactions[1:]
}
logger.Debug("Filtration", "initial", initialCnt, "no sender", noSenderCnt, "no account", noAccountCnt, "nonce too low", nonceTooLowCnt, "nonceTooHigh", missedTxs, "sender not EOA", notEOACnt, "fee too low", feeTooLowCnt, "overflow", overflowCnt, "balance too low", balanceTooLowCnt, "filtered", len(filtered))
logger.Info("Filtration", "initial", initialCnt, "no sender", noSenderCnt, "no account", noAccountCnt, "nonce too low", nonceTooLowCnt, "nonceTooHigh", missedTxs, "sender not EOA", notEOACnt, "fee too low", feeTooLowCnt, "overflow", overflowCnt, "balance too low", balanceTooLowCnt, "filtered", len(filtered))
return filtered, nil
}

Expand Down
20 changes: 20 additions & 0 deletions turbo/execution/eth1/forkchoice.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package eth1

import (
"context"
"errors"
"fmt"
"runtime"
"slices"
Expand All @@ -13,6 +14,7 @@ import (
execution "github.com/ledgerwatch/erigon-lib/gointerfaces/executionproto"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/rawdbv3"
"github.com/ledgerwatch/erigon-lib/state"
"github.com/ledgerwatch/erigon-lib/wrap"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/eth/consensuschain"
Expand Down Expand Up @@ -41,6 +43,15 @@ func sendForkchoiceErrorWithoutWaiting(ch chan forkchoiceOutcome, err error) {
}
}

func isDomainAheadOfBlocks(tx kv.RwTx) bool {
doms, err := state.NewSharedDomains(tx, log.New())
if err != nil {
return errors.Is(err, state.ErrBehindCommitment)
}
defer doms.Close()
return false
}

// verifyForkchoiceHashes verifies the finalized and safe hash of the forkchoice state
func (e *EthereumExecutionModule) verifyForkchoiceHashes(ctx context.Context, tx kv.Tx, blockHash, finalizedHash, safeHash common.Hash) (bool, error) {
// Client software MUST return -38002: Invalid forkchoice state error if the payload referenced by
Expand Down Expand Up @@ -310,6 +321,14 @@ func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, original
}
}
}
if isDomainAheadOfBlocks(tx) {
sendForkchoiceReceiptWithoutWaiting(outcomeCh, &execution.ForkChoiceReceipt{
LatestValidHash: gointerfaces.ConvertHashToH256(common.Hash{}),
Status: execution.ExecutionStatus_TooFarAway,
ValidationError: "domain ahead of blocks",
})
return
}

// Set Progress for headers and bodies accordingly.
if err := stages.SaveStageProgress(tx, stages.Headers, fcuHeader.Number.Uint64()); err != nil {
Expand Down Expand Up @@ -351,6 +370,7 @@ func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, original
// Update forks...
writeForkChoiceHashes(tx, blockHash, safeHash, finalizedHash)
status := execution.ExecutionStatus_Success

if headHash != blockHash {
status = execution.ExecutionStatus_BadBlock
validationError = "headHash and blockHash mismatch"
Expand Down
Loading