Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configurable Mercury transmitter parameters #12680

Merged
merged 5 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .changeset/sour-jars-cross.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
"chainlink": patch
---

#added

Add configurability to mercury transmitter

```toml
[Mercury.Transmitter]
TransmitQueueMaxSize = 10_000 # Default
TransmitTimeout = "5s" # Default
```
5 changes: 3 additions & 2 deletions core/cmd/shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,9 @@ func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.G
}

evmFactoryCfg := chainlink.EVMFactoryConfig{
CSAETHKeystore: keyStore,
ChainOpts: legacyevm.ChainOpts{AppConfig: cfg, MailMon: mailMon, DS: ds},
CSAETHKeystore: keyStore,
ChainOpts: legacyevm.ChainOpts{AppConfig: cfg, MailMon: mailMon, DS: ds},
MercuryTransmitter: cfg.Mercury().Transmitter(),
}
// evm always enabled for backward compatibility
// TODO BCF-2510 this needs to change in order to clear the path for EVM extraction
Expand Down
14 changes: 14 additions & 0 deletions core/config/docs/core.toml
Original file line number Diff line number Diff line change
Expand Up @@ -622,3 +622,17 @@ LatestReportDeadline = "5s" # Default
[Mercury.TLS]
# CertFile is the path to a PEM file of trusted root certificate authority certificates
CertFile = "/path/to/client/certs.pem" # Example

# Mercury.Transmitter controls settings for the mercury transmitter
[Mercury.Transmitter]
# TransmitQueueMaxSize controls the size of the transmit queue. This is scoped
# per OCR instance. If the queue is full, the transmitter will start dropping
# the oldest messages in order to make space.
#
# This is useful if mercury server goes offline and the nop needs to buffer
# transmissions.
TransmitQueueMaxSize = 10_000 # Default
# TransmitTimeout controls how long the transmitter will wait for a response
# when sending a message to the mercury server, before aborting and considering
# the transmission to be failed.
TransmitTimeout = "5s" # Default
7 changes: 7 additions & 0 deletions core/config/mercury_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package config
import (
"time"

commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config"
"github.com/smartcontractkit/chainlink-common/pkg/types"
)

Expand All @@ -16,8 +17,14 @@ type MercuryTLS interface {
CertFile() string
}

type MercuryTransmitter interface {
TransmitQueueMaxSize() uint32
TransmitTimeout() commonconfig.Duration
}

type Mercury interface {
Credentials(credName string) *types.MercuryCredentials
Cache() MercuryCache
TLS() MercuryTLS
Transmitter() MercuryTransmitter
}
20 changes: 18 additions & 2 deletions core/config/toml/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1312,14 +1312,30 @@ func (m *MercuryTLS) ValidateConfig() (err error) {
return
}

type MercuryTransmitter struct {
TransmitQueueMaxSize *uint32
TransmitTimeout *commonconfig.Duration
}

func (m *MercuryTransmitter) setFrom(f *MercuryTransmitter) {
if v := f.TransmitQueueMaxSize; v != nil {
m.TransmitQueueMaxSize = v
}
if v := f.TransmitTimeout; v != nil {
m.TransmitTimeout = v
}
}

type Mercury struct {
Cache MercuryCache `toml:",omitempty"`
TLS MercuryTLS `toml:",omitempty"`
Cache MercuryCache `toml:",omitempty"`
TLS MercuryTLS `toml:",omitempty"`
Transmitter MercuryTransmitter `toml:",omitempty"`
}

func (m *Mercury) setFrom(f *Mercury) {
m.Cache.setFrom(&f.Cache)
m.TLS.setFrom(&f.TLS)
m.Transmitter.setFrom(&f.Transmitter)
}

func (m *Mercury) ValidateConfig() (err error) {
Expand Down
3 changes: 2 additions & 1 deletion core/internal/cltest/cltest.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,8 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn
MailMon: mailMon,
DS: ds,
},
CSAETHKeystore: keyStore,
CSAETHKeystore: keyStore,
MercuryTransmitter: cfg.Mercury().Transmitter(),
}

if cfg.EVMEnabled() {
Expand Down
21 changes: 21 additions & 0 deletions core/services/chainlink/config_mercury.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package chainlink
import (
"time"

commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config"
"github.com/smartcontractkit/chainlink-common/pkg/types"

"github.com/smartcontractkit/chainlink/v2/core/config"
Expand All @@ -25,6 +26,8 @@ func (m *mercuryCacheConfig) LatestReportDeadline() time.Duration {
return m.c.LatestReportDeadline.Duration()
}

var _ config.MercuryTLS = (*mercuryTLSConfig)(nil)

type mercuryTLSConfig struct {
c toml.MercuryTLS
}
Expand All @@ -33,6 +36,20 @@ func (m *mercuryTLSConfig) CertFile() string {
return *m.c.CertFile
}

var _ config.MercuryTransmitter = (*mercuryTransmitterConfig)(nil)

type mercuryTransmitterConfig struct {
c toml.MercuryTransmitter
}

func (m *mercuryTransmitterConfig) TransmitQueueMaxSize() uint32 {
return *m.c.TransmitQueueMaxSize
}

func (m *mercuryTransmitterConfig) TransmitTimeout() commonconfig.Duration {
return *m.c.TransmitTimeout
}

type mercuryConfig struct {
c toml.Mercury
s toml.MercurySecrets
Expand Down Expand Up @@ -60,3 +77,7 @@ func (m *mercuryConfig) Cache() config.MercuryCache {
func (m *mercuryConfig) TLS() config.MercuryTLS {
return &mercuryTLSConfig{c: m.c.TLS}
}

func (m *mercuryConfig) Transmitter() config.MercuryTransmitter {
return &mercuryTransmitterConfig{c: m.c.Transmitter}
}
8 changes: 8 additions & 0 deletions core/services/chainlink/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,10 @@ func TestConfig_Marshal(t *testing.T) {
TLS: toml.MercuryTLS{
CertFile: ptr("/path/to/cert.pem"),
},
Transmitter: toml.MercuryTransmitter{
TransmitQueueMaxSize: ptr(uint32(123)),
TransmitTimeout: commoncfg.MustNewDuration(234 * time.Second),
},
}

for _, tt := range []struct {
Expand Down Expand Up @@ -1165,6 +1169,10 @@ LatestReportDeadline = '1m42s'

[Mercury.TLS]
CertFile = '/path/to/cert.pem'

[Mercury.Transmitter]
TransmitQueueMaxSize = 123
TransmitTimeout = '3m54s'
`},
{"full", full, fullTOML},
{"multi-chain", multiChain, multiChainTOML},
Expand Down
9 changes: 6 additions & 3 deletions core/services/chainlink/relayer_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/smartcontractkit/chainlink-starknet/relayer/pkg/chainlink/config"

"github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm"
coreconfig "github.com/smartcontractkit/chainlink/v2/core/config"
"github.com/smartcontractkit/chainlink/v2/core/config/env"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore"
Expand All @@ -38,6 +39,7 @@ type RelayerFactory struct {
type EVMFactoryConfig struct {
legacyevm.ChainOpts
evmrelay.CSAETHKeystore
coreconfig.MercuryTransmitter
}

func (r *RelayerFactory) NewEVM(ctx context.Context, config EVMFactoryConfig) (map[types.RelayID]evmrelay.LoopRelayAdapter, error) {
Expand Down Expand Up @@ -67,9 +69,10 @@ func (r *RelayerFactory) NewEVM(ctx context.Context, config EVMFactoryConfig) (m
}

relayerOpts := evmrelay.RelayerOpts{
DS: ccOpts.DS,
CSAETHKeystore: config.CSAETHKeystore,
MercuryPool: r.MercuryPool,
DS: ccOpts.DS,
CSAETHKeystore: config.CSAETHKeystore,
MercuryPool: r.MercuryPool,
TransmitterConfig: config.MercuryTransmitter,
}
relayer, err2 := evmrelay.NewRelayer(lggr.Named(relayID.ChainID), chain, relayerOpts)
if err2 != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,10 @@ LatestReportDeadline = '5s'
[Mercury.TLS]
CertFile = ''

[Mercury.Transmitter]
TransmitQueueMaxSize = 10000
TransmitTimeout = '5s'

[Capabilities]
[Capabilities.Peering]
IncomingMessageBufferSize = 10
Expand Down
4 changes: 4 additions & 0 deletions core/services/chainlink/testdata/config-full.toml
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,10 @@ LatestReportDeadline = '1m42s'
[Mercury.TLS]
CertFile = '/path/to/cert.pem'

[Mercury.Transmitter]
TransmitQueueMaxSize = 123
TransmitTimeout = '3m54s'

[Capabilities]
[Capabilities.Peering]
IncomingMessageBufferSize = 13
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,10 @@ LatestReportDeadline = '5s'
[Mercury.TLS]
CertFile = ''

[Mercury.Transmitter]
TransmitQueueMaxSize = 10000
TransmitTimeout = '5s'

[Capabilities]
[Capabilities.Peering]
IncomingMessageBufferSize = 10
Expand Down
1 change: 1 addition & 0 deletions core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ type mercuryConfig interface {
Credentials(credName string) *types.MercuryCredentials
Cache() coreconfig.MercuryCache
TLS() coreconfig.MercuryTLS
Transmitter() coreconfig.MercuryTransmitter
}

type thresholdConfig interface {
Expand Down
25 changes: 14 additions & 11 deletions core/services/relay/evm/evm.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ type Relayer struct {
codec commontypes.Codec

// Mercury
mercuryORM mercury.ORM
mercuryORM mercury.ORM
transmitterCfg mercury.TransmitterConfig

// LLO/data streams
cdcFactory llo.ChannelDefinitionCacheFactory
Expand All @@ -93,7 +94,8 @@ type CSAETHKeystore interface {
type RelayerOpts struct {
DS sqlutil.DataSource
CSAETHKeystore
MercuryPool wsrpc.Pool
MercuryPool wsrpc.Pool
TransmitterConfig mercury.TransmitterConfig
}

func (c RelayerOpts) Validate() error {
Expand Down Expand Up @@ -122,14 +124,15 @@ func NewRelayer(lggr logger.Logger, chain legacyevm.Chain, opts RelayerOpts) (*R
lloORM := llo.NewORM(opts.DS, chain.ID())
cdcFactory := llo.NewChannelDefinitionCacheFactory(lggr, lloORM, chain.LogPoller())
return &Relayer{
ds: opts.DS,
chain: chain,
lggr: lggr,
ks: opts.CSAETHKeystore,
mercuryPool: opts.MercuryPool,
cdcFactory: cdcFactory,
lloORM: lloORM,
mercuryORM: mercuryORM,
ds: opts.DS,
chain: chain,
lggr: lggr,
ks: opts.CSAETHKeystore,
mercuryPool: opts.MercuryPool,
cdcFactory: cdcFactory,
lloORM: lloORM,
mercuryORM: mercuryORM,
transmitterCfg: opts.TransmitterConfig,
}, nil
}

Expand Down Expand Up @@ -246,7 +249,7 @@ func (r *Relayer) NewMercuryProvider(rargs commontypes.RelayArgs, pargs commonty
default:
return nil, fmt.Errorf("invalid feed version %d", feedID.Version())
}
transmitter := mercury.NewTransmitter(lggr, clients, privKey.PublicKey, rargs.JobID, *relayConfig.FeedID, r.mercuryORM, transmitterCodec)
transmitter := mercury.NewTransmitter(lggr, r.transmitterCfg, clients, privKey.PublicKey, rargs.JobID, *relayConfig.FeedID, r.mercuryORM, transmitterCodec)

return NewMercuryProvider(cp, r.chainReader, r.codec, NewMercuryChainReader(r.chain.HeadTracker()), transmitter, reportCodecV1, reportCodecV2, reportCodecV3, lggr), nil
}
Expand Down
27 changes: 16 additions & 11 deletions core/services/relay/evm/mercury/transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/smartcontractkit/libocr/offchainreporting2plus/chains/evmutil"
ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/types/mercury"

Expand All @@ -33,12 +34,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

var (
maxTransmitQueueSize = 10_000
maxDeleteQueueSize = 10_000
transmitTimeout = 5 * time.Second
)

const (
// Mercury server error codes
DuplicateReport = 2
Expand Down Expand Up @@ -104,9 +99,15 @@ type TransmitterReportDecoder interface {

var _ Transmitter = (*mercuryTransmitter)(nil)

type TransmitterConfig interface {
TransmitQueueMaxSize() uint32
TransmitTimeout() commonconfig.Duration
}

type mercuryTransmitter struct {
services.StateMachine
lggr logger.Logger
cfg TransmitterConfig

servers map[string]*server

Expand Down Expand Up @@ -142,6 +143,8 @@ func getPayloadTypes() abi.Arguments {
type server struct {
lggr logger.Logger

transmitTimeout time.Duration

c wsrpc.Client
pm *PersistenceManager
q *TransmitQueue
Expand Down Expand Up @@ -221,7 +224,7 @@ func (s *server) runQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup, feed
// queue was closed
return
}
ctx, cancel := context.WithTimeout(runloopCtx, utils.WithJitter(transmitTimeout))
ctx, cancel := context.WithTimeout(runloopCtx, utils.WithJitter(s.transmitTimeout))
res, err := s.c.Transmit(ctx, t.Req)
cancel()
if runloopCtx.Err() != nil {
Expand Down Expand Up @@ -272,18 +275,19 @@ func (s *server) runQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup, feed
}
}

func NewTransmitter(lggr logger.Logger, clients map[string]wsrpc.Client, fromAccount ed25519.PublicKey, jobID int32, feedID [32]byte, orm ORM, codec TransmitterReportDecoder) *mercuryTransmitter {
func NewTransmitter(lggr logger.Logger, cfg TransmitterConfig, clients map[string]wsrpc.Client, fromAccount ed25519.PublicKey, jobID int32, feedID [32]byte, orm ORM, codec TransmitterReportDecoder) *mercuryTransmitter {
feedIDHex := fmt.Sprintf("0x%x", feedID[:])
servers := make(map[string]*server, len(clients))
for serverURL, client := range clients {
cLggr := lggr.Named(serverURL).With("serverURL", serverURL)
pm := NewPersistenceManager(cLggr, serverURL, orm, jobID, maxTransmitQueueSize, flushDeletesFrequency, pruneFrequency)
pm := NewPersistenceManager(cLggr, serverURL, orm, jobID, int(cfg.TransmitQueueMaxSize()), flushDeletesFrequency, pruneFrequency)
servers[serverURL] = &server{
cLggr,
cfg.TransmitTimeout().Duration(),
client,
pm,
NewTransmitQueue(cLggr, serverURL, feedIDHex, maxTransmitQueueSize, pm),
make(chan *pb.TransmitRequest, maxDeleteQueueSize),
NewTransmitQueue(cLggr, serverURL, feedIDHex, int(cfg.TransmitQueueMaxSize()), pm),
make(chan *pb.TransmitRequest, int(cfg.TransmitQueueMaxSize())),
transmitSuccessCount.WithLabelValues(feedIDHex, serverURL),
transmitDuplicateCount.WithLabelValues(feedIDHex, serverURL),
transmitConnectionErrorCount.WithLabelValues(feedIDHex, serverURL),
Expand All @@ -295,6 +299,7 @@ func NewTransmitter(lggr logger.Logger, clients map[string]wsrpc.Client, fromAcc
return &mercuryTransmitter{
services.StateMachine{},
lggr.Named("MercuryTransmitter").With("feedID", feedIDHex),
cfg,
servers,
codec,
feedID,
Expand Down
Loading
Loading