Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth,eth/watcher: Create Chainlink price feed watcher #2972

Merged
merged 10 commits into from
Mar 27, 2024
Merged
18 changes: 15 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
SHELL=/bin/bash
GO_BUILD_DIR?="./"

all: net/lp_rpc.pb.go net/redeemer.pb.go net/redeemer_mock.pb.go core/test_segment.go livepeer livepeer_cli livepeer_router livepeer_bench
MOCKGEN=go run github.com/golang/mock/mockgen
ABIGEN=go run github.com/ethereum/go-ethereum/cmd/abigen

all: net/lp_rpc.pb.go net/redeemer.pb.go net/redeemer_mock.pb.go core/test_segment.go eth/contracts/chainlink/AggregatorV3Interface.go livepeer livepeer_cli livepeer_router livepeer_bench

net/lp_rpc.pb.go: net/lp_rpc.proto
protoc -I=. --go_out=. --go-grpc_out=. $^
Expand All @@ -10,12 +13,21 @@ net/redeemer.pb.go: net/redeemer.proto
protoc -I=. --go_out=. --go-grpc_out=. $^

net/redeemer_mock.pb.go net/redeemer_grpc_mock.pb.go: net/redeemer.pb.go net/redeemer_grpc.pb.go
@mockgen -source net/redeemer.pb.go -destination net/redeemer_mock.pb.go -package net
@mockgen -source net/redeemer_grpc.pb.go -destination net/redeemer_grpc_mock.pb.go -package net
@$(MOCKGEN) -source net/redeemer.pb.go -destination net/redeemer_mock.pb.go -package net
@$(MOCKGEN) -source net/redeemer_grpc.pb.go -destination net/redeemer_grpc_mock.pb.go -package net

core/test_segment.go:
core/test_segment.sh core/test_segment.go

eth/contracts/chainlink/AggregatorV3Interface.go:
solc --version | grep 0.7.6+commit.7338295f
@set -ex; \
for sol_file in eth/contracts/chainlink/*.sol; do \
contract_name=$$(basename "$$sol_file" .sol); \
solc --abi --optimize --overwrite -o $$(dirname "$$sol_file") $$sol_file; \
$(ABIGEN) --abi=$${sol_file%.sol}.abi --pkg=chainlink --type=$$contract_name --out=$${sol_file%.sol}.go; \
done

version=$(shell cat VERSION)

ldflags := -X github.com/livepeer/go-livepeer/core.LivepeerVersion=$(shell ./print_version.sh)
Expand Down
1 change: 1 addition & 0 deletions eth/contracts/chainlink/AggregatorV3Interface.abi
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[{"inputs":[],"name":"decimals","outputs":[{"internalType":"uint8","name":"","type":"uint8"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"description","outputs":[{"internalType":"string","name":"","type":"string"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"uint80","name":"_roundId","type":"uint80"}],"name":"getRoundData","outputs":[{"internalType":"uint80","name":"roundId","type":"uint80"},{"internalType":"int256","name":"answer","type":"int256"},{"internalType":"uint256","name":"startedAt","type":"uint256"},{"internalType":"uint256","name":"updatedAt","type":"uint256"},{"internalType":"uint80","name":"answeredInRound","type":"uint80"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"latestRoundData","outputs":[{"internalType":"uint80","name":"roundId","type":"uint80"},{"internalType":"int256","name":"answer","type":"int256"},{"internalType":"uint256","name":"startedAt","type":"uint256"},{"internalType":"uint256","name":"updatedAt","type":"uint256"},{"internalType":"uint80","name":"answeredInRound","type":"uint80"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"version","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"view","type":"function"}]
394 changes: 394 additions & 0 deletions eth/contracts/chainlink/AggregatorV3Interface.go

Large diffs are not rendered by default.

36 changes: 36 additions & 0 deletions eth/contracts/chainlink/AggregatorV3Interface.sol
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// SPDX-License-Identifier: MIT
// https://github.com/smartcontractkit/chainlink/blob/v2.9.1/contracts/src/v0.7/interfaces/AggregatorV3Interface.sol
pragma solidity ^0.7.0;

interface AggregatorV3Interface {
function decimals() external view returns (uint8);

function description() external view returns (string memory);

function version() external view returns (uint256);

// getRoundData and latestRoundData should both raise "No data present"
// if they do not have data to report, instead of returning unset values
// which could be misinterpreted as actual reported values.
function getRoundData(uint80 _roundId)
external
view
returns (
uint80 roundId,
int256 answer,
uint256 startedAt,
uint256 updatedAt,
uint80 answeredInRound
);

function latestRoundData()
external
view
returns (
uint80 roundId,
int256 answer,
uint256 startedAt,
uint256 updatedAt,
uint80 answeredInRound
);
}
78 changes: 78 additions & 0 deletions eth/pricefeed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package eth

import (
"errors"
"fmt"
"math/big"
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/livepeer/go-livepeer/eth/contracts/chainlink"
)

type PriceData struct {
RoundID int64
Price *big.Rat
UpdatedAt time.Time
}

// PriceFeedEthClient is an interface for fetching price data from a Chainlink
// PriceFeed contract.
type PriceFeedEthClient interface {
Description() (string, error)
FetchPriceData() (PriceData, error)
}

func NewPriceFeedEthClient(ethClient *ethclient.Client, priceFeedAddr string) (PriceFeedEthClient, error) {
addr := common.HexToAddress(priceFeedAddr)
priceFeed, err := chainlink.NewAggregatorV3Interface(addr, ethClient)
if err != nil {
return nil, fmt.Errorf("failed to create aggregator proxy: %w", err)

Check warning on line 32 in eth/pricefeed.go

View check run for this annotation

Codecov / codecov/patch

eth/pricefeed.go#L28-L32

Added lines #L28 - L32 were not covered by tests
}

return &priceFeedClient{
client: ethClient,
priceFeed: priceFeed,
}, nil

Check warning on line 38 in eth/pricefeed.go

View check run for this annotation

Codecov / codecov/patch

eth/pricefeed.go#L35-L38

Added lines #L35 - L38 were not covered by tests
}

type priceFeedClient struct {
client *ethclient.Client
priceFeed *chainlink.AggregatorV3Interface
}

func (c *priceFeedClient) Description() (string, error) {
return c.priceFeed.Description(&bind.CallOpts{})

Check warning on line 47 in eth/pricefeed.go

View check run for this annotation

Codecov / codecov/patch

eth/pricefeed.go#L46-L47

Added lines #L46 - L47 were not covered by tests
}

func (c *priceFeedClient) FetchPriceData() (PriceData, error) {
data, err := c.priceFeed.LatestRoundData(&bind.CallOpts{})
if err != nil {
return PriceData{}, errors.New("failed to get latest round data: " + err.Error())

Check warning on line 53 in eth/pricefeed.go

View check run for this annotation

Codecov / codecov/patch

eth/pricefeed.go#L50-L53

Added lines #L50 - L53 were not covered by tests
}

decimals, err := c.priceFeed.Decimals(&bind.CallOpts{})
if err != nil {
return PriceData{}, errors.New("failed to get decimals: " + err.Error())

Check warning on line 58 in eth/pricefeed.go

View check run for this annotation

Codecov / codecov/patch

eth/pricefeed.go#L56-L58

Added lines #L56 - L58 were not covered by tests
}

return computePriceData(data.RoundId, data.UpdatedAt, data.Answer, decimals), nil

Check warning on line 61 in eth/pricefeed.go

View check run for this annotation

Codecov / codecov/patch

eth/pricefeed.go#L61

Added line #L61 was not covered by tests
}

// computePriceData transforms the raw data from the PriceFeed into the higher
// level PriceData struct, more easily usable by the rest of the system.
func computePriceData(roundID, updatedAt, answer *big.Int, decimals uint8) PriceData {
// Compute a big.int which is 10^decimals.
divisor := new(big.Int).Exp(
big.NewInt(10),
big.NewInt(int64(decimals)),
nil)

return PriceData{
RoundID: roundID.Int64(),
Price: new(big.Rat).SetFrac(answer, divisor),
UpdatedAt: time.Unix(updatedAt.Int64(), 0),
}
}
51 changes: 51 additions & 0 deletions eth/pricefeed_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package eth

import (
"math/big"
"testing"

"github.com/stretchr/testify/assert"
)

func TestComputePriceData(t *testing.T) {
assert := assert.New(t)

t.Run("valid data", func(t *testing.T) {
roundID := big.NewInt(1)
updatedAt := big.NewInt(1626192000)
answer := big.NewInt(420666000)
decimals := uint8(6)

data := computePriceData(roundID, updatedAt, answer, decimals)

assert.EqualValues(int64(1), data.RoundID, "Round ID didn't match")
assert.Equal("210333/500", data.Price.RatString(), "The Price Rat didn't match")
assert.Equal("2021-07-13 16:00:00 +0000 UTC", data.UpdatedAt.UTC().String(), "The updated at time did not match")
})

t.Run("zero answer", func(t *testing.T) {
roundID := big.NewInt(2)
updatedAt := big.NewInt(1626192000)
answer := big.NewInt(0)
decimals := uint8(18)

data := computePriceData(roundID, updatedAt, answer, decimals)

assert.EqualValues(int64(2), data.RoundID, "Round ID didn't match")
assert.Equal("0", data.Price.RatString(), "The Price Rat didn't match")
assert.Equal("2021-07-13 16:00:00 +0000 UTC", data.UpdatedAt.UTC().String(), "The updated at time did not match")
})

t.Run("zero decimals", func(t *testing.T) {
roundID := big.NewInt(3)
updatedAt := big.NewInt(1626192000)
answer := big.NewInt(13)
decimals := uint8(0)

data := computePriceData(roundID, updatedAt, answer, decimals)

assert.EqualValues(int64(3), data.RoundID, "Round ID didn't match")
assert.Equal("13", data.Price.RatString(), "The Price Rat didn't match")
assert.Equal("2021-07-13 16:00:00 +0000 UTC", data.UpdatedAt.UTC().String(), "The updated at time did not match")
})
}
182 changes: 182 additions & 0 deletions eth/watchers/pricefeedwatcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package watchers

import (
"context"
"fmt"
"strings"
"sync"
"time"

"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/event"
"github.com/livepeer/go-livepeer/clog"
"github.com/livepeer/go-livepeer/eth"
)

const (
priceUpdateMaxRetries = 5
priceUpdateBaseRetryDelay = 30 * time.Second
priceUpdatePeriod = 1 * time.Hour
)

// PriceFeedWatcher monitors a Chainlink PriceFeed for updated pricing info. It
// allows fetching the current price as well as listening for updates on the
// PriceUpdated channel.
type PriceFeedWatcher struct {
baseRetryDelay time.Duration

priceFeed eth.PriceFeedEthClient
currencyBase, currencyQuote string

mu sync.RWMutex
current eth.PriceData
priceEventFeed event.Feed
}

// NewPriceFeedWatcher creates a new PriceFeedWatcher instance. It will already
// fetch the current price and start a goroutine to watch for updates.
func NewPriceFeedWatcher(ethClient *ethclient.Client, priceFeedAddr string) (*PriceFeedWatcher, error) {
priceFeed, err := eth.NewPriceFeedEthClient(ethClient, priceFeedAddr)
if err != nil {
return nil, fmt.Errorf("failed to create price feed client: %w", err)

Check warning on line 41 in eth/watchers/pricefeedwatcher.go

View check run for this annotation

Codecov / codecov/patch

eth/watchers/pricefeedwatcher.go#L38-L41

Added lines #L38 - L41 were not covered by tests
}

description, err := priceFeed.Description()
if err != nil {
return nil, fmt.Errorf("failed to get description: %w", err)

Check warning on line 46 in eth/watchers/pricefeedwatcher.go

View check run for this annotation

Codecov / codecov/patch

eth/watchers/pricefeedwatcher.go#L44-L46

Added lines #L44 - L46 were not covered by tests
}

currencyFrom, currencyTo, err := parseCurrencies(description)
if err != nil {
return nil, err

Check warning on line 51 in eth/watchers/pricefeedwatcher.go

View check run for this annotation

Codecov / codecov/patch

eth/watchers/pricefeedwatcher.go#L49-L51

Added lines #L49 - L51 were not covered by tests
}

w := &PriceFeedWatcher{
baseRetryDelay: priceUpdateBaseRetryDelay,
priceFeed: priceFeed,
currencyBase: currencyFrom,
currencyQuote: currencyTo,

Check warning on line 58 in eth/watchers/pricefeedwatcher.go

View check run for this annotation

Codecov / codecov/patch

eth/watchers/pricefeedwatcher.go#L54-L58

Added lines #L54 - L58 were not covered by tests
}

err = w.updatePrice()
if err != nil {
return nil, fmt.Errorf("failed to update price: %w", err)

Check warning on line 63 in eth/watchers/pricefeedwatcher.go

View check run for this annotation

Codecov / codecov/patch

eth/watchers/pricefeedwatcher.go#L61-L63

Added lines #L61 - L63 were not covered by tests
}

return w, nil

Check warning on line 66 in eth/watchers/pricefeedwatcher.go

View check run for this annotation

Codecov / codecov/patch

eth/watchers/pricefeedwatcher.go#L66

Added line #L66 was not covered by tests
}

// Currencies returns the base and quote currencies of the price feed.
// i.e. base = CurrentPrice() * quote
func (w *PriceFeedWatcher) Currencies() (base string, quote string) {
return w.currencyBase, w.currencyQuote

Check warning on line 72 in eth/watchers/pricefeedwatcher.go

View check run for this annotation

Codecov / codecov/patch

eth/watchers/pricefeedwatcher.go#L71-L72

Added lines #L71 - L72 were not covered by tests
}

// Current returns the latest fetched price data.
func (w *PriceFeedWatcher) Current() eth.PriceData {
w.mu.RLock()
defer w.mu.RUnlock()
return w.current

Check warning on line 79 in eth/watchers/pricefeedwatcher.go

View check run for this annotation

Codecov / codecov/patch

eth/watchers/pricefeedwatcher.go#L76-L79

Added lines #L76 - L79 were not covered by tests
}

// Subscribe allows one to subscribe to price updates emitted by the Watcher.
// To unsubscribe, simply call `Unsubscribe` on the returned subscription.
// The sink channel should have ample buffer space to avoid blocking other
// subscribers. Slow subscribers are not dropped.
func (w *PriceFeedWatcher) Subscribe(sub chan<- eth.PriceData) event.Subscription {
return w.priceEventFeed.Subscribe(sub)
}

func (w *PriceFeedWatcher) updatePrice() error {
newPrice, err := w.priceFeed.FetchPriceData()
if err != nil {
return fmt.Errorf("failed to fetch price data: %w", err)
}

if newPrice.UpdatedAt.After(w.current.UpdatedAt) {
w.mu.Lock()
w.current = newPrice
w.mu.Unlock()
w.priceEventFeed.Send(newPrice)
}

return nil
}

// Watch starts the watch process. It will periodically poll the price feed for
// price updates until the given context is canceled. Typically, you want to
// call Watch inside a goroutine.
func (w *PriceFeedWatcher) Watch(ctx context.Context) {
ticker := newTruncatedTicker(ctx, priceUpdatePeriod)
w.watchTicker(ctx, ticker)

Check warning on line 111 in eth/watchers/pricefeedwatcher.go

View check run for this annotation

Codecov / codecov/patch

eth/watchers/pricefeedwatcher.go#L109-L111

Added lines #L109 - L111 were not covered by tests
}

func (w *PriceFeedWatcher) watchTicker(ctx context.Context, ticker <-chan time.Time) {
for {
select {
case <-ctx.Done():
return
case <-ticker:
attempt, retryDelay := 1, w.baseRetryDelay
for {
err := w.updatePrice()
if err == nil {
break
} else if attempt >= priceUpdateMaxRetries {
clog.Errorf(ctx, "Failed to fetch updated price from PriceFeed attempts=%d err=%q", attempt, err)
break

Check warning on line 127 in eth/watchers/pricefeedwatcher.go

View check run for this annotation

Codecov / codecov/patch

eth/watchers/pricefeedwatcher.go#L126-L127

Added lines #L126 - L127 were not covered by tests
}

clog.Warningf(ctx, "Failed to fetch updated price from PriceFeed, retrying after retryDelay=%d attempt=%d err=%q", retryDelay, attempt, err)
select {
case <-ctx.Done():
return

Check warning on line 133 in eth/watchers/pricefeedwatcher.go

View check run for this annotation

Codecov / codecov/patch

eth/watchers/pricefeedwatcher.go#L132-L133

Added lines #L132 - L133 were not covered by tests
case <-time.After(retryDelay):
}
attempt, retryDelay = attempt+1, retryDelay*2
}
}
}
}

// parseCurrencies parses the base and quote currencies from a price feed based
// on Chainlink PriceFeed description pattern "FROM / TO".
func parseCurrencies(description string) (currencyBase string, currencyQuote string, err error) {
currencies := strings.Split(description, "/")
if len(currencies) != 2 {
return "", "", fmt.Errorf("aggregator description must be in the format 'FROM / TO' but got: %s", description)
}

currencyBase = strings.TrimSpace(currencies[0])
currencyQuote = strings.TrimSpace(currencies[1])
return
}

// newTruncatedTicker creates a ticker that ticks at the next time that is a
victorges marked this conversation as resolved.
Show resolved Hide resolved
// multiple of d, starting from the current time. This is a best-effort approach
// to ensure that nodes update their prices around the same time to avoid too
// big price discrepancies.
func newTruncatedTicker(ctx context.Context, d time.Duration) <-chan time.Time {
ch := make(chan time.Time, 1)
go func() {
defer close(ch)

Check warning on line 162 in eth/watchers/pricefeedwatcher.go

View check run for this annotation

Codecov / codecov/patch

eth/watchers/pricefeedwatcher.go#L159-L162

Added lines #L159 - L162 were not covered by tests

nextTick := time.Now().UTC().Truncate(d)
for {
nextTick = nextTick.Add(d)
untilNextTick := nextTick.Sub(time.Now().UTC())
if untilNextTick <= 0 {
continue

Check warning on line 169 in eth/watchers/pricefeedwatcher.go

View check run for this annotation

Codecov / codecov/patch

eth/watchers/pricefeedwatcher.go#L164-L169

Added lines #L164 - L169 were not covered by tests
}

select {
case <-ctx.Done():
return
case t := <-time.After(untilNextTick):
ch <- t

Check warning on line 176 in eth/watchers/pricefeedwatcher.go

View check run for this annotation

Codecov / codecov/patch

eth/watchers/pricefeedwatcher.go#L172-L176

Added lines #L172 - L176 were not covered by tests
}
}
}()

return ch

Check warning on line 181 in eth/watchers/pricefeedwatcher.go

View check run for this annotation

Codecov / codecov/patch

eth/watchers/pricefeedwatcher.go#L181

Added line #L181 was not covered by tests
}
Loading
Loading