Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimized 3 db reads on ValidatePayload #4694

Merged
merged 1 commit into from
Jul 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"time"

"github.com/ledgerwatch/erigon/eth/ethconsensusconfig"
"github.com/ledgerwatch/erigon/turbo/engineapi"
"github.com/ledgerwatch/erigon/turbo/services"
"google.golang.org/protobuf/types/known/emptypb"

Expand Down Expand Up @@ -135,8 +136,8 @@ type Ethereum struct {
txPool2Send *txpool2.Send
txPool2GrpcServer txpool_proto.TxpoolServer
notifyMiningAboutNewTxs chan struct{}

downloader *downloader.Downloader
forkValidator *engineapi.ForkValidator
downloader *downloader.Downloader
}

// New creates a new Ethereum object (including the
Expand Down Expand Up @@ -507,8 +508,8 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
if config.Ethstats != "" {
headCh = make(chan *types.Block, 1)
}

backend.stagedSync, err = stages2.NewStagedSync(backend.sentryCtx, backend.log, backend.chainDB, stack.Config().P2P, *config, backend.sentriesClient, tmpdir, backend.notifications, backend.downloaderClient, allSnapshots, headCh, inMemoryExecution)
backend.forkValidator = engineapi.NewForkValidator(currentBlock.NumberU64(), inMemoryExecution)
backend.stagedSync, err = stages2.NewStagedSync(backend.sentryCtx, backend.log, backend.chainDB, stack.Config().P2P, *config, backend.sentriesClient, tmpdir, backend.notifications, backend.downloaderClient, allSnapshots, headCh, backend.forkValidator)
if err != nil {
return nil, err
}
Expand Down
25 changes: 15 additions & 10 deletions eth/stagedsync/stage_finish.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,25 @@ import (
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/ethdb/cbor"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/turbo/engineapi"
"github.com/ledgerwatch/log/v3"
)

type FinishCfg struct {
db kv.RwDB
tmpDir string
log log.Logger
headCh chan *types.Block
db kv.RwDB
tmpDir string
log log.Logger
headCh chan *types.Block
forkValidator *engineapi.ForkValidator
}

func StageFinishCfg(db kv.RwDB, tmpDir string, logger log.Logger, headCh chan *types.Block) FinishCfg {
func StageFinishCfg(db kv.RwDB, tmpDir string, logger log.Logger, headCh chan *types.Block, forkValidator *engineapi.ForkValidator) FinishCfg {
return FinishCfg{
db: db,
log: logger,
tmpDir: tmpDir,
headCh: headCh,
db: db,
log: logger,
tmpDir: tmpDir,
headCh: headCh,
forkValidator: forkValidator,
}
}

Expand All @@ -56,12 +59,14 @@ func FinishForward(s *StageState, tx kv.RwTx, cfg FinishCfg, initialCycle bool)
if executionAt <= s.BlockNumber {
return nil
}

rawdb.WriteHeadBlockHash(tx, rawdb.ReadHeadHeaderHash(tx))
err = s.Update(tx, executionAt)
if err != nil {
return err
}
if cfg.forkValidator != nil {
cfg.forkValidator.NotifyCurrentHeight(executionAt)
}

if initialCycle {
if err := params.SetErigonVersion(tx, params.VersionKeyFinished); err != nil {
Expand Down
28 changes: 15 additions & 13 deletions turbo/engineapi/fork_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
package engineapi

import (
"fmt"

"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/memdb"
Expand Down Expand Up @@ -47,6 +45,8 @@ type ForkValidator struct {
extendingForkHeadHash common.Hash
// this is the function we use to perform payload validation.
validatePayload validatePayloadFunc
// this is the current point where we processed the chain so far.
currentHeight uint64
}

// abs64 is a utility method that given an int64, it returns its absolute value in uint64.
Expand All @@ -57,16 +57,18 @@ func abs64(n int64) uint64 {
return uint64(n)
}

func NewForkValidatorMock() *ForkValidator {
func NewForkValidatorMock(currentHeight uint64) *ForkValidator {
return &ForkValidator{
sideForksBlock: make(map[common.Hash]forkSegment),
currentHeight: currentHeight,
}
}

func NewForkValidator(validatePayload validatePayloadFunc) *ForkValidator {
func NewForkValidator(currentHeight uint64, validatePayload validatePayloadFunc) *ForkValidator {
return &ForkValidator{
sideForksBlock: make(map[common.Hash]forkSegment),
validatePayload: validatePayload,
currentHeight: currentHeight,
}
}

Expand All @@ -75,6 +77,11 @@ func (fv *ForkValidator) ExtendingForkHeadHash() common.Hash {
return fv.extendingForkHeadHash
}

// NotifyCurrentHeight is to be called at the end of the stage cycle and repressent the last processed block.
func (fv *ForkValidator) NotifyCurrentHeight(currentHeight uint64) {
fv.currentHeight = currentHeight
}

// FlushExtendingFork flush the current extending fork if fcu chooses its head hash as the its forkchoice.
func (fv *ForkValidator) FlushExtendingFork(tx kv.RwTx) error {
// Flush changes to db.
Expand All @@ -97,12 +104,7 @@ func (fv *ForkValidator) ValidatePayload(tx kv.RwTx, header *types.Header, body
status = remote.EngineStatus_ACCEPTED
return
}
currentHeight := rawdb.ReadCurrentBlockNumber(tx)
if currentHeight == nil {
criticalError = fmt.Errorf("could not read block number.")
return
}
defer fv.clean(*currentHeight)
defer fv.clean()

if extendCanonical {
// If the new block extends the canonical chain we update extendingFork.
Expand Down Expand Up @@ -133,7 +135,7 @@ func (fv *ForkValidator) ValidatePayload(tx kv.RwTx, header *types.Header, body
}

// if the block is not in range of maxForkDepth from head then we do not validate it.
if abs64(int64(*currentHeight)-header.Number.Int64()) > maxForkDepth {
if abs64(int64(fv.currentHeight)-header.Number.Int64()) > maxForkDepth {
status = remote.EngineStatus_ACCEPTED
return
}
Expand Down Expand Up @@ -200,9 +202,9 @@ func (fv *ForkValidator) Clear(tx kv.RwTx) {
}

// clean wipes out all outdated sideforks whose distance exceed the height of the head.
func (fv *ForkValidator) clean(currentHeight uint64) {
func (fv *ForkValidator) clean() {
for hash, sb := range fv.sideForksBlock {
if abs64(int64(currentHeight)-sb.header.Number.Int64()) > maxForkDepth {
if abs64(int64(fv.currentHeight)-sb.header.Number.Int64()) > maxForkDepth {
delete(fv.sideForksBlock, hash)
}
}
Expand Down
4 changes: 2 additions & 2 deletions turbo/stages/mock_sentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey

mock.Sync = stagedsync.New(
stagedsync.DefaultStages(mock.Ctx, prune,
stagedsync.StageHeadersCfg(mock.DB, mock.sentriesClient.Hd, mock.sentriesClient.Bd, *mock.ChainConfig, sendHeaderRequest, propagateNewBlockHashes, penalize, cfg.BatchSize, false, false, allSnapshots, snapshotsDownloader, blockReader, mock.tmpdir, mock.Notifications.Events, mock.Notifications, engineapi.NewForkValidatorMock()),
stagedsync.StageHeadersCfg(mock.DB, mock.sentriesClient.Hd, mock.sentriesClient.Bd, *mock.ChainConfig, sendHeaderRequest, propagateNewBlockHashes, penalize, cfg.BatchSize, false, false, allSnapshots, snapshotsDownloader, blockReader, mock.tmpdir, mock.Notifications.Events, mock.Notifications, engineapi.NewForkValidatorMock(1)),
stagedsync.StageCumulativeIndexCfg(mock.DB),
stagedsync.StageBlockHashesCfg(mock.DB, mock.tmpdir, mock.ChainConfig),
stagedsync.StageBodiesCfg(
Expand Down Expand Up @@ -348,7 +348,7 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey
stagedsync.StageLogIndexCfg(mock.DB, prune, mock.tmpdir),
stagedsync.StageCallTracesCfg(mock.DB, prune, 0, mock.tmpdir),
stagedsync.StageTxLookupCfg(mock.DB, prune, mock.tmpdir, allSnapshots, isBor),
stagedsync.StageFinishCfg(mock.DB, mock.tmpdir, mock.Log, nil), true),
stagedsync.StageFinishCfg(mock.DB, mock.tmpdir, mock.Log, nil, nil), true),
stagedsync.DefaultUnwindOrder,
stagedsync.DefaultPruneOrder,
)
Expand Down
6 changes: 3 additions & 3 deletions turbo/stages/stageloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func NewStagedSync(
snapDownloader proto_downloader.DownloaderClient,
snapshots *snapshotsync.RoSnapshots,
headCh chan *types.Block,
execPayload func(kv.RwTx, *types.Header, *types.RawBody, uint64, []*types.Header, []*types.RawBody) error,
forkValidator *engineapi.ForkValidator,
) (*stagedsync.Sync, error) {
var blockReader services.FullBlockReader
if cfg.Snapshot.Enabled {
Expand Down Expand Up @@ -365,7 +365,7 @@ func NewStagedSync(
tmpdir,
notifications.Events,
notifications,
engineapi.NewForkValidator(execPayload)),
forkValidator),
stagedsync.StageCumulativeIndexCfg(db),
stagedsync.StageBlockHashesCfg(db, tmpdir, controlServer.ChainConfig),
stagedsync.StageBodiesCfg(
Expand Down Expand Up @@ -404,7 +404,7 @@ func NewStagedSync(
stagedsync.StageLogIndexCfg(db, cfg.Prune, tmpdir),
stagedsync.StageCallTracesCfg(db, cfg.Prune, 0, tmpdir),
stagedsync.StageTxLookupCfg(db, cfg.Prune, tmpdir, snapshots, isBor),
stagedsync.StageFinishCfg(db, tmpdir, logger, headCh), runInTestMode),
stagedsync.StageFinishCfg(db, tmpdir, logger, headCh, forkValidator), runInTestMode),
stagedsync.DefaultUnwindOrder,
stagedsync.DefaultPruneOrder,
), nil
Expand Down