Skip to content

Commit

Permalink
eth: implement eth/68 (ethereum#25980)
Browse files Browse the repository at this point in the history
* eth: implement eth/68

* eth/protocols/eth: added tx size to announcement

* eth/protocols/eth: check equal lengths on receiving announcement

* eth/protocols/eth: add +1 to tx size because of the type byte

* eth: happy lint, add eth68 tests, enable eth68

* eth: various nitpick fixes on eth/68

* eth/protocols/eth: fix announced tx size wrt type byte

Co-authored-by: MariusVanDerWijden <[email protected]>
Co-authored-by: Péter Szilágyi <[email protected]>
Conflicts:
	eth/handler_eth_test.go
	eth/protocols/eth/handler.go
	eth/protocols/eth/protocol.go
  • Loading branch information
vdwijden authored and jonastheis committed Jun 6, 2024
1 parent d086022 commit f633a27
Show file tree
Hide file tree
Showing 10 changed files with 179 additions and 46 deletions.
2 changes: 1 addition & 1 deletion cmd/devp2p/internal/ethtest/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ type NewBlock eth.NewBlockPacket
func (nb NewBlock) Code() int { return 23 }

// NewPooledTransactionHashes is the network packet for the tx hash propagation message.
type NewPooledTransactionHashes eth.NewPooledTransactionHashesPacket
type NewPooledTransactionHashes eth.NewPooledTransactionHashesPacket66

func (nb NewPooledTransactionHashes) Code() int { return 24 }

Expand Down
5 changes: 4 additions & 1 deletion eth/handler_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,12 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
case *eth.NewBlockPacket:
return h.handleBlockBroadcast(peer, packet.Block, packet.TD)

case *eth.NewPooledTransactionHashesPacket:
case *eth.NewPooledTransactionHashesPacket66:
return h.txFetcher.Notify(peer.ID(), *packet)

case *eth.NewPooledTransactionHashesPacket68:
return h.txFetcher.Notify(peer.ID(), packet.Hashes)

case *eth.TransactionsPacket:
return h.txFetcher.Enqueue(peer.ID(), *packet, false)

Expand Down
26 changes: 20 additions & 6 deletions eth/handler_eth_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2014 The go-ethereum Authors
// Copyright 2020 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
Expand Down Expand Up @@ -61,10 +61,14 @@ func (h *testEthHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
h.blockBroadcasts.Send(packet.Block)
return nil

case *eth.NewPooledTransactionHashesPacket:
case *eth.NewPooledTransactionHashesPacket66:
h.txAnnounces.Send(([]common.Hash)(*packet))
return nil

case *eth.NewPooledTransactionHashesPacket68:
h.txAnnounces.Send(packet.Hashes)
return nil

case *eth.TransactionsPacket:
h.txBroadcasts.Send(([]*types.Transaction)(*packet))
return nil
Expand All @@ -81,6 +85,8 @@ func (h *testEthHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
// Tests that peers are correctly accepted (or rejected) based on the advertised
// fork IDs in the protocol handshake.
func TestForkIDSplit66(t *testing.T) { testForkIDSplit(t, eth.ETH66) }
func TestForkIDSplit67(t *testing.T) { testForkIDSplit(t, eth.ETH67) }
func TestForkIDSplit68(t *testing.T) { testForkIDSplit(t, eth.ETH68) }

func testForkIDSplit(t *testing.T, protocol uint) {
t.Parallel()
Expand Down Expand Up @@ -236,6 +242,8 @@ func testForkIDSplit(t *testing.T, protocol uint) {

// Tests that received transactions are added to the local pool.
func TestRecvTransactions66(t *testing.T) { testRecvTransactions(t, eth.ETH66) }
func TestRecvTransactions67(t *testing.T) { testRecvTransactions(t, eth.ETH67) }
func TestRecvTransactions68(t *testing.T) { testRecvTransactions(t, eth.ETH68) }

func testRecvTransactions(t *testing.T, protocol uint) {
t.Parallel()
Expand Down Expand Up @@ -293,6 +301,8 @@ func testRecvTransactions(t *testing.T, protocol uint) {

// This test checks that pending transactions are sent.
func TestSendTransactions66(t *testing.T) { testSendTransactions(t, eth.ETH66) }
func TestSendTransactions67(t *testing.T) { testSendTransactions(t, eth.ETH67) }
func TestSendTransactions68(t *testing.T) { testSendTransactions(t, eth.ETH68) }

func testSendTransactions(t *testing.T, protocol uint) {
t.Parallel()
Expand Down Expand Up @@ -351,7 +361,7 @@ func testSendTransactions(t *testing.T, protocol uint) {
seen := make(map[common.Hash]struct{})
for len(seen) < len(insert) {
switch protocol {
case 65, 66:
case 65, 66, 67, 68:
select {
case hashes := <-anns:
for _, hash := range hashes {
Expand All @@ -378,6 +388,8 @@ func testSendTransactions(t *testing.T, protocol uint) {
// Tests that transactions get propagated to all attached peers, either via direct
// broadcasts or via announcements/retrievals.
func TestTransactionPropagation66(t *testing.T) { testTransactionPropagation(t, eth.ETH66) }
func TestTransactionPropagation67(t *testing.T) { testTransactionPropagation(t, eth.ETH67) }
func TestTransactionPropagation68(t *testing.T) { testTransactionPropagation(t, eth.ETH68) }

func testTransactionPropagation(t *testing.T, protocol uint) {
t.Parallel()
Expand Down Expand Up @@ -435,12 +447,13 @@ func testTransactionPropagation(t *testing.T, protocol uint) {

// Iterate through all the sinks and ensure they all got the transactions
for i := range sinks {
for arrived := 0; arrived < len(txs); {
for arrived, timeout := 0, false; arrived < len(txs) && !timeout; {
select {
case event := <-txChs[i]:
arrived += len(event.Txs)
case <-time.NewTimer(time.Second).C:
case <-time.After(time.Second):
t.Errorf("sink %d: transaction propagation timed out: have %d, want %d", i, arrived, len(txs))
timeout = true
}
}
}
Expand Down Expand Up @@ -486,7 +499,6 @@ func TestCheckpointChallenge(t *testing.T) {
}

func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpoint bool, timeout bool, empty bool, match bool, drop bool) {

// Reduce the checkpoint handshake challenge timeout
defer func(old time.Duration) { syncChallengeTimeout = old }(syncChallengeTimeout)
syncChallengeTimeout = 250 * time.Millisecond
Expand Down Expand Up @@ -676,6 +688,8 @@ func testBroadcastBlock(t *testing.T, peers, bcasts int) {
// Tests that a propagated malformed block (uncles or transactions don't match
// with the hashes in the header) gets discarded and not broadcast forward.
func TestBroadcastMalformedBlock66(t *testing.T) { testBroadcastMalformedBlock(t, eth.ETH66) }
func TestBroadcastMalformedBlock67(t *testing.T) { testBroadcastMalformedBlock(t, eth.ETH67) }
func TestBroadcastMalformedBlock68(t *testing.T) { testBroadcastMalformedBlock(t, eth.ETH68) }

func testBroadcastMalformedBlock(t *testing.T, protocol uint) {
t.Parallel()
Expand Down
25 changes: 18 additions & 7 deletions eth/protocols/eth/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,17 @@ func (p *Peer) announceTransactions() {
if done == nil && len(queue) > 0 {
// Pile transaction hashes until we reach our allowed network limit
var (
count int
pending []common.Hash
size common.StorageSize
count int
pending []common.Hash
pendingTypes []byte
pendingSizes []uint32
size common.StorageSize
)
for count = 0; count < len(queue) && size < maxTxPacketSize; count++ {
if p.txpool.Get(queue[count]) != nil {
if tx := p.txpool.Get(queue[count]); tx != nil {
pending = append(pending, queue[count])
pendingTypes = append(pendingTypes, tx.Type())
pendingSizes = append(pendingSizes, uint32(tx.Size()))
size += common.HashLength
}
}
Expand All @@ -159,9 +163,16 @@ func (p *Peer) announceTransactions() {
if len(pending) > 0 {
done = make(chan struct{})
go func() {
if err := p.sendPooledTransactionHashes(pending); err != nil {
fail <- err
return
if p.version >= ETH68 {
if err := p.sendPooledTransactionHashes68(pending, pendingTypes, pendingSizes); err != nil {
fail <- err
return
}
} else {
if err := p.sendPooledTransactionHashes66(pending); err != nil {
fail <- err
return
}
}
close(done)
p.Log().Trace("Sent transaction announcements", "count", len(pending))
Expand Down
43 changes: 38 additions & 5 deletions eth/protocols/eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ type Backend interface {

// TxPool defines the methods needed by the protocol handler to serve transactions.
type TxPool interface {
// Get retrieves the the transaction from the local txpool with the given hash.
// Get retrieves the transaction from the local txpool with the given hash.
Get(hash common.Hash) *types.Transaction
}

Expand Down Expand Up @@ -175,7 +175,7 @@ var eth66 = map[uint64]msgHandler{
NewBlockHashesMsg: handleNewBlockhashes,
NewBlockMsg: handleNewBlock,
TransactionsMsg: handleTransactions,
NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes,
NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes66,
GetBlockHeadersMsg: handleGetBlockHeaders66,
BlockHeadersMsg: handleBlockHeaders66,
GetBlockBodiesMsg: handleGetBlockBodies66,
Expand All @@ -188,6 +188,36 @@ var eth66 = map[uint64]msgHandler{
PooledTransactionsMsg: handlePooledTransactions66,
}

var eth67 = map[uint64]msgHandler{
NewBlockHashesMsg: handleNewBlockhashes,
NewBlockMsg: handleNewBlock,
TransactionsMsg: handleTransactions,
NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes66,
GetBlockHeadersMsg: handleGetBlockHeaders66,
BlockHeadersMsg: handleBlockHeaders66,
GetBlockBodiesMsg: handleGetBlockBodies66,
BlockBodiesMsg: handleBlockBodies66,
GetReceiptsMsg: handleGetReceipts66,
ReceiptsMsg: handleReceipts66,
GetPooledTransactionsMsg: handleGetPooledTransactions66,
PooledTransactionsMsg: handlePooledTransactions66,
}

var eth68 = map[uint64]msgHandler{
NewBlockHashesMsg: handleNewBlockhashes,
NewBlockMsg: handleNewBlock,
TransactionsMsg: handleTransactions,
NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes68,
GetBlockHeadersMsg: handleGetBlockHeaders66,
BlockHeadersMsg: handleBlockHeaders66,
GetBlockBodiesMsg: handleGetBlockBodies66,
BlockBodiesMsg: handleBlockBodies66,
GetReceiptsMsg: handleGetReceipts66,
ReceiptsMsg: handleReceipts66,
GetPooledTransactionsMsg: handleGetPooledTransactions66,
PooledTransactionsMsg: handlePooledTransactions66,
}

// handleMessage is invoked whenever an inbound message is received from a remote
// peer. The remote connection is torn down upon returning any error.
func handleMessage(backend Backend, peer *Peer) error {
Expand All @@ -202,9 +232,12 @@ func handleMessage(backend Backend, peer *Peer) error {
defer msg.Discard()

var handlers = eth66
//if peer.Version() >= ETH67 { // Left in as a sample when new protocol is added
// handlers = eth67
//}
if peer.Version() == ETH67 {
handlers = eth67
}
if peer.Version() >= ETH68 {
handlers = eth68
}

// Track the amount of time it takes to serve the request and run the handler
if metrics.Enabled {
Expand Down
23 changes: 19 additions & 4 deletions eth/protocols/eth/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ func (b *testBackend) Handle(*Peer, Packet) error {

// Tests that block headers can be retrieved from a remote chain based on user queries.
func TestGetBlockHeaders66(t *testing.T) { testGetBlockHeaders(t, ETH66) }
func TestGetBlockHeaders67(t *testing.T) { testGetBlockHeaders(t, ETH67) }
func TestGetBlockHeaders68(t *testing.T) { testGetBlockHeaders(t, ETH68) }

func testGetBlockHeaders(t *testing.T, protocol uint) {
t.Parallel()
Expand Down Expand Up @@ -291,6 +293,8 @@ func testGetBlockHeaders(t *testing.T, protocol uint) {

// Tests that block contents can be retrieved from a remote chain based on their hashes.
func TestGetBlockBodies66(t *testing.T) { testGetBlockBodies(t, ETH66) }
func TestGetBlockBodies67(t *testing.T) { testGetBlockBodies(t, ETH67) }
func TestGetBlockBodies68(t *testing.T) { testGetBlockBodies(t, ETH68) }

func testGetBlockBodies(t *testing.T, protocol uint) {
t.Parallel()
Expand Down Expand Up @@ -373,9 +377,11 @@ func testGetBlockBodies(t *testing.T, protocol uint) {
}

// Tests that the state trie nodes can be retrieved based on hashes.
func TestGetNodeData66(t *testing.T) { testGetNodeData(t, ETH66) }
func TestGetNodeData66(t *testing.T) { testGetNodeData(t, ETH66, false) }
func TestGetNodeData67(t *testing.T) { testGetNodeData(t, ETH67, true) }
func TestGetNodeData68(t *testing.T) { testGetNodeData(t, ETH68, true) }

func testGetNodeData(t *testing.T, protocol uint) {
func testGetNodeData(t *testing.T, protocol uint, drop bool) {
t.Parallel()

// Define three accounts to simulate transactions with
Expand Down Expand Up @@ -436,8 +442,15 @@ func testGetNodeData(t *testing.T, protocol uint) {
GetNodeDataPacket: hashes,
})
msg, err := peer.app.ReadMsg()
if err != nil {
t.Fatalf("failed to read node data response: %v", err)
if !drop {
if err != nil {
t.Fatalf("failed to read node data response: %v", err)
}
} else {
if err != nil {
return
}
t.Fatalf("succeeded to read node data response on non-supporting protocol: %v", msg)
}
if msg.Code != NodeDataMsg {
t.Fatalf("response packet code mismatch: have %x, want %x", msg.Code, NodeDataMsg)
Expand Down Expand Up @@ -483,6 +496,8 @@ func testGetNodeData(t *testing.T, protocol uint) {

// Tests that the transaction receipts can be retrieved based on hashes.
func TestGetBlockReceipts66(t *testing.T) { testGetBlockReceipts(t, ETH66) }
func TestGetBlockReceipts67(t *testing.T) { testGetBlockReceipts(t, ETH67) }
func TestGetBlockReceipts68(t *testing.T) { testGetBlockReceipts(t, ETH68) }

func testGetBlockReceipts(t *testing.T, protocol uint) {
t.Parallel()
Expand Down
24 changes: 22 additions & 2 deletions eth/protocols/eth/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,13 +315,13 @@ func handleReceipts66(backend Backend, msg Decoder, peer *Peer) error {
return backend.Handle(peer, &res.ReceiptsPacket)
}

func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer) error {
func handleNewPooledTransactionHashes66(backend Backend, msg Decoder, peer *Peer) error {
// New transaction announcement arrived, make sure we have
// a valid and fresh chain to handle them
if !backend.AcceptTxs() {
return nil
}
ann := new(NewPooledTransactionHashesPacket)
ann := new(NewPooledTransactionHashesPacket66)
if err := msg.Decode(ann); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
Expand All @@ -332,6 +332,26 @@ func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer)
return backend.Handle(peer, ann)
}

func handleNewPooledTransactionHashes68(backend Backend, msg Decoder, peer *Peer) error {
// New transaction announcement arrived, make sure we have
// a valid and fresh chain to handle them
if !backend.AcceptTxs() {
return nil
}
ann := new(NewPooledTransactionHashesPacket68)
if err := msg.Decode(ann); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
if len(ann.Hashes) != len(ann.Types) || len(ann.Hashes) != len(ann.Sizes) {
return fmt.Errorf("%w: message %v: invalid len of fields: %v %v %v", errDecode, msg, len(ann.Hashes), len(ann.Types), len(ann.Sizes))
}
// Schedule all the unknown hashes for retrieval
for _, hash := range ann.Hashes {
peer.markTransaction(hash)
}
return backend.Handle(peer, ann)
}

func handleGetPooledTransactions66(backend Backend, msg Decoder, peer *Peer) error {
// Decode the pooled transactions retrieval message
var query GetPooledTransactionsPacket66
Expand Down
19 changes: 16 additions & 3 deletions eth/protocols/eth/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,16 +203,29 @@ func (p *Peer) AsyncSendTransactions(hashes []common.Hash) {
}
}

// sendPooledTransactionHashes sends transaction hashes to the peer and includes
// sendPooledTransactionHashes66 sends transaction hashes to the peer and includes
// them in its transaction hash set for future reference.
//
// This method is a helper used by the async transaction announcer. Don't call it
// directly as the queueing (memory) and transmission (bandwidth) costs should
// not be managed directly.
func (p *Peer) sendPooledTransactionHashes(hashes []common.Hash) error {
func (p *Peer) sendPooledTransactionHashes66(hashes []common.Hash) error {
// Mark all the transactions as known, but ensure we don't overflow our limits
p.knownTxs.Add(hashes...)
return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket(hashes))
return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket66(hashes))
}

// sendPooledTransactionHashes68 sends transaction hashes (tagged with their type
// and size) to the peer and includes them in its transaction hash set for future
// reference.
//
// This method is a helper used by the async transaction announcer. Don't call it
// directly as the queueing (memory) and transmission (bandwidth) costs should
// not be managed directly.
func (p *Peer) sendPooledTransactionHashes68(hashes []common.Hash, types []byte, sizes []uint32) error {
// Mark all the transactions as known, but ensure we don't overflow our limits
p.knownTxs.Add(hashes...)
return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket68{Types: types, Sizes: sizes, Hashes: hashes})
}

// AsyncSendPooledTransactionHashes queues a list of transactions hashes to eventually
Expand Down
2 changes: 2 additions & 0 deletions eth/protocols/eth/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ func newTestPeer(name string, version uint, backend Backend) (*testPeer, <-chan
peer := NewPeer(version, p2p.NewPeer(id, name, nil), net, backend.TxPool())
errc := make(chan error, 1)
go func() {
defer app.Close()

errc <- backend.RunPeer(peer, func(peer *Peer) error {
return Handle(backend, peer)
})
Expand Down
Loading

0 comments on commit f633a27

Please sign in to comment.