Skip to content

Commit

Permalink
feat: astroport provider (#316)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamewozniak authored Nov 28, 2023
1 parent 4d8d0a8 commit 671b429
Show file tree
Hide file tree
Showing 5 changed files with 346 additions and 1 deletion.
3 changes: 3 additions & 0 deletions config/supported_assets.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var (
provider.ProviderPolygon: true,
provider.ProviderEthUniswap: false,
provider.ProviderKujira: false,
provider.ProviderAstroport: false,
provider.ProviderMock: false,
}

Expand All @@ -40,11 +41,13 @@ var (
{Base: "ETH", Quote: "USD"}: {},
{Base: "ATOM", Quote: "USD"}: {},
{Base: "OSMO", Quote: "USD"}: {},
{Base: "INJ", Quote: "USD"}: {},

{Base: "OSMO", Quote: "USDT"}: {},
{Base: "JUNO", Quote: "USDT"}: {},
{Base: "WETH", Quote: "USDC"}: {},
{Base: "WBTC", Quote: "WETH"}: {},
{Base: "INJ", Quote: "USDT"}: {},
}

SupportedUniswapCurrencies = map[string]struct{}{
Expand Down
16 changes: 16 additions & 0 deletions ojo-provider-config/currency-pairs.toml
Original file line number Diff line number Diff line change
Expand Up @@ -217,3 +217,19 @@ providers = [
"gate"
]
quote = "USDT"

[[currency_pairs]]
base = "INJ"
providers = [
"gate",
"binance",
"mexc",
]
quote = "USDT"

[[currency_pairs]]
base = "STINJ"
providers = [
"astroport",
]
quote = "INJ"
4 changes: 3 additions & 1 deletion oracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,6 @@ func (o *Oracle) GetComputedPrices(
providerCandles types.AggregatedProviderCandles,
providerPrices types.AggregatedProviderPrices,
) (types.CurrencyPairDec, error) {

conversionRates, err := CalcCurrencyPairRates(
providerCandles,
providerPrices,
Expand Down Expand Up @@ -474,6 +473,9 @@ func NewProvider(

case provider.ProviderEthUniswap:
return provider.NewUniswapProvider(ctx, logger, endpoint, providerPairs...)

case provider.ProviderAstroport:
return provider.NewAstroportProvider(ctx, logger, endpoint, providerPairs...)
}

return nil, fmt.Errorf("provider %s not found", providerName)
Expand Down
283 changes: 283 additions & 0 deletions oracle/provider/astroport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
package provider

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"sync"
"time"

"github.com/ojo-network/ojo/util/decmath"
"github.com/ojo-network/price-feeder/oracle/types"
"github.com/rs/zerolog"
)

var _ Provider = (*AstroportProvider)(nil)

const (
ProviderAstroport = "astroport"
restURL = "https://markets-api.astroport.fi"
tickersURL = "/markets/cg/tickers"
assetsURL = "/markets/cmc/v1/assets"
pollInterval = 3 * time.Second
)

type (
AstroportProvider struct {
logger zerolog.Logger
mtx sync.RWMutex
endpoints Endpoint

client *http.Client
priceStore
ctx context.Context
}

// AstroportAssetResponse is the response from the Astroport assets endpoint.
AstroportAssetResponse struct {
BaseID string `json:"base_id"`
BaseName string `json:"base_name"`
BaseSymbol string `json:"base_symbol"`
QuoteID string `json:"quote_id"`
QuoteName string `json:"quote_name"`
QuoteSymbol string `json:"quote_symbol"`
LastPrice float64 `json:"last_price"`
BaseVolume float64 `json:"base_volume"`
QuoteVolume float64 `json:"quote_volume"`
USDVolume float64 `json:"USD_volume"`
}
// AstroportTickersResponse is the response from the Astroport tickers endpoint.
AstroportTickersResponse struct {
TickerID string `json:"ticker_id"`
BaseCurrency string `json:"base_currency"`
TargetCurrency string `json:"target_currency"`
LastPrice float64 `json:"last_price"`
LiquidityInUSD float64 `json:"liquidity_in_usd"`
BaseVolume float64 `json:"base_volume"`
TargetVolume float64 `json:"target_volume"`
PoolID string `json:"pool_id"`
}
)

// NewAstroportProvider returns a new AstroportProvider.
// It also starts a go routine to poll for new data.
func NewAstroportProvider(
ctx context.Context,
logger zerolog.Logger,
endpoints Endpoint,
pairs ...types.CurrencyPair,
) (*AstroportProvider, error) {
if (endpoints.Name) != ProviderAstroport {
endpoints = Endpoint{
Name: ProviderAstroport,
Rest: restURL,
}
}

astroLogger := logger.With().Str("provider", string(ProviderAstroport)).Logger()

provider := &AstroportProvider{
logger: astroLogger,
endpoints: endpoints,
priceStore: newPriceStore(astroLogger),
client: &http.Client{},
ctx: ctx,
}

confirmedPairs, err := ConfirmPairAvailability(
provider,
provider.endpoints.Name,
provider.logger,
pairs...,
)
if err != nil {
return nil, err
}

provider.setSubscribedPairs(confirmedPairs...)

return provider, nil
}

// GetAvailablePairs return all available pair symbols.
func (p *AstroportProvider) GetAvailablePairs() (map[string]struct{}, error) {
availablePairs, err := p.getAvailableAssets()
if err != nil {
return nil, err
}

availableSymbols := map[string]struct{}{}
for _, pair := range availablePairs {
availableSymbols[pair.String()] = struct{}{}
}

return availableSymbols, nil
}

// SubscribeCurrencyPairs sends the new subscription messages to the websocket
// and adds them to the providers subscribedPairs array.
func (p *AstroportProvider) SubscribeCurrencyPairs(cps ...types.CurrencyPair) {
p.mtx.Lock()
defer p.mtx.Unlock()

newPairs := []types.CurrencyPair{}
for _, cp := range cps {
if _, ok := p.subscribedPairs[cp.String()]; !ok {
newPairs = append(newPairs, cp)
}
}

confirmedPairs, err := ConfirmPairAvailability(
p,
p.endpoints.Name,
p.logger,
newPairs...,
)
if err != nil {
return
}

p.setSubscribedPairs(confirmedPairs...)
}

// StartConnections begins the polling process for
// the astroport provider.
func (p *AstroportProvider) StartConnections() {
go func() {
p.logger.Debug().Msg("starting astroport polling...")
err := p.poll()
if err != nil {
p.logger.Err(err).Msg("astroport provider unable to poll new data")
}
}()
}

// AstroportTickerPairs is a struct to hold the AstroportTickersResponse and the
// corresponding pair. It satisfies the TickerPrice interface.
type AstroportTickerPairs struct {
ticker AstroportTickersResponse
pair types.CurrencyPair
}

// toTickerPrice converts the AstroportTickerPairs to a TickerPrice.
// It satisfies the TickerPrice interface.
func (atr AstroportTickersResponse) toTickerPrice() (types.TickerPrice, error) {
lp, err := decmath.NewDecFromFloat(atr.LastPrice)
if err != nil {
return types.TickerPrice{}, err
}
volume, err := decmath.NewDecFromFloat(atr.BaseVolume)
if err != nil {
return types.TickerPrice{}, err
}
return types.TickerPrice{
Price: lp,
Volume: volume,
}, nil
}

// setTickers queries the Astroport API for the latest tickers and updates the
// priceStore.
func (p *AstroportProvider) setTickers() error {
tickers, err := p.queryTickers()
if err != nil {
return err
}
for _, v := range tickers {
p.setTickerPair(v.ticker, v.pair.String())
}
return nil
}

// getAvailableAssets returns all available assets from the api.
// It returns a map of ticker IDs -> pairs.
func (p *AstroportProvider) getAvailableAssets() (map[string]types.CurrencyPair, error) {
res, err := p.client.Get(p.endpoints.Rest + assetsURL)
if err != nil {
return nil, err
}
defer res.Body.Close()

bz, err := io.ReadAll(res.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response: %w", err)
}

astroportAssets := []map[string]AstroportAssetResponse{}
if err := json.Unmarshal(bz, &astroportAssets); err != nil {
return nil, fmt.Errorf("failed to unmarshal response body: %w", err)
}

// convert the astroport assets to a map of ticker IDs -> pairs
availablePairs := map[string]types.CurrencyPair{}
for _, assetMap := range astroportAssets {
for tickerID, asset := range assetMap {
availablePairs[tickerID] = types.CurrencyPair{
Base: strings.ToUpper(asset.BaseSymbol),
Quote: strings.ToUpper(asset.QuoteSymbol),
}
}
}
return availablePairs, nil
}

// queryTickers returns the AstroportTickerPairs available from the API.
func (p *AstroportProvider) queryTickers() ([]AstroportTickerPairs, error) {
res, err := p.client.Get(p.endpoints.Rest + tickersURL)
if err != nil {
return nil, err
}
defer res.Body.Close()

bz, err := io.ReadAll(res.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response: %w", err)
}

astroportTickers := []AstroportTickersResponse{}
if err := json.Unmarshal(bz, &astroportTickers); err != nil {
return nil, fmt.Errorf("failed to unmarshal response body: %w", err)
}

availableAssets, err := p.getAvailableAssets()
if err != nil {
return nil, err
}

// filter out tickers that we are not subscribed to
tickers := []AstroportTickerPairs{}
for tickerID, v := range availableAssets {
for _, ticker := range astroportTickers {
if ticker.TickerID == tickerID {
tickers = append(tickers, AstroportTickerPairs{
ticker: ticker,
pair: v,
})
}
}
}
return tickers, nil
}

// This function periodically calls setTickers to update the priceStore.
func (p *AstroportProvider) poll() error {
for {
select {
case <-p.ctx.Done():
return nil

default:
p.logger.Debug().Msg("querying astroport api")

err := p.setTickers()
if err != nil {
return err
}

time.Sleep(pollInterval)
}
}
}
41 changes: 41 additions & 0 deletions oracle/provider/astroport_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package provider

import (
"context"
"os"
"testing"
"time"

"github.com/ojo-network/price-feeder/oracle/types"
"github.com/rs/zerolog"
"github.com/stretchr/testify/require"
)

// TestAstroportProvider_GetTickers tests the polling process.
// TODO: Make this more comprehensive.
//
// Ref: https://github.com/ojo-network/price-feeder/issues/317
func TestAstroportProvider_GetTickers(t *testing.T) {
ctx := context.Background()
pairs := []types.CurrencyPair{{
Base: "STINJ",
Quote: "INJ",
}}
p, err := NewAstroportProvider(
ctx,
zerolog.New(os.Stdout).With().Timestamp().Logger(),
Endpoint{},
pairs...,
)
require.NoError(t, err)
availPairs, err := p.GetAvailablePairs()
require.NoError(t, err)
require.NotEmpty(t, availPairs)

p.StartConnections()
time.Sleep(2 * time.Second)

res, err := p.GetTickerPrices(pairs...)
require.NoError(t, err)
require.NotEmpty(t, res)
}

0 comments on commit 671b429

Please sign in to comment.