Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report Valid ExecutionStatus only after all stages #3212

Merged
merged 14 commits into from
Jan 7, 2022
Merged
1 change: 0 additions & 1 deletion cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -1119,7 +1119,6 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig)
sync, err := stages2.NewStagedSync(context.Background(), logger, db, p2p.Config{}, cfg,
chainConfig.TerminalTotalDifficulty, sentryControlServer, tmpdir,
nil, nil, nil, nil,
nil,
)
if err != nil {
panic(err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/rpcdaemon/commands/eth_subscribe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestEthSubscribe(t *testing.T) {

initialCycle := true
highestSeenHeader := chain.TopBlock.NumberU64()
if err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil {
if _, err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil {
t.Fatal(err)
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/rpcdaemon/commands/send_transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestSendRawTransaction(t *testing.T) {

initialCycle := true
highestSeenHeader := chain.TopBlock.NumberU64()
if err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil {
if _, err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil {
t.Fatal(err)
}
}
Expand Down
4 changes: 2 additions & 2 deletions core/gen_genesis.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion core/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type Genesis struct {
Number uint64 `json:"number"`
GasUsed uint64 `json:"gasUsed"`
ParentHash common.Hash `json:"parentHash"`
BaseFee *big.Int `json:"baseFee"`
BaseFee *big.Int `json:"baseFeePerGas"`
}

// GenesisAlloc specifies the initial state that is part of the genesis block.
Expand Down
6 changes: 2 additions & 4 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ type Ethereum struct {
// When we receive something here, it means that the beacon chain transitioned
// to proof-of-stake so we start reverse syncing from the header
reverseDownloadCh chan privateapi.PayloadMessage
statusCh chan privateapi.ExecutionStatus
waitingForBeaconChain uint32 // atomic boolean flag
}

Expand Down Expand Up @@ -391,7 +390,6 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
backend.pendingBlocks = miner.PendingResultCh
backend.minedBlocks = miner.MiningResultCh
backend.reverseDownloadCh = make(chan privateapi.PayloadMessage)
backend.statusCh = make(chan privateapi.ExecutionStatus, 1)

// proof-of-work mining
mining := stagedsync.New(
Expand Down Expand Up @@ -432,7 +430,7 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
atomic.StoreUint32(&backend.waitingForBeaconChain, 0)
// Initialize ethbackend
ethBackendRPC := privateapi.NewEthBackendServer(ctx, backend, backend.chainDB, backend.notifications.Events,
blockReader, chainConfig, backend.reverseDownloadCh, backend.statusCh, &backend.waitingForBeaconChain,
blockReader, chainConfig, backend.reverseDownloadCh, backend.sentryControlServer.Hd.ExecutionStatusCh, &backend.waitingForBeaconChain,
backend.sentryControlServer.Hd.SkipCycleHack, assembleBlockPOS, config.Miner.EnabledPOS)
miningRPC = privateapi.NewMiningServer(ctx, backend, ethashApi)
// If we enabled the proposer flag we initiates the block proposing thread
Expand Down Expand Up @@ -509,7 +507,7 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
backend.stagedSync, err = stages2.NewStagedSync(backend.sentryCtx, backend.logger, backend.chainDB,
stack.Config().P2P, *config, chainConfig.TerminalTotalDifficulty,
backend.sentryControlServer, tmpdir, backend.notifications.Accumulator,
backend.reverseDownloadCh, backend.statusCh, &backend.waitingForBeaconChain,
backend.reverseDownloadCh, &backend.waitingForBeaconChain,
backend.downloaderClient)
if err != nil {
return nil, err
Expand Down
28 changes: 12 additions & 16 deletions eth/stagedsync/stage_headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
type HeadersCfg struct {
db kv.RwDB
hd *headerdownload.HeaderDownload
statusCh chan privateapi.ExecutionStatus
chainConfig params.ChainConfig
headerReqSend func(context.Context, *headerdownload.HeaderRequest) (enode.ID, bool)
announceNewHashes func(context.Context, []headerdownload.Announce)
Expand All @@ -54,7 +53,6 @@ type HeadersCfg struct {
func StageHeadersCfg(
db kv.RwDB,
headerDownload *headerdownload.HeaderDownload,
statusCh chan privateapi.ExecutionStatus,
chainConfig params.ChainConfig,
headerReqSend func(context.Context, *headerdownload.HeaderRequest) (enode.ID, bool),
announceNewHashes func(context.Context, []headerdownload.Announce),
Expand All @@ -71,7 +69,6 @@ func StageHeadersCfg(
return HeadersCfg{
db: db,
hd: headerDownload,
statusCh: statusCh,
chainConfig: chainConfig,
headerReqSend: headerReqSend,
announceNewHashes: announceNewHashes,
Expand Down Expand Up @@ -148,21 +145,25 @@ func HeadersPOS(
}

atomic.StoreUint32(cfg.waitingPosHeaders, 0)
header := payloadMessage.Header

cfg.hd.ClearPendingExecutionStatus()

header := payloadMessage.Header
headerNumber := header.Number.Uint64()
headerHash := header.Hash()

cfg.hd.UpdateTopSeenHeightPoS(headerNumber)

existingHash, err := rawdb.ReadCanonicalHash(tx, headerNumber)
if err != nil {
cfg.statusCh <- privateapi.ExecutionStatus{Error: err}
cfg.hd.ExecutionStatusCh <- privateapi.ExecutionStatus{Error: err}
return err
}

// TODO(yperbasis): handle re-orgs properly
if s.BlockNumber >= headerNumber && headerHash != existingHash {
u.UnwindTo(headerNumber-1, common.Hash{})
cfg.statusCh <- privateapi.ExecutionStatus{Status: privateapi.Syncing}
cfg.hd.ExecutionStatusCh <- privateapi.ExecutionStatus{Status: privateapi.Syncing}
return nil
}
// Set chain header reader right
Expand All @@ -177,25 +178,20 @@ func HeadersPOS(
// If we have the parent then we can move on with the stagedsync
parent, err := rawdb.ReadHeaderByHash(tx, header.ParentHash)
if err != nil {
cfg.statusCh <- privateapi.ExecutionStatus{Error: err}
cfg.hd.ExecutionStatusCh <- privateapi.ExecutionStatus{Error: err}
return err
}
if parent != nil && parent.Hash() == header.ParentHash {
if parent != nil {
if err := cfg.hd.VerifyHeader(header); err != nil {
log.Warn("Verification failed for header", "hash", headerHash, "height", headerNumber, "error", err)
cfg.statusCh <- privateapi.ExecutionStatus{
cfg.hd.ExecutionStatusCh <- privateapi.ExecutionStatus{
Status: privateapi.Invalid,
LatestValidHash: header.ParentHash,
}
return nil
}

// For the sake of simplicity we can just assume it will be valid for now.
// TODO(yperbasis): move to execution stage
cfg.statusCh <- privateapi.ExecutionStatus{
Status: privateapi.Valid,
LatestValidHash: headerHash,
}
cfg.hd.SetPendingExecutionStatus(headerHash)

if err := headerInserter.FeedHeaderPoS(tx, header, headerHash); err != nil {
return err
Expand All @@ -222,7 +218,7 @@ func HeadersPOS(
}

// If we don't have the right parent, download the missing ancestors
cfg.statusCh <- privateapi.ExecutionStatus{Status: privateapi.Syncing}
cfg.hd.ExecutionStatusCh <- privateapi.ExecutionStatus{Status: privateapi.Syncing}

cfg.hd.SetPOSSync(true)
if err = cfg.hd.ReadProgressFromDb(tx); err != nil {
Expand Down
5 changes: 1 addition & 4 deletions ethdb/privateapi/ethbackend.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,7 @@ func (s *EthBackendServer) EngineExecutePayloadV1(ctx context.Context, req *type
// We are still syncing a commissioned payload
return &remote.EngineExecutePayloadReply{Status: string(Syncing)}, nil
}
// Let's check if we have parent hash, if we have it we can process the payload right now.
// If not, we need to commission it and reverse-download the chain.

var baseFee *big.Int
eip1559 := false

Expand Down Expand Up @@ -266,12 +265,10 @@ func (s *EthBackendServer) EngineExecutePayloadV1(ctx context.Context, req *type
ReceiptHash: gointerfaces.ConvertH256ToHash(req.ReceiptRoot),
TxHash: types.DeriveSha(types.RawTransactions(req.Transactions)),
}
// Our execution layer has some problems so we return invalid
if header.Hash() != blockHash {
return nil, fmt.Errorf("invalid hash for payload. got: %s, wanted: %s", common.Bytes2Hex(blockHash[:]), common.Bytes2Hex(header.Hash().Bytes()))
}
// Send the block over

s.reverseDownloadCh <- PayloadMessage{
Header: &header,
Body: &types.RawBody{
Expand Down
3 changes: 2 additions & 1 deletion params/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ func (c *ChainConfig) String() string {
)
}

return fmt.Sprintf("{ChainID: %v Homestead: %v DAO: %v DAOSupport: %v EIP150: %v EIP155: %v EIP158: %v Byzantium: %v Constantinople: %v Petersburg: %v Istanbul: %v , Muir Glacier: %v, Berlin: %v, London: %v, Arrow Glacier: %v, Engine: %v}",
return fmt.Sprintf("{ChainID: %v Homestead: %v DAO: %v DAOSupport: %v EIP150: %v EIP155: %v EIP158: %v Byzantium: %v Constantinople: %v Petersburg: %v Istanbul: %v , Muir Glacier: %v, Berlin: %v, London: %v, Arrow Glacier: %v, Terminal Total Difficulty: %v, Engine: %v}",
c.ChainID,
c.HomesteadBlock,
c.DAOForkBlock,
Expand All @@ -583,6 +583,7 @@ func (c *ChainConfig) String() string {
c.BerlinBlock,
c.LondonBlock,
c.ArrowGlacierBlock,
c.TerminalTotalDifficulty,
engine,
)
}
Expand Down
45 changes: 37 additions & 8 deletions turbo/stages/headerdownload/header_algos.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/consensus"
"github.com/ledgerwatch/erigon/consensus/serenity"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
Expand Down Expand Up @@ -596,10 +595,10 @@ func (hd *HeaderDownload) UpdateRetryTime(req *HeaderRequest, currentTime, timeo
func (hd *HeaderDownload) RequestSkeleton() *HeaderRequest {
hd.lock.RLock()
defer hd.lock.RUnlock()
log.Trace("Request skeleton", "anchors", len(hd.anchors), "top seen height", hd.topSeenHeight, "highestInDb", hd.highestInDb)
log.Trace("Request skeleton", "anchors", len(hd.anchors), "top seen height", hd.topSeenHeightPoW, "highestInDb", hd.highestInDb)
stride := uint64(8 * 192)
strideHeight := hd.highestInDb + stride
lowestAnchorHeight := hd.topSeenHeight + 1 // Inclusive upper bound
lowestAnchorHeight := hd.topSeenHeightPoW + 1 // Inclusive upper bound
if lowestAnchorHeight <= strideHeight {
return nil
}
Expand Down Expand Up @@ -676,7 +675,7 @@ func (hd *HeaderDownload) InsertHeaders(hf FeedHeaderFunc, terminalTotalDifficul
}
if td != nil {
// Check if transition to proof-of-stake happened and stop forward syncing
if terminalTotalDifficulty != nil && (link.header.Difficulty.Cmp(serenity.SerenityDifficulty) == 0 || td.Cmp(terminalTotalDifficulty) >= 0) {
if terminalTotalDifficulty != nil && td.Cmp(terminalTotalDifficulty) >= 0 {
hd.highestInDb = link.blockHeight
return true, nil
}
Expand All @@ -701,7 +700,7 @@ func (hd *HeaderDownload) InsertHeaders(hf FeedHeaderFunc, terminalTotalDifficul
hd.insertList = append(hd.insertList, linksInFuture...)
linksInFuture = nil //nolint
}
return hd.highestInDb >= hd.preverifiedHeight && hd.topSeenHeight > 0 && hd.highestInDb >= hd.topSeenHeight, nil
return hd.highestInDb >= hd.preverifiedHeight && hd.topSeenHeightPoW > 0 && hd.highestInDb >= hd.topSeenHeightPoW, nil
}

func (hd *HeaderDownload) SetExpectedHash(hash common.Hash) {
Expand Down Expand Up @@ -991,9 +990,9 @@ func (hd *HeaderDownload) ProcessSegment(segment ChainSegment, newBlock bool, pe
}
return
}
if highestNum > hd.topSeenHeight {
if highestNum > hd.topSeenHeightPoW {
if newBlock || hd.seenAnnounces.Seen(highest.Hash) {
hd.topSeenHeight = highestNum
hd.topSeenHeightPoW = highestNum
}
}

Expand Down Expand Up @@ -1037,7 +1036,19 @@ func (hd *HeaderDownload) ProcessSegment(segment ChainSegment, newBlock bool, pe
func (hd *HeaderDownload) TopSeenHeight() uint64 {
hd.lock.RLock()
defer hd.lock.RUnlock()
return hd.topSeenHeight
if hd.topSeenHeightPoW > hd.topSeenHeightPoS {
return hd.topSeenHeightPoW
} else {
return hd.topSeenHeightPoS
}
}

func (hd *HeaderDownload) UpdateTopSeenHeightPoS(blockHeight uint64) {
hd.lock.Lock()
defer hd.lock.Unlock()
if blockHeight > hd.topSeenHeightPoS {
hd.topSeenHeightPoS = blockHeight
}
}

func (hd *HeaderDownload) SetHeaderReader(headerReader consensus.ChainHeaderReader) {
Expand Down Expand Up @@ -1106,6 +1117,24 @@ func (hd *HeaderDownload) Fetching() bool {
return hd.fetching
}

func (hd *HeaderDownload) GetPendingExecutionStatus() common.Hash {
hd.lock.RLock()
defer hd.lock.RUnlock()
return hd.pendingExecutionStatus
}

func (hd *HeaderDownload) SetPendingExecutionStatus(header common.Hash) {
hd.lock.Lock()
defer hd.lock.Unlock()
hd.pendingExecutionStatus = header
}

func (hd *HeaderDownload) ClearPendingExecutionStatus() {
hd.lock.Lock()
defer hd.lock.Unlock()
hd.pendingExecutionStatus = common.Hash{}
}

func (hd *HeaderDownload) AddMinedHeader(header *types.Header) error {
buf := bytes.NewBuffer(nil)
if err := header.EncodeRLP(buf); err != nil {
Expand Down
21 changes: 13 additions & 8 deletions turbo/stages/headerdownload/header_data_struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/consensus"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/ethdb/privateapi"
"github.com/ledgerwatch/erigon/p2p/enode"
"github.com/ledgerwatch/erigon/rlp"
)
Expand Down Expand Up @@ -217,15 +218,18 @@ type HeaderDownload struct {
persistedLinkLimit int // Maximum allowed number of persisted links
anchorLimit int // Maximum allowed number of anchors
highestInDb uint64 // Height of the highest block header in the database
topSeenHeight uint64
requestChaining bool // Whether the downloader is allowed to issue more requests when previous responses created or moved an anchor
fetching bool // Set when the stage that is actively fetching the headers is in progress
requestChaining bool // Whether the downloader is allowed to issue more requests when previous responses created or moved an anchor
fetching bool // Set when the stage that is actively fetching the headers is in progress
topSeenHeightPoW uint64
// proof-of-stake
lastProcessedPayload uint64 // The last header number inserted when processing the chain backwards
expectedHash common.Hash // Parenthash of the last header inserted, we keep it so that we do not read it from database over and over
synced bool // if we found a canonical hash during backward sync, in this case our sync process is done
posSync bool // True if the chain is syncing backwards or not
headersCollector *etl.Collector // ETL collector for headers
topSeenHeightPoS uint64
lastProcessedPayload uint64 // The last header number inserted when processing the chain backwards
expectedHash common.Hash // Parenthash of the last header inserted, we keep it so that we do not read it from database over and over
synced bool // if we found a canonical hash during backward sync, in this case our sync process is done
posSync bool // True if the chain is syncing backwards or not
headersCollector *etl.Collector // ETL collector for headers
ExecutionStatusCh chan privateapi.ExecutionStatus // Channel to report payload execution status (engine_executePayloadV1 response)
pendingExecutionStatus common.Hash // Header whose status we still should send to the ExecutionStatusCh
}

// HeaderRecord encapsulates two forms of the same header - raw RLP encoding (to avoid duplicated decodings and encodings), and parsed value types.Header
Expand Down Expand Up @@ -255,6 +259,7 @@ func NewHeaderDownload(
seenAnnounces: NewSeenAnnounces(),
DeliveryNotify: make(chan struct{}, 1),
SkipCycleHack: make(chan struct{}),
ExecutionStatusCh: make(chan privateapi.ExecutionStatus, 1),
}
heap.Init(hd.persistedLinkQueue)
heap.Init(hd.linkQueue)
Expand Down
3 changes: 1 addition & 2 deletions turbo/stages/mock_sentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,6 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey
stagedsync.DefaultStages(mock.Ctx, prune, stagedsync.StageHeadersCfg(
mock.DB,
mock.downloader.Hd,
make(chan privateapi.ExecutionStatus),
*mock.ChainConfig,
sendHeaderRequest,
propagateNewBlockHashes,
Expand Down Expand Up @@ -452,7 +451,7 @@ func (ms *MockSentry) InsertChain(chain *core.ChainPack) error {
if ms.TxPool != nil {
ms.ReceiveWg.Add(1)
}
if err := StageLoopStep(ms.Ctx, ms.DB, ms.Sync, highestSeenHeader, ms.Notifications, initialCycle, ms.UpdateHead, nil); err != nil {
if _, err := StageLoopStep(ms.Ctx, ms.DB, ms.Sync, highestSeenHeader, ms.Notifications, initialCycle, ms.UpdateHead, nil); err != nil {
return err
}
if ms.TxPool != nil {
Expand Down
Loading