Skip to content

Commit

Permalink
Sanity check p2p block (#3197)
Browse files Browse the repository at this point in the history
  • Loading branch information
AskAlexSharov authored Jan 4, 2022
1 parent 8203cdf commit 156287a
Show file tree
Hide file tree
Showing 13 changed files with 192 additions and 441 deletions.
2 changes: 1 addition & 1 deletion cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -1103,7 +1103,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig)
must(batchSize.UnmarshalText([]byte(batchSizeStr)))

blockDownloaderWindow := 65536
sentryControlServer, err := sentry.NewControlServer(db, "", chainConfig, genesisBlock.Hash(), engine, 1, nil, blockDownloaderWindow)
sentryControlServer, err := sentry.NewControlServer(db, "", chainConfig, genesisBlock.Hash(), engine, 1, nil, blockDownloaderWindow, getBlockReader(chainConfig))
if err != nil {
panic(err)
}
Expand Down
2 changes: 2 additions & 0 deletions cmd/rpcdaemon/interfaces/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ type BlockReader interface {
type HeaderReader interface {
Header(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64) (*types.Header, error)
HeaderByNumber(ctx context.Context, tx kv.Getter, blockHeight uint64) (*types.Header, error)
HeaderByHash(ctx context.Context, tx kv.Getter, hash common.Hash) (*types.Header, error)
CanonicalHash(ctx context.Context, tx kv.Getter, blockHeight uint64) (common.Hash, error)
}

type BodyReader interface {
Expand Down
25 changes: 16 additions & 9 deletions cmd/sentry/sentry/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
proto_sentry "github.com/ledgerwatch/erigon-lib/gointerfaces/sentry"
proto_types "github.com/ledgerwatch/erigon-lib/gointerfaces/types"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/cmd/rpcdaemon/interfaces"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/consensus"
"github.com/ledgerwatch/erigon/core/forkid"
Expand Down Expand Up @@ -338,9 +339,10 @@ type ControlServerImpl struct {
networkId uint64
db kv.RwDB
Engine consensus.Engine
blockReader interfaces.FullBlockReader
}

func NewControlServer(db kv.RwDB, nodeName string, chainConfig *params.ChainConfig, genesisHash common.Hash, engine consensus.Engine, networkID uint64, sentries []direct.SentryClient, window int) (*ControlServerImpl, error) {
func NewControlServer(db kv.RwDB, nodeName string, chainConfig *params.ChainConfig, genesisHash common.Hash, engine consensus.Engine, networkID uint64, sentries []direct.SentryClient, window int, blockReader interfaces.FullBlockReader) (*ControlServerImpl, error) {
hd := headerdownload.NewHeaderDownload(
512, /* anchorLimit */
1024*1024, /* linkLimit */
Expand All @@ -356,12 +358,13 @@ func NewControlServer(db kv.RwDB, nodeName string, chainConfig *params.ChainConf
bd := bodydownload.NewBodyDownload(window /* outstandingLimit */, engine)

cs := &ControlServerImpl{
nodeName: nodeName,
Hd: hd,
Bd: bd,
sentries: sentries,
db: db,
Engine: engine,
nodeName: nodeName,
Hd: hd,
Bd: bd,
sentries: sentries,
db: db,
Engine: engine,
blockReader: blockReader,
}
cs.ChainConfig = chainConfig
cs.forks = forkid.GatherForks(cs.ChainConfig)
Expand Down Expand Up @@ -540,10 +543,14 @@ func (cs *ControlServerImpl) newBlock66(ctx context.Context, inreq *proto_sentry
return fmt.Errorf("decode 3 NewBlockMsg: %w", err)
}
// Parse the entire request from scratch
var request eth.NewBlockPacket
request := &eth.NewBlockPacket{}
if err := rlp.DecodeBytes(inreq.Data, &request); err != nil {
return fmt.Errorf("decode 4 NewBlockMsg: %w", err)
}
if err := request.SanityCheck(); err != nil {
return fmt.Errorf("newBlock66: %w", err)
}

if segments, penalty, err := cs.Hd.SingleHeaderAsSegment(headerRaw, request.Block.Header()); err == nil {
if penalty == headerdownload.NoPenalty {
cs.Hd.ProcessSegment(segments[0], true /* newBlock */, ConvertH256ToPeerID(inreq.PeerId)) // There is only one segment in this case
Expand Down Expand Up @@ -598,7 +605,7 @@ func (cs *ControlServerImpl) getBlockHeaders66(ctx context.Context, inreq *proto

var headers []*types.Header
if err := cs.db.View(ctx, func(tx kv.Tx) (err error) {
headers, err = eth.AnswerGetBlockHeadersQuery(tx, query.GetBlockHeadersPacket)
headers, err = eth.AnswerGetBlockHeadersQuery(tx, query.GetBlockHeadersPacket, cs.blockReader)
if err != nil {
return err
}
Expand Down
20 changes: 14 additions & 6 deletions core/rawdb/accessors_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/cmd/rpcdaemon/interfaces"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/core/types"
Expand Down Expand Up @@ -1087,28 +1088,32 @@ func ReadHeaderByHash(db kv.Getter, hash common.Hash) (*types.Header, error) {
return ReadHeader(db, hash, *number), nil
}

func ReadAncestor(db kv.Getter, hash common.Hash, number, ancestor uint64, maxNonCanonical *uint64) (common.Hash, uint64) {
func ReadAncestor(db kv.Getter, hash common.Hash, number, ancestor uint64, maxNonCanonical *uint64, blockReader interfaces.FullBlockReader) (common.Hash, uint64) {
if ancestor > number {
return common.Hash{}, 0
}
if ancestor == 1 {
header, err := blockReader.Header(context.Background(), db, hash, number)
if err != nil {
panic(err)
}
// in this case it is cheaper to just read the header
if header := ReadHeader(db, hash, number); header != nil {
if header != nil {
return header.ParentHash, number - 1
}
return common.Hash{}, 0
}
for ancestor != 0 {
h, err := ReadCanonicalHash(db, number)
h, err := blockReader.CanonicalHash(context.Background(), db, number)
if err != nil {
panic(err)
}
if h == hash {
ancestorHash, err := ReadCanonicalHash(db, number-ancestor)
ancestorHash, err := blockReader.CanonicalHash(context.Background(), db, number-ancestor)
if err != nil {
panic(err)
}
h, err := ReadCanonicalHash(db, number)
h, err := blockReader.CanonicalHash(context.Background(), db, number)
if err != nil {
panic(err)
}
Expand All @@ -1122,7 +1127,10 @@ func ReadAncestor(db kv.Getter, hash common.Hash, number, ancestor uint64, maxNo
}
*maxNonCanonical--
ancestor--
header := ReadHeader(db, hash, number)
header, err := blockReader.Header(context.Background(), db, hash, number)
if err != nil {
panic(err)
}
if header == nil {
return common.Hash{}, 0
}
Expand Down
91 changes: 46 additions & 45 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,52 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
}
}()
}
backend.sentryControlServer, err = sentry.NewControlServer(chainKv, stack.Config().NodeName(), chainConfig, genesis.Hash(), backend.engine, backend.config.NetworkID, backend.sentries, config.BlockDownloaderWindow)

var blockReader interfaces.FullBlockReader
if config.Snapshot.Enabled {
snConfig := snapshothashes.KnownConfig(chainConfig.ChainName)
//TODO: incremental snapshot sync
if err := chainKv.Update(ctx, func(tx kv.RwTx) error {
const SyncedWithSnapshot = "synced_with_snapshot"
v, err := tx.GetOne(kv.DatabaseInfo, []byte(SyncedWithSnapshot))
if err != nil {
return err
}
if v != nil {
valueInDB := binary.BigEndian.Uint64(v)
if valueInDB != snConfig.ExpectBlocks {
log.Warn(fmt.Sprintf("'incremental snapshots feature' not implemented yet. New snapshots available up to block %d, but this node was synced to snapshot %d and will keep other blocks in db. (it's safe, re-sync may reduce db size)", valueInDB, snConfig.ExpectBlocks))
snConfig.ExpectBlocks = valueInDB
}
return nil
}

num := make([]byte, 8)
binary.BigEndian.PutUint64(num, snConfig.ExpectBlocks)
if err := tx.Put(kv.DatabaseInfo, []byte(SyncedWithSnapshot), num); err != nil {
return err
}
return nil
}); err != nil {
return nil, err
}

allSnapshots := snapshotsync.NewAllSnapshots(config.Snapshot.Dir, snConfig)
if err != nil {
return nil, err
}
blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots)

// connect to Downloader
backend.downloaderClient, err = downloadergrpc.NewClient(ctx, stack.Config().DownloaderAddr)
if err != nil {
return nil, err
}
} else {
blockReader = snapshotsync.NewBlockReader()
}

backend.sentryControlServer, err = sentry.NewControlServer(chainKv, stack.Config().NodeName(), chainConfig, genesis.Hash(), backend.engine, backend.config.NetworkID, backend.sentries, config.BlockDownloaderWindow, blockReader)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -347,50 +392,6 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
backend.reverseDownloadCh = make(chan privateapi.PayloadMessage)
backend.statusCh = make(chan privateapi.ExecutionStatus)

var blockReader interfaces.FullBlockReader
if config.Snapshot.Enabled {
snConfig := snapshothashes.KnownConfig(chainConfig.ChainName)
//TODO: incremental snapshot sync
if err := chainKv.Update(ctx, func(tx kv.RwTx) error {
const SyncedWithSnapshot = "synced_with_snapshot"
v, err := tx.GetOne(kv.DatabaseInfo, []byte(SyncedWithSnapshot))
if err != nil {
return err
}
if v != nil {
valueInDB := binary.BigEndian.Uint64(v)
if valueInDB != snConfig.ExpectBlocks {
log.Warn(fmt.Sprintf("'incremental snapshots feature' not implemented yet. New snapshots available up to block %d, but this node was synced to snapshot %d and will keep other blocks in db. (it's safe, re-sync may reduce db size)", valueInDB, snConfig.ExpectBlocks))
snConfig.ExpectBlocks = valueInDB
}
return nil
}

num := make([]byte, 8)
binary.BigEndian.PutUint64(num, snConfig.ExpectBlocks)
if err := tx.Put(kv.DatabaseInfo, []byte(SyncedWithSnapshot), num); err != nil {
return err
}
return nil
}); err != nil {
return nil, err
}

allSnapshots := snapshotsync.NewAllSnapshots(config.Snapshot.Dir, snConfig)
if err != nil {
return nil, err
}
blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots)

// connect to Downloader
backend.downloaderClient, err = downloadergrpc.NewClient(ctx, stack.Config().DownloaderAddr)
if err != nil {
return nil, err
}
} else {
blockReader = snapshotsync.NewBlockReader()
}

mining := stagedsync.New(
stagedsync.MiningStages(backend.sentryCtx,
stagedsync.StageMiningCreateBlockCfg(backend.chainDB, miner, *backend.chainConfig, backend.engine, backend.txPool2, backend.txPool2DB, tmpdir),
Expand Down
55 changes: 0 additions & 55 deletions eth/protocols/eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package eth

import (
"fmt"
"math/big"
"time"

Expand Down Expand Up @@ -112,61 +111,7 @@ func ReadNodeInfo(getter kv.Getter, config *params.ChainConfig, genesisHash comm
}
}

// Handle is invoked whenever an `eth` connection is made that successfully passes
// the protocol handshake. This method will keep processing messages until the
// connection is torn down.
func Handle(backend Backend, peer *Peer) error {
for {
if err := handleMessage(backend, peer); err != nil {
peer.Log().Debug("Message handling failed in `eth`", "err", err)
return err
}
}
}

type msgHandler func(backend Backend, msg Decoder, peer *Peer) error
type Decoder interface {
Decode(val interface{}) error
Time() time.Time
}

var eth66 = map[uint64]msgHandler{
// eth64 announcement messages (no id)
NewBlockHashesMsg: handleNewBlockhashes,
NewBlockMsg: handleNewBlock,
TransactionsMsg: handleTransactions,
// eth65 announcement messages (no id)
NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes,
// eth66 messages with request-id
GetBlockHeadersMsg: handleGetBlockHeaders66,
BlockHeadersMsg: handleBlockHeaders66,
GetBlockBodiesMsg: handleGetBlockBodies66,
BlockBodiesMsg: handleBlockBodies66,
GetNodeDataMsg: handleGetNodeData66,
NodeDataMsg: handleNodeData66,
GetReceiptsMsg: handleGetReceipts66,
ReceiptsMsg: handleReceipts66,
GetPooledTransactionsMsg: handleGetPooledTransactions66,
PooledTransactionsMsg: handlePooledTransactions66,
}

// handleMessage is invoked whenever an inbound message is received from a remote
// peer. The remote connection is torn down upon returning any error.
func handleMessage(backend Backend, peer *Peer) error {
// Read the next message from the remote peer, and ensure it's fully consumed
msg, err := peer.rw.ReadMsg()
if err != nil {
return err
}
if msg.Size > maxMessageSize {
return fmt.Errorf("%w: %v > %v", errMsgTooLarge, msg.Size, maxMessageSize)
}
defer msg.Discard()

handlers := eth66

if handler := handlers[msg.Code]; handler != nil {
return handler(backend, msg, peer)
}
return fmt.Errorf("%w: %v", errInvalidMsgCode, msg.Code)
}
Loading

0 comments on commit 156287a

Please sign in to comment.