Skip to content

Commit

Permalink
[BCF-3250]: Fix FilterLog error handling in LogPoller (#11654)
Browse files Browse the repository at this point in the history
* Fix error handling of results from FilterLogs()

This was logging a critical error when any error other than an rpc error happened (eg networking issue, or context timeout) when batch_size was = 1.
Should have only been logging at that level for rpc error "Limit Exceeded"

More specifically, there are 4 interrelated issues addressed in this PR:

1. The logic for whether to retry or reduce batch size was wrong--so it was retrying with reduced batch size unnecessariy for transient errors
2. The error was being matched against a concrete JsonError type which is very fragile due to the number of types the type gets wrapped and re-formatted
   while it's propagated up the stack from geth through different layers.  (It may have matched only in simulated geth but not with a live rpc server).
   Now it's matched against rpc.Error interface defined in geth for this purpose, which should work more generally.
3. There was a bug in the test for this feature (related to pointer indirection) which caused a false positive PASS
4. In addition to the rate limiting error returned by infura, alchemy, etc. I've added a similar error code geth can return when the request size is too large

Also:

A new subtest has been added to make sure that unrelated errors do not log this error message

* pnpm changeset

* Fix whitespace for lint

* Add ErrorData() implementation to satisfy rpc.DataError

This will make our JsonError fully parallel with jsonError, so they can
both be treated in the same way.

* Add more sophisticated classificaiton of FilterLogs error codes

There are a lot of different error codes and messages which can be
returned depending on what type of rpc server it is. Classification has been
expanded to include both the codes and regex matching of the messages,
and moved into client/errors.go where a similar process happens to disambiguate
SendTx error

* Add TooManyResults to docs example & full-config.toml

* Add tests for IsTooManyResults()
  • Loading branch information
reductionista authored Aug 27, 2024
1 parent 7a41ae7 commit bf2b72d
Show file tree
Hide file tree
Showing 17 changed files with 331 additions and 74 deletions.
5 changes: 5 additions & 0 deletions .changeset/poor-pumas-occur.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#bugfix More robust error handling in LogPoller, including no more misleading CRITICAL errors emitted under non-critical conditions
105 changes: 104 additions & 1 deletion core/chains/evm/client/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rpc"
pkgerrors "github.com/pkg/errors"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
Expand Down Expand Up @@ -62,6 +63,7 @@ const (
Fatal
ServiceUnavailable
TerminallyStuck
TooManyResults
)

type ClientErrors map[int]*regexp.Regexp
Expand Down Expand Up @@ -298,6 +300,7 @@ func ClientErrorRegexes(errsRegex config.ClientErrors) *ClientErrors {
TransactionAlreadyMined: regexp.MustCompile(errsRegex.TransactionAlreadyMined()),
Fatal: regexp.MustCompile(errsRegex.Fatal()),
ServiceUnavailable: regexp.MustCompile(errsRegex.ServiceUnavailable()),
TooManyResults: regexp.MustCompile(errsRegex.TooManyResults()),
}
}

Expand Down Expand Up @@ -457,6 +460,11 @@ func isFatalSendError(err error) bool {
return false
}

var (
_ rpc.Error = JsonError{}
_ rpc.DataError = JsonError{}
)

// [email protected]/rpc/json.go
type JsonError struct {
Code int `json:"code"`
Expand All @@ -471,7 +479,17 @@ func (err JsonError) Error() string {
return err.Message
}

func (err *JsonError) String() string {
// To satisfy rpc.Error interface
func (err JsonError) ErrorCode() int {
return err.Code
}

// To satisfy rpc.DataError
func (err JsonError) ErrorData() interface{} {
return err.Data
}

func (err JsonError) String() string {
return fmt.Sprintf("json-rpc error { Code = %d, Message = '%s', Data = '%v' }", err.Code, err.Message, err.Data)
}

Expand Down Expand Up @@ -610,3 +628,88 @@ func ClassifySendError(err error, clientErrors config.ClientErrors, lggr logger.
lggr.Criticalw("Unknown error encountered when sending transaction", "err", err, "etx", tx)
return commonclient.Unknown
}

var infura = ClientErrors{
TooManyResults: regexp.MustCompile(`(: |^)query returned more than [0-9]+ results. Try with this block range \[0x[0-9A-F]+, 0x[0-9A-F]+\].$`),
}

var alchemy = ClientErrors{
TooManyResults: regexp.MustCompile(`(: |^)Log response size exceeded. You can make eth_getLogs requests with up to a [0-9A-Z]+ block range and no limit on the response size, or you can request any block range with a cap of [0-9A-Z]+ logs in the response. Based on your parameters and the response size limit, this block range should work: \[0x[0-9a-f]+, 0x[0-9a-f]+\]$`),
}

var quicknode = ClientErrors{
TooManyResults: regexp.MustCompile(`(: |^)eth_getLogs is limited to a [0-9,]+ range$`),
}

var simplyvc = ClientErrors{
TooManyResults: regexp.MustCompile(`too wide blocks range, the limit is [0-9,]+$`),
}

var drpc = ClientErrors{
TooManyResults: regexp.MustCompile(`(: |^)requested too many blocks from [0-9]+ to [0-9]+, maximum is set to [0-9,]+$`),
}

// Linkpool, Blockdaemon, and Chainstack all return "request timed out" if the log results are too large for them to process
var defaultClient = ClientErrors{
TooManyResults: regexp.MustCompile(`request timed out`),
}

// JSON-RPC error codes which can indicate a refusal of the server to process an eth_getLogs request because the result set is too large
const (
jsonRpcServerError = -32000 // Server error. SimplyVC uses this error code when too many results are returned

// Server timeout. When the rpc server has its own limit on how long it can take to compile the results
// Examples: Linkpool, Chainstack, Block Daemon
jsonRpcTimedOut = -32002

// See: https://github.com/ethereum/go-ethereum/blob/master/rpc/errors.go#L63
// Can occur if the rpc server is configured with a maximum byte limit on the response size of batch requests
jsonRpcResponseTooLarge = -32003

// Not implemented in geth by default, but is defined in EIP 1474 and implemented by infura and some other 3rd party rpc servers
// See: https://community.infura.io/t/getlogs-error-query-returned-more-than-1000-results/358/5
jsonRpcLimitExceeded = -32005 // See also: https://github.com/ethereum/EIPs/blob/master/EIPS/eip-1474.md

jsonRpcInvalidParams = -32602 // Invalid method params. Returned by alchemy if the block range is too large or there are too many results to return

jsonRpcQuicknodeTooManyResults = -32614 // Undocumented error code used by Quicknode for too many results error
)

func IsTooManyResults(err error, clientErrors config.ClientErrors) bool {
var rpcErr rpc.Error

if !pkgerrors.As(err, &rpcErr) {
return false
}
configErrors := ClientErrorRegexes(clientErrors)
if configErrors.ErrIs(rpcErr, TooManyResults) {
return true
}

switch rpcErr.ErrorCode() {
case jsonRpcResponseTooLarge:
return true
case jsonRpcLimitExceeded:
if infura.ErrIs(rpcErr, TooManyResults) {
return true
}
case jsonRpcInvalidParams:
if alchemy.ErrIs(rpcErr, TooManyResults) {
return true
}
case jsonRpcQuicknodeTooManyResults:
if quicknode.ErrIs(rpcErr, TooManyResults) {
return true
}
case jsonRpcTimedOut:
if defaultClient.ErrIs(rpcErr, TooManyResults) {
return true
}
case jsonRpcServerError:
if simplyvc.ErrIs(rpcErr, TooManyResults) ||
drpc.ErrIs(rpcErr, TooManyResults) {
return true
}
}
return false
}
85 changes: 85 additions & 0 deletions core/chains/evm/client/errors_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package client_test

import (
"encoding/json"
"errors"
"fmt"
"testing"

pkgerrors "github.com/pkg/errors"
Expand Down Expand Up @@ -439,3 +441,86 @@ func Test_Config_Errors(t *testing.T) {
assert.False(t, clientErrors.ErrIs(errors.New("some old bollocks"), evmclient.NonceTooLow))
})
}

func Test_IsTooManyResultsError(t *testing.T) {
customErrors := evmclient.NewTestClientErrors()

tests := []errorCase{
{`{
"code":-32602,
"message":"Log response size exceeded. You can make eth_getLogs requests with up to a 2K block range and no limit on the response size, or you can request any block range with a cap of 10K logs in the response. Based on your parameters and the response size limit, this block range should work: [0x0, 0x133e71]"}`,
true,
"alchemy",
}, {`{
"code":-32005,
"data":{"from":"0xCB3D","limit":10000,"to":"0x7B737"},
"message":"query returned more than 10000 results. Try with this block range [0xCB3D, 0x7B737]."}`,
true,
"infura",
}, {`{
"code":-32002,
"message":"request timed out"}`,
true,
"LinkPool-Blockdaemon-Chainstack",
}, {`{
"code":-32614,
"message":"eth_getLogs is limited to a 10,000 range"}`,
true,
"Quicknode",
}, {`{
"code":-32000,
"message":"too wide blocks range, the limit is 100"}`,
true,
"SimplyVC",
}, {`{
"message":"requested too many blocks from 0 to 16777216, maximum is set to 2048",
"code":-32000}`,
true,
"Drpc",
}, {`
<!DOCTYPE html>
<html>
<head>
<title>503 Backend fetch failed</title>
</head>
<body>
<h1>Error 503 Backend fetch failed</h1>
<p>Backend fetch failed</p>
<h3>Guru Meditation:</h3>
<p>XID: 343710611</p>
<hr>
<p>Varnish cache server</p>
</body>
</html>`,
false,
"Nirvana Labs"}, // This isn't an error response we can handle, but including for completeness. },

{`{
"code":-32000",
"message":"unrelated server error"}`,
false,
"any",
}, {`{
"code":-32500,
"message":"unrelated error code"}`,
false,
"any2",
}, {fmt.Sprintf(`{
"code" : -43106,
"message" : "%s"}`, customErrors.TooManyResults()),
true,
"custom chain with error specified in toml config",
},
}

for _, test := range tests {
t.Run(test.network, func(t *testing.T) {
jsonRpcErr := evmclient.JsonError{}
err := json.Unmarshal([]byte(test.message), &jsonRpcErr)
if err == nil {
err = jsonRpcErr
}
assert.Equal(t, test.expect, evmclient.IsTooManyResults(err, &customErrors))
})
}
}
3 changes: 3 additions & 0 deletions core/chains/evm/client/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type TestClientErrors struct {
transactionAlreadyMined string
fatal string
serviceUnavailable string
tooManyResults string
}

func NewTestClientErrors() TestClientErrors {
Expand All @@ -52,6 +53,7 @@ func NewTestClientErrors() TestClientErrors {
transactionAlreadyMined: "client error transaction already mined",
fatal: "client error fatal",
serviceUnavailable: "client error service unavailable",
tooManyResults: "client error too many results",
}
}

Expand All @@ -77,6 +79,7 @@ func (c *TestClientErrors) L2Full() string { return c.l2Full }
func (c *TestClientErrors) TransactionAlreadyMined() string { return c.transactionAlreadyMined }
func (c *TestClientErrors) Fatal() string { return c.fatal }
func (c *TestClientErrors) ServiceUnavailable() string { return c.serviceUnavailable }
func (c *TestClientErrors) TooManyResults() string { return c.serviceUnavailable }

type TestNodePoolConfig struct {
NodePollFailureThreshold uint32
Expand Down
4 changes: 2 additions & 2 deletions core/chains/evm/client/simulated_backend_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ func (c *SimulatedBackendClient) CallContract(ctx context.Context, msg ethereum.
res, err := c.b.CallContract(ctx, msg, blockNumber)
if err != nil {
dataErr := revertError{}
if errors.Is(err, &dataErr) {
if errors.As(err, &dataErr) {
return nil, &JsonError{Data: dataErr.ErrorData(), Message: dataErr.Error(), Code: 3}
}
// Generic revert, no data
Expand All @@ -434,7 +434,7 @@ func (c *SimulatedBackendClient) PendingCallContract(ctx context.Context, msg et
res, err := c.b.PendingCallContract(ctx, msg)
if err != nil {
dataErr := revertError{}
if errors.Is(err, &dataErr) {
if errors.As(err, &dataErr) {
return nil, &JsonError{Data: dataErr.ErrorData(), Message: dataErr.Error(), Code: 3}
}
// Generic revert, no data
Expand Down
1 change: 1 addition & 0 deletions core/chains/evm/config/chain_scoped_client_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,4 @@ func (c *clientErrorsConfig) Fatal() string { return derefOrDefault(c.c.Fatal) }
func (c *clientErrorsConfig) ServiceUnavailable() string {
return derefOrDefault(c.c.ServiceUnavailable)
}
func (c *clientErrorsConfig) TooManyResults() string { return derefOrDefault(c.c.TooManyResults) }
1 change: 1 addition & 0 deletions core/chains/evm/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ type ClientErrors interface {
TransactionAlreadyMined() string
Fatal() string
ServiceUnavailable() string
TooManyResults() string
}

type Transactions interface {
Expand Down
2 changes: 2 additions & 0 deletions core/chains/evm/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ func TestClientErrorsConfig(t *testing.T) {
TransactionAlreadyMined: ptr("client error transaction already mined"),
Fatal: ptr("client error fatal"),
ServiceUnavailable: ptr("client error service unavailable"),
TooManyResults: ptr("client error too many results"),
},
}
})
Expand All @@ -372,6 +373,7 @@ func TestClientErrorsConfig(t *testing.T) {
assert.Equal(t, "client error transaction already mined", errors.TransactionAlreadyMined())
assert.Equal(t, "client error fatal", errors.Fatal())
assert.Equal(t, "client error service unavailable", errors.ServiceUnavailable())
assert.Equal(t, "client error too many results", errors.TooManyResults())
})
}

Expand Down
4 changes: 4 additions & 0 deletions core/chains/evm/config/toml/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,7 @@ type ClientErrors struct {
TransactionAlreadyMined *string `toml:",omitempty"`
Fatal *string `toml:",omitempty"`
ServiceUnavailable *string `toml:",omitempty"`
TooManyResults *string `toml:",omitempty"`
}

func (r *ClientErrors) setFrom(f *ClientErrors) bool {
Expand Down Expand Up @@ -852,6 +853,9 @@ func (r *ClientErrors) setFrom(f *ClientErrors) bool {
if v := f.ServiceUnavailable; v != nil {
r.ServiceUnavailable = v
}
if v := f.TooManyResults; v != nil {
r.TooManyResults = v
}
return true
}

Expand Down
16 changes: 8 additions & 8 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/utils/mathutil"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/config"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
)
Expand Down Expand Up @@ -113,6 +114,7 @@ type logPoller struct {
backfillBatchSize int64 // batch size to use when backfilling finalized logs
rpcBatchSize int64 // batch size to use for fallback RPC calls made in GetBlocks
logPrunePageSize int64
clientErrors config.ClientErrors
backupPollerNextBlock int64 // next block to be processed by Backup LogPoller
backupPollerBlockDelay int64 // how far behind regular LogPoller should BackupLogPoller run. 0 = disabled

Expand Down Expand Up @@ -143,6 +145,7 @@ type Opts struct {
KeepFinalizedBlocksDepth int64
BackupPollerBlockDelay int64
LogPrunePageSize int64
ClientErrors config.ClientErrors
}

// NewLogPoller creates a log poller. Note there is an assumption
Expand Down Expand Up @@ -172,6 +175,7 @@ func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, headTracker HeadTracke
rpcBatchSize: opts.RpcBatchSize,
keepFinalizedBlocksDepth: opts.KeepFinalizedBlocksDepth,
logPrunePageSize: opts.LogPrunePageSize,
clientErrors: opts.ClientErrors,
filters: make(map[string]Filter),
filterDirty: true, // Always build Filter on first call to cache an empty filter if nothing registered yet.
finalityViolated: new(atomic.Bool),
Expand Down Expand Up @@ -794,8 +798,6 @@ func (lp *logPoller) blocksFromLogs(ctx context.Context, logs []types.Log, endBl
return lp.GetBlocksRange(ctx, numbers)
}

const jsonRpcLimitExceeded = -32005 // See https://github.com/ethereum/EIPs/blob/master/EIPS/eip-1474.md

// backfill will query FilterLogs in batches for logs in the
// block range [start, end] and save them to the db.
// Retries until ctx cancelled. Will return an error if cancelled
Expand All @@ -807,13 +809,11 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error {

gethLogs, err := lp.ec.FilterLogs(ctx, lp.Filter(big.NewInt(from), big.NewInt(to), nil))
if err != nil {
var rpcErr client.JsonError
if pkgerrors.As(err, &rpcErr) {
if rpcErr.Code != jsonRpcLimitExceeded {
lp.lggr.Errorw("Unable to query for logs", "err", err, "from", from, "to", to)
return err
}
if !client.IsTooManyResults(err, lp.clientErrors) {
lp.lggr.Errorw("Unable to query for logs", "err", err, "from", from, "to", to)
return err
}

if batchSize == 1 {
lp.lggr.Criticalw("Too many log results in a single block, failed to retrieve logs! Node may be running in a degraded state.", "err", err, "from", from, "to", to, "LogBackfillBatchSize", lp.backfillBatchSize)
return err
Expand Down
Loading

0 comments on commit bf2b72d

Please sign in to comment.