From ff3feb16218a014497c216df20eb1e802a1c3c8a Mon Sep 17 00:00:00 2001 From: Giulio Date: Wed, 15 May 2024 20:51:57 +0200 Subject: [PATCH 01/13] save --- eth/stagedsync/exec3.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eth/stagedsync/exec3.go b/eth/stagedsync/exec3.go index 4aef00db17b..e5924de2d93 100644 --- a/eth/stagedsync/exec3.go +++ b/eth/stagedsync/exec3.go @@ -290,7 +290,7 @@ func ExecV3(ctx context.Context, blockNum = doms.BlockNum() initialBlockNum := blockNum outputTxNum.Store(doms.TxNum()) - + fmt.Println(maxBlockNum, blockNum) if maxBlockNum-blockNum > 16 { log.Info(fmt.Sprintf("[%s] starting", execStage.LogPrefix()), "from", blockNum, "to", maxBlockNum, "fromTxNum", doms.TxNum(), "offsetFromBlockBeginning", offsetFromBlockBeginning, "initialCycle", initialCycle, "useExternalTx", useExternalTx) From 8bf5bed646a32c946b082b31b7ef9ee4660beda2 Mon Sep 17 00:00:00 2001 From: Giulio Date: Wed, 15 May 2024 20:53:59 +0200 Subject: [PATCH 02/13] save --- eth/stagedsync/exec3.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eth/stagedsync/exec3.go b/eth/stagedsync/exec3.go index e5924de2d93..339d21f500b 100644 --- a/eth/stagedsync/exec3.go +++ b/eth/stagedsync/exec3.go @@ -188,6 +188,7 @@ func ExecV3(ctx context.Context, // if we are behind the commitment, we can't execute anything // this can heppen if progress in domain is higher than progress in blocks if errors.Is(err, state2.ErrBehindCommitment) { + fmt.Println(err) return nil } if err != nil { @@ -290,7 +291,6 @@ func ExecV3(ctx context.Context, blockNum = doms.BlockNum() initialBlockNum := blockNum outputTxNum.Store(doms.TxNum()) - fmt.Println(maxBlockNum, blockNum) if maxBlockNum-blockNum > 16 { log.Info(fmt.Sprintf("[%s] starting", execStage.LogPrefix()), "from", blockNum, "to", maxBlockNum, "fromTxNum", doms.TxNum(), "offsetFromBlockBeginning", offsetFromBlockBeginning, "initialCycle", initialCycle, "useExternalTx", useExternalTx) From 4001ad90bf01d12e46f416be73a825ed0bb6fba7 Mon Sep 17 00:00:00 2001 From: Giulio Date: Wed, 15 May 2024 21:10:15 +0200 Subject: [PATCH 03/13] save --- eth/stagedsync/exec3.go | 1 - eth/stagedsync/stage_execute.go | 4 ---- turbo/execution/eth1/forkchoice.go | 19 +++++++++++++++++++ 3 files changed, 19 insertions(+), 5 deletions(-) diff --git a/eth/stagedsync/exec3.go b/eth/stagedsync/exec3.go index 339d21f500b..c6763d273cf 100644 --- a/eth/stagedsync/exec3.go +++ b/eth/stagedsync/exec3.go @@ -188,7 +188,6 @@ func ExecV3(ctx context.Context, // if we are behind the commitment, we can't execute anything // this can heppen if progress in domain is higher than progress in blocks if errors.Is(err, state2.ErrBehindCommitment) { - fmt.Println(err) return nil } if err != nil { diff --git a/eth/stagedsync/stage_execute.go b/eth/stagedsync/stage_execute.go index 6dc3fc44309..e9c17e70505 100644 --- a/eth/stagedsync/stage_execute.go +++ b/eth/stagedsync/stage_execute.go @@ -284,7 +284,6 @@ func ExecBlockV3(s *StageState, u Unwinder, txc wrap.TxContainer, toBlock uint64 return err } - logPrefix := s.LogPrefix() var to = prevStageProgress if toBlock > 0 { to = cmp.Min(prevStageProgress, toBlock) @@ -292,9 +291,6 @@ func ExecBlockV3(s *StageState, u Unwinder, txc wrap.TxContainer, toBlock uint64 if to < s.BlockNumber { return nil } - if to > s.BlockNumber+16 { - logger.Info(fmt.Sprintf("[%s] Blocks execution", logPrefix), "from", s.BlockNumber, "to", to) - } parallel := txc.Tx == nil if err := ExecV3(ctx, s, u, workersCount, cfg, txc, parallel, to, logger, initialCycle); err != nil { diff --git a/turbo/execution/eth1/forkchoice.go b/turbo/execution/eth1/forkchoice.go index d79c3695940..0d091b16a51 100644 --- a/turbo/execution/eth1/forkchoice.go +++ b/turbo/execution/eth1/forkchoice.go @@ -2,6 +2,7 @@ package eth1 import ( "context" + "errors" "fmt" "runtime" "slices" @@ -13,6 +14,7 @@ import ( execution "github.com/ledgerwatch/erigon-lib/gointerfaces/executionproto" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/rawdbv3" + "github.com/ledgerwatch/erigon-lib/state" "github.com/ledgerwatch/erigon-lib/wrap" "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/eth/consensuschain" @@ -41,6 +43,13 @@ func sendForkchoiceErrorWithoutWaiting(ch chan forkchoiceOutcome, err error) { } } +func isDomainAheadOfBlocks(tx kv.RwTx) bool { + doms, err := state.NewSharedDomains(tx, log.New()) + defer doms.Close() + return errors.Is(err, state.ErrBehindCommitment) + +} + // verifyForkchoiceHashes verifies the finalized and safe hash of the forkchoice state func (e *EthereumExecutionModule) verifyForkchoiceHashes(ctx context.Context, tx kv.Tx, blockHash, finalizedHash, safeHash common.Hash) (bool, error) { // Client software MUST return -38002: Invalid forkchoice state error if the payload referenced by @@ -138,6 +147,15 @@ func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, original } defer tx.Rollback() + if isDomainAheadOfBlocks(tx) { + sendForkchoiceReceiptWithoutWaiting(outcomeCh, &execution.ForkChoiceReceipt{ + LatestValidHash: gointerfaces.ConvertHashToH256(common.Hash{}), + Status: execution.ExecutionStatus_TooFarAway, + ValidationError: "domain ahead of blocks", + }) + return + } + blockHash := originalBlockHash finishProgressBefore, err := stages.GetStageProgress(tx, stages.Finish) @@ -351,6 +369,7 @@ func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, original // Update forks... writeForkChoiceHashes(tx, blockHash, safeHash, finalizedHash) status := execution.ExecutionStatus_Success + if headHash != blockHash { status = execution.ExecutionStatus_BadBlock validationError = "headHash and blockHash mismatch" From 9d8901f5e8401d3d3735d73757865460a9e2076f Mon Sep 17 00:00:00 2001 From: Giulio Date: Wed, 15 May 2024 21:11:03 +0200 Subject: [PATCH 04/13] save --- turbo/execution/eth1/forkchoice.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/turbo/execution/eth1/forkchoice.go b/turbo/execution/eth1/forkchoice.go index 0d091b16a51..af07b5181ed 100644 --- a/turbo/execution/eth1/forkchoice.go +++ b/turbo/execution/eth1/forkchoice.go @@ -45,9 +45,11 @@ func sendForkchoiceErrorWithoutWaiting(ch chan forkchoiceOutcome, err error) { func isDomainAheadOfBlocks(tx kv.RwTx) bool { doms, err := state.NewSharedDomains(tx, log.New()) + if err != nil { + return errors.Is(err, state.ErrBehindCommitment) + } defer doms.Close() - return errors.Is(err, state.ErrBehindCommitment) - + return false } // verifyForkchoiceHashes verifies the finalized and safe hash of the forkchoice state From f471f5aa7af25ef03cbdb2571861072d05ba6488 Mon Sep 17 00:00:00 2001 From: Giulio Date: Wed, 15 May 2024 21:48:03 +0200 Subject: [PATCH 05/13] save --- turbo/execution/eth1/forkchoice.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/turbo/execution/eth1/forkchoice.go b/turbo/execution/eth1/forkchoice.go index af07b5181ed..26d50c746f9 100644 --- a/turbo/execution/eth1/forkchoice.go +++ b/turbo/execution/eth1/forkchoice.go @@ -149,15 +149,6 @@ func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, original } defer tx.Rollback() - if isDomainAheadOfBlocks(tx) { - sendForkchoiceReceiptWithoutWaiting(outcomeCh, &execution.ForkChoiceReceipt{ - LatestValidHash: gointerfaces.ConvertHashToH256(common.Hash{}), - Status: execution.ExecutionStatus_TooFarAway, - ValidationError: "domain ahead of blocks", - }) - return - } - blockHash := originalBlockHash finishProgressBefore, err := stages.GetStageProgress(tx, stages.Finish) @@ -330,6 +321,14 @@ func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, original } } } + if isDomainAheadOfBlocks(tx) { + sendForkchoiceReceiptWithoutWaiting(outcomeCh, &execution.ForkChoiceReceipt{ + LatestValidHash: gointerfaces.ConvertHashToH256(common.Hash{}), + Status: execution.ExecutionStatus_TooFarAway, + ValidationError: "domain ahead of blocks", + }) + return + } // Set Progress for headers and bodies accordingly. if err := stages.SaveStageProgress(tx, stages.Headers, fcuHeader.Number.Uint64()); err != nil { From ed1f61713eda1a15d6404a4319f495da292c123b Mon Sep 17 00:00:00 2001 From: Giulio Date: Wed, 15 May 2024 23:20:37 +0200 Subject: [PATCH 06/13] save --- core/state/rw_v3.go | 24 ++++++++++++++++++++---- eth/stagedsync/exec3.go | 10 ++++++++-- eth/stagedsync/stage_execute.go | 2 +- 3 files changed, 29 insertions(+), 7 deletions(-) diff --git a/core/state/rw_v3.go b/core/state/rw_v3.go index 9b2cd419fca..92719813d93 100644 --- a/core/state/rw_v3.go +++ b/core/state/rw_v3.go @@ -37,7 +37,7 @@ type StateV3 struct { trace bool } -func NewStateV3(domains *libstate.SharedDomains, logger log.Logger) *StateV3 { +func NewStateV3(domains *libstate.SharedDomains, accumulator *shards.Accumulator, logger log.Logger) *StateV3 { return &StateV3{ domains: domains, triggers: map[uint64]*TxTask{}, @@ -345,14 +345,16 @@ type StateWriterBufferedV3 struct { accountDels map[string]*accounts.Account storagePrevs map[string][]byte codePrevs map[string]uint64 + accumulator *shards.Accumulator tx kv.Tx } -func NewStateWriterBufferedV3(rs *StateV3) *StateWriterBufferedV3 { +func NewStateWriterBufferedV3(rs *StateV3, accumulator *shards.Accumulator) *StateWriterBufferedV3 { return &StateWriterBufferedV3{ - rs: rs, - writeLists: newWriteList(), + rs: rs, + writeLists: newWriteList(), + accumulator: accumulator, //trace: true, } } @@ -395,6 +397,9 @@ func (w *StateWriterBufferedV3) UpdateAccountData(address common.Address, origin } } value := accounts.SerialiseV3(account) + if w.accumulator != nil { + w.accumulator.ChangeAccount(address, account.Incarnation, value) + } w.writeLists[kv.AccountsDomain.String()].Push(string(address[:]), value) return nil @@ -404,6 +409,9 @@ func (w *StateWriterBufferedV3) UpdateAccountCode(address common.Address, incarn if w.trace { fmt.Printf("code: %x, %x, valLen: %d\n", address.Bytes(), codeHash, len(code)) } + if w.accumulator != nil { + w.accumulator.ChangeCode(address, incarnation, code) + } w.writeLists[kv.CodeDomain.String()].Push(string(address[:]), code) return nil } @@ -412,6 +420,9 @@ func (w *StateWriterBufferedV3) DeleteAccount(address common.Address, original * if w.trace { fmt.Printf("del acc: %x\n", address) } + if w.accumulator != nil { + w.accumulator.DeleteAccount(address) + } w.writeLists[kv.AccountsDomain.String()].Push(string(address.Bytes()), nil) return nil } @@ -425,6 +436,11 @@ func (w *StateWriterBufferedV3) WriteAccountStorage(address common.Address, inca if w.trace { fmt.Printf("storage: %x,%x,%x\n", address, *key, value.Bytes()) } + if w.accumulator != nil && key != nil && value != nil { + k := *key + v := value.Bytes() + w.accumulator.ChangeStorage(address, incarnation, k, v) + } return nil } diff --git a/eth/stagedsync/exec3.go b/eth/stagedsync/exec3.go index c6763d273cf..bd950b64244 100644 --- a/eth/stagedsync/exec3.go +++ b/eth/stagedsync/exec3.go @@ -44,6 +44,7 @@ import ( "github.com/ledgerwatch/erigon/eth/ethconfig/estimate" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" "github.com/ledgerwatch/erigon/turbo/services" + "github.com/ledgerwatch/erigon/turbo/shards" ) var execStepsInDB = metrics.NewGauge(`exec_steps_in_db`) //nolint @@ -305,7 +306,12 @@ func ExecV3(ctx context.Context, var count uint64 var lock sync.RWMutex - rs := state.NewStateV3(doms, logger) + shouldReportToTxPool := maxBlockNum-blockNum <= 8 + var accumulator *shards.Accumulator + if shouldReportToTxPool { + accumulator = cfg.accumulator + } + rs := state.NewStateV3(doms, accumulator, logger) ////TODO: owner of `resultCh` is main goroutine, but owner of `retryQueue` is applyLoop. // Now rwLoop closing both (because applyLoop we completely restart) @@ -910,7 +916,7 @@ Loop: return err } doms.SetTxNum(inputTxNum) - rs = state.NewStateV3(doms, logger) + rs = state.NewStateV3(doms, nil, logger) applyWorker.ResetTx(applyTx) applyWorker.ResetState(rs) diff --git a/eth/stagedsync/stage_execute.go b/eth/stagedsync/stage_execute.go index e9c17e70505..8f7c0615d4b 100644 --- a/eth/stagedsync/stage_execute.go +++ b/eth/stagedsync/stage_execute.go @@ -351,7 +351,7 @@ func unwindExec3(u *UnwindState, s *StageState, txc wrap.TxContainer, ctx contex } else { domains = txc.Doms } - rs := state.NewStateV3(domains, logger) + rs := state.NewStateV3(domains, accumulator, logger) // unwind all txs of u.UnwindPoint block. 1 txn in begin/end of block - system txs txNum, err := rawdbv3.TxNums.Min(txc.Tx, u.UnwindPoint+1) if err != nil { From b5ed2aa2ac3427a541355e89a26e71a505e55a85 Mon Sep 17 00:00:00 2001 From: Giulio Date: Wed, 15 May 2024 23:22:05 +0200 Subject: [PATCH 07/13] save --- eth/stagedsync/stage_mining_exec.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eth/stagedsync/stage_mining_exec.go b/eth/stagedsync/stage_mining_exec.go index f6a21970149..cdecd0b6898 100644 --- a/eth/stagedsync/stage_mining_exec.go +++ b/eth/stagedsync/stage_mining_exec.go @@ -404,7 +404,7 @@ func filterBadTransactions(transactions []types.Transaction, config chain.Config filtered = append(filtered, transaction) transactions = transactions[1:] } - logger.Debug("Filtration", "initial", initialCnt, "no sender", noSenderCnt, "no account", noAccountCnt, "nonce too low", nonceTooLowCnt, "nonceTooHigh", missedTxs, "sender not EOA", notEOACnt, "fee too low", feeTooLowCnt, "overflow", overflowCnt, "balance too low", balanceTooLowCnt, "filtered", len(filtered)) + logger.Info("Filtration", "initial", initialCnt, "no sender", noSenderCnt, "no account", noAccountCnt, "nonce too low", nonceTooLowCnt, "nonceTooHigh", missedTxs, "sender not EOA", notEOACnt, "fee too low", feeTooLowCnt, "overflow", overflowCnt, "balance too low", balanceTooLowCnt, "filtered", len(filtered)) return filtered, nil } From bf0cde0531413978e6f365610b3811b368479343 Mon Sep 17 00:00:00 2001 From: Giulio Date: Wed, 15 May 2024 23:34:59 +0200 Subject: [PATCH 08/13] save --- erigon-lib/txpool/pool.go | 1 + 1 file changed, 1 insertion(+) diff --git a/erigon-lib/txpool/pool.go b/erigon-lib/txpool/pool.go index 1b4f38eb9e0..f16a565d2ab 100644 --- a/erigon-lib/txpool/pool.go +++ b/erigon-lib/txpool/pool.go @@ -833,6 +833,7 @@ func toBlobs(_blobs [][]byte) []gokzg4844.Blob { } func (p *TxPool) validateTx(txn *types.TxSlot, isLocal bool, stateCache kvcache.CacheView) txpoolcfg.DiscardReason { + txn.Traced = true isShanghai := p.isShanghai() || p.isAgra() if isShanghai { if txn.DataLen > fixedgas.MaxInitCodeSize { From 90345fa344aad6be4c95bf1f4520ee750d90429a Mon Sep 17 00:00:00 2001 From: Giulio Date: Wed, 15 May 2024 23:38:09 +0200 Subject: [PATCH 09/13] save --- eth/backend.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eth/backend.go b/eth/backend.go index 21d57665de5..22d4a83b92b 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -475,7 +475,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger } go func() { - logEvery := time.NewTicker(180 * time.Second) + logEvery := time.NewTicker(2 * time.Second) defer logEvery.Stop() var logItems []interface{} From 2e43296b98f7e61edce1e7d00b837d58bdd785a7 Mon Sep 17 00:00:00 2001 From: Giulio Date: Thu, 16 May 2024 01:31:20 +0200 Subject: [PATCH 10/13] save --- erigon-lib/txpool/pool.go | 1 - 1 file changed, 1 deletion(-) diff --git a/erigon-lib/txpool/pool.go b/erigon-lib/txpool/pool.go index f16a565d2ab..1b4f38eb9e0 100644 --- a/erigon-lib/txpool/pool.go +++ b/erigon-lib/txpool/pool.go @@ -833,7 +833,6 @@ func toBlobs(_blobs [][]byte) []gokzg4844.Blob { } func (p *TxPool) validateTx(txn *types.TxSlot, isLocal bool, stateCache kvcache.CacheView) txpoolcfg.DiscardReason { - txn.Traced = true isShanghai := p.isShanghai() || p.isAgra() if isShanghai { if txn.DataLen > fixedgas.MaxInitCodeSize { From ffb48d9a425317903573caf05a1b83e3e0ad1ab9 Mon Sep 17 00:00:00 2001 From: Giulio Date: Thu, 16 May 2024 02:45:30 +0200 Subject: [PATCH 11/13] save --- cmd/state/exec3/state.go | 15 ++++++++------- core/state/rw_v3.go | 23 +++++++++++++++++++---- eth/stagedsync/exec3.go | 8 ++++---- eth/stagedsync/stage_execute.go | 2 +- eth/stagedsync/stage_execute_test.go | 2 +- 5 files changed, 33 insertions(+), 17 deletions(-) diff --git a/cmd/state/exec3/state.go b/cmd/state/exec3/state.go index de7fce77946..273a25093eb 100644 --- a/cmd/state/exec3/state.go +++ b/cmd/state/exec3/state.go @@ -21,6 +21,7 @@ import ( "github.com/ledgerwatch/erigon/core/vm" "github.com/ledgerwatch/erigon/core/vm/evmtypes" "github.com/ledgerwatch/erigon/turbo/services" + "github.com/ledgerwatch/erigon/turbo/shards" ) type Worker struct { @@ -53,7 +54,7 @@ type Worker struct { dirs datadir.Dirs } -func NewWorker(lock sync.Locker, logger log.Logger, ctx context.Context, background bool, chainDb kv.RoDB, rs *state.StateV3, in *state.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, results *state.ResultsQueue, engine consensus.Engine, dirs datadir.Dirs) *Worker { +func NewWorker(lock sync.Locker, logger log.Logger, accumulator *shards.Accumulator, ctx context.Context, background bool, chainDb kv.RoDB, rs *state.StateV3, in *state.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, results *state.ResultsQueue, engine consensus.Engine, dirs datadir.Dirs) *Worker { w := &Worker{ lock: lock, logger: logger, @@ -62,7 +63,7 @@ func NewWorker(lock sync.Locker, logger log.Logger, ctx context.Context, backgro rs: rs, background: background, blockReader: blockReader, - stateWriter: state.NewStateWriterV3(rs), + stateWriter: state.NewStateWriterV3(rs, accumulator), stateReader: state.NewStateReaderV3(rs.Domains()), chainConfig: chainConfig, @@ -83,10 +84,10 @@ func NewWorker(lock sync.Locker, logger log.Logger, ctx context.Context, backgro return w } -func (rw *Worker) ResetState(rs *state.StateV3) { +func (rw *Worker) ResetState(rs *state.StateV3, accumulator *shards.Accumulator) { rw.rs = rs rw.SetReader(state.NewStateReaderV3(rs.Domains())) - rw.stateWriter = state.NewStateWriterV3(rs) + rw.stateWriter = state.NewStateWriterV3(rs, accumulator) } func (rw *Worker) Tx() kv.Tx { return rw.chainTx } @@ -268,7 +269,7 @@ func (rw *Worker) RunTxTaskNoLock(txTask *state.TxTask) { } } -func NewWorkersPool(lock sync.Locker, logger log.Logger, ctx context.Context, background bool, chainDb kv.RoDB, rs *state.StateV3, in *state.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, engine consensus.Engine, workerCount int, dirs datadir.Dirs) (reconWorkers []*Worker, applyWorker *Worker, rws *state.ResultsQueue, clear func(), wait func()) { +func NewWorkersPool(lock sync.Locker, accumulator *shards.Accumulator, logger log.Logger, ctx context.Context, background bool, chainDb kv.RoDB, rs *state.StateV3, in *state.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, engine consensus.Engine, workerCount int, dirs datadir.Dirs) (reconWorkers []*Worker, applyWorker *Worker, rws *state.ResultsQueue, clear func(), wait func()) { reconWorkers = make([]*Worker, workerCount) resultChSize := workerCount * 8 @@ -279,7 +280,7 @@ func NewWorkersPool(lock sync.Locker, logger log.Logger, ctx context.Context, ba ctx, cancel := context.WithCancel(ctx) g, ctx := errgroup.WithContext(ctx) for i := 0; i < workerCount; i++ { - reconWorkers[i] = NewWorker(lock, logger, ctx, background, chainDb, rs, in, blockReader, chainConfig, genesis, rws, engine, dirs) + reconWorkers[i] = NewWorker(lock, logger, accumulator, ctx, background, chainDb, rs, in, blockReader, chainConfig, genesis, rws, engine, dirs) } if background { for i := 0; i < workerCount; i++ { @@ -305,7 +306,7 @@ func NewWorkersPool(lock sync.Locker, logger log.Logger, ctx context.Context, ba //applyWorker.ResetTx(nil) } } - applyWorker = NewWorker(lock, logger, ctx, false, chainDb, rs, in, blockReader, chainConfig, genesis, rws, engine, dirs) + applyWorker = NewWorker(lock, logger, accumulator, ctx, false, chainDb, rs, in, blockReader, chainConfig, genesis, rws, engine, dirs) return reconWorkers, applyWorker, rws, clear, wait } diff --git a/core/state/rw_v3.go b/core/state/rw_v3.go index 92719813d93..acfc047bd4f 100644 --- a/core/state/rw_v3.go +++ b/core/state/rw_v3.go @@ -37,7 +37,7 @@ type StateV3 struct { trace bool } -func NewStateV3(domains *libstate.SharedDomains, accumulator *shards.Accumulator, logger log.Logger) *StateV3 { +func NewStateV3(domains *libstate.SharedDomains, logger log.Logger) *StateV3 { return &StateV3{ domains: domains, triggers: map[uint64]*TxTask{}, @@ -462,13 +462,14 @@ func (w *StateWriterBufferedV3) CreateContract(address common.Address) error { // StateWriterV3 - used by parallel workers to accumulate updates and then send them to conflict-resolution. type StateWriterV3 struct { - rs *StateV3 - trace bool + rs *StateV3 + trace bool + accumulator *shards.Accumulator tx kv.Tx } -func NewStateWriterV3(rs *StateV3) *StateWriterV3 { +func NewStateWriterV3(rs *StateV3, accumulator *shards.Accumulator) *StateWriterV3 { return &StateWriterV3{ rs: rs, //trace: true, @@ -504,6 +505,9 @@ func (w *StateWriterV3) UpdateAccountData(address common.Address, original, acco } } value := accounts.SerialiseV3(account) + if w.accumulator != nil { + w.accumulator.ChangeAccount(address, account.Incarnation, value) + } if err := w.rs.domains.DomainPut(kv.AccountsDomain, address[:], nil, value, nil, 0); err != nil { return err @@ -518,6 +522,9 @@ func (w *StateWriterV3) UpdateAccountCode(address common.Address, incarnation ui if err := w.rs.domains.DomainPut(kv.CodeDomain, address[:], nil, code, nil, 0); err != nil { return err } + if w.accumulator != nil { + w.accumulator.ChangeCode(address, incarnation, code) + } return nil } @@ -528,6 +535,9 @@ func (w *StateWriterV3) DeleteAccount(address common.Address, original *accounts if err := w.rs.domains.DomainDel(kv.AccountsDomain, address[:], nil, nil, 0); err != nil { return err } + if w.accumulator != nil { + w.accumulator.DeleteAccount(address) + } return nil } @@ -543,6 +553,11 @@ func (w *StateWriterV3) WriteAccountStorage(address common.Address, incarnation if len(v) == 0 { return w.rs.domains.DomainDel(kv.StorageDomain, composite, nil, nil, 0) } + if w.accumulator != nil && key != nil && value != nil { + k := *key + w.accumulator.ChangeStorage(address, incarnation, k, v) + } + return w.rs.domains.DomainPut(kv.StorageDomain, composite, nil, v, nil, 0) } diff --git a/eth/stagedsync/exec3.go b/eth/stagedsync/exec3.go index bd950b64244..a1eaefe08d7 100644 --- a/eth/stagedsync/exec3.go +++ b/eth/stagedsync/exec3.go @@ -311,7 +311,7 @@ func ExecV3(ctx context.Context, if shouldReportToTxPool { accumulator = cfg.accumulator } - rs := state.NewStateV3(doms, accumulator, logger) + rs := state.NewStateV3(doms, logger) ////TODO: owner of `resultCh` is main goroutine, but owner of `retryQueue` is applyLoop. // Now rwLoop closing both (because applyLoop we completely restart) @@ -324,7 +324,7 @@ func ExecV3(ctx context.Context, rwsConsumed := make(chan struct{}, 1) defer close(rwsConsumed) - execWorkers, applyWorker, rws, stopWorkers, waitWorkers := exec3.NewWorkersPool(lock.RLocker(), logger, ctx, parallel, chainDb, rs, in, blockReader, chainConfig, genesis, engine, workerCount+1, cfg.dirs) + execWorkers, applyWorker, rws, stopWorkers, waitWorkers := exec3.NewWorkersPool(lock.RLocker(), accumulator, logger, ctx, parallel, chainDb, rs, in, blockReader, chainConfig, genesis, engine, workerCount+1, cfg.dirs) defer stopWorkers() applyWorker.DiscardReadList() @@ -916,10 +916,10 @@ Loop: return err } doms.SetTxNum(inputTxNum) - rs = state.NewStateV3(doms, nil, logger) + rs = state.NewStateV3(doms, logger) applyWorker.ResetTx(applyTx) - applyWorker.ResetState(rs) + applyWorker.ResetState(rs, accumulator) return nil }(); err != nil { diff --git a/eth/stagedsync/stage_execute.go b/eth/stagedsync/stage_execute.go index 8f7c0615d4b..e9c17e70505 100644 --- a/eth/stagedsync/stage_execute.go +++ b/eth/stagedsync/stage_execute.go @@ -351,7 +351,7 @@ func unwindExec3(u *UnwindState, s *StageState, txc wrap.TxContainer, ctx contex } else { domains = txc.Doms } - rs := state.NewStateV3(domains, accumulator, logger) + rs := state.NewStateV3(domains, logger) // unwind all txs of u.UnwindPoint block. 1 txn in begin/end of block - system txs txNum, err := rawdbv3.TxNums.Min(txc.Tx, u.UnwindPoint+1) if err != nil { diff --git a/eth/stagedsync/stage_execute_test.go b/eth/stagedsync/stage_execute_test.go index 6f5e628e530..76b43ece23d 100644 --- a/eth/stagedsync/stage_execute_test.go +++ b/eth/stagedsync/stage_execute_test.go @@ -21,7 +21,7 @@ func apply(tx kv.RwTx, logger log.Logger) (beforeBlock, afterBlock testGenHook, if err != nil { panic(err) } - rs := state.NewStateV3(domains, logger) + rs := state.NewStateV3(domains, nil, logger) stateWriter := state.NewStateWriterBufferedV3(rs) stateWriter.SetTx(tx) From c8d1f9f3762bcc263257df7372784204bf8fbec1 Mon Sep 17 00:00:00 2001 From: Giulio Date: Thu, 16 May 2024 02:47:10 +0200 Subject: [PATCH 12/13] save --- eth/backend.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eth/backend.go b/eth/backend.go index 22d4a83b92b..21d57665de5 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -475,7 +475,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger } go func() { - logEvery := time.NewTicker(2 * time.Second) + logEvery := time.NewTicker(180 * time.Second) defer logEvery.Stop() var logItems []interface{} From 6b60831854c87afadd6a6ed6086eb7b47db81486 Mon Sep 17 00:00:00 2001 From: Giulio Date: Thu, 16 May 2024 02:48:54 +0200 Subject: [PATCH 13/13] save --- eth/stagedsync/stage_execute_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/eth/stagedsync/stage_execute_test.go b/eth/stagedsync/stage_execute_test.go index 76b43ece23d..0b84f2c4a45 100644 --- a/eth/stagedsync/stage_execute_test.go +++ b/eth/stagedsync/stage_execute_test.go @@ -21,8 +21,8 @@ func apply(tx kv.RwTx, logger log.Logger) (beforeBlock, afterBlock testGenHook, if err != nil { panic(err) } - rs := state.NewStateV3(domains, nil, logger) - stateWriter := state.NewStateWriterBufferedV3(rs) + rs := state.NewStateV3(domains, logger) + stateWriter := state.NewStateWriterBufferedV3(rs, nil) stateWriter.SetTx(tx) return func(n, from, numberOfBlocks uint64) {