From 5885454e9a7eaa8f8c180ac3708afbdf5bdb08cd Mon Sep 17 00:00:00 2001 From: Austin Born Date: Wed, 18 Sep 2024 09:28:46 -0700 Subject: [PATCH] DF-20481: Add new OCR3DataFeeds telemetry type for Mercury jobs (#14470) * Add new OCR3DataFeeds telemetry type for Mercury jobs * Update plugin_test.go * Changeset * Update curvy-beans-scream.md * Update .changeset/curvy-beans-scream.md * Update curvy-beans-scream.md --- .changeset/curvy-beans-scream.md | 5 +++++ core/services/ocr2/delegate.go | 19 +++++++++++++++++-- core/services/ocr2/plugins/mercury/plugin.go | 15 +++++---------- .../ocr2/plugins/mercury/plugin_test.go | 2 +- core/services/synchronization/common.go | 1 + core/services/telemetry/manager.go | 2 +- 6 files changed, 30 insertions(+), 14 deletions(-) create mode 100644 .changeset/curvy-beans-scream.md diff --git a/.changeset/curvy-beans-scream.md b/.changeset/curvy-beans-scream.md new file mode 100644 index 00000000000..4252f965e32 --- /dev/null +++ b/.changeset/curvy-beans-scream.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +#changed: Add new OCR3DataFeeds telemetry type for Mercury jobs diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index 18f4d6224e7..9d4dbb85982 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -882,6 +882,21 @@ func (d *Delegate) newServicesMercury( lggr.ErrorIf(d.jobORM.RecordError(ctx, jb.ID, msg), "unable to record error") }) + var relayConfig evmrelaytypes.RelayConfig + err = json.Unmarshal(jb.OCR2OracleSpec.RelayConfig.Bytes(), &relayConfig) + if err != nil { + return nil, fmt.Errorf("error while unmarshalling relay config: %w", err) + } + + var telemetryType synchronization.TelemetryType + if relayConfig.EnableTriggerCapability && jb.OCR2OracleSpec.PluginConfig == nil { + telemetryType = synchronization.OCR3DataFeeds + // First use case for TriggerCapability transmission is Data Feeds, so telemetry should be routed accordingly. + // This is only true if TriggerCapability is the *only* transmission method (PluginConfig == nil). + } else { + telemetryType = synchronization.OCR3Mercury + } + oracleArgsNoPlugin := libocr2.MercuryOracleArgs{ BinaryNetworkEndpointFactory: d.peerWrapper.Peer2, V2Bootstrappers: bootstrapPeers, @@ -890,7 +905,7 @@ func (d *Delegate) newServicesMercury( Database: ocrDB, LocalConfig: lc, Logger: ocrLogger, - MonitoringEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(rid.Network, rid.ChainID, spec.FeedID.String(), synchronization.OCR3Mercury), + MonitoringEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(rid.Network, rid.ChainID, spec.FeedID.String(), telemetryType), OffchainConfigDigester: mercuryProvider.OffchainConfigDigester(), OffchainKeyring: kb, OnchainKeyring: kb, @@ -901,7 +916,7 @@ func (d *Delegate) newServicesMercury( mCfg := mercury.NewMercuryConfig(d.cfg.JobPipeline().MaxSuccessfulRuns(), d.cfg.JobPipeline().ResultWriteQueueDepth(), d.cfg) - mercuryServices, err2 := mercury.NewServices(jb, mercuryProvider, d.pipelineRunner, lggr, oracleArgsNoPlugin, mCfg, chEnhancedTelem, d.mercuryORM, (mercuryutils.FeedID)(*spec.FeedID)) + mercuryServices, err2 := mercury.NewServices(jb, mercuryProvider, d.pipelineRunner, lggr, oracleArgsNoPlugin, mCfg, chEnhancedTelem, d.mercuryORM, (mercuryutils.FeedID)(*spec.FeedID), relayConfig.EnableTriggerCapability) if ocrcommon.ShouldCollectEnhancedTelemetryMercury(jb) { enhancedTelemService := ocrcommon.NewEnhancedTelemetryService(&jb, chEnhancedTelem, make(chan struct{}), d.monitoringEndpointGen.GenMonitoringEndpoint(rid.Network, rid.ChainID, spec.FeedID.String(), synchronization.EnhancedEAMercury), lggr.Named("EnhancedTelemetryMercury")) diff --git a/core/services/ocr2/plugins/mercury/plugin.go b/core/services/ocr2/plugins/mercury/plugin.go index 13793570dec..f78531d6b07 100644 --- a/core/services/ocr2/plugins/mercury/plugin.go +++ b/core/services/ocr2/plugins/mercury/plugin.go @@ -31,7 +31,6 @@ import ( mercuryv2 "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v2" mercuryv3 "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v3" mercuryv4 "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v4" - evmtypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types" "github.com/smartcontractkit/chainlink/v2/plugins" ) @@ -74,20 +73,16 @@ func NewServices( chEnhancedTelem chan ocrcommon.EnhancedTelemetryMercuryData, orm types.DataSourceORM, feedID utils.FeedID, + enableTriggerCapability bool, ) ([]job.ServiceCtx, error) { if jb.PipelineSpec == nil { return nil, errors.New("expected job to have a non-nil PipelineSpec") } - var relayConfig evmtypes.RelayConfig - err := json.Unmarshal(jb.OCR2OracleSpec.RelayConfig.Bytes(), &relayConfig) - if err != nil { - return nil, fmt.Errorf("error while unmarshalling relay config: %w", err) - } - + var err error var pluginConfig config.PluginConfig if jb.OCR2OracleSpec.PluginConfig == nil { - if !relayConfig.EnableTriggerCapability { + if !enableTriggerCapability { return nil, fmt.Errorf("at least one transmission option must be configured") } } else { @@ -106,8 +101,8 @@ func NewServices( // encapsulate all the subservices and ensure we close them all if any fail to start srvs := []job.ServiceCtx{ocr2Provider} abort := func() { - if cerr := services.MultiCloser(srvs).Close(); err != nil { - lggr.Errorw("Error closing unused services", "err", cerr) + if err = services.MultiCloser(srvs).Close(); err != nil { + lggr.Errorw("Error closing unused services", "err", err) } } saver := ocrcommon.NewResultRunSaver(pipelineRunner, lggr, cfg.MaxSuccessfulRuns(), cfg.ResultWriteQueueDepth()) diff --git a/core/services/ocr2/plugins/mercury/plugin_test.go b/core/services/ocr2/plugins/mercury/plugin_test.go index 50ff5489358..22aaf7522de 100644 --- a/core/services/ocr2/plugins/mercury/plugin_test.go +++ b/core/services/ocr2/plugins/mercury/plugin_test.go @@ -230,7 +230,7 @@ func newServicesTestWrapper(t *testing.T, pluginConfig job.JSONConfig, feedID ut t.Helper() jb := testJob jb.OCR2OracleSpec.PluginConfig = pluginConfig - return mercuryocr2.NewServices(jb, &testProvider{}, nil, logger.TestLogger(t), testArgsNoPlugin, testCfg, nil, &testDataSourceORM{}, feedID) + return mercuryocr2.NewServices(jb, &testProvider{}, nil, logger.TestLogger(t), testArgsNoPlugin, testCfg, nil, &testDataSourceORM{}, feedID, false) } type testProvider struct{} diff --git a/core/services/synchronization/common.go b/core/services/synchronization/common.go index a6c0191e3a7..9145a3c9ace 100644 --- a/core/services/synchronization/common.go +++ b/core/services/synchronization/common.go @@ -22,6 +22,7 @@ const ( OCR2S4 TelemetryType = "ocr2-s4" OCR2Median TelemetryType = "ocr2-median" OCR3Mercury TelemetryType = "ocr3-mercury" + OCR3DataFeeds TelemetryType = "ocr3-data-feeds" AutomationCustom TelemetryType = "automation-custom" OCR3Automation TelemetryType = "ocr3-automation" OCR3Rebalancer TelemetryType = "ocr3-rebalancer" diff --git a/core/services/telemetry/manager.go b/core/services/telemetry/manager.go index 73a94b4b127..7b788c4806c 100644 --- a/core/services/telemetry/manager.go +++ b/core/services/telemetry/manager.go @@ -75,7 +75,7 @@ func (m *Manager) GenMonitoringEndpoint(network string, chainID string, contract e, found := m.getEndpoint(network, chainID) if !found { - m.eng.Warnf("no telemetry endpoint found for network %q chainID %q, telemetry %q for contactID %q will NOT be sent", network, chainID, telemType, contractID) + m.eng.Warnf("no telemetry endpoint found for network %q chainID %q, telemetry %q for contractID %q will NOT be sent", network, chainID, telemType, contractID) return &NoopAgent{} }