Skip to content

Commit

Permalink
eth/watchers: Create PriceFeed watcher
Browse files Browse the repository at this point in the history
Makefile: Use mockgen binary from tool dependencies

eth/contracts: Add chainlink interfaces source

Makefile: Generate Chainlink contracts ABI

tools: Add abigen tool to repo

eth/contracts: Generate chainlink bindings

Makefile: Fix abigen bindings generation

Revert everything abigen

Turns out there's already bindings exported from the Chainlink lib.

go.mod: Add chainlink library

eth/watchers: Add pricefeed watcher

eth/watchers: Clean-up event watching code

eth/watchers: Improve price tracking

Revert "go.mod: Add chainlink library"

This reverts commit ac415bd.

Revert "Revert everything abigen"

This reverts commit b7c40b1.

eth/contracts: Gen bindings for proxy iface

eth/watchers: Use local bindings for contracts

eth/watchers: Simplify event subs logic

eth/watchers: Simplify&optimize truncated ticker

eth/watchers: Update decimals on fetch

eth/watchers: Improve handling of decimals

eth/watchers: Fix price rat creation

eth/watchers: Make sure we use UTC on truncated timer

eth/contracts/chainlink: Generate only V3 contract bindings

eth/watchers: Watch PriceFeed only with polling

eth/watchers: Add a retry logic on price update

eth/watchers: Use clog instead of fmt.Printf
  • Loading branch information
victorges committed Mar 13, 2024
1 parent cad6713 commit 67286d8
Show file tree
Hide file tree
Showing 8 changed files with 932 additions and 6 deletions.
18 changes: 15 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
SHELL=/bin/bash
GO_BUILD_DIR?="./"

all: net/lp_rpc.pb.go net/redeemer.pb.go net/redeemer_mock.pb.go core/test_segment.go livepeer livepeer_cli livepeer_router livepeer_bench
MOCKGEN=go run github.com/golang/mock/mockgen
ABIGEN=go run github.com/ethereum/go-ethereum/cmd/abigen

all: net/lp_rpc.pb.go net/redeemer.pb.go net/redeemer_mock.pb.go core/test_segment.go eth/contracts/chainlink/AggregatorV3Interface.go livepeer livepeer_cli livepeer_router livepeer_bench

net/lp_rpc.pb.go: net/lp_rpc.proto
protoc -I=. --go_out=. --go-grpc_out=. $^
Expand All @@ -10,12 +13,21 @@ net/redeemer.pb.go: net/redeemer.proto
protoc -I=. --go_out=. --go-grpc_out=. $^

net/redeemer_mock.pb.go net/redeemer_grpc_mock.pb.go: net/redeemer.pb.go net/redeemer_grpc.pb.go
@mockgen -source net/redeemer.pb.go -destination net/redeemer_mock.pb.go -package net
@mockgen -source net/redeemer_grpc.pb.go -destination net/redeemer_grpc_mock.pb.go -package net
@$(MOCKGEN) -source net/redeemer.pb.go -destination net/redeemer_mock.pb.go -package net
@$(MOCKGEN) -source net/redeemer_grpc.pb.go -destination net/redeemer_grpc_mock.pb.go -package net

core/test_segment.go:
core/test_segment.sh core/test_segment.go

eth/contracts/chainlink/AggregatorV3Interface.go:
solc --version | grep 0.7.6+commit.7338295f
@set -ex; \
for sol_file in eth/contracts/chainlink/*.sol; do \
contract_name=$$(basename "$$sol_file" .sol); \
solc --abi --optimize --overwrite -o $$(dirname "$$sol_file") $$sol_file; \
$(ABIGEN) --abi=$${sol_file%.sol}.abi --pkg=chainlink --type=$$contract_name --out=$${sol_file%.sol}.go; \
done

version=$(shell cat VERSION)

ldflags := -X github.com/livepeer/go-livepeer/core.LivepeerVersion=$(shell ./print_version.sh)
Expand Down
1 change: 1 addition & 0 deletions eth/contracts/chainlink/AggregatorV3Interface.abi
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[{"inputs":[],"name":"decimals","outputs":[{"internalType":"uint8","name":"","type":"uint8"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"description","outputs":[{"internalType":"string","name":"","type":"string"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"uint80","name":"_roundId","type":"uint80"}],"name":"getRoundData","outputs":[{"internalType":"uint80","name":"roundId","type":"uint80"},{"internalType":"int256","name":"answer","type":"int256"},{"internalType":"uint256","name":"startedAt","type":"uint256"},{"internalType":"uint256","name":"updatedAt","type":"uint256"},{"internalType":"uint80","name":"answeredInRound","type":"uint80"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"latestRoundData","outputs":[{"internalType":"uint80","name":"roundId","type":"uint80"},{"internalType":"int256","name":"answer","type":"int256"},{"internalType":"uint256","name":"startedAt","type":"uint256"},{"internalType":"uint256","name":"updatedAt","type":"uint256"},{"internalType":"uint80","name":"answeredInRound","type":"uint80"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"version","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"view","type":"function"}]
394 changes: 394 additions & 0 deletions eth/contracts/chainlink/AggregatorV3Interface.go

Large diffs are not rendered by default.

36 changes: 36 additions & 0 deletions eth/contracts/chainlink/AggregatorV3Interface.sol
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// SPDX-License-Identifier: MIT
// https://github.com/smartcontractkit/chainlink/blob/v2.9.1/contracts/src/v0.7/interfaces/AggregatorV3Interface.sol
pragma solidity ^0.7.0;

interface AggregatorV3Interface {
function decimals() external view returns (uint8);

function description() external view returns (string memory);

function version() external view returns (uint256);

// getRoundData and latestRoundData should both raise "No data present"
// if they do not have data to report, instead of returning unset values
// which could be misinterpreted as actual reported values.
function getRoundData(uint80 _roundId)
external
view
returns (
uint80 roundId,
int256 answer,
uint256 startedAt,
uint256 updatedAt,
uint80 answeredInRound
);

function latestRoundData()
external
view
returns (
uint80 roundId,
int256 answer,
uint256 startedAt,
uint256 updatedAt,
uint80 answeredInRound
);
}
219 changes: 219 additions & 0 deletions eth/watchers/pricefeedwatcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
package watchers

import (
"context"
"fmt"
"math/big"
"regexp"
"strings"
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/livepeer/go-livepeer/clog"
"github.com/livepeer/go-livepeer/eth/contracts/chainlink"
)

const (
priceUpdateMaxRetries = 5
priceUpdateBaseRetryDelay = 30 * time.Second
)

type PriceFeedWatcher struct {
ctx context.Context

client *ethclient.Client
proxy *chainlink.AggregatorV3Interface

currencyBase, currencyQuote string

current PriceData
priceUpdated chan PriceData
}

type PriceData struct {
RoundId int64
Price *big.Rat
UpdatedAt time.Time
}

func NewPriceFeedWatcher(ctx context.Context, rpcUrl, proxyAddrStr string) (*PriceFeedWatcher, error) {
// Initialize client instance using the rpcUrl.
client, err := ethclient.DialContext(ctx, rpcUrl)
if err != nil {
return nil, fmt.Errorf("failed to initialize client: %w", err)
}

// Test if it is a contract address.
ok := isContractAddress(proxyAddrStr, client)
if !ok {
return nil, fmt.Errorf("not a contract address: %s", proxyAddrStr)
}

proxyAddr := common.HexToAddress(proxyAddrStr)
proxy, err := chainlink.NewAggregatorV3Interface(proxyAddr, client)
if err != nil {
return nil, fmt.Errorf("failed to create mock aggregator proxy: %w", err)
}

description, err := proxy.Description(&bind.CallOpts{})
if err != nil {
return nil, fmt.Errorf("failed to get description: %w", err)
}

currencyFrom, currencyTo, err := parseCurrencies(description)
if err != nil {
return nil, err
}

w := &PriceFeedWatcher{
ctx: ctx,
client: client,
proxy: proxy,
currencyBase: currencyFrom,
currencyQuote: currencyTo,
priceUpdated: make(chan PriceData, 1),
}

err = w.fetchPrice()
if err != nil {
return nil, fmt.Errorf("failed to update price: %w", err)
}
go w.watch()

return w, nil
}

// Currencies returns the base and quote currencies of the price feed.
// i.e. base = CurrentPrice() * quote
func (w *PriceFeedWatcher) Currencies() (base string, quote string) {
return w.currencyBase, w.currencyQuote
}

func (w *PriceFeedWatcher) Current() PriceData {
return w.current
}

func (w *PriceFeedWatcher) PriceUpdated() <-chan PriceData {
return w.priceUpdated
}

func (w *PriceFeedWatcher) fetchPrice() error {
roundData, err := w.proxy.LatestRoundData(&bind.CallOpts{})
if err != nil {
return fmt.Errorf("failed to get latest round data: %w", err)
}

decimals, err := w.proxy.Decimals(&bind.CallOpts{})
if err != nil {
return fmt.Errorf("failed to get decimals: %w", err)
}

w.updatePrice(roundData.Answer, decimals, roundData.RoundId, roundData.UpdatedAt)
return nil
}

func (w *PriceFeedWatcher) updatePrice(current *big.Int, decimals uint8, roundId, updatedAt *big.Int) {
// Compute a big.int which is 10^decimals.
divisor := new(big.Int).Exp(
big.NewInt(10),
big.NewInt(int64(decimals)),
nil)

newPrice := PriceData{
RoundId: roundId.Int64(),
Price: new(big.Rat).SetFrac(current, divisor),
UpdatedAt: time.Unix(updatedAt.Int64(), 0),
}

if newPrice.UpdatedAt.After(w.current.UpdatedAt) {
w.current = newPrice
select {
case w.priceUpdated <- newPrice:
default:
}
}
}

func (w *PriceFeedWatcher) watch() {
ctx, cancel := context.WithCancel(w.ctx)
defer cancel()
ticker := newTruncatedTicker(ctx, 1*time.Hour)

for {
select {
case <-w.ctx.Done():
return
case <-ticker:
retryDelay := priceUpdateBaseRetryDelay
for attempt := 1; attempt <= priceUpdateMaxRetries; attempt++ {
err := w.fetchPrice()
if err == nil {
break
}

clog.Warningf(ctx, "Failed to fetch updated price from PriceFeed, retrying after retryDelay=%d attempt=%d err=%v", retryDelay, attempt, err)
time.Sleep(retryDelay)
retryDelay *= 2
}
}
}
}

func isContractAddress(addr string, client *ethclient.Client) bool {
if len(addr) == 0 {
return false
}

// Ensure it is an Ethereum address: 0x followed by 40 hexadecimal characters.
re := regexp.MustCompile("^0x[0-9a-fA-F]{40}$")
if !re.MatchString(addr) {
return false
}

// Ensure it is a contract address.
address := common.HexToAddress(addr)
bytecode, err := client.CodeAt(context.Background(), address, nil) // nil is latest block
if err != nil {
return false
}
isContract := len(bytecode) > 0
return isContract
}

func parseCurrencies(description string) (currencyBase string, currencyQuote string, err error) {
currencies := strings.Split(description, "/")
if len(currencies) != 2 {
return "", "", fmt.Errorf("aggregator description must be in the format 'FROM / TO' but got: %s", description)
}

currencyBase = strings.TrimSpace(currencies[0])
currencyQuote = strings.TrimSpace(currencies[1])
return
}

func newTruncatedTicker(ctx context.Context, d time.Duration) <-chan time.Time {
ch := make(chan time.Time, 1)
go func() {
defer close(ch)

nextTick := time.Now().UTC().Truncate(d)
for {
nextTick = nextTick.Add(d)
untilNextTick := nextTick.Sub(time.Now().UTC())
if untilNextTick <= 0 {
continue
}

select {
case <-ctx.Done():
return
case t := <-time.After(untilNextTick):
ch <- t
}
}
}()

return ch
}
Loading

0 comments on commit 67286d8

Please sign in to comment.