Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MERC-6304 ea telem support for view function #14426

Closed
wants to merge 40 commits into from
Closed
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
8f4d73d
checkin
akuzni2 Sep 11, 2024
37af716
checkin for changes to consume new telemetry
akuzni2 Sep 13, 2024
3e12c67
[TT-1546] change the parameter of chains' Confirm() function (#14388)
Tofel Sep 12, 2024
25af746
Add doc (#14359)
dimriou Sep 12, 2024
9a50696
Chore/devsvcs 499 (#14408)
iljapavlovs Sep 12, 2024
776ab2b
Change TelemetryIngress.UniConn default to false (#14401)
george-dorin Sep 12, 2024
8473452
Remove PriceMin check from attempt builder (#14370)
dimriou Sep 12, 2024
8b60a05
DEVSVCS-134: adding readme on how to run VRF CTF tests (#14409)
iljapavlovs Sep 12, 2024
45d23a4
DEVSVCS-134: minor updates to README and testconfig (#14419)
iljapavlovs Sep 13, 2024
ff5c448
AddChain inbound CCIP integration test (#14377)
connorwstein Sep 13, 2024
6a18c0f
Update the code owner of `feeds` service from FMS group to OPCORE (#…
ro-tex Sep 13, 2024
a2c4af7
added unimplemented contract reader (#14413)
EasterTheBunny Sep 13, 2024
b6d680b
changeset
akuzni2 Sep 13, 2024
cba084b
Merge branch 'develop' into feature/MERC-6304-ea-telem-view-function
akuzni2 Sep 13, 2024
35f4165
fix test
akuzni2 Sep 13, 2024
7db21f0
add runner test for ethdecode task and other telemetry parsing change…
akuzni2 Sep 13, 2024
6782c2e
Parse task results via task tags + adding tests
akuzni2 Sep 16, 2024
d83c69f
Change bridge request proto field name
akuzni2 Sep 16, 2024
e990dc8
Merge branch 'develop' into feature/MERC-6304-ea-telem-view-function
akuzni2 Sep 16, 2024
8283990
Merge branch 'feature/MERC-6304-ea-telem-view-function' of github.com…
akuzni2 Sep 16, 2024
5bbc1de
update changeset
akuzni2 Sep 16, 2024
41056ce
add back whitespace
akuzni2 Sep 16, 2024
a0294a5
add task descendent search into base task
akuzni2 Sep 16, 2024
da1ce99
update changeset
akuzni2 Sep 16, 2024
77a993f
update changeset
akuzni2 Sep 16, 2024
d12b6ad
add task descendent search into base task
akuzni2 Sep 16, 2024
6ca3560
linting fix
akuzni2 Sep 16, 2024
8f76298
Merge branch 'develop' into feature/MERC-6304-ea-telem-view-function
akuzni2 Sep 16, 2024
9be88fa
Merge branch 'develop' into feature/MERC-6304-ea-telem-view-function
akuzni2 Sep 16, 2024
ef091b7
Merge branch 'develop' into feature/MERC-6304-ea-telem-view-function
akuzni2 Sep 16, 2024
281e911
Merge branch 'develop' into feature/MERC-6304-ea-telem-view-function
akuzni2 Sep 16, 2024
b9e2d84
Merge branch 'develop' into feature/MERC-6304-ea-telem-view-function
akuzni2 Sep 16, 2024
80c3d42
Branch was auto-updated.
github-actions[bot] Sep 17, 2024
d0afc13
remove TODO
akuzni2 Sep 17, 2024
384b2ce
line back
akuzni2 Sep 17, 2024
ed7a62a
Update graph.go
akuzni2 Sep 17, 2024
0e4248f
Branch was auto-updated.
github-actions[bot] Sep 17, 2024
50ddc2e
refactor
akuzni2 Sep 17, 2024
b143754
Merge remote-tracking branch 'origin/feature/MERC-6304-ea-telem-view-…
akuzni2 Sep 17, 2024
b46af32
add debug logging
akuzni2 Sep 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/violet-pandas-juggle.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"chainlink": patch
---
#added and internal changes
* Adds support for "tags" to Tasks that can be used generically.
* Adds a descendent task search method
* Added support in Mercury EA telemetry to utilize tags for telemetry extraction
150 changes: 117 additions & 33 deletions core/services/ocrcommon/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ func (e *EnhancedTelemetryService[T]) collectMercuryEnhancedTelemetry(d Enhanced

assetSymbol := e.getAssetSymbolFromRequestData(bridgeTask.RequestData)

benchmarkPrice, bidPrice, askPrice := e.getPricesFromResults(trr, d.TaskRunResults, d.FeedVersion)
benchmarkPrice, bidPrice, askPrice := e.getPricesFromBridgeTask(trr, d.TaskRunResults, d.FeedVersion)

t := &telem.EnhancedEAMercury{
DataSource: eaTelem.DataSource,
Expand Down Expand Up @@ -431,6 +431,7 @@ func (e *EnhancedTelemetryService[T]) collectMercuryEnhancedTelemetry(d Enhanced
ConfigDigest: d.RepTimestamp.ConfigDigest.Hex(),
Round: int64(d.RepTimestamp.Round),
Epoch: int64(d.RepTimestamp.Epoch),
BridgeRequestData: bridgeTask.RequestData,
AssetSymbol: assetSymbol,
Version: uint32(d.FeedVersion),
}
Expand All @@ -445,11 +446,25 @@ func (e *EnhancedTelemetryService[T]) collectMercuryEnhancedTelemetry(d Enhanced
}
}

type telemetryAttributes struct {
PriceType *string `json:"priceType"`
}

func (e *EnhancedTelemetryService[T]) parseTelemetryAttributes(a string) telemetryAttributes {
attrs := &telemetryAttributes{}
err := json.Unmarshal([]byte(a), attrs)
if err != nil {
return telemetryAttributes{}
}
return *attrs
}

// getAssetSymbolFromRequestData parses the requestData of the bridge to generate an asset symbol pair
func (e *EnhancedTelemetryService[T]) getAssetSymbolFromRequestData(requestData string) string {
type reqDataPayload struct {
To string `json:"to"`
From string `json:"from"`
To *string `json:"to"`
From *string `json:"from"`
Address *string `json:"address"` // used for view function ea only
}
type reqData struct {
Data reqDataPayload `json:"data"`
Expand All @@ -461,7 +476,15 @@ func (e *EnhancedTelemetryService[T]) getAssetSymbolFromRequestData(requestData
return ""
}

return rd.Data.From + "/" + rd.Data.To
if rd.Data.From != nil && rd.Data.To != nil {
return *rd.Data.From + "/" + *rd.Data.To
}

if rd.Data.Address != nil {
return *rd.Data.Address
}

return ""
}

// ShouldCollectEnhancedTelemetryMercury checks if enhanced telemetry should be collected and sent
Expand All @@ -472,26 +495,101 @@ func ShouldCollectEnhancedTelemetryMercury(jb job.Job) bool {
return false
}

// getPricesFromResults parses the pipeline.TaskRunResults for pipeline.TaskTypeJSONParse and gets the benchmarkPrice,
const (
bid = "bid"
ask = "ask"
benchmark = "benchmark"
exchangeRate = "exchangeRate"
)

func (e *EnhancedTelemetryService[T]) getPricesFromBridgeTask(bridgeTask pipeline.TaskRunResult, allTasks pipeline.TaskRunResults, mercuryVersion mercuryutils.FeedVersion) (float64, float64, float64) {
var benchmarkPrice, bidPrice, askPrice float64

// This will assume that all fields we care about are tagged with the correct priceType
benchmarkPrice, bidPrice, askPrice = e.getPricesFromBridgeTaskByTelemetryField(bridgeTask, allTasks, mercuryVersion)

// If prices weren't parsed by telemetry fields - attempt to get prices using the legacy method
// This is for backwards compatibility with job specs that don't have the telemetry attributes set
if benchmarkPrice == 0 && bidPrice == 0 && askPrice == 0 {
benchmarkP, bidP, askP := e.getPricesFromResultsByOrder(bridgeTask, allTasks, mercuryVersion)
bidPrice = bidP
askPrice = askP
benchmarkPrice = benchmarkP
}

return benchmarkPrice, bidPrice, askPrice
}

// CollectTaskRunResultsWithTags collects TaskRunResults for descendent tasks with non-empty TaskTags.
func (e *EnhancedTelemetryService[T]) collectTaskRunResultsWithTags(bridgeTask pipeline.TaskRunResult, allTasks pipeline.TaskRunResults) []pipeline.TaskRunResult {
startTask := bridgeTask.Task
descendants := startTask.GetDescendantTasks()
var taskRunResultsWithTags []pipeline.TaskRunResult
for _, task := range descendants {
trr := allTasks.GetTaskRunResultOf(task)
if trr != nil {
if trr.Task.TaskTags() != "" {
taskRunResultsWithTags = append(taskRunResultsWithTags, *trr)
}
}
}
return taskRunResultsWithTags
}

// Start task should be a bridge task
func (e *EnhancedTelemetryService[T]) getPricesFromBridgeTaskByTelemetryField(bridgeTask pipeline.TaskRunResult, allTasks pipeline.TaskRunResults, mercuryVersion mercuryutils.FeedVersion) (float64, float64, float64) {
var benchmarkPrice, bidPrice, askPrice float64

// Outputs are the mapped tasks from this task.
var tasksWithTags = e.collectTaskRunResultsWithTags(bridgeTask, allTasks)

for _, trr := range tasksWithTags {
attributes := e.parseTelemetryAttributes(trr.Task.TaskTags())
if attributes.PriceType != nil {
switch *attributes.PriceType {
case bid:
bidPrice = e.parsePriceFromTask(trr)
case ask:
askPrice = e.parsePriceFromTask(trr)
case benchmark:
benchmarkPrice = e.parsePriceFromTask(trr)
case exchangeRate:
price := e.parsePriceFromTask(trr)
benchmarkPrice, bidPrice, askPrice = price, price, price
case "":
}
}
}

return benchmarkPrice, bidPrice, askPrice
}

func (e *EnhancedTelemetryService[T]) parsePriceFromTask(trr pipeline.TaskRunResult) float64 {
var val float64
if trr.Result.Error != nil {
e.lggr.Warnw(fmt.Sprintf("got error on EA telemetry price task, job %d, id %s: %s", e.job.ID, trr.Task.DotID(), trr.Result.Error), "err", trr.Result.Error)
return 0
}
val, err := getResultFloat64(&trr)
if err != nil {
e.lggr.Warnw(fmt.Sprintf("cannot parse EA telemetry price to float64, DOT id %s", trr.Task.DotID()), "job", e.job.ID, "task_type", trr.Task.Type(), "task_tags", trr.Task.TaskTags(), "err", err)
}
return val
}

// getPricesFromResultsByOrder parses the pipeline.TaskRunResults for pipeline.TaskTypeJSONParse and gets the benchmarkPrice,
// bid and ask. This functions expects the pipeline.TaskRunResults to be correctly ordered
func (e *EnhancedTelemetryService[T]) getPricesFromResults(startTask pipeline.TaskRunResult, allTasks pipeline.TaskRunResults, mercuryVersion mercuryutils.FeedVersion) (float64, float64, float64) {
func (e *EnhancedTelemetryService[T]) getPricesFromResultsByOrder(startTask pipeline.TaskRunResult, allTasks pipeline.TaskRunResults, mercuryVersion mercuryutils.FeedVersion) (float64, float64, float64) {
var benchmarkPrice, askPrice, bidPrice float64
var err error

// We rely on task results to be sorted in the correct order
benchmarkPriceTask := allTasks.GetNextTaskOf(startTask)
if benchmarkPriceTask == nil {
e.lggr.Warnf("cannot parse enhanced EA telemetry benchmark price, task is nil, job %d", e.job.ID)
return 0, 0, 0
}
if benchmarkPriceTask.Task.Type() == pipeline.TaskTypeJSONParse {
if benchmarkPriceTask.Result.Error != nil {
e.lggr.Warnw(fmt.Sprintf("got error for enhanced EA telemetry benchmark price, job %d, id %s: %s", e.job.ID, benchmarkPriceTask.Task.DotID(), benchmarkPriceTask.Result.Error), "err", benchmarkPriceTask.Result.Error)
} else {
benchmarkPrice, err = getResultFloat64(benchmarkPriceTask)
if err != nil {
e.lggr.Warnw(fmt.Sprintf("cannot parse enhanced EA telemetry benchmark price, job %d, id %s", e.job.ID, benchmarkPriceTask.Task.DotID()), "err", err)
}
}
benchmarkPrice = e.parsePriceFromTask(*benchmarkPriceTask)
}

// mercury version 2 only supports benchmarkPrice
Expand All @@ -505,31 +603,17 @@ func (e *EnhancedTelemetryService[T]) getPricesFromResults(startTask pipeline.Ta
return benchmarkPrice, 0, 0
}

if bidTask != nil && bidTask.Task.Type() == pipeline.TaskTypeJSONParse {
if bidTask.Result.Error != nil {
e.lggr.Warnw(fmt.Sprintf("got error for enhanced EA telemetry bid price, job %d, id %s: %s", e.job.ID, bidTask.Task.DotID(), bidTask.Result.Error), "err", bidTask.Result.Error)
} else {
bidPrice, err = getResultFloat64(bidTask)
if err != nil {
e.lggr.Warnw(fmt.Sprintf("cannot parse enhanced EA telemetry bid price, job %d, id %s", e.job.ID, bidTask.Task.DotID()), "err", err)
}
}
if bidTask.Task.Type() == pipeline.TaskTypeJSONParse {
bidPrice = e.parsePriceFromTask(*bidTask)
}

askTask := allTasks.GetNextTaskOf(*bidTask)
if askTask == nil {
e.lggr.Warnf("cannot parse enhanced EA telemetry ask price, task is nil, job %d, id %s", e.job.ID, benchmarkPriceTask.Task.DotID())
return benchmarkPrice, bidPrice, 0
}
if askTask != nil && askTask.Task.Type() == pipeline.TaskTypeJSONParse {
if bidTask.Result.Error != nil {
e.lggr.Warnw(fmt.Sprintf("got error for enhanced EA telemetry ask price, job %d, id %s: %s", e.job.ID, askTask.Task.DotID(), askTask.Result.Error), "err", askTask.Result.Error)
} else {
askPrice, err = getResultFloat64(askTask)
if err != nil {
e.lggr.Warnw(fmt.Sprintf("cannot parse enhanced EA telemetry ask price, job %d, id %s", e.job.ID, askTask.Task.DotID()), "err", err)
}
}
if askTask.Task.Type() == pipeline.TaskTypeJSONParse {
askPrice = e.parsePriceFromTask(*askTask)
}

return benchmarkPrice, bidPrice, askPrice
Expand Down
Loading