From 81798d8080e7162176bceb26523e546f6c4ba6f2 Mon Sep 17 00:00:00 2001 From: Roberto Bayardo Date: Tue, 6 Aug 2024 06:19:21 -0700 Subject: [PATCH] address review feedback --- cmd/devp2p/internal/ethtest/suite.go | 2 +- eth/fetcher/tx_fetcher.go | 72 +++++++++------------------- 2 files changed, 23 insertions(+), 51 deletions(-) diff --git a/cmd/devp2p/internal/ethtest/suite.go b/cmd/devp2p/internal/ethtest/suite.go index 876861f9d881..5cb9fa0297d9 100644 --- a/cmd/devp2p/internal/ethtest/suite.go +++ b/cmd/devp2p/internal/ethtest/suite.go @@ -849,7 +849,7 @@ func (s *Suite) TestBlobViolations(t *utesting.T) { if code, _, err := conn.Read(); err != nil { t.Fatalf("expected disconnect on blob violation, got err: %v", err) } else if code != discMsg { - if code == 24 { + if code == protoOffset(ethProto)+eth.NewPooledTransactionHashesMsg { // sometimes we'll get a blob transaction hashes announcement before the disconnect // because blob transactions are scheduled to be fetched right away. if code, _, err = conn.Read(); err != nil { diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index 6b5409d8809d..fdc1d6b675c2 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -113,11 +113,7 @@ var errTerminated = errors.New("terminated") type txAnnounce struct { origin string // Identifier of the peer originating the notification hashes []common.Hash // Batch of transaction hashes being announced -<<<<<<< HEAD - metas []*txMetadata // Batch of metadata associated with the hashes -======= - metas []txMetadata // Batch of metadatas associated with the hashes ->>>>>>> ddf2fd4314 (- fetch transactions from a peer in the order they were announced to minimize nonce-gaps (which cause blob txs to be rejected)) + metas []txMetadata // Batch of metadata associated with the hashes } // txMetadata provides the extra data transmitted along with the announcement @@ -181,14 +177,14 @@ type TxFetcher struct { // Stage 1: Waiting lists for newly discovered transactions that might be // broadcast without needing explicit request/reply round trips. - waitlist map[common.Hash]map[string]struct{} // Transactions waiting for an potential broadcast - waittime map[common.Hash]mclock.AbsTime // Timestamps when transactions were added to the waitlist - waitslots map[string]map[common.Hash]txMetadata // Waiting announcements grouped by peer (DoS protection) + waitlist map[common.Hash]map[string]struct{} // Transactions waiting for an potential broadcast + waittime map[common.Hash]mclock.AbsTime // Timestamps when transactions were added to the waitlist + waitslots map[string]map[common.Hash]*txMetadata // Waiting announcements grouped by peer (DoS protection) // Stage 2: Queue of transactions that waiting to be allocated to some peer // to be retrieved directly. - announces map[string]map[common.Hash]txMetadata // Set of announced transactions, grouped by origin peer - announced map[common.Hash]map[string]struct{} // Set of download locations, grouped by transaction hash + announces map[string]map[common.Hash]*txMetadata // Set of announced transactions, grouped by origin peer + announced map[common.Hash]map[string]struct{} // Set of download locations, grouped by transaction hash // Stage 3: Set of transactions currently being retrieved, some which may be // fulfilled and some rescheduled. Note, this step shares 'announces' from the @@ -226,8 +222,8 @@ func NewTxFetcherForTests( quit: make(chan struct{}), waitlist: make(map[common.Hash]map[string]struct{}), waittime: make(map[common.Hash]mclock.AbsTime), - waitslots: make(map[string]map[common.Hash]txMetadata), - announces: make(map[string]map[common.Hash]txMetadata), + waitslots: make(map[string]map[common.Hash]*txMetadata), + announces: make(map[string]map[common.Hash]*txMetadata), announced: make(map[common.Hash]map[string]struct{}), fetching: make(map[common.Hash]string), requests: make(map[string]*txRequest), @@ -274,20 +270,7 @@ func (f *TxFetcher) Notify(peer string, types []byte, sizes []uint32, hashes []c underpriced++ default: unknownHashes = append(unknownHashes, hash) -<<<<<<< HEAD - unknownMetas = append(unknownMetas, &txMetadata{kind: types[i], size: sizes[i]}) -======= - if types == nil { - unknownMetas = append(unknownMetas, txMetadata{arrival: f.counter}) - } else { - if sizes[i] == 0 { - // invalid size parameter, return error - return fmt.Errorf("announcement from tx %x had an invalid 0 size metadata", hash) - } - unknownMetas = append(unknownMetas, txMetadata{kind: types[i], size: sizes[i], arrival: f.counter}) - } - f.counter++ ->>>>>>> ddf2fd4314 (- fetch transactions from a peer in the order they were announced to minimize nonce-gaps (which cause blob txs to be rejected)) + unknownMetas = append(unknownMetas, txMetadata{kind: types[i], size: sizes[i]}) } } txAnnounceKnownMeter.Mark(duplicate) @@ -298,7 +281,6 @@ func (f *TxFetcher) Notify(peer string, types []byte, sizes []uint32, hashes []c return nil } announce := &txAnnounce{origin: peer, hashes: unknownHashes, metas: unknownMetas} - f.counter++ select { case f.notify <- announce: return nil @@ -467,9 +449,9 @@ func (f *TxFetcher) loop() { // Stage 2 and 3 share the set of origins per tx if announces := f.announces[ann.origin]; announces != nil { - announces[hash] = ann.metas[i] + announces[hash] = &ann.metas[i] } else { - f.announces[ann.origin] = map[common.Hash]txMetadata{hash: ann.metas[i]} + f.announces[ann.origin] = map[common.Hash]*txMetadata{hash: &ann.metas[i]} } continue } @@ -480,9 +462,9 @@ func (f *TxFetcher) loop() { // Stage 2 and 3 share the set of origins per tx if announces := f.announces[ann.origin]; announces != nil { - announces[hash] = ann.metas[i] + announces[hash] = &ann.metas[i] } else { - f.announces[ann.origin] = map[common.Hash]txMetadata{hash: ann.metas[i]} + f.announces[ann.origin] = map[common.Hash]*txMetadata{hash: &ann.metas[i]} } continue } @@ -495,9 +477,9 @@ func (f *TxFetcher) loop() { if ann.metas[i].kind == types.BlobTxType && f.waitlist[hash] == nil { f.announced[hash] = map[string]struct{}{ann.origin: {}} if announces := f.announces[ann.origin]; announces != nil { - announces[hash] = ann.metas[i] + announces[hash] = &ann.metas[i] } else { - f.announces[ann.origin] = map[common.Hash]txMetadata{hash: ann.metas[i]} + f.announces[ann.origin] = map[common.Hash]*txMetadata{hash: &ann.metas[i]} } f.scheduleFetches(timeoutTimer, timeoutTrigger, map[string]struct{}{ann.origin: {}}) continue @@ -515,9 +497,9 @@ func (f *TxFetcher) loop() { f.waitlist[hash][ann.origin] = struct{}{} if waitslots := f.waitslots[ann.origin]; waitslots != nil { - waitslots[hash] = ann.metas[i] + waitslots[hash] = &ann.metas[i] } else { - f.waitslots[ann.origin] = map[common.Hash]txMetadata{hash: ann.metas[i]} + f.waitslots[ann.origin] = map[common.Hash]*txMetadata{hash: &ann.metas[i]} } continue } @@ -526,9 +508,9 @@ func (f *TxFetcher) loop() { f.waittime[hash] = f.clock.Now() if waitslots := f.waitslots[ann.origin]; waitslots != nil { - waitslots[hash] = ann.metas[i] + waitslots[hash] = &ann.metas[i] } else { - f.waitslots[ann.origin] = map[common.Hash]txMetadata{hash: ann.metas[i]} + f.waitslots[ann.origin] = map[common.Hash]*txMetadata{hash: &ann.metas[i]} } } // If a new item was added to the waitlist, schedule it into the fetcher @@ -556,7 +538,7 @@ func (f *TxFetcher) loop() { if announces := f.announces[peer]; announces != nil { announces[hash] = f.waitslots[peer][hash] } else { - f.announces[peer] = map[common.Hash]txMetadata{hash: f.waitslots[peer][hash]} + f.announces[peer] = map[common.Hash]*txMetadata{hash: f.waitslots[peer][hash]} } delete(f.waitslots[peer], hash) if len(f.waitslots[peer]) == 0 { @@ -932,18 +914,8 @@ func (f *TxFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{}, if len(hashes) >= maxTxRetrievals { return false // break in the for-each } -<<<<<<< HEAD bytes += uint64(meta.size) return bytes < maxTxRetrievalSize -======= - if meta.size != 0 { // Only set eth/68 and upwards - bytes += uint64(meta.size) - if bytes >= maxTxRetrievalSize { - return false - } - } - return true // scheduled, try to add more ->>>>>>> ddf2fd4314 (- fetch transactions from a peer in the order they were announced to minimize nonce-gaps (which cause blob txs to be rejected)) }) // If any hashes were allocated, request them from the peer if len(hashes) > 0 { @@ -992,7 +964,7 @@ func (f *TxFetcher) forEachPeer(peers map[string]struct{}, do func(peer string)) // the do function for each until it returns false. We enforce an arrival // ordering to minimize the chances of mempool nonce-gaps, which result in blob // transactions being rejected by the mempool. -func (f *TxFetcher) forEachAnnounce(announces map[common.Hash]txMetadata, do func(hash common.Hash, meta txMetadata) bool) { +func (f *TxFetcher) forEachAnnounce(announces map[common.Hash]*txMetadata, do func(hash common.Hash, meta txMetadata) bool) { type announcement struct { hash common.Hash meta txMetadata @@ -1000,7 +972,7 @@ func (f *TxFetcher) forEachAnnounce(announces map[common.Hash]txMetadata, do fun // process announcements by their arrival order list := make([]announcement, 0, len(announces)) for hash, metadata := range announces { - list = append(list, announcement{hash: hash, meta: metadata}) + list = append(list, announcement{hash: hash, meta: *metadata}) } sort.Slice(list, func(i, j int) bool { return list[i].meta.arrival < list[j].meta.arrival