Skip to content

Commit

Permalink
Support for syncing on mergemock (#3174)
Browse files Browse the repository at this point in the history
* block proposing

* standard finalized

* mergemock execution

* private chain can now be ran yay

* perfectioned

* polished

* more polishing

* resize PR

* resize PR

* resize PR

* simplifications

* fixed tests

* better syncronous communication

* better syncronous once again

* clean

* Re-enabled headers verification

* mining finish

* mining finish

* cleaned hash computation

* fixed evm bug

* go.mod

* Update .gitignore

* Update .gitignore

* Update .gitignore

* removed new line from .gitignore

* added go.mod and go.sum

* feeRecipient into preset

* useExternal

* todo

* fixed comment for forkchoiceUpdateV1

* smaller

* smaller

* Revert changes to miner frequency

* Restore useExternalTx

* Fix headerLoadFunc

* do not reset payloadId

* rename

* extra is empty

Co-authored-by: yperbasis <[email protected]>
  • Loading branch information
Giulio2002 and yperbasis authored Dec 29, 2021
1 parent 2d9fe6c commit 864b744
Show file tree
Hide file tree
Showing 11 changed files with 178 additions and 79 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,4 @@ docker-compose.dev.yml
libmdbx/build/*
tests/testdata/*

go.work
go.work
68 changes: 47 additions & 21 deletions cmd/rpcdaemon/commands/engine_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package commands
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"math/big"

"github.com/holiman/uint256"
"github.com/ledgerwatch/erigon-lib/gointerfaces"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
types2 "github.com/ledgerwatch/erigon-lib/gointerfaces/types"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/cmd/rpcdaemon/services"
Expand Down Expand Up @@ -37,6 +39,13 @@ type ExecutionPayload struct {
Transactions []hexutil.Bytes `json:"transactions" gencodec:"required"`
}

// PayloadAttributes represent the attributes required to start assembling a payload
type ForkChoiceState struct {
HeadHash common.Hash `json:"headBlockHash" gencodec:"required"`
SafeBlockHash common.Hash `json:"safeBlockHash" gencodec:"required"`
FinalizedBlockHash common.Hash `json:"finalizedBlockHash" gencodec:"required"`
}

// PayloadAttributes represent the attributes required to start assembling a payload
type PayloadAttributes struct {
Timestamp hexutil.Uint64 `json:"timestamp" gencodec:"required"`
Expand All @@ -46,9 +55,9 @@ type PayloadAttributes struct {

// EngineAPI Beacon chain communication endpoint
type EngineAPI interface {
ForkchoiceUpdatedV1(context.Context, struct{}, *PayloadAttributes) (map[string]interface{}, error)
ForkchoiceUpdatedV1(ctx context.Context, forkChoiceState *ForkChoiceState, payloadAttributes *PayloadAttributes) (map[string]interface{}, error)
ExecutePayloadV1(context.Context, *ExecutionPayload) (map[string]interface{}, error)
GetPayloadV1(ctx context.Context, payloadID hexutil.Uint64) (*ExecutionPayload, error)
GetPayloadV1(ctx context.Context, payloadID hexutil.Bytes) (*ExecutionPayload, error)
GetPayloadBodiesV1(ctx context.Context, blockHashes []rpc.BlockNumberOrHash) (map[common.Hash]ExecutionPayload, error)
}

Expand All @@ -62,16 +71,42 @@ type EngineImpl struct {
// ForkchoiceUpdatedV1 is executed only if we are running a beacon validator,
// in erigon we do not use this for reorgs like go-ethereum does since we can do that in engine_executePayloadV1
// if the payloadAttributes is different than null, we return
func (e *EngineImpl) ForkchoiceUpdatedV1(_ context.Context, _ struct{}, payloadAttributes *PayloadAttributes) (map[string]interface{}, error) {
func (e *EngineImpl) ForkchoiceUpdatedV1(ctx context.Context, forkChoiceState *ForkChoiceState, payloadAttributes *PayloadAttributes) (map[string]interface{}, error) {
// Unwinds can be made within engine_excutePayloadV1 so we can return success regardless
if payloadAttributes == nil {
return map[string]interface{}{
"status": "SUCCESS",
"payloadId": nil,
"status": "SUCCESS",
}, nil
}
// Request for assembling payload
return nil, fmt.Errorf("invalid request")
reply, err := e.api.EngineForkchoiceUpdateV1(ctx, &remote.EngineForkChoiceUpdatedRequest{
Prepare: &remote.EnginePreparePayload{
Timestamp: uint64(payloadAttributes.Timestamp),
Random: gointerfaces.ConvertHashToH256(payloadAttributes.Random),
FeeRecipient: gointerfaces.ConvertAddressToH160(payloadAttributes.SuggestedFeeRecipient),
},
Forkchoice: &remote.EngineForkChoiceUpdated{
HeadBlockHash: gointerfaces.ConvertHashToH256(forkChoiceState.HeadHash),
FinalizedBlockHash: gointerfaces.ConvertHashToH256(forkChoiceState.FinalizedBlockHash),
SafeBlockHash: gointerfaces.ConvertHashToH256(forkChoiceState.SafeBlockHash),
},
})
if err != nil {
return nil, err
}
// Process reply
if reply.Status == "SYNCING" {
return map[string]interface{}{
"status": reply.Status,
}, nil
}
encodedPayloadId := make([]byte, 8)
binary.BigEndian.PutUint64(encodedPayloadId, reply.PayloadId)
// Answer
return map[string]interface{}{
"status": reply.Status,
"payloadId": hexutil.Bytes(encodedPayloadId),
}, nil
}

// ExecutePayloadV1 takes a block from the beacon chain and do either two of the following things
Expand All @@ -86,9 +121,6 @@ func (e *EngineImpl) ExecutePayloadV1(ctx context.Context, payload *ExecutionPay
return nil, fmt.Errorf("invalid request")
}
}
// Maximum length of extra is 32 bytes so we can use the hash datatype
extra := common.BytesToHash(payload.ExtraData)

log.Info("Received Payload from beacon-chain")

// Convert slice of hexutil.Bytes to a slice of slice of bytes
Expand All @@ -107,7 +139,7 @@ func (e *EngineImpl) ExecutePayloadV1(ctx context.Context, payload *ExecutionPay
GasLimit: (uint64)(payload.GasLimit),
GasUsed: (uint64)(payload.GasUsed),
Timestamp: (uint64)(payload.Timestamp),
ExtraData: gointerfaces.ConvertHashToH256(extra),
ExtraData: payload.ExtraData,
BaseFeePerGas: gointerfaces.ConvertUint256IntToH256(baseFee),
BlockHash: gointerfaces.ConvertHashToH256(payload.BlockHash),
Transactions: transactions,
Expand All @@ -120,21 +152,21 @@ func (e *EngineImpl) ExecutePayloadV1(ctx context.Context, payload *ExecutionPay
var latestValidHash common.Hash = gointerfaces.ConvertH256ToHash(res.LatestValidHash)
return map[string]interface{}{
"status": res.Status,
"latestValidHash": common.Bytes2Hex(latestValidHash.Bytes()),
"latestValidHash": latestValidHash,
}, nil
}
return map[string]interface{}{
"status": res.Status,
}, nil
}

func (e *EngineImpl) GetPayloadV1(ctx context.Context, payloadID hexutil.Uint64) (*ExecutionPayload, error) {
payload, err := e.api.EngineGetPayloadV1(ctx, (uint64)(payloadID))
func (e *EngineImpl) GetPayloadV1(ctx context.Context, payloadID hexutil.Bytes) (*ExecutionPayload, error) {
decodedPayloadId := binary.BigEndian.Uint64(payloadID)
payload, err := e.api.EngineGetPayloadV1(ctx, decodedPayloadId)
if err != nil {
return nil, err
}
var bloom types.Bloom = gointerfaces.ConvertH2048ToBloom(payload.LogsBloom)
var extra common.Hash = gointerfaces.ConvertH256ToHash(payload.ExtraData)

var baseFee *big.Int
if payload.BaseFeePerGas != nil {
Expand All @@ -157,7 +189,7 @@ func (e *EngineImpl) GetPayloadV1(ctx context.Context, payloadID hexutil.Uint64)
GasLimit: hexutil.Uint64(payload.GasLimit),
GasUsed: hexutil.Uint64(payload.GasUsed),
Timestamp: hexutil.Uint64(payload.Timestamp),
ExtraData: extra[:],
ExtraData: payload.ExtraData,
BaseFeePerGas: (*hexutil.Big)(baseFee),
BlockHash: gointerfaces.ConvertH256ToHash(payload.BlockHash),
Transactions: transactions,
Expand All @@ -176,20 +208,16 @@ func (e *EngineImpl) GetPayloadBodiesV1(ctx context.Context, blockHashes []rpc.B

for _, blockHash := range blockHashes {
hash := *blockHash.BlockHash

block, err := e.blockByHashWithSenders(tx, hash)
if err != nil {
return nil, err
}

if block == nil {
continue
}

var bloom types.Bloom = block.Bloom()

buf := bytes.NewBuffer(nil)

var encodedTransactions []hexutil.Bytes

for _, tx := range block.Transactions() {
Expand All @@ -199,7 +227,6 @@ func (e *EngineImpl) GetPayloadBodiesV1(ctx context.Context, blockHashes []rpc.B
if err != nil {
return nil, fmt.Errorf("broken tx rlp: %w", err)
}

encodedTransactions = append(encodedTransactions, common.CopyBytes(buf.Bytes()))
}

Expand All @@ -219,7 +246,6 @@ func (e *EngineImpl) GetPayloadBodiesV1(ctx context.Context, blockHashes []rpc.B
BlockHash: block.Hash(),
Transactions: encodedTransactions,
}

}
return blockHashToBody, nil
}
Expand Down
5 changes: 5 additions & 0 deletions cmd/rpcdaemon/services/eth_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type ApiBackend interface {
Subscribe(ctx context.Context, cb func(*remote.SubscribeReply)) error
BlockWithSenders(ctx context.Context, tx kv.Tx, hash common.Hash, blockHeight uint64) (block *types.Block, senders []common.Address, err error)
EngineExecutePayloadV1(ctx context.Context, payload *types2.ExecutionPayload) (*remote.EngineExecutePayloadReply, error)
EngineForkchoiceUpdateV1(ctx context.Context, request *remote.EngineForkChoiceUpdatedRequest) (*remote.EngineForkChoiceUpdatedReply, error)
EngineGetPayloadV1(ctx context.Context, payloadId uint64) (*types2.ExecutionPayload, error)
NodeInfo(ctx context.Context, limit uint32) ([]p2p.NodeInfo, error)
}
Expand Down Expand Up @@ -164,6 +165,10 @@ func (back *RemoteBackend) EngineExecutePayloadV1(ctx context.Context, payload *
return back.remoteEthBackend.EngineExecutePayloadV1(ctx, payload)
}

func (back *RemoteBackend) EngineForkchoiceUpdateV1(ctx context.Context, request *remote.EngineForkChoiceUpdatedRequest) (*remote.EngineForkChoiceUpdatedReply, error) {
return back.remoteEthBackend.EngineForkChoiceUpdatedV1(ctx, request)
}

func (back *RemoteBackend) EngineGetPayloadV1(ctx context.Context, payloadId uint64) (res *types2.ExecutionPayload, err error) {
return back.remoteEthBackend.EngineGetPayloadV1(ctx, &remote.EngineGetPayloadRequest{
PayloadId: payloadId,
Expand Down
14 changes: 7 additions & 7 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,9 @@ type Ethereum struct {
notifyMiningAboutNewTxs chan struct{}
// When we receive something here, it means that the beacon chain transitioned
// to proof-of-stake so we start reverse syncing from the header
reverseDownloadCh chan types.Header
statusCh chan privateapi.ExecutionStatus
waitingForPOSHeaders uint32 // atomic boolean flag
reverseDownloadCh chan privateapi.PayloadMessage
statusCh chan privateapi.ExecutionStatus
waitingForBeaconChain uint32 // atomic boolean flag
}

// New creates a new Ethereum object (including the
Expand Down Expand Up @@ -344,7 +344,7 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
miner := stagedsync.NewMiningState(&config.Miner)
backend.pendingBlocks = miner.PendingResultCh
backend.minedBlocks = miner.MiningResultCh
backend.reverseDownloadCh = make(chan types.Header)
backend.reverseDownloadCh = make(chan privateapi.PayloadMessage)
backend.statusCh = make(chan privateapi.ExecutionStatus)

var blockReader interfaces.FullBlockReader
Expand Down Expand Up @@ -404,9 +404,9 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
if casted, ok := backend.engine.(*ethash.Ethash); ok {
ethashApi = casted.APIs(nil)[1].Service.(*ethash.API)
}
atomic.StoreUint32(&backend.waitingForPOSHeaders, 0)
atomic.StoreUint32(&backend.waitingForBeaconChain, 0)
ethBackendRPC := privateapi.NewEthBackendServer(ctx, backend, backend.chainDB, backend.notifications.Events,
blockReader, chainConfig, backend.reverseDownloadCh, backend.statusCh, &backend.waitingForPOSHeaders)
blockReader, chainConfig, backend.reverseDownloadCh, backend.statusCh, &backend.waitingForBeaconChain)
miningRPC = privateapi.NewMiningServer(ctx, backend, ethashApi)
if stack.Config().PrivateApiAddr != "" {
var creds credentials.TransportCredentials
Expand Down Expand Up @@ -478,7 +478,7 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
backend.stagedSync, err = stages2.NewStagedSync(backend.sentryCtx, backend.logger, backend.chainDB,
stack.Config().P2P, *config, chainConfig.TerminalTotalDifficulty,
backend.sentryControlServer, tmpdir, backend.notifications.Accumulator,
backend.reverseDownloadCh, backend.statusCh, &backend.waitingForPOSHeaders,
backend.reverseDownloadCh, backend.statusCh, &backend.waitingForBeaconChain,
backend.downloaderClient)
if err != nil {
return nil, err
Expand Down
38 changes: 22 additions & 16 deletions eth/stagedsync/stage_headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type HeadersCfg struct {
batchSize datasize.ByteSize
noP2PDiscovery bool
tmpdir string
reverseDownloadCh chan types.Header
reverseDownloadCh chan privateapi.PayloadMessage
waitingPosHeaders *uint32 // atomic boolean flag

snapshots *snapshotsync.AllSnapshots
Expand All @@ -61,7 +61,7 @@ func StageHeadersCfg(
penalize func(context.Context, []headerdownload.PenaltyItem),
batchSize datasize.ByteSize,
noP2PDiscovery bool,
reverseDownloadCh chan types.Header,
reverseDownloadCh chan privateapi.PayloadMessage,
waitingPosHeaders *uint32, // atomic boolean flag
snapshots *snapshotsync.AllSnapshots,
snapshotDownloader proto_downloader.DownloaderClient,
Expand Down Expand Up @@ -104,7 +104,6 @@ func SpawnStageHeaders(
}
defer tx.Rollback()
}

var blockNumber uint64

if s == nil {
Expand All @@ -114,7 +113,6 @@ func SpawnStageHeaders(
}

isTrans, err := rawdb.Transitioned(tx, blockNumber, cfg.chainConfig.TerminalTotalDifficulty)

if err != nil {
return err
}
Expand All @@ -137,11 +135,12 @@ func HeadersPOS(
test bool, // Set to true in tests, allows the stage to fail rather than wait indefinitely
useExternalTx bool,
) error {
atomic.StoreUint32(cfg.waitingPosHeaders, 1)
// Waiting for the beacon chain
log.Info("Waiting for payloads...")
header := <-cfg.reverseDownloadCh
atomic.StoreUint32(cfg.waitingPosHeaders, 1)
payloadMessage := <-cfg.reverseDownloadCh
atomic.StoreUint32(cfg.waitingPosHeaders, 0)
header := payloadMessage.Header

headerNumber := header.Number.Uint64()
headerHash := header.Hash()
Expand All @@ -158,6 +157,8 @@ func HeadersPOS(
cfg.statusCh <- privateapi.ExecutionStatus{Status: privateapi.Syncing}
return nil
}
// Set chain header reader right
cfg.hd.SetHeaderReader(&chainReader{config: &cfg.chainConfig, tx: tx, blockReader: cfg.blockReader})

logPrefix := s.LogPrefix()
logEvery := time.NewTicker(logInterval)
Expand All @@ -172,7 +173,7 @@ func HeadersPOS(
return err
}
if parent != nil && parent.Hash() == header.ParentHash {
if err := cfg.hd.VerifyHeader(&header); err != nil {
if err := cfg.hd.VerifyHeader(header); err != nil {
log.Warn("Verification failed for header", "hash", headerHash, "height", headerNumber, "error", err)
cfg.statusCh <- privateapi.ExecutionStatus{
Status: privateapi.Invalid,
Expand All @@ -188,18 +189,27 @@ func HeadersPOS(
LatestValidHash: headerHash,
}

if err := headerInserter.FeedHeaderPoS(tx, &header, headerHash); err != nil {
if err := headerInserter.FeedHeaderPoS(tx, header, headerHash); err != nil {
return err
}
// We can insert raw bodies immediately and skip stage 3. (stage 2 will not be skipped)
// TODO(Giulio2002): Fix inconsistency
if err := rawdb.WriteRawBody(tx, headerHash, headerNumber, payloadMessage.Body); err != nil {
return err
}
if err := stages.SaveStageProgress(tx, stages.Bodies, headerNumber); err != nil {
return err
}

if err := fixCanonicalChain(logPrefix, logEvery, headerInserter.GetHighest(), headerInserter.GetHighestHash(), tx, cfg.blockReader); err != nil {
return fmt.Errorf("fix canonical chain: %w", err)
}

if !useExternalTx {
if err := tx.Commit(); err != nil {
return err
}
}

return nil
}

Expand All @@ -216,8 +226,6 @@ func HeadersPOS(

log.Info(fmt.Sprintf("[%s] Waiting for headers...", logPrefix), "from", headerNumber)

cfg.hd.SetHeaderReader(&chainReader{config: &cfg.chainConfig, tx: tx, blockReader: cfg.blockReader})

stopped := false
prevProgress := headerNumber

Expand Down Expand Up @@ -287,11 +295,11 @@ func HeadersPOS(
return err
}

if err := cfg.hd.VerifyHeader(&header); err != nil {
if err := cfg.hd.VerifyHeader(header); err != nil {
log.Warn("Verification failed for header", "hash", headerHash, "height", headerNumber, "error", err)
return nil
}
if err := headerInserter.FeedHeaderPoS(tx, &header, headerHash); err != nil {
if err := headerInserter.FeedHeaderPoS(tx, header, headerHash); err != nil {
return err
}
if err := fixCanonicalChain(logPrefix, logEvery, headerInserter.GetHighest(), headerInserter.GetHighestHash(), tx, cfg.blockReader); err != nil {
Expand All @@ -303,6 +311,7 @@ func HeadersPOS(
return err
}
}

return nil
}

Expand All @@ -324,9 +333,6 @@ func HeadersPOW(
var headerProgress uint64
var err error

if !useExternalTx {
defer tx.Rollback()
}
if err = cfg.hd.ReadProgressFromDb(tx); err != nil {
return err
}
Expand Down
Loading

0 comments on commit 864b744

Please sign in to comment.