Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More Merge logging #3712

Merged
merged 6 commits into from
Mar 16, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/rpcdaemon/commands/engine_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,14 @@ func (e *EngineImpl) ForkchoiceUpdatedV1(ctx context.Context, forkChoiceState *F
// NewPayloadV1 processes new payloads (blocks) from the beacon chain.
// See https://github.com/ethereum/execution-apis/blob/main/src/engine/specification.md#engine_newpayloadv1
func (e *EngineImpl) NewPayloadV1(ctx context.Context, payload *ExecutionPayload) (map[string]interface{}, error) {
log.Info("Received NewPayload", "height", payload.BlockNumber, "hash", payload.BlockHash)
log.Info("Received NewPayload", "height", uint64(payload.BlockNumber), "hash", payload.BlockHash)

var baseFee *uint256.Int
if payload.BaseFeePerGas != nil {
var overflow bool
baseFee, overflow = uint256.FromBig((*big.Int)(payload.BaseFeePerGas))
if overflow {
log.Warn("NewPayload BaseFeePerGas overflow")
return nil, fmt.Errorf("invalid request")
}
}
Expand All @@ -158,6 +159,7 @@ func (e *EngineImpl) NewPayloadV1(ctx context.Context, payload *ExecutionPayload
Transactions: transactions,
})
if err != nil {
log.Warn("NewPayload", "err", err)
return nil, err
}

Expand Down
14 changes: 13 additions & 1 deletion eth/stagedsync/stage_headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ func handleForkChoice(

currentHeadHash := rawdb.ReadHeadHeaderHash(tx)
if currentHeadHash == headerHash { // no-op
log.Info(fmt.Sprintf("[%s] Fork choice no-op", s.LogPrefix()))
cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{
Status: remote.EngineStatus_VALID,
LatestValidHash: currentHeadHash,
Expand All @@ -255,12 +256,14 @@ func handleForkChoice(

header, err := rawdb.ReadHeaderByHash(tx, headerHash)
if err != nil {
log.Info(fmt.Sprintf("[%s] Fork choice err", s.LogPrefix()), "err", err)
cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{CriticalError: err}
return err
}

repliedWithSyncStatus := false
if header == nil {
log.Info(fmt.Sprintf("[%s] Fork choice missing header", s.LogPrefix()))
cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{Status: remote.EngineStatus_SYNCING}
repliedWithSyncStatus = true

Expand Down Expand Up @@ -303,7 +306,9 @@ func handleForkChoice(
}
}

log.Info(fmt.Sprintf("[%s] Fork choice beginning unwind", s.LogPrefix()))
u.UnwindTo(forkingPoint, common.Hash{})
log.Info(fmt.Sprintf("[%s] Fork choice unwind finished", s.LogPrefix()))

cfg.hd.SetPendingHeader(headerHash, headerNumber)

Expand All @@ -328,12 +333,13 @@ func handleNewPayload(

existingCanonicalHash, err := rawdb.ReadCanonicalHash(tx, headerNumber)
if err != nil {
log.Info(fmt.Sprintf("[%s] New payload err", s.LogPrefix()), "err", err)
cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{CriticalError: err}
return err
}

if existingCanonicalHash != (common.Hash{}) && headerHash == existingCanonicalHash {
// previously received valid header
log.Info(fmt.Sprintf("[%s] New payload: previously received valid header", s.LogPrefix()))
cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{
Status: remote.EngineStatus_VALID,
LatestValidHash: headerHash,
Expand All @@ -356,11 +362,14 @@ func handleNewPayload(
}

if parent != nil {
log.Info(fmt.Sprintf("[%s] New payload begin verification", s.LogPrefix()))
success, err := verifyAndSaveNewPoSHeader(s, tx, cfg, header, headerInserter)
log.Info(fmt.Sprintf("[%s] New payload verification ended", s.LogPrefix()), "success", success, "err", err)
if err != nil || !success {
return err
}
} else {
log.Info(fmt.Sprintf("[%s] New payload missing parent", s.LogPrefix()))
cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{Status: remote.EngineStatus_SYNCING}

hashToDownload := header.ParentHash
Expand Down Expand Up @@ -519,6 +528,9 @@ func downloadMissingPoSHeaders(
// Cleanup timer
timer.Stop()
}

log.Info(fmt.Sprintf("[%s] Downloading PoS headers finished", s.LogPrefix()))

// If the user stopped it, we don't update anything
if !cfg.hd.Synced() {
return
Expand Down
23 changes: 21 additions & 2 deletions ethdb/privateapi/ethbackend.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,10 +257,13 @@ func (s *EthBackendServer) stageLoopIsBusy() bool {

// EngineNewPayloadV1, validates and possibly executes payload
func (s *EthBackendServer) EngineNewPayloadV1(ctx context.Context, req *types2.ExecutionPayload) (*remote.EnginePayloadStatus, error) {
log.Info("[NewPayload] acquiring lock")
s.syncCond.L.Lock()
defer s.syncCond.L.Unlock()
log.Info("[NewPayload] lock acquired")

if s.config.TerminalTotalDifficulty == nil {
log.Error("[NewPayload] not a proof-of-stake chain")
return nil, fmt.Errorf("not a proof-of-stake chain")
}

Expand Down Expand Up @@ -294,6 +297,7 @@ func (s *EthBackendServer) EngineNewPayloadV1(ctx context.Context, req *types2.E

blockHash := gointerfaces.ConvertH256ToHash(req.BlockHash)
if header.Hash() != blockHash {
log.Error("[NewPayload] invalid block hash", "stated", common.Hash(blockHash), "actual", header.Hash())
return &remote.EnginePayloadStatus{Status: remote.EngineStatus_INVALID_BLOCK_HASH}, nil
}

Expand All @@ -308,7 +312,7 @@ func (s *EthBackendServer) EngineNewPayloadV1(ctx context.Context, req *types2.E
return &remote.EnginePayloadStatus{Status: remote.EngineStatus_SYNCING}, nil
}

// Send the block over
log.Info("[NewPayload] sending block", "height", header.Number, "hash", blockHash)
s.newPayloadCh <- PayloadMessage{
Header: &header,
Body: &types.RawBody{
Expand All @@ -318,6 +322,8 @@ func (s *EthBackendServer) EngineNewPayloadV1(ctx context.Context, req *types2.E
}

payloadStatus := <-s.statusCh
log.Info("[NewPayload] got reply", "payloadStatus", payloadStatus)

if payloadStatus.CriticalError != nil {
return nil, payloadStatus.CriticalError
}
Expand All @@ -329,8 +335,10 @@ func (s *EthBackendServer) EngineNewPayloadV1(ctx context.Context, req *types2.E
func (s *EthBackendServer) EngineGetPayloadV1(ctx context.Context, req *remote.EngineGetPayloadRequest) (*types2.ExecutionPayload, error) {
// TODO(yperbasis): getPayload should stop block assembly if that's currently in fly

log.Info("[GetPayload] acquiring lock")
s.syncCond.L.Lock()
defer s.syncCond.L.Unlock()
log.Info("[GetPayload] lock acquired")

if !s.proposing {
return nil, fmt.Errorf("execution layer not running as a proposer. enable proposer by taking out the --proposer.disable flag on startup")
Expand Down Expand Up @@ -383,8 +391,10 @@ func (s *EthBackendServer) EngineGetPayloadV1(ctx context.Context, req *remote.E

// EngineForkChoiceUpdatedV1, either states new block head or request the assembling of a new block
func (s *EthBackendServer) EngineForkChoiceUpdatedV1(ctx context.Context, req *remote.EngineForkChoiceUpdatedRequest) (*remote.EngineForkChoiceUpdatedReply, error) {
log.Info("[ForkChoiceUpdated] acquiring lock")
s.syncCond.L.Lock()
defer s.syncCond.L.Unlock()
log.Info("[ForkChoiceUpdated] lock acquired")

if s.config.TerminalTotalDifficulty == nil {
return nil, fmt.Errorf("not a proof-of-stake chain")
Expand All @@ -407,9 +417,13 @@ func (s *EthBackendServer) EngineForkChoiceUpdatedV1(ctx context.Context, req *r
SafeBlockHash: gointerfaces.ConvertH256ToHash(req.ForkchoiceState.SafeBlockHash),
FinalizedBlockHash: gointerfaces.ConvertH256ToHash(req.ForkchoiceState.FinalizedBlockHash),
}

log.Info("[ForkChoiceUpdated] sending forkChoiceMessage", "head", forkChoiceMessage.HeadBlockHash)
s.forkChoiceCh <- forkChoiceMessage

payloadStatus := <-s.statusCh
log.Info("[ForkChoiceUpdated] got reply", "payloadStatus", payloadStatus)

if payloadStatus.CriticalError != nil {
return nil, payloadStatus.CriticalError
}
Expand Down Expand Up @@ -474,8 +488,10 @@ func (s *EthBackendServer) evictOldPendingPayloads() {
func (s *EthBackendServer) StartProposer() {

go func() {
log.Info("[Proposer] acquiring lock")
s.syncCond.L.Lock()
defer s.syncCond.L.Unlock()
log.Info("[Proposer] lock acquired")

for {
var blockToBuild *types.Block
Expand Down Expand Up @@ -503,8 +519,9 @@ func (s *EthBackendServer) StartProposer() {
}
}

// Wait until we have to process new payloads
log.Info("[Proposer] Wait until we have to process new payloads")
s.syncCond.Wait()
log.Info("[Proposer] Wait finished")
}

// Tell the stage headers to leave space for the write transaction for mining stages
Expand Down Expand Up @@ -535,8 +552,10 @@ func (s *EthBackendServer) StartProposer() {
}

func (s *EthBackendServer) StopProposer() {
log.Info("[StopProposer] acquiring lock")
s.syncCond.L.Lock()
defer s.syncCond.L.Unlock()
log.Info("[StopProposer] lock acquired")

s.shutdown = true
s.syncCond.Broadcast()
Expand Down