Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

polygon/p2p: fix p2p fetcher peer errors on chain tip #11927

Merged
merged 11 commits into from
Sep 10, 2024
42 changes: 21 additions & 21 deletions polygon/p2p/fetcher_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,27 +543,27 @@ func newFetcherTest(t *testing.T, requestIdGenerator RequestIdGenerator) *fetche
messageSender := NewMessageSender(sentryClient)
fetcher := newFetcher(fetcherConfig, messageListener, messageSender, requestIdGenerator)
return &fetcherTest{
ctx: ctx,
ctxCancel: cancel,
t: t,
fetcher: fetcher,
logger: logger,
sentryClient: sentryClient,
messageListener: messageListener,
headersRequestResponseMocks: map[uint64]requestResponseMock{},
ctx: ctx,
ctxCancel: cancel,
t: t,
fetcher: fetcher,
logger: logger,
sentryClient: sentryClient,
messageListener: messageListener,
requestResponseMocks: map[uint64]requestResponseMock{},
}
}

type fetcherTest struct {
ctx context.Context
ctxCancel context.CancelFunc
t *testing.T
fetcher *fetcher
logger log.Logger
sentryClient *direct.MockSentryClient
messageListener MessageListener
headersRequestResponseMocks map[uint64]requestResponseMock
peerEvents chan *delayedMessage[*sentryproto.PeerEvent]
ctx context.Context
ctxCancel context.CancelFunc
t *testing.T
fetcher *fetcher
logger log.Logger
sentryClient *direct.MockSentryClient
messageListener MessageListener
requestResponseMocks map[uint64]requestResponseMock
peerEvents chan *delayedMessage[*sentryproto.PeerEvent]
}

func (ft *fetcherTest) run(f func(ctx context.Context, t *testing.T)) {
Expand Down Expand Up @@ -611,7 +611,7 @@ func (ft *fetcherTest) mockSentryInboundMessagesStream(mocks ...requestResponseM
var numInboundMessages int
for _, mock := range mocks {
numInboundMessages += len(mock.mockResponseInboundMessages)
ft.headersRequestResponseMocks[mock.requestId] = mock
ft.requestResponseMocks[mock.requestId] = mock
}

inboundMessageStreamChan := make(chan *delayedMessage[*sentryproto.InboundMessage], numInboundMessages)
Expand Down Expand Up @@ -643,7 +643,7 @@ func (ft *fetcherTest) mockSentryInboundMessagesStream(mocks ...requestResponseM
return nil, err
}

delete(ft.headersRequestResponseMocks, mock.requestId)
delete(ft.requestResponseMocks, mock.requestId)
for _, inboundMessage := range mock.mockResponseInboundMessages {
inboundMessageStreamChan <- &delayedMessage[*sentryproto.InboundMessage]{
message: inboundMessage,
Expand All @@ -668,7 +668,7 @@ func (ft *fetcherTest) mockSendMessageByIdForHeaders(req *sentryproto.SendMessag
return requestResponseMock{}, err
}

mock, ok := ft.headersRequestResponseMocks[pkt.RequestId]
mock, ok := ft.requestResponseMocks[pkt.RequestId]
if !ok {
return requestResponseMock{}, fmt.Errorf("unexpected request id %d", pkt.RequestId)
}
Expand Down Expand Up @@ -699,7 +699,7 @@ func (ft *fetcherTest) mockSendMessageByIdForBodies(req *sentryproto.SendMessage
return requestResponseMock{}, err
}

mock, ok := ft.headersRequestResponseMocks[pkt.RequestId]
mock, ok := ft.requestResponseMocks[pkt.RequestId]
if !ok {
return requestResponseMock{}, fmt.Errorf("unexpected request id %d", pkt.RequestId)
}
Expand Down
42 changes: 35 additions & 7 deletions polygon/p2p/fetcher_penalizing.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,42 @@ func NewPenalizingFetcher(logger log.Logger, fetcher Fetcher, peerPenalizer Peer
}

func newPenalizingFetcher(logger log.Logger, fetcher Fetcher, peerPenalizer PeerPenalizer) *penalizingFetcher {
fetchHeadersPenalizeErrs := []error{
&ErrTooManyHeaders{},
&ErrNonSequentialHeaderNumbers{},
}

fetchBodiesPenalizeErrs := []error{
&ErrTooManyBodies{},
}

fetchBlocksPenalizeErrs := make([]error, 0, len(fetchHeadersPenalizeErrs)+len(fetchBodiesPenalizeErrs))
fetchBlocksPenalizeErrs = append(fetchBlocksPenalizeErrs, fetchHeadersPenalizeErrs...)
fetchBlocksPenalizeErrs = append(fetchBlocksPenalizeErrs, fetchBodiesPenalizeErrs...)

return &penalizingFetcher{
Fetcher: fetcher,
logger: logger,
peerPenalizer: peerPenalizer,
Fetcher: fetcher,
logger: logger,
peerPenalizer: peerPenalizer,
fetchHeadersPenalizeErrs: fetchHeadersPenalizeErrs,
fetchBodiesPenalizeErrs: fetchBodiesPenalizeErrs,
fetchBlocksPenalizeErrs: fetchBlocksPenalizeErrs,
}
}

type penalizingFetcher struct {
Fetcher
logger log.Logger
peerPenalizer PeerPenalizer
logger log.Logger
peerPenalizer PeerPenalizer
fetchHeadersPenalizeErrs []error
fetchBodiesPenalizeErrs []error
fetchBlocksPenalizeErrs []error
}

func (pf *penalizingFetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, peerId *PeerId) (FetcherResponse[[]*types.Header], error) {
headers, err := pf.Fetcher.FetchHeaders(ctx, start, end, peerId)
if err != nil {
return FetcherResponse[[]*types.Header]{}, pf.maybePenalize(ctx, peerId, err, &ErrTooManyHeaders{}, &ErrNonSequentialHeaderNumbers{})
return FetcherResponse[[]*types.Header]{}, pf.maybePenalize(ctx, peerId, err, pf.fetchHeadersPenalizeErrs...)
}

return headers, nil
Expand All @@ -56,12 +75,21 @@ func (pf *penalizingFetcher) FetchHeaders(ctx context.Context, start uint64, end
func (pf *penalizingFetcher) FetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) (FetcherResponse[[]*types.Body], error) {
bodies, err := pf.Fetcher.FetchBodies(ctx, headers, peerId)
if err != nil {
return FetcherResponse[[]*types.Body]{}, pf.maybePenalize(ctx, peerId, err, &ErrTooManyBodies{})
return FetcherResponse[[]*types.Body]{}, pf.maybePenalize(ctx, peerId, err, pf.fetchBodiesPenalizeErrs...)
}

return bodies, nil
}

func (pf *penalizingFetcher) FetchBlocks(ctx context.Context, start uint64, end uint64, peerId *PeerId) (FetcherResponse[[]*types.Block], error) {
blocks, err := pf.Fetcher.FetchBlocks(ctx, start, end, peerId)
if err != nil {
return FetcherResponse[[]*types.Block]{}, pf.maybePenalize(ctx, peerId, err, pf.fetchBlocksPenalizeErrs...)
}

return blocks, nil
}

func (pf *penalizingFetcher) maybePenalize(ctx context.Context, peerId *PeerId, err error, penalizeErrs ...error) error {
var shouldPenalize bool
for _, penalizeErr := range penalizeErrs {
Expand Down
Loading
Loading