Skip to content

Commit

Permalink
[BCF-3269] - Contract Reader Batch Call (#13635)
Browse files Browse the repository at this point in the history
* Chain Reader Batch Call inital commit

* Add batchCaller tests

* Minor code improvement for ChainReader addDecoderDef and addEncoderDef

* Use common types for Chain Reader BatchGetLatestValue

* CR BatchGetLatestValue fixes and setup evm chain reader tester

* Run lint, generate and update common ref

* Add changeset

* Update common ref, rename BatchGetLatestValue to BatchGetLatestValues

* Import new common changes

* Return an error if BatchGetLatestValues received an event in the req

* Bump common

* fix minor merge issues

* Add common codec mock for batch_caller_test

* Bump feeds

* Bump Solana
  • Loading branch information
ilija42 authored Jul 12, 2024
1 parent 51225f8 commit 055a9d2
Show file tree
Hide file tree
Showing 18 changed files with 912 additions and 98 deletions.
5 changes: 5 additions & 0 deletions .changeset/pink-papayas-swim.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#internal Add BatchGetLatestValues to ChainReader
30 changes: 30 additions & 0 deletions core/capabilities/targets/mocks/chain_reader.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/prometheus/client_golang v1.17.0
github.com/shopspring/decimal v1.3.1
github.com/smartcontractkit/chainlink-automation v1.0.4
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240710165532-ade916a95858
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240712101200-5b11e6cc6e86
github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000
github.com/smartcontractkit/libocr v0.0.0-20240702141926-063ceef8c42e
github.com/spf13/cobra v1.8.0
Expand Down Expand Up @@ -273,7 +273,7 @@ require (
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240710121324-3ed288aa9b45 // indirect
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240702145022-37a2c3a742d1 // indirect
github.com/smartcontractkit/chainlink-feeds v0.0.0-20240710170203-5b41615da827 // indirect
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20240710170818-eccca28888e5 // indirect
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20240712132946-267a37c5ac6e // indirect
github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20240709043547-03612098f799 // indirect
github.com/smartcontractkit/tdh2/go/ocr2/decryptionplugin v0.0.0-20230906073235-9e478e5e19f1 // indirect
github.com/smartcontractkit/tdh2/go/tdh2 v0.0.0-20230906073235-9e478e5e19f1 // indirect
Expand Down
8 changes: 4 additions & 4 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1178,16 +1178,16 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq
github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE=
github.com/smartcontractkit/chainlink-automation v1.0.4 h1:iyW181JjKHLNMnDleI8umfIfVVlwC7+n5izbLSFgjw8=
github.com/smartcontractkit/chainlink-automation v1.0.4/go.mod h1:u4NbPZKJ5XiayfKHD/v3z3iflQWqvtdhj13jVZXj/cM=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240710165532-ade916a95858 h1:nwAe0iA4JN7/oEFz/N2lkTpNh6rxlzbK7g8Els/dDew=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240710165532-ade916a95858/go.mod h1:fh9eBbrReCmv31bfz52ENCAMa7nTKQbdhb2B3+S2VGo=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240712101200-5b11e6cc6e86 h1:TYALsn6Jue7xCIcXMel+Ow0SuudVfOUAz6iups946Yw=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240712101200-5b11e6cc6e86/go.mod h1:fh9eBbrReCmv31bfz52ENCAMa7nTKQbdhb2B3+S2VGo=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240710121324-3ed288aa9b45 h1:NBQLtqk8zsyY4qTJs+NElI3aDFTcAo83JHvqD04EvB0=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240710121324-3ed288aa9b45/go.mod h1:LV0h7QBQUpoC2UUi6TcUvcIFm1xjP/DtEcqV8+qeLUs=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240702145022-37a2c3a742d1 h1:dsTmitRaVizHxoYFoGz4+y/zVa8XnvKUiTaZdx+6t9M=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240702145022-37a2c3a742d1/go.mod h1:6DgCnHMGdBaIh0bLs1dK0MtdeMZfeNhc/nvBUN6KIUg=
github.com/smartcontractkit/chainlink-feeds v0.0.0-20240710170203-5b41615da827 h1:BCHu4pNP6arrcHLEWx61XjLaonOd2coQNyL0NTUcaMc=
github.com/smartcontractkit/chainlink-feeds v0.0.0-20240710170203-5b41615da827/go.mod h1:OPX+wC2TWQsyLNpR7daMt2vMpmsNcoBxbZyGTHr6tiA=
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20240710170818-eccca28888e5 h1:gktRCdvNp0tczyqb79JaQOloa/elDS6t33qjAS9SrEU=
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20240710170818-eccca28888e5/go.mod h1:aJUY4hdo1g942mhlPX9Z4FWe5ldEyWvsWSNf7frh7yU=
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20240712132946-267a37c5ac6e h1:PzwzlHNv1YbJ6ZIdl/pIFRoOuOS4V4WLvjZvFUnZFL4=
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20240712132946-267a37c5ac6e/go.mod h1:hsFhop+SlQHKD+DEFjZrMJmbauT1A/wvtZIeeo4PxFU=
github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20240709043547-03612098f799 h1:HyLTySm7BR+oNfZqDTkVJ25wnmcTtxBBD31UkFL+kEM=
github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20240709043547-03612098f799/go.mod h1:UVFRacRkP7O7TQAzFmR52v5mUlxf+G1ovMlCQAB/cHU=
github.com/smartcontractkit/go-plugin v0.0.0-20240208201424-b3b91517de16 h1:TFe+FvzxClblt6qRfqEhUfa4kFQx5UobuoFGO2W4mMo=
Expand Down
312 changes: 312 additions & 0 deletions core/services/relay/evm/batch_caller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,312 @@
package evm

import (
"context"
"fmt"
"math/big"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/rpc"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"

"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
"github.com/smartcontractkit/chainlink/v2/core/logger"
)

var errEmptyOutput = errors.New("rpc call output is empty (make sure that the contract method exists and rpc is healthy)")

const (
// DefaultRpcBatchSizeLimit defines the maximum number of rpc requests to be included in a batch.
DefaultRpcBatchSizeLimit = 100

// DefaultRpcBatchBackOffMultiplier defines the rate of reducing the batch size limit for retried calls.
// For example if limit is 20 and multiplier is 4:
// 1. 20
// 2. 20/4 = 5
// 3. 5/4 = 1
DefaultRpcBatchBackOffMultiplier = 5

// DefaultMaxParallelRpcCalls defines the default maximum number of individual in-parallel rpc calls.
DefaultMaxParallelRpcCalls = 10
)

// BatchResult is organised by contracts names, key is contract name.
type BatchResult map[string]ContractResults
type ContractResults []MethodCallResult
type MethodCallResult struct {
MethodName string
ReturnValue any
Err error
}

type BatchCall []Call
type Call struct {
ContractAddress common.Address
ContractName, MethodName string
Params, ReturnVal any
}

func (c BatchCall) String() string {
callString := ""
for _, call := range c {
callString += fmt.Sprintf("%s\n", call.String())
}
return callString
}

// Implement the String method for the Call struct
func (c Call) String() string {
return fmt.Sprintf("contractAddress: %s, contractName: %s, method: %s, params: %+v returnValType: %T",
c.ContractAddress.Hex(), c.ContractName, c.MethodName, c.Params, c.ReturnVal)
}

//go:generate mockery --quiet --name BatchCaller --output ./rpclibmocks --outpkg rpclibmocks --filename batch_caller.go --case=underscore
type BatchCaller interface {
// BatchCall executes all the provided BatchRequest and returns the results in the same order
// of the calls. Pass blockNumber=0 to use the latest block.
BatchCall(ctx context.Context, blockNumber uint64, batchRequests BatchCall) (BatchResult, error)
}

// dynamicLimitedBatchCaller makes batched rpc calls and perform retries by reducing the batch size on each retry.
type dynamicLimitedBatchCaller struct {
bc *defaultEvmBatchCaller
}

func NewDynamicLimitedBatchCaller(lggr logger.Logger, codec types.Codec, evmClient client.Client, batchSizeLimit, backOffMultiplier, parallelRpcCallsLimit uint) BatchCaller {
return &dynamicLimitedBatchCaller{
bc: newDefaultEvmBatchCaller(lggr, evmClient, codec, batchSizeLimit, backOffMultiplier, parallelRpcCallsLimit),
}
}

func (c *dynamicLimitedBatchCaller) BatchCall(ctx context.Context, blockNumber uint64, reqs BatchCall) (BatchResult, error) {
return c.bc.batchCallDynamicLimitRetries(ctx, blockNumber, reqs)
}

type defaultEvmBatchCaller struct {
lggr logger.Logger
evmClient client.Client
codec types.Codec
batchSizeLimit uint
parallelRpcCallsLimit uint
backOffMultiplier uint
}

// NewDefaultEvmBatchCaller returns a new batch caller instance.
// batchCallLimit defines the maximum number of calls for BatchCallLimit method, pass 0 to keep the default.
// backOffMultiplier defines the back-off strategy for retries on BatchCallDynamicLimitRetries method, pass 0 to keep the default.
func newDefaultEvmBatchCaller(
lggr logger.Logger, evmClient client.Client, codec types.Codec, batchSizeLimit, backOffMultiplier, parallelRpcCallsLimit uint,
) *defaultEvmBatchCaller {
batchSize := uint(DefaultRpcBatchSizeLimit)
if batchSizeLimit > 0 {
batchSize = batchSizeLimit
}

multiplier := uint(DefaultRpcBatchBackOffMultiplier)
if backOffMultiplier > 0 {
multiplier = backOffMultiplier
}

parallelRpcCalls := uint(DefaultMaxParallelRpcCalls)
if parallelRpcCallsLimit > 0 {
parallelRpcCalls = parallelRpcCallsLimit
}

return &defaultEvmBatchCaller{
lggr: lggr,
evmClient: evmClient,
codec: codec,
batchSizeLimit: batchSize,
parallelRpcCallsLimit: parallelRpcCalls,
backOffMultiplier: multiplier,
}
}

func (c *defaultEvmBatchCaller) batchCall(ctx context.Context, blockNumber uint64, batchCall BatchCall) ([]dataAndErr, error) {
if len(batchCall) == 0 {
return nil, nil
}

packedOutputs := make([]string, len(batchCall))
rpcBatchCalls := make([]rpc.BatchElem, len(batchCall))
for i, call := range batchCall {
data, err := c.codec.Encode(ctx, call.Params, WrapItemType(call.ContractName, call.MethodName, true))
if err != nil {
return nil, err
}

blockNumStr := "latest"
if blockNumber > 0 {
blockNumStr = hexutil.EncodeBig(big.NewInt(0).SetUint64(blockNumber))
}

rpcBatchCalls[i] = rpc.BatchElem{
Method: "eth_call",
Args: []any{
map[string]interface{}{
"from": common.Address{},
"to": call.ContractAddress,
"data": data,
},
blockNumStr,
},
Result: &packedOutputs[i],
}
}

if err := c.evmClient.BatchCallContext(ctx, rpcBatchCalls); err != nil {
return nil, fmt.Errorf("batch call context: %w", err)
}

results := make([]dataAndErr, len(batchCall))
for i, call := range batchCall {
results[i] = dataAndErr{
contractName: call.ContractName,
methodName: call.MethodName,
returnVal: call.ReturnVal,
}

if rpcBatchCalls[i].Error != nil {
results[i].err = rpcBatchCalls[i].Error
continue
}

if packedOutputs[i] == "" {
// Some RPCs instead of returning "0x" are returning an empty string.
// We are overriding this behaviour for consistent handling of this scenario.
packedOutputs[i] = "0x"
}

b, err := hexutil.Decode(packedOutputs[i])
if err != nil {
return nil, fmt.Errorf("decode result %s: packedOutputs %s: %w", call, packedOutputs[i], err)
}

if err = c.codec.Decode(ctx, b, call.ReturnVal, WrapItemType(call.ContractName, call.MethodName, false)); err != nil {
if len(b) == 0 {
results[i].err = fmt.Errorf("unpack result %s: %s: %w", call, err.Error(), errEmptyOutput)
} else {
results[i].err = fmt.Errorf("unpack result %s: %w", call, err)
}
continue
}
results[i].returnVal = call.ReturnVal
}

return results, nil
}

func (c *defaultEvmBatchCaller) batchCallDynamicLimitRetries(ctx context.Context, blockNumber uint64, calls BatchCall) (BatchResult, error) {
lim := c.batchSizeLimit
// Limit the batch size to the number of calls
if uint(len(calls)) < lim {
lim = uint(len(calls))
}
for {
results, err := c.batchCallLimit(ctx, blockNumber, calls, lim)
if err == nil {
return results, nil
}

if lim <= 1 {
return nil, errors.Wrapf(err, "calls %+v", calls)
}

newLim := lim / c.backOffMultiplier
if newLim == 0 || newLim == lim {
newLim = 1
}
lim = newLim
c.lggr.Errorf("retrying batch call with %d calls and %d limit that failed with error=%s",
len(calls), lim, err)
}
}

type dataAndErr struct {
contractName, methodName string
returnVal any
err error
}

func (c *defaultEvmBatchCaller) batchCallLimit(ctx context.Context, blockNumber uint64, calls BatchCall, batchSizeLimit uint) (BatchResult, error) {
if batchSizeLimit <= 0 {
res, err := c.batchCall(ctx, blockNumber, calls)
return convertToBatchResult(res), err
}

type job struct {
blockNumber uint64
calls BatchCall
results []dataAndErr
}

jobs := make([]job, 0)
for i := 0; i < len(calls); i += int(batchSizeLimit) {
idxFrom := i
idxTo := idxFrom + int(batchSizeLimit)
if idxTo > len(calls) {
idxTo = len(calls)
}
jobs = append(jobs, job{blockNumber: blockNumber, calls: calls[idxFrom:idxTo], results: nil})
}

if c.parallelRpcCallsLimit > 1 {
eg := new(errgroup.Group)
eg.SetLimit(int(c.parallelRpcCallsLimit))
for jobIdx := range jobs {
jobIdx := jobIdx
eg.Go(func() error {
res, err := c.batchCall(ctx, jobs[jobIdx].blockNumber, jobs[jobIdx].calls)
if err != nil {
return err
}
jobs[jobIdx].results = res
return nil
})
}
if err := eg.Wait(); err != nil {
return nil, err
}
} else {
var err error
for jobIdx := range jobs {
jobs[jobIdx].results, err = c.batchCall(ctx, jobs[jobIdx].blockNumber, jobs[jobIdx].calls)
if err != nil {
return nil, err
}
}
}

var results []dataAndErr
for _, jb := range jobs {
results = append(results, jb.results...)
}

return convertToBatchResult(results), nil
}

func convertToBatchResult(data []dataAndErr) BatchResult {
if data == nil {
return nil
}

batchResult := make(BatchResult)
for _, d := range data {
methodCall := MethodCallResult{
MethodName: d.methodName,
ReturnValue: d.returnVal,
Err: d.err,
}

if _, exists := batchResult[d.contractName]; !exists {
batchResult[d.contractName] = ContractResults{}
}

batchResult[d.contractName] = append(batchResult[d.contractName], methodCall)
}

return batchResult
}
Loading

0 comments on commit 055a9d2

Please sign in to comment.