Skip to content

Commit

Permalink
optimized one db read (#4694)
Browse files Browse the repository at this point in the history
  • Loading branch information
Giulio2002 authored Jul 12, 2022
1 parent e0845f2 commit b161c27
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 32 deletions.
9 changes: 5 additions & 4 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"time"

"github.com/ledgerwatch/erigon/eth/ethconsensusconfig"
"github.com/ledgerwatch/erigon/turbo/engineapi"
"github.com/ledgerwatch/erigon/turbo/services"
"google.golang.org/protobuf/types/known/emptypb"

Expand Down Expand Up @@ -135,8 +136,8 @@ type Ethereum struct {
txPool2Send *txpool2.Send
txPool2GrpcServer txpool_proto.TxpoolServer
notifyMiningAboutNewTxs chan struct{}

downloader *downloader.Downloader
forkValidator *engineapi.ForkValidator
downloader *downloader.Downloader
}

// New creates a new Ethereum object (including the
Expand Down Expand Up @@ -507,8 +508,8 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
if config.Ethstats != "" {
headCh = make(chan *types.Block, 1)
}

backend.stagedSync, err = stages2.NewStagedSync(backend.sentryCtx, backend.log, backend.chainDB, stack.Config().P2P, *config, backend.sentriesClient, tmpdir, backend.notifications, backend.downloaderClient, allSnapshots, headCh, inMemoryExecution)
backend.forkValidator = engineapi.NewForkValidator(currentBlock.NumberU64(), inMemoryExecution)
backend.stagedSync, err = stages2.NewStagedSync(backend.sentryCtx, backend.log, backend.chainDB, stack.Config().P2P, *config, backend.sentriesClient, tmpdir, backend.notifications, backend.downloaderClient, allSnapshots, headCh, backend.forkValidator)
if err != nil {
return nil, err
}
Expand Down
25 changes: 15 additions & 10 deletions eth/stagedsync/stage_finish.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,25 @@ import (
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/ethdb/cbor"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/turbo/engineapi"
"github.com/ledgerwatch/log/v3"
)

type FinishCfg struct {
db kv.RwDB
tmpDir string
log log.Logger
headCh chan *types.Block
db kv.RwDB
tmpDir string
log log.Logger
headCh chan *types.Block
forkValidator *engineapi.ForkValidator
}

func StageFinishCfg(db kv.RwDB, tmpDir string, logger log.Logger, headCh chan *types.Block) FinishCfg {
func StageFinishCfg(db kv.RwDB, tmpDir string, logger log.Logger, headCh chan *types.Block, forkValidator *engineapi.ForkValidator) FinishCfg {
return FinishCfg{
db: db,
log: logger,
tmpDir: tmpDir,
headCh: headCh,
db: db,
log: logger,
tmpDir: tmpDir,
headCh: headCh,
forkValidator: forkValidator,
}
}

Expand All @@ -56,12 +59,14 @@ func FinishForward(s *StageState, tx kv.RwTx, cfg FinishCfg, initialCycle bool)
if executionAt <= s.BlockNumber {
return nil
}

rawdb.WriteHeadBlockHash(tx, rawdb.ReadHeadHeaderHash(tx))
err = s.Update(tx, executionAt)
if err != nil {
return err
}
if cfg.forkValidator != nil {
cfg.forkValidator.NotifyCurrentHeight(executionAt)
}

if initialCycle {
if err := params.SetErigonVersion(tx, params.VersionKeyFinished); err != nil {
Expand Down
28 changes: 15 additions & 13 deletions turbo/engineapi/fork_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
package engineapi

import (
"fmt"

"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/memdb"
Expand Down Expand Up @@ -47,6 +45,8 @@ type ForkValidator struct {
extendingForkHeadHash common.Hash
// this is the function we use to perform payload validation.
validatePayload validatePayloadFunc
// this is the current point where we processed the chain so far.
currentHeight uint64
}

// abs64 is a utility method that given an int64, it returns its absolute value in uint64.
Expand All @@ -57,16 +57,18 @@ func abs64(n int64) uint64 {
return uint64(n)
}

func NewForkValidatorMock() *ForkValidator {
func NewForkValidatorMock(currentHeight uint64) *ForkValidator {
return &ForkValidator{
sideForksBlock: make(map[common.Hash]forkSegment),
currentHeight: currentHeight,
}
}

func NewForkValidator(validatePayload validatePayloadFunc) *ForkValidator {
func NewForkValidator(currentHeight uint64, validatePayload validatePayloadFunc) *ForkValidator {
return &ForkValidator{
sideForksBlock: make(map[common.Hash]forkSegment),
validatePayload: validatePayload,
currentHeight: currentHeight,
}
}

Expand All @@ -75,6 +77,11 @@ func (fv *ForkValidator) ExtendingForkHeadHash() common.Hash {
return fv.extendingForkHeadHash
}

// NotifyCurrentHeight is to be called at the end of the stage cycle and repressent the last processed block.
func (fv *ForkValidator) NotifyCurrentHeight(currentHeight uint64) {
fv.currentHeight = currentHeight
}

// FlushExtendingFork flush the current extending fork if fcu chooses its head hash as the its forkchoice.
func (fv *ForkValidator) FlushExtendingFork(tx kv.RwTx) error {
// Flush changes to db.
Expand All @@ -97,12 +104,7 @@ func (fv *ForkValidator) ValidatePayload(tx kv.RwTx, header *types.Header, body
status = remote.EngineStatus_ACCEPTED
return
}
currentHeight := rawdb.ReadCurrentBlockNumber(tx)
if currentHeight == nil {
criticalError = fmt.Errorf("could not read block number.")
return
}
defer fv.clean(*currentHeight)
defer fv.clean()

if extendCanonical {
// If the new block extends the canonical chain we update extendingFork.
Expand Down Expand Up @@ -133,7 +135,7 @@ func (fv *ForkValidator) ValidatePayload(tx kv.RwTx, header *types.Header, body
}

// if the block is not in range of maxForkDepth from head then we do not validate it.
if abs64(int64(*currentHeight)-header.Number.Int64()) > maxForkDepth {
if abs64(int64(fv.currentHeight)-header.Number.Int64()) > maxForkDepth {
status = remote.EngineStatus_ACCEPTED
return
}
Expand Down Expand Up @@ -200,9 +202,9 @@ func (fv *ForkValidator) Clear(tx kv.RwTx) {
}

// clean wipes out all outdated sideforks whose distance exceed the height of the head.
func (fv *ForkValidator) clean(currentHeight uint64) {
func (fv *ForkValidator) clean() {
for hash, sb := range fv.sideForksBlock {
if abs64(int64(currentHeight)-sb.header.Number.Int64()) > maxForkDepth {
if abs64(int64(fv.currentHeight)-sb.header.Number.Int64()) > maxForkDepth {
delete(fv.sideForksBlock, hash)
}
}
Expand Down
4 changes: 2 additions & 2 deletions turbo/stages/mock_sentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey

mock.Sync = stagedsync.New(
stagedsync.DefaultStages(mock.Ctx, prune,
stagedsync.StageHeadersCfg(mock.DB, mock.sentriesClient.Hd, mock.sentriesClient.Bd, *mock.ChainConfig, sendHeaderRequest, propagateNewBlockHashes, penalize, cfg.BatchSize, false, false, allSnapshots, snapshotsDownloader, blockReader, mock.tmpdir, mock.Notifications.Events, mock.Notifications, engineapi.NewForkValidatorMock()),
stagedsync.StageHeadersCfg(mock.DB, mock.sentriesClient.Hd, mock.sentriesClient.Bd, *mock.ChainConfig, sendHeaderRequest, propagateNewBlockHashes, penalize, cfg.BatchSize, false, false, allSnapshots, snapshotsDownloader, blockReader, mock.tmpdir, mock.Notifications.Events, mock.Notifications, engineapi.NewForkValidatorMock(1)),
stagedsync.StageCumulativeIndexCfg(mock.DB),
stagedsync.StageBlockHashesCfg(mock.DB, mock.tmpdir, mock.ChainConfig),
stagedsync.StageBodiesCfg(
Expand Down Expand Up @@ -348,7 +348,7 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey
stagedsync.StageLogIndexCfg(mock.DB, prune, mock.tmpdir),
stagedsync.StageCallTracesCfg(mock.DB, prune, 0, mock.tmpdir),
stagedsync.StageTxLookupCfg(mock.DB, prune, mock.tmpdir, allSnapshots, isBor),
stagedsync.StageFinishCfg(mock.DB, mock.tmpdir, mock.Log, nil), true),
stagedsync.StageFinishCfg(mock.DB, mock.tmpdir, mock.Log, nil, nil), true),
stagedsync.DefaultUnwindOrder,
stagedsync.DefaultPruneOrder,
)
Expand Down
6 changes: 3 additions & 3 deletions turbo/stages/stageloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func NewStagedSync(
snapDownloader proto_downloader.DownloaderClient,
snapshots *snapshotsync.RoSnapshots,
headCh chan *types.Block,
execPayload func(kv.RwTx, *types.Header, *types.RawBody, uint64, []*types.Header, []*types.RawBody) error,
forkValidator *engineapi.ForkValidator,
) (*stagedsync.Sync, error) {
var blockReader services.FullBlockReader
if cfg.Snapshot.Enabled {
Expand Down Expand Up @@ -365,7 +365,7 @@ func NewStagedSync(
tmpdir,
notifications.Events,
notifications,
engineapi.NewForkValidator(execPayload)),
forkValidator),
stagedsync.StageCumulativeIndexCfg(db),
stagedsync.StageBlockHashesCfg(db, tmpdir, controlServer.ChainConfig),
stagedsync.StageBodiesCfg(
Expand Down Expand Up @@ -404,7 +404,7 @@ func NewStagedSync(
stagedsync.StageLogIndexCfg(db, cfg.Prune, tmpdir),
stagedsync.StageCallTracesCfg(db, cfg.Prune, 0, tmpdir),
stagedsync.StageTxLookupCfg(db, cfg.Prune, tmpdir, snapshots, isBor),
stagedsync.StageFinishCfg(db, tmpdir, logger, headCh), runInTestMode),
stagedsync.StageFinishCfg(db, tmpdir, logger, headCh, forkValidator), runInTestMode),
stagedsync.DefaultUnwindOrder,
stagedsync.DefaultPruneOrder,
), nil
Expand Down

0 comments on commit b161c27

Please sign in to comment.