Skip to content

Commit

Permalink
More efficient header verification of headers for Parlia when snapsho…
Browse files Browse the repository at this point in the history
…ts are used (#3998)

* Update stageloop.go

* Print

* Consider snapshot headers as parlia checkpoints

* Not fail after not loading snapshot

* Lazy snapshots

* Print number of validators

* More printing

* Use epoch instead of checkpoint interval

* Reduce logging

* Fix compilation

* Remove trace jump dest

* Fix lint

* Not store snapshots every epoch

* Separate snapshot for verification and finalisation

Co-authored-by: Alex Sharp <[email protected]>
Co-authored-by: Alexey Sharp <[email protected]>
  • Loading branch information
3 people authored Apr 29, 2022
1 parent 6903587 commit 285c782
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 110 deletions.
50 changes: 25 additions & 25 deletions cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -1149,24 +1149,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig)
}
vmConfig := &vm.Config{}

genesis, chainConfig := byChain(chain)
var engine consensus.Engine
config := &ethconfig.Defaults
if chainConfig.Clique != nil {
c := params.CliqueSnapshot
c.DBPath = filepath.Join(datadir, "clique", "db")
engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, c, config.Miner.Notify, config.Miner.Noverify, "", true, datadir)
} else if chainConfig.Aura != nil {
engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, &params.AuRaConfig{DBPath: filepath.Join(datadir, "aura")}, config.Miner.Notify, config.Miner.Noverify, "", true, datadir)
} else if chainConfig.Parlia != nil {
consensusConfig := &params.ParliaConfig{DBPath: filepath.Join(datadir, "parlia")}
engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, consensusConfig, config.Miner.Notify, config.Miner.Noverify, "", true, datadir)
} else if chainConfig.Bor != nil {
consensusConfig := &config.Bor
engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, consensusConfig, config.Miner.Notify, config.Miner.Noverify, HeimdallURL, false, datadir)
} else { //ethash
engine = ethash.NewFaker()
}
genesis, _ := byChain(chain)

events := privateapi.NewEvents()

Expand All @@ -1184,13 +1167,6 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig)
var batchSize datasize.ByteSize
must(batchSize.UnmarshalText([]byte(batchSizeStr)))

br := getBlockReader(chainConfig)
blockDownloaderWindow := 65536
sentryControlServer, err := sentry.NewControlServer(db, "", chainConfig, genesisBlock.Hash(), engine, 1, nil, blockDownloaderWindow, br)
if err != nil {
panic(err)
}

cfg := ethconfig.Defaults
cfg.Prune = pm
cfg.BatchSize = batchSize
Expand All @@ -1206,6 +1182,30 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig)
snDir := &dir.Rw{Path: filepath.Join(datadir, "snapshots")}
cfg.SnapshotDir = snDir
}
var engine consensus.Engine
config := &ethconfig.Defaults
if chainConfig.Clique != nil {
c := params.CliqueSnapshot
c.DBPath = filepath.Join(datadir, "clique", "db")
engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, c, config.Miner.Notify, config.Miner.Noverify, "", true, datadir, allSn)
} else if chainConfig.Aura != nil {
engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, &params.AuRaConfig{DBPath: filepath.Join(datadir, "aura")}, config.Miner.Notify, config.Miner.Noverify, "", true, datadir, allSn)
} else if chainConfig.Parlia != nil {
consensusConfig := &params.ParliaConfig{DBPath: filepath.Join(datadir, "parlia")}
engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, consensusConfig, config.Miner.Notify, config.Miner.Noverify, "", true, datadir, allSn)
} else if chainConfig.Bor != nil {
consensusConfig := &config.Bor
engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, consensusConfig, config.Miner.Notify, config.Miner.Noverify, HeimdallURL, false, datadir, allSn)
} else { //ethash
engine = ethash.NewFaker()
}

br := getBlockReader(chainConfig)
blockDownloaderWindow := 65536
sentryControlServer, err := sentry.NewControlServer(db, "", chainConfig, genesisBlock.Hash(), engine, 1, nil, blockDownloaderWindow, br)
if err != nil {
panic(err)
}

sync, err := stages2.NewStagedSync(context.Background(), logger, db, p2p.Config{}, cfg,
chainConfig.TerminalTotalDifficulty, sentryControlServer, tmpdir,
Expand Down
15 changes: 8 additions & 7 deletions cmd/state/commands/erigon2.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,16 +205,17 @@ func Erigon2(genesis *core.Genesis, chainConfig *params.ChainConfig, logger log.
}
}()

engine := initConsensusEngine(chainConfig, logger)
var blockReader interfaces.FullBlockReader
var allSnapshots *snapshotsync.RoSnapshots
syncMode := ethconfig.SyncModeByChainName(chainConfig.ChainName, syncmodeCli)
if syncMode == ethconfig.SnapSync {
allSnapshots := snapshotsync.NewRoSnapshots(ethconfig.NewSnapshotCfg(true, false), path.Join(datadir, "snapshots"))
allSnapshots = snapshotsync.NewRoSnapshots(ethconfig.NewSnapshotCfg(true, false), path.Join(datadir, "snapshots"))
defer allSnapshots.Close()
blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots)
} else {
blockReader = snapshotsync.NewBlockReader()
}
engine := initConsensusEngine(chainConfig, logger, allSnapshots)

for !interrupt {
blockNum++
Expand Down Expand Up @@ -598,23 +599,23 @@ func (ww *WriterWrapper) CreateContract(address common.Address) error {
return nil
}

func initConsensusEngine(chainConfig *params.ChainConfig, logger log.Logger) (engine consensus.Engine) {
func initConsensusEngine(chainConfig *params.ChainConfig, logger log.Logger, snapshots *snapshotsync.RoSnapshots) (engine consensus.Engine) {
config := ethconfig.Defaults

switch {
case chainConfig.Clique != nil:
c := params.CliqueSnapshot
c.DBPath = filepath.Join(datadir, "clique", "db")
engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, c, config.Miner.Notify, config.Miner.Noverify, "", true, datadir)
engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, c, config.Miner.Notify, config.Miner.Noverify, "", true, datadir, snapshots)
case chainConfig.Aura != nil:
consensusConfig := &params.AuRaConfig{DBPath: filepath.Join(datadir, "aura")}
engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, consensusConfig, config.Miner.Notify, config.Miner.Noverify, "", true, datadir)
engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, consensusConfig, config.Miner.Notify, config.Miner.Noverify, "", true, datadir, snapshots)
case chainConfig.Parlia != nil:
consensusConfig := &params.ParliaConfig{DBPath: filepath.Join(datadir, "parlia")}
engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, consensusConfig, config.Miner.Notify, config.Miner.Noverify, "", true, datadir)
engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, consensusConfig, config.Miner.Notify, config.Miner.Noverify, "", true, datadir, snapshots)
case chainConfig.Bor != nil:
consensusConfig := &config.Bor
engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, consensusConfig, config.Miner.Notify, config.Miner.Noverify, "http://localhost:1317", false, datadir)
engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, consensusConfig, config.Miner.Notify, config.Miner.Noverify, "http://localhost:1317", false, datadir, snapshots)
default: //ethash
engine = ethash.NewFaker()
}
Expand Down
8 changes: 4 additions & 4 deletions consensus/parlia/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (api *API) GetSnapshot(number *rpc.BlockNumber) (*Snapshot, error) {
if header == nil {
return nil, errUnknownBlock
}
return api.parlia.snapshot(api.chain, header.Number.Uint64(), header.Hash(), nil)
return api.parlia.snapshot(api.chain, header.Number.Uint64(), header.Hash(), nil, false /* verify */)
}

// GetSnapshotAtHash retrieves the state snapshot at a given block.
Expand All @@ -51,7 +51,7 @@ func (api *API) GetSnapshotAtHash(hash common.Hash) (*Snapshot, error) {
if header == nil {
return nil, errUnknownBlock
}
return api.parlia.snapshot(api.chain, header.Number.Uint64(), header.Hash(), nil)
return api.parlia.snapshot(api.chain, header.Number.Uint64(), header.Hash(), nil, false /* verify */)
}

// GetValidators retrieves the list of validators at the specified block.
Expand All @@ -67,7 +67,7 @@ func (api *API) GetValidators(number *rpc.BlockNumber) ([]common.Address, error)
if header == nil {
return nil, errUnknownBlock
}
snap, err := api.parlia.snapshot(api.chain, header.Number.Uint64(), header.Hash(), nil)
snap, err := api.parlia.snapshot(api.chain, header.Number.Uint64(), header.Hash(), nil, false /* verify */)
if err != nil {
return nil, err
}
Expand All @@ -80,7 +80,7 @@ func (api *API) GetValidatorsAtHash(hash common.Hash) ([]common.Address, error)
if header == nil {
return nil, errUnknownBlock
}
snap, err := api.parlia.snapshot(api.chain, header.Number.Uint64(), header.Hash(), nil)
snap, err := api.parlia.snapshot(api.chain, header.Number.Uint64(), header.Hash(), nil, false /* verify */)
if err != nil {
return nil, err
}
Expand Down
66 changes: 32 additions & 34 deletions consensus/parlia/parlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/rlp"
"github.com/ledgerwatch/erigon/rpc"
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
)

const (
Expand Down Expand Up @@ -232,14 +233,16 @@ type Parlia struct {
slashABI abi.ABI

// The fields below are for testing only
fakeDiff bool // Skip difficulty verifications
forks []uint64 // Forks extracted from the chainConfig
fakeDiff bool // Skip difficulty verifications
forks []uint64 // Forks extracted from the chainConfig
snapshots *snapshotsync.RoSnapshots
}

// New creates a Parlia consensus engine.
func New(
chainConfig *params.ChainConfig,
db kv.RwDB,
snapshots *snapshotsync.RoSnapshots,
) *Parlia {
// get parlia config
parliaConfig := chainConfig.Parlia
Expand Down Expand Up @@ -276,6 +279,7 @@ func New(
slashABI: sABI,
signer: types.LatestSigner(chainConfig),
forks: forkid.GatherForks(chainConfig),
snapshots: snapshots,
}

return c
Expand Down Expand Up @@ -392,7 +396,7 @@ func (p *Parlia) verifyCascadingFields(chain consensus.ChainHeaderReader, header
return consensus.ErrUnknownAncestor
}

snap, err := p.snapshot(chain, number-1, header.ParentHash, parents)
snap, err := p.snapshot(chain, number-1, header.ParentHash, parents, true /* verify */)
if err != nil {
return err
}
Expand Down Expand Up @@ -438,7 +442,7 @@ func (p *Parlia) verifySeal(chain consensus.ChainHeaderReader, header *types.Hea
return errUnknownBlock
}
// Retrieve the snapshot needed to verify this header and cache it
snap, err := p.snapshot(chain, number-1, header.ParentHash, parents)
snap, err := p.snapshot(chain, number-1, header.ParentHash, parents, true /* verify */)
if err != nil {
return err
}
Expand Down Expand Up @@ -481,7 +485,7 @@ func (p *Parlia) verifySeal(chain consensus.ChainHeaderReader, header *types.Hea
}

// snapshot retrieves the authorization snapshot at a given point in time.
func (p *Parlia) snapshot(chain consensus.ChainHeaderReader, number uint64, hash common.Hash, parents []*types.Header) (*Snapshot, error) {
func (p *Parlia) snapshot(chain consensus.ChainHeaderReader, number uint64, hash common.Hash, parents []*types.Header, verify bool) (*Snapshot, error) {
// Search for a snapshot in memory or on disk for checkpoints
var (
headers []*types.Header
Expand All @@ -500,31 +504,26 @@ func (p *Parlia) snapshot(chain consensus.ChainHeaderReader, number uint64, hash
if s, err := loadSnapshot(p.config, p.signatures, p.db, number, hash); err == nil {
log.Trace("Loaded snapshot from disk", "number", number, "hash", hash)
snap = s
break
if !verify || snap != nil {
break
}
}
}

// If we're at the genesis, snapshot the initial state.
if number == 0 {
checkpoint := chain.GetHeaderByNumber(number)
if checkpoint != nil {
// get checkpoint data
hash := checkpoint.Hash()

validatorBytes := checkpoint.Extra[extraVanity : len(checkpoint.Extra)-extraSeal]
// get validators from headers
validators, err := ParseValidators(validatorBytes)
if err != nil {
return nil, err
if (verify && number%p.config.Epoch == 0) || number == 0 {
if (p.snapshots != nil && number <= p.snapshots.BlocksAvailable()) || number == 0 {
// Headers included into the snapshots have to be trusted as checkpoints
checkpoint := chain.GetHeader(hash, number)
if checkpoint != nil {
validatorBytes := checkpoint.Extra[extraVanity : len(checkpoint.Extra)-extraSeal]
// get validators from headers
validators, err := ParseValidators(validatorBytes)
if err != nil {
return nil, err
}
// new snapshot
snap = newSnapshot(p.config, p.signatures, number, hash, validators)
break
}

// new snapshot
snap = newSnapshot(p.config, p.signatures, number, hash, validators)
if err := snap.store(p.db); err != nil {
return nil, err
}
log.Info("Stored checkpoint snapshot to disk", "number", number, "hash", hash)
break
}
}

Expand Down Expand Up @@ -557,7 +556,6 @@ func (p *Parlia) snapshot(chain consensus.ChainHeaderReader, number uint64, hash
for i := 0; i < len(headers)/2; i++ {
headers[i], headers[len(headers)-1-i] = headers[len(headers)-1-i], headers[i]
}

snap, err := snap.apply(headers, chain, parents, p.chainConfig.ChainID)
if err != nil {
return nil, err
Expand Down Expand Up @@ -590,7 +588,7 @@ func (p *Parlia) Prepare(chain consensus.ChainHeaderReader, header *types.Header
header.Nonce = types.BlockNonce{}

number := header.Number.Uint64()
snap, err := p.snapshot(chain, number-1, header.ParentHash, nil)
snap, err := p.snapshot(chain, number-1, header.ParentHash, nil, false /* verify */)
if err != nil {
return err
}
Expand Down Expand Up @@ -685,7 +683,7 @@ func (p *Parlia) finalize(header *types.Header, state *state.IntraBlockState, tx
txs = userTxs
// warn if not in majority fork
number := header.Number.Uint64()
snap, err := p.snapshot(chain, number-1, header.ParentHash, nil)
snap, err := p.snapshot(chain, number-1, header.ParentHash, nil, false /* verify */)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -805,7 +803,7 @@ func (p *Parlia) Seal(chain consensus.ChainHeaderReader, block *types.Block, res
val, signFn := p.val, p.signFn
p.lock.RUnlock()

snap, err := p.snapshot(chain, number-1, header.ParentHash, nil)
snap, err := p.snapshot(chain, number-1, header.ParentHash, nil, false /* verify */)
if err != nil {
return err
}
Expand Down Expand Up @@ -875,7 +873,7 @@ func (p *Parlia) SealHash(header *types.Header) common.Hash {
// CalcDifficulty is the difficulty adjustment algorithm. It returns the difficulty
// that a new block should have.
func (p *Parlia) CalcDifficulty(chain consensus.ChainHeaderReader, time, parentTime uint64, parentDifficulty *big.Int, parentNumber uint64, parentHash, parentUncleHash common.Hash, parentSeal []rlp.RawValue) *big.Int {
snap, err := p.snapshot(chain, parentNumber, parentHash, nil)
snap, err := p.snapshot(chain, parentNumber, parentHash, nil, false /* verify */)
if err != nil {
return nil
}
Expand Down Expand Up @@ -950,7 +948,7 @@ func (p *Parlia) shouldWaitForCurrentBlockProcess(chain consensus.ChainHeaderRea
}

func (p *Parlia) EnoughDistance(chain consensus.ChainReader, header *types.Header) bool {
snap, err := p.snapshot(chain, header.Number.Uint64()-1, header.ParentHash, nil)
snap, err := p.snapshot(chain, header.Number.Uint64()-1, header.ParentHash, nil, false /* verify */)
if err != nil {
return true
}
Expand All @@ -962,7 +960,7 @@ func (p *Parlia) IsLocalBlock(header *types.Header) bool {
}

func (p *Parlia) AllowLightProcess(chain consensus.ChainReader, currentHeader *types.Header) bool {
snap, err := p.snapshot(chain, currentHeader.Number.Uint64()-1, currentHeader.ParentHash, nil)
snap, err := p.snapshot(chain, currentHeader.Number.Uint64()-1, currentHeader.ParentHash, nil, false /* verify */)
if err != nil {
return true
}
Expand Down
Loading

0 comments on commit 285c782

Please sign in to comment.