Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for syncing on mergemock #3174

Merged
merged 41 commits into from
Dec 29, 2021
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
28287c7
block proposing
Giulio2002 Dec 24, 2021
40d61ea
standard finalized
Giulio2002 Dec 25, 2021
2a33c4c
mergemock execution
Giulio2002 Dec 25, 2021
f31df32
private chain can now be ran yay
Giulio2002 Dec 26, 2021
bdb44eb
perfectioned
Giulio2002 Dec 26, 2021
2a0020c
polished
Giulio2002 Dec 26, 2021
d4eab6f
more polishing
Giulio2002 Dec 26, 2021
5900107
resize PR
Giulio2002 Dec 26, 2021
230efd8
resize PR
Giulio2002 Dec 26, 2021
718e856
resize PR
Giulio2002 Dec 26, 2021
3ad01be
simplifications
Giulio2002 Dec 26, 2021
45cfc7e
fixed tests
Giulio2002 Dec 27, 2021
71e47db
better syncronous communication
Giulio2002 Dec 27, 2021
15b297d
better syncronous once again
Giulio2002 Dec 27, 2021
1c76d35
clean
Giulio2002 Dec 27, 2021
67e7140
Re-enabled headers verification
Giulio2002 Dec 27, 2021
c5ab49f
mining finish
Giulio2002 Dec 27, 2021
499ab83
Merge branch 'mining-pos' of https://www.github.com/ledgerwatch/erigo…
Giulio2002 Dec 27, 2021
edd24ea
mining finish
Giulio2002 Dec 27, 2021
fbe8137
cleaned hash computation
Giulio2002 Dec 27, 2021
20953f6
fixed evm bug
Giulio2002 Dec 27, 2021
b04c820
go.mod
Giulio2002 Dec 27, 2021
2bd9573
Merge branch 'devel' into mining-pos
Giulio2002 Dec 27, 2021
739a5ea
Update .gitignore
Giulio2002 Dec 27, 2021
bc36009
Update .gitignore
Giulio2002 Dec 27, 2021
66b13f2
Update .gitignore
Giulio2002 Dec 27, 2021
88824f2
removed new line from .gitignore
Giulio2002 Dec 27, 2021
b5cc6e7
added go.mod and go.sum
Giulio2002 Dec 28, 2021
107c967
feeRecipient into preset
Giulio2002 Dec 28, 2021
7df70ab
useExternal
Giulio2002 Dec 28, 2021
964cfdc
todo
Giulio2002 Dec 28, 2021
873910a
fixed comment for forkchoiceUpdateV1
Giulio2002 Dec 29, 2021
eb6671d
smaller
Giulio2002 Dec 29, 2021
3ae5c95
Merge branch 'mining-pos' of https://www.github.com/ledgerwatch/erigo…
Giulio2002 Dec 29, 2021
8a55a2d
smaller
Giulio2002 Dec 29, 2021
506736d
Revert changes to miner frequency
yperbasis Dec 29, 2021
e24b375
Restore useExternalTx
yperbasis Dec 29, 2021
0a89c98
Fix headerLoadFunc
yperbasis Dec 29, 2021
f4526f6
do not reset payloadId
Giulio2002 Dec 29, 2021
a7e2c3a
rename
Giulio2002 Dec 29, 2021
08617ba
extra is empty
Giulio2002 Dec 29, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,4 @@ docker-compose.dev.yml
libmdbx/build/*
tests/testdata/*

go.work
go.work
68 changes: 47 additions & 21 deletions cmd/rpcdaemon/commands/engine_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package commands
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"math/big"

"github.com/holiman/uint256"
"github.com/ledgerwatch/erigon-lib/gointerfaces"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
types2 "github.com/ledgerwatch/erigon-lib/gointerfaces/types"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/cmd/rpcdaemon/services"
Expand Down Expand Up @@ -37,6 +39,13 @@ type ExecutionPayload struct {
Transactions []hexutil.Bytes `json:"transactions" gencodec:"required"`
}

// PayloadAttributes represent the attributes required to start assembling a payload
type ForkChoiceState struct {
HeadHash common.Hash `json:"headBlockHash" gencodec:"required"`
SafeBlockHash common.Hash `json:"safeBlockHash" gencodec:"required"`
FinalizedBlockHash common.Hash `json:"finalizedBlockHash" gencodec:"required"`
}

// PayloadAttributes represent the attributes required to start assembling a payload
type PayloadAttributes struct {
Timestamp hexutil.Uint64 `json:"timestamp" gencodec:"required"`
Expand All @@ -46,9 +55,9 @@ type PayloadAttributes struct {

// EngineAPI Beacon chain communication endpoint
type EngineAPI interface {
ForkchoiceUpdatedV1(context.Context, struct{}, *PayloadAttributes) (map[string]interface{}, error)
ForkchoiceUpdatedV1(ctx context.Context, forkChoiceState *ForkChoiceState, payloadAttributes *PayloadAttributes) (map[string]interface{}, error)
ExecutePayloadV1(context.Context, *ExecutionPayload) (map[string]interface{}, error)
GetPayloadV1(ctx context.Context, payloadID hexutil.Uint64) (*ExecutionPayload, error)
GetPayloadV1(ctx context.Context, payloadID hexutil.Bytes) (*ExecutionPayload, error)
GetPayloadBodiesV1(ctx context.Context, blockHashes []rpc.BlockNumberOrHash) (map[common.Hash]ExecutionPayload, error)
}

Expand All @@ -62,16 +71,42 @@ type EngineImpl struct {
// ForkchoiceUpdatedV1 is executed only if we are running a beacon validator,
// in erigon we do not use this for reorgs like go-ethereum does since we can do that in engine_executePayloadV1
// if the payloadAttributes is different than null, we return
func (e *EngineImpl) ForkchoiceUpdatedV1(_ context.Context, _ struct{}, payloadAttributes *PayloadAttributes) (map[string]interface{}, error) {
func (e *EngineImpl) ForkchoiceUpdatedV1(ctx context.Context, forkChoiceState *ForkChoiceState, payloadAttributes *PayloadAttributes) (map[string]interface{}, error) {
// Unwinds can be made within engine_excutePayloadV1 so we can return success regardless
if payloadAttributes == nil {
return map[string]interface{}{
"status": "SUCCESS",
"payloadId": nil,
"status": "SUCCESS",
}, nil
}
// Request for assembling payload
return nil, fmt.Errorf("invalid request")
reply, err := e.api.EngineForkchoiceUpdateV1(ctx, &remote.EngineForkChoiceUpdatedRequest{
Prepare: &remote.EnginePreparePayload{
Timestamp: uint64(payloadAttributes.Timestamp),
Random: gointerfaces.ConvertHashToH256(payloadAttributes.Random),
FeeRecipient: gointerfaces.ConvertAddressToH160(payloadAttributes.SuggestedFeeRecipient),
},
Forkchoice: &remote.EngineForkChoiceUpdated{
HeadBlockHash: gointerfaces.ConvertHashToH256(forkChoiceState.HeadHash),
FinalizedBlockHash: gointerfaces.ConvertHashToH256(forkChoiceState.FinalizedBlockHash),
SafeBlockHash: gointerfaces.ConvertHashToH256(forkChoiceState.SafeBlockHash),
},
})
if err != nil {
return nil, err
}
// Process reply
if reply.Status == "SYNCING" {
return map[string]interface{}{
"status": reply.Status,
}, nil
}
encodedPayloadId := make([]byte, 8)
binary.BigEndian.PutUint64(encodedPayloadId, reply.PayloadId)
// Answer
return map[string]interface{}{
"status": reply.Status,
"payloadId": hexutil.Bytes(encodedPayloadId),
}, nil
}

// ExecutePayloadV1 takes a block from the beacon chain and do either two of the following things
Expand All @@ -86,9 +121,6 @@ func (e *EngineImpl) ExecutePayloadV1(ctx context.Context, payload *ExecutionPay
return nil, fmt.Errorf("invalid request")
}
}
// Maximum length of extra is 32 bytes so we can use the hash datatype
extra := common.BytesToHash(payload.ExtraData)

log.Info("Received Payload from beacon-chain")

// Convert slice of hexutil.Bytes to a slice of slice of bytes
Expand All @@ -107,7 +139,7 @@ func (e *EngineImpl) ExecutePayloadV1(ctx context.Context, payload *ExecutionPay
GasLimit: (uint64)(payload.GasLimit),
GasUsed: (uint64)(payload.GasUsed),
Timestamp: (uint64)(payload.Timestamp),
ExtraData: gointerfaces.ConvertHashToH256(extra),
ExtraData: payload.ExtraData,
BaseFeePerGas: gointerfaces.ConvertUint256IntToH256(baseFee),
BlockHash: gointerfaces.ConvertHashToH256(payload.BlockHash),
Transactions: transactions,
Expand All @@ -120,21 +152,21 @@ func (e *EngineImpl) ExecutePayloadV1(ctx context.Context, payload *ExecutionPay
var latestValidHash common.Hash = gointerfaces.ConvertH256ToHash(res.LatestValidHash)
return map[string]interface{}{
"status": res.Status,
"latestValidHash": common.Bytes2Hex(latestValidHash.Bytes()),
"latestValidHash": latestValidHash,
}, nil
}
return map[string]interface{}{
"status": res.Status,
}, nil
}

func (e *EngineImpl) GetPayloadV1(ctx context.Context, payloadID hexutil.Uint64) (*ExecutionPayload, error) {
payload, err := e.api.EngineGetPayloadV1(ctx, (uint64)(payloadID))
func (e *EngineImpl) GetPayloadV1(ctx context.Context, payloadID hexutil.Bytes) (*ExecutionPayload, error) {
decodedPayloadId := binary.BigEndian.Uint64(payloadID)
payload, err := e.api.EngineGetPayloadV1(ctx, decodedPayloadId)
if err != nil {
return nil, err
}
var bloom types.Bloom = gointerfaces.ConvertH2048ToBloom(payload.LogsBloom)
var extra common.Hash = gointerfaces.ConvertH256ToHash(payload.ExtraData)

var baseFee *big.Int
if payload.BaseFeePerGas != nil {
Expand All @@ -157,7 +189,7 @@ func (e *EngineImpl) GetPayloadV1(ctx context.Context, payloadID hexutil.Uint64)
GasLimit: hexutil.Uint64(payload.GasLimit),
GasUsed: hexutil.Uint64(payload.GasUsed),
Timestamp: hexutil.Uint64(payload.Timestamp),
ExtraData: extra[:],
ExtraData: payload.ExtraData,
BaseFeePerGas: (*hexutil.Big)(baseFee),
BlockHash: gointerfaces.ConvertH256ToHash(payload.BlockHash),
Transactions: transactions,
Expand All @@ -176,20 +208,16 @@ func (e *EngineImpl) GetPayloadBodiesV1(ctx context.Context, blockHashes []rpc.B

for _, blockHash := range blockHashes {
hash := *blockHash.BlockHash

block, err := e.blockByHashWithSenders(tx, hash)
if err != nil {
return nil, err
}

if block == nil {
continue
}

var bloom types.Bloom = block.Bloom()

buf := bytes.NewBuffer(nil)

var encodedTransactions []hexutil.Bytes

for _, tx := range block.Transactions() {
Expand All @@ -199,7 +227,6 @@ func (e *EngineImpl) GetPayloadBodiesV1(ctx context.Context, blockHashes []rpc.B
if err != nil {
return nil, fmt.Errorf("broken tx rlp: %w", err)
}

encodedTransactions = append(encodedTransactions, common.CopyBytes(buf.Bytes()))
}

Expand All @@ -219,7 +246,6 @@ func (e *EngineImpl) GetPayloadBodiesV1(ctx context.Context, blockHashes []rpc.B
BlockHash: block.Hash(),
Transactions: encodedTransactions,
}

}
return blockHashToBody, nil
}
Expand Down
5 changes: 5 additions & 0 deletions cmd/rpcdaemon/services/eth_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type ApiBackend interface {
Subscribe(ctx context.Context, cb func(*remote.SubscribeReply)) error
BlockWithSenders(ctx context.Context, tx kv.Tx, hash common.Hash, blockHeight uint64) (block *types.Block, senders []common.Address, err error)
EngineExecutePayloadV1(ctx context.Context, payload *types2.ExecutionPayload) (*remote.EngineExecutePayloadReply, error)
EngineForkchoiceUpdateV1(ctx context.Context, request *remote.EngineForkChoiceUpdatedRequest) (*remote.EngineForkChoiceUpdatedReply, error)
EngineGetPayloadV1(ctx context.Context, payloadId uint64) (*types2.ExecutionPayload, error)
NodeInfo(ctx context.Context, limit uint32) ([]p2p.NodeInfo, error)
}
Expand Down Expand Up @@ -164,6 +165,10 @@ func (back *RemoteBackend) EngineExecutePayloadV1(ctx context.Context, payload *
return back.remoteEthBackend.EngineExecutePayloadV1(ctx, payload)
}

func (back *RemoteBackend) EngineForkchoiceUpdateV1(ctx context.Context, request *remote.EngineForkChoiceUpdatedRequest) (*remote.EngineForkChoiceUpdatedReply, error) {
return back.remoteEthBackend.EngineForkChoiceUpdatedV1(ctx, request)
}

func (back *RemoteBackend) EngineGetPayloadV1(ctx context.Context, payloadId uint64) (res *types2.ExecutionPayload, err error) {
return back.remoteEthBackend.EngineGetPayloadV1(ctx, &remote.EngineGetPayloadRequest{
PayloadId: payloadId,
Expand Down
14 changes: 7 additions & 7 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,9 @@ type Ethereum struct {
notifyMiningAboutNewTxs chan struct{}
// When we receive something here, it means that the beacon chain transitioned
// to proof-of-stake so we start reverse syncing from the header
reverseDownloadCh chan types.Header
statusCh chan privateapi.ExecutionStatus
waitingForPOSHeaders uint32 // atomic boolean flag
reverseDownloadCh chan privateapi.PayloadMessage
statusCh chan privateapi.ExecutionStatus
waitingForBeaconChain uint32 // atomic boolean flag
}

// New creates a new Ethereum object (including the
Expand Down Expand Up @@ -344,7 +344,7 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
miner := stagedsync.NewMiningState(&config.Miner)
backend.pendingBlocks = miner.PendingResultCh
backend.minedBlocks = miner.MiningResultCh
backend.reverseDownloadCh = make(chan types.Header)
backend.reverseDownloadCh = make(chan privateapi.PayloadMessage)
backend.statusCh = make(chan privateapi.ExecutionStatus)

var blockReader interfaces.FullBlockReader
Expand Down Expand Up @@ -404,9 +404,9 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
if casted, ok := backend.engine.(*ethash.Ethash); ok {
ethashApi = casted.APIs(nil)[1].Service.(*ethash.API)
}
atomic.StoreUint32(&backend.waitingForPOSHeaders, 0)
atomic.StoreUint32(&backend.waitingForBeaconChain, 0)
ethBackendRPC := privateapi.NewEthBackendServer(ctx, backend, backend.chainDB, backend.notifications.Events,
blockReader, chainConfig, backend.reverseDownloadCh, backend.statusCh, &backend.waitingForPOSHeaders)
blockReader, chainConfig, backend.reverseDownloadCh, backend.statusCh, &backend.waitingForBeaconChain)
miningRPC = privateapi.NewMiningServer(ctx, backend, ethashApi)
if stack.Config().PrivateApiAddr != "" {
var creds credentials.TransportCredentials
Expand Down Expand Up @@ -478,7 +478,7 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
backend.stagedSync, err = stages2.NewStagedSync(backend.sentryCtx, backend.logger, backend.chainDB,
stack.Config().P2P, *config, chainConfig.TerminalTotalDifficulty,
backend.sentryControlServer, tmpdir, backend.notifications.Accumulator,
backend.reverseDownloadCh, backend.statusCh, &backend.waitingForPOSHeaders,
backend.reverseDownloadCh, backend.statusCh, &backend.waitingForBeaconChain,
backend.downloaderClient)
if err != nil {
return nil, err
Expand Down
38 changes: 22 additions & 16 deletions eth/stagedsync/stage_headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type HeadersCfg struct {
batchSize datasize.ByteSize
noP2PDiscovery bool
tmpdir string
reverseDownloadCh chan types.Header
reverseDownloadCh chan privateapi.PayloadMessage
waitingPosHeaders *uint32 // atomic boolean flag

snapshots *snapshotsync.AllSnapshots
Expand All @@ -61,7 +61,7 @@ func StageHeadersCfg(
penalize func(context.Context, []headerdownload.PenaltyItem),
batchSize datasize.ByteSize,
noP2PDiscovery bool,
reverseDownloadCh chan types.Header,
reverseDownloadCh chan privateapi.PayloadMessage,
waitingPosHeaders *uint32, // atomic boolean flag
snapshots *snapshotsync.AllSnapshots,
snapshotDownloader proto_downloader.DownloaderClient,
Expand Down Expand Up @@ -104,7 +104,6 @@ func SpawnStageHeaders(
}
defer tx.Rollback()
}

var blockNumber uint64

if s == nil {
Expand All @@ -114,7 +113,6 @@ func SpawnStageHeaders(
}

isTrans, err := rawdb.Transitioned(tx, blockNumber, cfg.chainConfig.TerminalTotalDifficulty)

if err != nil {
return err
}
Expand All @@ -137,11 +135,12 @@ func HeadersPOS(
test bool, // Set to true in tests, allows the stage to fail rather than wait indefinitely
useExternalTx bool,
) error {
atomic.StoreUint32(cfg.waitingPosHeaders, 1)
// Waiting for the beacon chain
log.Info("Waiting for payloads...")
header := <-cfg.reverseDownloadCh
atomic.StoreUint32(cfg.waitingPosHeaders, 1)
payloadMessage := <-cfg.reverseDownloadCh
atomic.StoreUint32(cfg.waitingPosHeaders, 0)
header := payloadMessage.Header

headerNumber := header.Number.Uint64()
headerHash := header.Hash()
Expand All @@ -158,6 +157,8 @@ func HeadersPOS(
cfg.statusCh <- privateapi.ExecutionStatus{Status: privateapi.Syncing}
return nil
}
// Set chain header reader right
cfg.hd.SetHeaderReader(&chainReader{config: &cfg.chainConfig, tx: tx, blockReader: cfg.blockReader})

logPrefix := s.LogPrefix()
logEvery := time.NewTicker(logInterval)
Expand All @@ -172,7 +173,7 @@ func HeadersPOS(
return err
}
if parent != nil && parent.Hash() == header.ParentHash {
if err := cfg.hd.VerifyHeader(&header); err != nil {
if err := cfg.hd.VerifyHeader(header); err != nil {
log.Warn("Verification failed for header", "hash", headerHash, "height", headerNumber, "error", err)
cfg.statusCh <- privateapi.ExecutionStatus{
Status: privateapi.Invalid,
Expand All @@ -188,18 +189,27 @@ func HeadersPOS(
LatestValidHash: headerHash,
}

if err := headerInserter.FeedHeaderPoS(tx, &header, headerHash); err != nil {
if err := headerInserter.FeedHeaderPoS(tx, header, headerHash); err != nil {
return err
}
// We can insert raw bodies immediately and skip stage 3. (stage 2 will not be skipped)
// TODO(Giulio2002): Fix inconsistency
if err := rawdb.WriteRawBody(tx, headerHash, headerNumber, payloadMessage.Body); err != nil {
return err
}
if err := stages.SaveStageProgress(tx, stages.Bodies, headerNumber); err != nil {
return err
}

if err := fixCanonicalChain(logPrefix, logEvery, headerInserter.GetHighest(), headerInserter.GetHighestHash(), tx, cfg.blockReader); err != nil {
return fmt.Errorf("fix canonical chain: %w", err)
}

if !useExternalTx {
if err := tx.Commit(); err != nil {
return err
}
}

return nil
}

Expand All @@ -216,8 +226,6 @@ func HeadersPOS(

log.Info(fmt.Sprintf("[%s] Waiting for headers...", logPrefix), "from", headerNumber)

cfg.hd.SetHeaderReader(&chainReader{config: &cfg.chainConfig, tx: tx, blockReader: cfg.blockReader})

stopped := false
prevProgress := headerNumber

Expand Down Expand Up @@ -287,11 +295,11 @@ func HeadersPOS(
return err
}

if err := cfg.hd.VerifyHeader(&header); err != nil {
if err := cfg.hd.VerifyHeader(header); err != nil {
log.Warn("Verification failed for header", "hash", headerHash, "height", headerNumber, "error", err)
return nil
}
if err := headerInserter.FeedHeaderPoS(tx, &header, headerHash); err != nil {
if err := headerInserter.FeedHeaderPoS(tx, header, headerHash); err != nil {
return err
}
if err := fixCanonicalChain(logPrefix, logEvery, headerInserter.GetHighest(), headerInserter.GetHighestHash(), tx, cfg.blockReader); err != nil {
Expand All @@ -303,6 +311,7 @@ func HeadersPOS(
return err
}
}

return nil
}

Expand All @@ -324,9 +333,6 @@ func HeadersPOW(
var headerProgress uint64
var err error

if !useExternalTx {
defer tx.Rollback()
}
if err = cfg.hd.ReadProgressFromDb(tx); err != nil {
return err
}
Expand Down
Loading