Skip to content

Commit

Permalink
New Engine API semantics (Kiln v1) (#3340)
Browse files Browse the repository at this point in the history
* Disable PoS sync temporarily

* Resore PoS sync

* Handle Ctrl^C in HeadersPOS

* Consistent naming

* Extract verifyAndSavePoSHeader & downloadMissingPoSHeaders

* Preparation for EngineForkChoiceUpdated re-orgs

* Extract ForkingPoint

* Comments

* New proto for Engine API

* EngineExecutePayload -> EngineNewPayload

* Return INVALID_BLOCK_HASH if block hash is invalid

* Return EngineStatus_ACCEPTED for side chain blocks

* Update erigon-lib (PR 268)

* Fix payload2Hash

* reverseDownloadCh -> beaconPayloadCh

* Update erigon-lib

* Engine API updated

* ExecutionStatus -> PayloadStatus

* Introduce forkChoiceCh

* Mock ForkChoiceMessage/PayloadStatus

* Add ValidationError to PayloadStatus

* Small clean-ups

* Add INVALID_TERMINAL_BLOCK to EngineStatus

* Add a comment

* Extract handleNewPayload & handleForkChoice

* Partially implement handleForkChoice

* Update erigon-lib

* short vs long re-org

* Move header insertion out of downloadMissingPoSHeaders

* Update erigon-lib

* Refactor ProcessSegmentPOS

* Fix imports

* Wire downloadMissingPoSHeaders into handleForkChoice

* evictOldPendingPayloads

* nolint:unused for assertSegment

* Try nolint instead of nolint:unused

* Small comment improvements

* HeadHeaderHash/StageProgress in handleForkChoice

* TODO: bodyDownloader.AddToPrefetch(block)

* Review call suggestions

* Don't use ReadHeaderNumber in ProcessSegmentPOS

* Don't leave ethbackend waiting when server is stopping

* Update erigon-lib

* More explicit signature of downloadMissingPoSHeaders
  • Loading branch information
yperbasis authored Feb 9, 2022
1 parent ab62377 commit 8466cb1
Show file tree
Hide file tree
Showing 14 changed files with 592 additions and 365 deletions.
2 changes: 1 addition & 1 deletion cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -1144,7 +1144,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig)

sync, err := stages2.NewStagedSync(context.Background(), logger, db, p2p.Config{}, cfg,
chainConfig.TerminalTotalDifficulty, sentryControlServer, tmpdir,
nil, nil, nil, nil,
nil, nil, nil, nil, nil,
)
if err != nil {
panic(err)
Expand Down
84 changes: 40 additions & 44 deletions cmd/rpcdaemon/commands/engine_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type PayloadAttributes struct {
// EngineAPI Beacon chain communication endpoint
type EngineAPI interface {
ForkchoiceUpdatedV1(ctx context.Context, forkChoiceState *ForkChoiceState, payloadAttributes *PayloadAttributes) (map[string]interface{}, error)
ExecutePayloadV1(context.Context, *ExecutionPayload) (map[string]interface{}, error)
NewPayloadV1(context.Context, *ExecutionPayload) (map[string]interface{}, error)
GetPayloadV1(ctx context.Context, payloadID hexutil.Bytes) (*ExecutionPayload, error)
GetPayloadBodiesV1(ctx context.Context, blockHashes []rpc.BlockNumberOrHash) (map[common.Hash]ExecutionPayload, error)
}
Expand All @@ -68,51 +68,56 @@ type EngineImpl struct {
api services.ApiBackend
}

// ForkchoiceUpdatedV1 is executed only if we are running a beacon validator,
// in erigon we do not use this for reorgs like go-ethereum does since we can do that in engine_executePayloadV1
// if the payloadAttributes is different than null, we return
func convertPayloadStatus(x *remote.EnginePayloadStatus) map[string]interface{} {
json := map[string]interface{}{
"status": x.Status.String(),
}
if x.LatestValidHash != nil {
json["latestValidHash"] = gointerfaces.ConvertH256ToHash(x.LatestValidHash)
}
if x.ValidationError != "" {
json["validationError"] = x.ValidationError
}

return json
}

func (e *EngineImpl) ForkchoiceUpdatedV1(ctx context.Context, forkChoiceState *ForkChoiceState, payloadAttributes *PayloadAttributes) (map[string]interface{}, error) {
// Unwinds can be made within engine_excutePayloadV1 so we can return success regardless
if payloadAttributes == nil {
return map[string]interface{}{
"status": "SUCCESS",
}, nil
var prepareParameters *remote.EnginePayloadAttributes
if payloadAttributes != nil {
prepareParameters = &remote.EnginePayloadAttributes{
Timestamp: uint64(payloadAttributes.Timestamp),
Random: gointerfaces.ConvertHashToH256(payloadAttributes.Random),
SuggestedFeeRecipient: gointerfaces.ConvertAddressToH160(payloadAttributes.SuggestedFeeRecipient),
}
}
// Request for assembling payload
reply, err := e.api.EngineForkchoiceUpdateV1(ctx, &remote.EngineForkChoiceUpdatedRequest{
Prepare: &remote.EnginePreparePayload{
Timestamp: uint64(payloadAttributes.Timestamp),
Random: gointerfaces.ConvertHashToH256(payloadAttributes.Random),
FeeRecipient: gointerfaces.ConvertAddressToH160(payloadAttributes.SuggestedFeeRecipient),
},
Forkchoice: &remote.EngineForkChoiceUpdated{
reply, err := e.api.EngineForkchoiceUpdatedV1(ctx, &remote.EngineForkChoiceUpdatedRequest{
ForkchoiceState: &remote.EngineForkChoiceState{
HeadBlockHash: gointerfaces.ConvertHashToH256(forkChoiceState.HeadHash),
FinalizedBlockHash: gointerfaces.ConvertHashToH256(forkChoiceState.FinalizedBlockHash),
SafeBlockHash: gointerfaces.ConvertHashToH256(forkChoiceState.SafeBlockHash),
},
PayloadAttributes: prepareParameters,
})
if err != nil {
return nil, err
}
// Process reply
if reply.Status == "SYNCING" {
return map[string]interface{}{
"status": reply.Status,
}, nil

json := map[string]interface{}{
"payloadStatus": convertPayloadStatus(reply.PayloadStatus),
}
if reply.PayloadId != 0 {
encodedPayloadId := make([]byte, 8)
binary.BigEndian.PutUint64(encodedPayloadId, reply.PayloadId)
json["payloadId"] = hexutil.Bytes(encodedPayloadId)
}
encodedPayloadId := make([]byte, 8)
binary.BigEndian.PutUint64(encodedPayloadId, reply.PayloadId)
// Answer
return map[string]interface{}{
"status": reply.Status,
"payloadId": hexutil.Bytes(encodedPayloadId),
}, nil

return json, nil
}

// ExecutePayloadV1 takes a block from the beacon chain and do either two of the following things
// - Stageloop the block just received if we have the payload's parent hash already
// - Start the reverse sync process otherwise, and return "Syncing"
func (e *EngineImpl) ExecutePayloadV1(ctx context.Context, payload *ExecutionPayload) (map[string]interface{}, error) {
// NewPayloadV1 processes new payloads (blocks) from the beacon chain.
// See https://github.com/ethereum/execution-apis/blob/main/src/engine/specification.md#engine_newpayloadv1
func (e *EngineImpl) NewPayloadV1(ctx context.Context, payload *ExecutionPayload) (map[string]interface{}, error) {
var baseFee *uint256.Int
if payload.BaseFeePerGas != nil {
var overflow bool
Expand All @@ -128,7 +133,7 @@ func (e *EngineImpl) ExecutePayloadV1(ctx context.Context, payload *ExecutionPay
for i, transaction := range payload.Transactions {
transactions[i] = ([]byte)(transaction)
}
res, err := e.api.EngineExecutePayloadV1(ctx, &types2.ExecutionPayload{
res, err := e.api.EngineNewPayloadV1(ctx, &types2.ExecutionPayload{
ParentHash: gointerfaces.ConvertHashToH256(payload.ParentHash),
Coinbase: gointerfaces.ConvertAddressToH160(payload.FeeRecipient),
StateRoot: gointerfaces.ConvertHashToH256(payload.StateRoot),
Expand All @@ -148,16 +153,7 @@ func (e *EngineImpl) ExecutePayloadV1(ctx context.Context, payload *ExecutionPay
return nil, err
}

if res.LatestValidHash != nil {
var latestValidHash common.Hash = gointerfaces.ConvertH256ToHash(res.LatestValidHash)
return map[string]interface{}{
"status": res.Status,
"latestValidHash": latestValidHash,
}, nil
}
return map[string]interface{}{
"status": res.Status,
}, nil
return convertPayloadStatus(res), nil
}

func (e *EngineImpl) GetPayloadV1(ctx context.Context, payloadID hexutil.Bytes) (*ExecutionPayload, error) {
Expand Down
5 changes: 2 additions & 3 deletions cmd/rpcdaemon/rpcdaemontest/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ import (
"net"
"testing"

"github.com/ledgerwatch/erigon-lib/gointerfaces/starknet"

"github.com/holiman/uint256"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
"github.com/ledgerwatch/erigon-lib/gointerfaces/starknet"
"github.com/ledgerwatch/erigon-lib/gointerfaces/txpool"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/accounts/abi/bind"
Expand Down Expand Up @@ -223,7 +222,7 @@ func CreateTestGrpcConn(t *testing.T, m *stages.MockSentry) (context.Context, *g
ethashApi := apis[1].Service.(*ethash.API)
server := grpc.NewServer()

remote.RegisterETHBACKENDServer(server, privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, snapshotsync.NewBlockReader(), nil, nil, nil, nil, nil, nil, false))
remote.RegisterETHBACKENDServer(server, privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, snapshotsync.NewBlockReader(), nil, nil, nil, nil, nil, nil, nil, false))
txpool.RegisterTxpoolServer(server, m.TxPoolGrpcServer)
txpool.RegisterMiningServer(server, privateapi.NewMiningServer(ctx, &IsMiningMock{}, ethashApi))
starknet.RegisterCAIROVMServer(server, &starknet.UnimplementedCAIROVMServer{})
Expand Down
10 changes: 5 additions & 5 deletions cmd/rpcdaemon/services/eth_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ type ApiBackend interface {
ClientVersion(ctx context.Context) (string, error)
Subscribe(ctx context.Context, cb func(*remote.SubscribeReply)) error
BlockWithSenders(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64) (block *types.Block, senders []common.Address, err error)
EngineExecutePayloadV1(ctx context.Context, payload *types2.ExecutionPayload) (*remote.EngineExecutePayloadReply, error)
EngineForkchoiceUpdateV1(ctx context.Context, request *remote.EngineForkChoiceUpdatedRequest) (*remote.EngineForkChoiceUpdatedReply, error)
EngineNewPayloadV1(ctx context.Context, payload *types2.ExecutionPayload) (*remote.EnginePayloadStatus, error)
EngineForkchoiceUpdatedV1(ctx context.Context, request *remote.EngineForkChoiceUpdatedRequest) (*remote.EngineForkChoiceUpdatedReply, error)
EngineGetPayloadV1(ctx context.Context, payloadId uint64) (*types2.ExecutionPayload, error)
NodeInfo(ctx context.Context, limit uint32) ([]p2p.NodeInfo, error)
}
Expand Down Expand Up @@ -164,11 +164,11 @@ func (back *RemoteBackend) BlockWithSenders(ctx context.Context, tx kv.Getter, h
return back.blockReader.BlockWithSenders(ctx, tx, hash, blockHeight)
}

func (back *RemoteBackend) EngineExecutePayloadV1(ctx context.Context, payload *types2.ExecutionPayload) (res *remote.EngineExecutePayloadReply, err error) {
return back.remoteEthBackend.EngineExecutePayloadV1(ctx, payload)
func (back *RemoteBackend) EngineNewPayloadV1(ctx context.Context, payload *types2.ExecutionPayload) (res *remote.EnginePayloadStatus, err error) {
return back.remoteEthBackend.EngineNewPayloadV1(ctx, payload)
}

func (back *RemoteBackend) EngineForkchoiceUpdateV1(ctx context.Context, request *remote.EngineForkChoiceUpdatedRequest) (*remote.EngineForkChoiceUpdatedReply, error) {
func (back *RemoteBackend) EngineForkchoiceUpdatedV1(ctx context.Context, request *remote.EngineForkChoiceUpdatedRequest) (*remote.EngineForkChoiceUpdatedReply, error) {
return back.remoteEthBackend.EngineForkChoiceUpdatedV1(ctx, request)
}

Expand Down
14 changes: 8 additions & 6 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,9 @@ type Ethereum struct {
txPool2GrpcServer *txpool2.GrpcServer
notifyMiningAboutNewTxs chan struct{}
// When we receive something here, it means that the beacon chain transitioned
// to proof-of-stake so we start reverse syncing from the header
reverseDownloadCh chan privateapi.PayloadMessage
// to proof-of-stake so we start reverse syncing from the block
newPayloadCh chan privateapi.PayloadMessage
forkChoiceCh chan privateapi.ForkChoiceMessage
waitingForBeaconChain uint32 // atomic boolean flag

downloadProtocols *downloader.Protocols
Expand Down Expand Up @@ -372,7 +373,8 @@ func New(stack *node.Node, config *ethconfig.Config, txpoolCfg txpool2.Config, l
miner := stagedsync.NewMiningState(&config.Miner)
backend.pendingBlocks = miner.PendingResultCh
backend.minedBlocks = miner.MiningResultCh
backend.reverseDownloadCh = make(chan privateapi.PayloadMessage)
backend.newPayloadCh = make(chan privateapi.PayloadMessage)
backend.forkChoiceCh = make(chan privateapi.ForkChoiceMessage)

// proof-of-work mining
mining := stagedsync.New(
Expand Down Expand Up @@ -413,8 +415,8 @@ func New(stack *node.Node, config *ethconfig.Config, txpoolCfg txpool2.Config, l
atomic.StoreUint32(&backend.waitingForBeaconChain, 0)
// Initialize ethbackend
ethBackendRPC := privateapi.NewEthBackendServer(ctx, backend, backend.chainDB, backend.notifications.Events,
blockReader, chainConfig, backend.reverseDownloadCh, backend.sentryControlServer.Hd.ExecutionStatusCh, &backend.waitingForBeaconChain,
backend.sentryControlServer.Hd.SkipCycleHack, assembleBlockPOS, config.Miner.EnabledPOS)
blockReader, chainConfig, backend.newPayloadCh, backend.forkChoiceCh, backend.sentryControlServer.Hd.PayloadStatusCh,
&backend.waitingForBeaconChain, backend.sentryControlServer.Hd.SkipCycleHack, assembleBlockPOS, config.Miner.EnabledPOS)
miningRPC = privateapi.NewMiningServer(ctx, backend, ethashApi)
// If we enabled the proposer flag we initiates the block proposing thread
if config.Miner.EnabledPOS && chainConfig.TerminalTotalDifficulty != nil {
Expand Down Expand Up @@ -490,7 +492,7 @@ func New(stack *node.Node, config *ethconfig.Config, txpoolCfg txpool2.Config, l
backend.stagedSync, err = stages2.NewStagedSync(backend.sentryCtx, backend.logger, backend.chainDB,
stack.Config().P2P, *config, chainConfig.TerminalTotalDifficulty,
backend.sentryControlServer, tmpdir, backend.notifications.Accumulator,
backend.reverseDownloadCh, &backend.waitingForBeaconChain,
backend.newPayloadCh, backend.forkChoiceCh, &backend.waitingForBeaconChain,
backend.downloaderClient)
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit 8466cb1

Please sign in to comment.