Skip to content

Commit

Permalink
E3: Fixed many hive tests and bugs (See description) (#10409)
Browse files Browse the repository at this point in the history
Fixes made:
* Added ReceiptsRoot check
* Use progress on stages.Execute instead of Intermediate hashes to
determine latest valid hash
* Validation by block batch needs to be done one-by-one. see
https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#engine_newpayloadv1
* Adjustment of stages.Execution on invalid state root

<img width="1719" alt="Screenshot 2024-05-20 alle 01 52 06"
src="https://github.com/ledgerwatch/erigon/assets/29233688/978ad360-a81c-4ab2-945e-7ccdb373f81a">

Only 24 tests left to fix.

What is left to fix?

* Unwind is bugged and produces bad state for some long reorgs as it
crashes on first block of long reorg (8 blocks which are mostly empty)
* Block execution deadlocks/stalls/gets stuck on some tests

Took the time to update hive repository with E3:
ethereum/hive#1106

PSA: Also modified the logs for stage snapshots
  • Loading branch information
Giulio2002 authored May 20, 2024
1 parent 9f2e943 commit 4373ee1
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 23 deletions.
7 changes: 6 additions & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"golang.org/x/crypto/sha3"

"github.com/ledgerwatch/erigon-lib/chain"
"github.com/ledgerwatch/erigon-lib/common"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/cmp"
"github.com/ledgerwatch/erigon-lib/common/dbg"
Expand Down Expand Up @@ -369,7 +370,7 @@ func InitializeBlockExecution(engine consensus.Engine, chain consensus.ChainHead
return nil
}

func BlockPostValidation(gasUsed, blobGasUsed uint64, h *types.Header) error {
func BlockPostValidation(gasUsed, blobGasUsed uint64, checkReceipts bool, receiptHash common.Hash, h *types.Header) error {
if gasUsed != h.GasUsed {
return fmt.Errorf("gas used by execution: %d, in header: %d, headerNum=%d, %x",
gasUsed, h.GasUsed, h.Number.Uint64(), h.Hash())
Expand All @@ -379,5 +380,9 @@ func BlockPostValidation(gasUsed, blobGasUsed uint64, h *types.Header) error {
return fmt.Errorf("blobGasUsed by execution: %d, in header: %d, headerNum=%d, %x",
blobGasUsed, *h.BlobGasUsed, h.Number.Uint64(), h.Hash())
}
if checkReceipts && receiptHash != h.ReceiptHash {
return fmt.Errorf("receiptHash mismatch: %x != %x, headerNum=%d, %x",
receiptHash, h.ReceiptHash, h.Number.Uint64(), h.Hash())
}
return nil
}
4 changes: 2 additions & 2 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,9 +566,9 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
dirs, notifications, blockReader, blockWriter, backend.agg, backend.silkworm, terseLogger)
chainReader := consensuschain.NewReader(chainConfig, txc.Tx, blockReader, logger)
// We start the mining step
if err := stages2.StateStep(ctx, chainReader, backend.engine, txc, stateSync, header, body, unwindPoint, headersChain, bodiesChain); err != nil {
if err := stages2.StateStep(ctx, chainReader, backend.engine, txc, stateSync, header, body, unwindPoint, headersChain, bodiesChain, config.ImportMode); err != nil {
logger.Warn("Could not validate block", "err", err)
return err
return errors.Join(consensus.ErrInvalidBlock, err)
}
var progress uint64
progress, err = stages.GetStageProgress(txc.Tx, stages.Execution)
Expand Down
5 changes: 4 additions & 1 deletion eth/stagedsync/exec3.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,8 +767,9 @@ Loop:
blobGasUsed += txTask.Tx.GetBlobGas()
}
if txTask.Final {
checkReceipts := !cfg.vmConfig.StatelessExec && chainConfig.IsByzantium(txTask.BlockNum) && !cfg.vmConfig.NoReceipts
if txTask.BlockNum > 0 && !skipPostEvaluation { //Disable check for genesis. Maybe need somehow improve it in future - to satisfy TestExecutionSpec
if err := core.BlockPostValidation(usedGas, blobGasUsed, txTask.Header); err != nil {
if err := core.BlockPostValidation(usedGas, blobGasUsed, checkReceipts, types.DeriveSha(receipts), txTask.Header); err != nil {
return fmt.Errorf("%w, txnIdx=%d, %v", consensus.ErrInvalidBlock, txTask.TxIndex, err) //same as in stage_exec.go
}
}
Expand All @@ -782,9 +783,11 @@ Loop:
TransactionIndex: uint(txTask.TxIndex),
Type: txTask.Tx.Type(),
CumulativeGasUsed: usedGas,
GasUsed: txTask.UsedGas,
TxHash: txTask.Tx.Hash(),
Logs: txTask.Logs,
}
receipt.Bloom = types.CreateBloom(types.Receipts{receipt})
if txTask.Failed {
receipt.Status = types.ReceiptStatusFailed
} else {
Expand Down
11 changes: 3 additions & 8 deletions turbo/engineapi/engine_helpers/fork_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,23 +58,21 @@ type ForkValidator struct {
tmpDir string
// block hashes that are deemed valid
validHashes *lru.Cache[libcommon.Hash, bool]
stateV3 bool

ctx context.Context

// we want fork validator to be thread safe so let
lock sync.Mutex
}

func NewForkValidatorMock(currentHeight uint64, stateV3 bool) *ForkValidator {
func NewForkValidatorMock(currentHeight uint64) *ForkValidator {
validHashes, err := lru.New[libcommon.Hash, bool]("validHashes", maxForkDepth*8)
if err != nil {
panic(err)
}
return &ForkValidator{
currentHeight: currentHeight,
validHashes: validHashes,
stateV3: stateV3,
}
}

Expand Down Expand Up @@ -306,11 +304,8 @@ func (fv *ForkValidator) validateAndStorePayload(txc wrap.TxContainer, header *t
latestValidHash = header.Hash()
if validationError != nil {
var latestValidNumber uint64
if fv.stateV3 {
latestValidNumber, criticalError = stages.GetStageProgress(txc.Tx, stages.Execution)
} else {
latestValidNumber, criticalError = stages.GetStageProgress(txc.Tx, stages.IntermediateHashes)
}
latestValidNumber, criticalError = stages.GetStageProgress(txc.Tx, stages.Execution)

if criticalError != nil {
return
}
Expand Down
24 changes: 20 additions & 4 deletions turbo/snapshotsync/snapshotsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,15 @@ func WaitForDownloader(ctx context.Context, logPrefix string, headerchain, blobs
if stats, err = snapshotDownloader.Stats(ctx, &proto_downloader.StatsRequest{}); err != nil {
log.Warn("Error while waiting for snapshots progress", "err", err)
} else {
logStats(ctx, stats, downloadStartTime, stagesIdsList, logPrefix, "download")
reason := "download"
if headerchain {
reason = "downloading header-chain"
}
logEnd := "download finished"
if headerchain {
logEnd = "header-chain download finished"
}
logStats(ctx, stats, downloadStartTime, stagesIdsList, logPrefix, reason, logEnd)
}
}
}
Expand All @@ -394,7 +402,15 @@ func WaitForDownloader(ctx context.Context, logPrefix string, headerchain, blobs
if stats, err = snapshotDownloader.Stats(ctx, &proto_downloader.StatsRequest{}); err != nil {
log.Warn("Error while waiting for snapshots progress", "err", err)
} else {
logStats(ctx, stats, downloadStartTime, stagesIdsList, logPrefix, "download")
reason := "download"
if headerchain {
reason = "downloading header-chain"
}
logEnd := "download finished"
if headerchain {
logEnd = "header-chain download finished"
}
logStats(ctx, stats, downloadStartTime, stagesIdsList, logPrefix, reason, logEnd)
}
}
}
Expand Down Expand Up @@ -484,7 +500,7 @@ func WaitForDownloader(ctx context.Context, logPrefix string, headerchain, blobs
return nil
}

func logStats(ctx context.Context, stats *proto_downloader.StatsReply, startTime time.Time, stagesIdsList []string, logPrefix string, logReason string) {
func logStats(ctx context.Context, stats *proto_downloader.StatsReply, startTime time.Time, stagesIdsList []string, logPrefix string, logReason string, logEnd string) {
var m runtime.MemStats

diagnostics.Send(diagnostics.SyncStagesList{Stages: stagesIdsList})
Expand All @@ -504,7 +520,7 @@ func logStats(ctx context.Context, stats *proto_downloader.StatsReply, startTime
})

if stats.Completed {
log.Info(fmt.Sprintf("[%s] download finished", logPrefix), "time", time.Since(startTime).String())
log.Info(fmt.Sprintf("[%s] %s", logPrefix, logEnd), "time", time.Since(startTime).String())
} else {

if stats.MetadataReady < stats.FilesTotal && stats.BytesTotal == 0 {
Expand Down
5 changes: 3 additions & 2 deletions turbo/stages/mock/mock_sentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package mock
import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"math/big"
"os"
Expand Down Expand Up @@ -353,9 +354,9 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK
dirs, notifications, mock.BlockReader, blockWriter, mock.agg, nil, terseLogger)
chainReader := consensuschain.NewReader(mock.ChainConfig, txc.Tx, mock.BlockReader, logger)
// We start the mining step
if err := stages2.StateStep(ctx, chainReader, mock.Engine, txc, stateSync, header, body, unwindPoint, headersChain, bodiesChain); err != nil {
if err := stages2.StateStep(ctx, chainReader, mock.Engine, txc, stateSync, header, body, unwindPoint, headersChain, bodiesChain, true); err != nil {
logger.Warn("Could not validate block", "err", err)
return err
return errors.Join(consensus.ErrInvalidBlock, err)
}
var progress uint64
progress, err = stages.GetStageProgress(txc.Tx, stages.Execution)
Expand Down
36 changes: 31 additions & 5 deletions turbo/stages/stageloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,13 +455,27 @@ func addAndVerifyBlockStep(batch kv.RwTx, engine consensus.Engine, chainReader c
return nil
}

func StateStep(ctx context.Context, chainReader consensus.ChainReader, engine consensus.Engine, txc wrap.TxContainer, stateSync *stagedsync.Sync, header *types.Header, body *types.RawBody, unwindPoint uint64, headersChain []*types.Header, bodiesChain []*types.RawBody) (err error) {
func cleanupProgressIfNeeded(batch kv.RwTx, header *types.Header) error {
// If we fail state root then we have wrong execution stage progress set (+1), we need to decrease by one!
progress, err := stages.GetStageProgress(batch, stages.Execution)
if err != nil {
return err
}
if progress == header.Number.Uint64() && progress > 0 {
progress--
if err := stages.SaveStageProgress(batch, stages.Execution, progress); err != nil {
return err
}
}
return nil
}

func StateStep(ctx context.Context, chainReader consensus.ChainReader, engine consensus.Engine, txc wrap.TxContainer, stateSync *stagedsync.Sync, header *types.Header, body *types.RawBody, unwindPoint uint64, headersChain []*types.Header, bodiesChain []*types.RawBody, test bool) (err error) {
defer func() {
if rec := recover(); rec != nil {
err = fmt.Errorf("%+v, trace: %s", rec, dbg.Stack())
}
}() // avoid crash because Erigon's core does many things

// Construct side fork if we have one
if unwindPoint > 0 {
// Run it through the unwind
Expand All @@ -484,9 +498,16 @@ func StateStep(ctx context.Context, chainReader consensus.ChainReader, engine co
return err
}
// Run state sync
if err = stateSync.RunNoInterrupt(nil, txc, false /* firstCycle */); err != nil {
return err
if !test {
if err = stateSync.RunNoInterrupt(nil, txc, false /* firstCycle */); err != nil {
if err := cleanupProgressIfNeeded(txc.Tx, currentHeader); err != nil {
return err

}
return err
}
}

}

// If we did not specify header we stop here
Expand All @@ -497,10 +518,15 @@ func StateStep(ctx context.Context, chainReader consensus.ChainReader, engine co
if err := addAndVerifyBlockStep(txc.Tx, engine, chainReader, header, body); err != nil {
return err
}
// Run state sync
if err = stateSync.RunNoInterrupt(nil, txc, false /* firstCycle */); err != nil {
if !test {
if err := cleanupProgressIfNeeded(txc.Tx, header); err != nil {
return err
}
}
return err
}

return nil
}

Expand Down

0 comments on commit 4373ee1

Please sign in to comment.