Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(feeder): add price provider #1032

Merged
merged 18 commits into from
Oct 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Features

* [#1032](https://github.com/NibiruChain/nibiru/pull/1032) - feeder: add price provide API and bitfinex price source
* [#1019](https://github.com/NibiruChain/nibiru/pull/1019) - add fields to the snapshot reserve event
* [#1010](https://github.com/NibiruChain/nibiru/pull/1010) - feeder: initialize oracle feeder core logic
* [#966](https://github.com/NibiruChain/nibiru/pull/966) - collections: add indexed map
Expand Down
128 changes: 128 additions & 0 deletions feeder/priceprovider/priceprovider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package priceprovider

import (
"github.com/NibiruChain/nibiru/feeder/types"
"github.com/NibiruChain/nibiru/x/common"
"github.com/rs/zerolog"
"sync"
"time"
)

// NewPriceProvider returns a types.PriceProvider given the price source we want to gather prices from,
// the mapping between nibiru common.AssetPair and the source's symbols, and a zerolog.Logger instance.
func NewPriceProvider(priceSourceName string, pairToSymbolMap map[common.AssetPair]string, log zerolog.Logger) types.PriceProvider {
var source Source
switch priceSourceName {
case Bitfinex:
source = NewTickSource(symbolsFromPairsToSymbolsMap(pairToSymbolMap), BitfinexPriceUpdate, log)
default:
panic("unknown price provider: " + priceSourceName)
}
return newPriceProvider(source, priceSourceName, pairToSymbolMap, log)
}

// newPriceProvider returns a raw *PriceProvider given a Source implementer, the source name and the
// map of nibiru common.AssetPair to Source's symbols, plus the zerolog.Logger instance.
// Exists for testing purposes.
func newPriceProvider(source Source, sourceName string, pairToSymbolsMap map[common.AssetPair]string, log zerolog.Logger) *PriceProvider {
log = log.With().
Str("component", "price-provider").
Str("price-source", sourceName).
Logger()

pp := &PriceProvider{
log: log,
stop: make(chan struct{}),
done: make(chan struct{}),
source: source,
sourceName: sourceName,
pairToSymbol: pairToSymbolsMap,
mu: sync.Mutex{},
lastPrices: map[string]SourcePriceUpdate{},
}
go pp.loop()
return pp
}

// PriceProvider implements the types.PriceProvider interface.
// it wraps a Source and handles conversions between
// nibiru asset pair to exchange symbols.
type PriceProvider struct {
log zerolog.Logger

stop, done chan struct{}

source Source
sourceName string
pairToSymbol map[common.AssetPair]string

mu sync.Mutex
lastPrices map[string]SourcePriceUpdate
}

// GetPrice returns the types.Price for the given common.AssetPair
// in case price has expired, or for some reason it's impossible to
// get the last available price, then an invalid types.Price is returned.
func (p *PriceProvider) GetPrice(pair common.AssetPair) types.Price {
symbol, ok := p.pairToSymbol[pair]
// in case this is an unknown symbol, which might happen
// when for example we have a param update, then we return
// an abstain vote on the provided asset pair.
if !ok {
p.log.Warn().Str("nibiru-pair", pair.String()).Msg("unknown nibiru pair")
return types.Price{
Pair: pair,
Price: 0,
Source: p.sourceName,
Valid: false,
}
}
p.mu.Lock()
price, ok := p.lastPrices[symbol]
p.mu.Unlock()
return types.Price{
Pair: pair,
Price: price.Price,
Source: p.sourceName,
Valid: isValid(price, ok),
}
}

func (p *PriceProvider) loop() {
defer close(p.done)
defer p.source.Close()

for {
select {
case <-p.stop:
return
case updates := <-p.source.PricesUpdate():
p.mu.Lock()
for symbol, price := range updates {
p.lastPrices[symbol] = price
}
p.mu.Unlock()
}
}
}

func (p *PriceProvider) Close() {
close(p.stop)
<-p.done
}

// symbolsFromPairsToSymbolsMap returns the symbols list
// given the map which maps nibiru chain pairs to exchange symbols.
func symbolsFromPairsToSymbolsMap(m map[common.AssetPair]string) []string {
symbols := make([]string, 0, len(m))
for _, v := range m {
symbols = append(symbols, v)
}
return symbols
}

// isValid is a helper function which asserts if a price is valid given
// if it was found and the time at which it was last updated.
func isValid(price SourcePriceUpdate, found bool) bool {
return found && time.Now().Sub(price.UpdateTime) < PriceTimeout
}
80 changes: 80 additions & 0 deletions feeder/priceprovider/priceprovider_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package priceprovider

import (
"github.com/NibiruChain/nibiru/x/common"
"github.com/rs/zerolog"
"github.com/stretchr/testify/require"
"io"
"testing"
"time"
)

type testAsyncPriceProvider struct {
closeFn func()
priceUpdatesC chan map[string]SourcePriceUpdate
}

func (t testAsyncPriceProvider) Close() { t.closeFn() }
func (t testAsyncPriceProvider) PricesUpdate() <-chan map[string]SourcePriceUpdate {
return t.priceUpdatesC
}

func TestPriceProvider(t *testing.T) {
t.Run("success", func(t *testing.T) {
pp := NewPriceProvider(Bitfinex, map[common.AssetPair]string{common.Pair_BTC_NUSD: "tBTCUSD"}, zerolog.New(io.Discard))
defer pp.Close()
<-time.After(UpdateTick + 2*time.Second)

price := pp.GetPrice(common.Pair_BTC_NUSD)
require.True(t, price.Valid)
})

t.Run("panics on unknown price source", func(t *testing.T) {
require.Panics(t, func() {
NewPriceProvider("unknown", nil, zerolog.New(io.Discard))
})
})

t.Run("returns invalid price on unknown AssetPair", func(t *testing.T) {
pp := newPriceProvider(testAsyncPriceProvider{}, "test", map[common.AssetPair]string{}, zerolog.New(io.Discard))
price := pp.GetPrice(common.Pair_BTC_NUSD)
require.False(t, price.Valid)
require.Zero(t, price.Price)
require.Equal(t, common.Pair_BTC_NUSD, price.Pair)
})

t.Run("Close assertions", func(t *testing.T) {
var closed bool
pp := newPriceProvider(testAsyncPriceProvider{
closeFn: func() {
closed = true
},
}, "test", map[common.AssetPair]string{}, zerolog.New(io.Discard))

pp.Close()
require.True(t, closed)
})
}

func Test_isValid(t *testing.T) {
t.Run("ok", func(t *testing.T) {
require.True(t, isValid(SourcePriceUpdate{
Price: 10,
UpdateTime: time.Now(),
}, true))
})

t.Run("price not found", func(t *testing.T) {
require.False(t, isValid(SourcePriceUpdate{
Price: 10,
UpdateTime: time.Now(),
}, false))
})

t.Run("price expired", func(t *testing.T) {
require.False(t, isValid(SourcePriceUpdate{
Price: 20,
UpdateTime: time.Now().Add(-1 - 1*PriceTimeout),
}, true))
})
}
116 changes: 116 additions & 0 deletions feeder/priceprovider/source.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package priceprovider

import (
"github.com/rs/zerolog"
"time"
)

var (
// PriceTimeout defines after how much time a price is considered expired.
PriceTimeout = 15 * time.Second
// UpdateTick defines the wait time between price updates.
UpdateTick = 3 * time.Second
)

const (
Bitfinex = "bitfinex"
)

// Source defines a source for price provision.
// This source has no knowledge of nibiru internals
// and mappings across common.AssetPair and the Source
// symbols.
type Source interface {
// PricesUpdate is a readonly channel which provides
// the latest prices update. Updates can be provided
// for one asset only or in batches, hence the map.
PricesUpdate() <-chan map[string]SourcePriceUpdate
// Close closes the Source.
Close()
}

// SourcePriceUpdate defines an update for a symbol for Source implementers.
type SourcePriceUpdate struct {
Price float64
UpdateTime time.Time
}

// TickSourcePriceUpdateFunc is the function used by TickSource to update prices.
// The symbols passed are the symbols we require prices for.
// The returned map must map symbol to its float64 price, or an error.
// If there's a failure in updating only one price then the map can be returned
// without the provided symbol.
type TickSourcePriceUpdateFunc func(symbols []string) (map[string]float64, error)

// NewTickSource instantiates a new TickSource instance, given the symbols and a price updater function
// which returns the latest prices for the provided symbols.
func NewTickSource(symbols []string, updatePricesFunc TickSourcePriceUpdateFunc, log zerolog.Logger) *TickSource {
ts := &TickSource{
log: log,
stop: make(chan struct{}),
done: make(chan struct{}),
tick: time.NewTicker(UpdateTick),
symbols: symbols,
updatePrices: updatePricesFunc,
sendPriceUpdate: make(chan map[string]SourcePriceUpdate),
}
go ts.loop()
return ts
}

// TickSource is a Source which updates prices
// every x time.Duration.
type TickSource struct {
log zerolog.Logger
stop, done chan struct{}
tick *time.Ticker

symbols []string

updatePrices func(symbols []string) (map[string]float64, error)

sendPriceUpdate chan map[string]SourcePriceUpdate
}

func (s *TickSource) loop() {
defer s.tick.Stop()
defer close(s.done)
for {
select {
case <-s.stop:
return
case <-s.tick.C:
now := time.Now()
s.log.Debug().Time("time", now).Msg("received tick, updating prices")
prices, err := s.updatePrices(s.symbols)
if err != nil {
s.log.Err(err).Msg("failed to update prices")
break // breaks the current select case, not the for cycle
}
update := make(map[string]SourcePriceUpdate, len(prices))
for symbol, price := range prices {
update[symbol] = SourcePriceUpdate{
Price: price,
UpdateTime: now,
}
}
s.log.Debug().Msg("sending price update")
select {
case s.sendPriceUpdate <- update:
s.log.Debug().Msg("sent price update")
case <-s.stop:
s.log.Warn().Msg("dropped price update due to shutdown")
return
}
}
}
}

func (s *TickSource) PricesUpdate() <-chan map[string]SourcePriceUpdate {
return s.sendPriceUpdate
}

func (s *TickSource) Close() {
close(s.stop)
<-s.done
}
47 changes: 47 additions & 0 deletions feeder/priceprovider/source_bitfinex.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package priceprovider

import (
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
)

// BitfinexPriceUpdate returns the prices given the symbols or an error.
func BitfinexPriceUpdate(symbols []string) (prices map[string]float64, err error) {
type ticker []interface{}
const size = 11
const lastPriceIndex = 7
const symbolNameIndex = 0

resp, err := http.Get("https://api-pub.bitfinex.com/v2/tickers?symbols=" + strings.Join(symbols, ","))
if err != nil {
return nil, err
}
defer resp.Body.Close()

b, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
var tickers []ticker

err = json.Unmarshal(b, &tickers)
if err != nil {
return nil, err
}

prices = make(map[string]float64)
for _, ticker := range tickers {
if len(ticker) != size {
return nil, fmt.Errorf("impossible to parse ticker size %d, %#v", len(ticker), ticker) // TODO(mercilex): return or log and continue?
}
tickerName := ticker[symbolNameIndex].(string)
lastPrice := ticker[lastPriceIndex].(float64)

prices[tickerName] = lastPrice
}

return prices, nil
}
Loading