Skip to content

Commit

Permalink
fix(polling): poll per rollapp, only non-finalized orders (#63)
Browse files Browse the repository at this point in the history
  • Loading branch information
zale144 authored Jan 2, 2025
1 parent f0ec486 commit 2a16d1b
Show file tree
Hide file tree
Showing 6 changed files with 677 additions and 26 deletions.
2 changes: 1 addition & 1 deletion eibc/lp.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func (or *orderTracker) loadLPs(ctx context.Context) error {
if len(or.lps) == 0 {
or.logger.Info("no LPs found")
} else {
or.logger.Info("loaded LPs", zap.Int("count", len(or.lps)))
or.logger.Debug("loaded LPs", zap.Int("count", len(or.lps)))
}

if lpsUpdated || (currentLPCount > 0 && len(or.lps) > currentLPCount) {
Expand Down
7 changes: 6 additions & 1 deletion eibc/order_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,15 @@ func NewOrderClient(cfg config.Config, logger *zap.Logger) (*orderClient, error)
}

if cfg.OrderPolling.Enabled {
var rollapps []string
for r, _ := range cfg.Rollapps {
rollapps = append(rollapps, r)
}
oc.orderPoller = newOrderPoller(
hubClient.Context().ChainID,
hubClient.Context(),
oc.orderTracker,
cfg.OrderPolling,
rollapps,
logger,
)
oc.orderTracker.resetPoller = oc.orderPoller.resetOrderPolling
Expand Down
15 changes: 9 additions & 6 deletions eibc/order_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func TestOrderClient(t *testing.T) {

func setupTestOrderClient(
cfg config.Config,
pollOrders func() ([]Order, error),
pollOrders func(ctx context.Context) ([]Order, error),
hubClient mockNodeClient,
fullNodeClient *nodeClient,
grantsFn getLPGrantsFn,
Expand Down Expand Up @@ -373,8 +373,6 @@ func setupTestOrderClient(
logger,
)

chainID := "test-chain-id"

for i := range cfg.Fulfillers.Scale {
fulfillerName := fmt.Sprintf("fulfiller-%d", i+1)

Expand Down Expand Up @@ -403,10 +401,15 @@ func setupTestOrderClient(
// poller
var poller *orderPoller
if cfg.OrderPolling.Enabled {
var rollapps []string
for r, _ := range cfg.Rollapps {
rollapps = append(rollapps, r)
}
poller = newOrderPoller(
chainID,
hubClient.Context(),
ordTracker,
cfg.OrderPolling,
rollapps,
logger,
)
poller.getOrders = pollOrders
Expand All @@ -425,8 +428,8 @@ func setupTestOrderClient(
return oc, nil
}

func mockGetPollerOrders(orders []Order) func() ([]Order, error) {
return func() ([]Order, error) {
func mockGetPollerOrders(orders []Order) func(ctx context.Context) ([]Order, error) {
return func(ctx context.Context) ([]Order, error) {
// after polling once, remove orders
defer func() {
orders = nil
Expand Down
53 changes: 43 additions & 10 deletions eibc/order_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,44 +11,51 @@ import (
"sync/atomic"
"time"

"github.com/cosmos/cosmos-sdk/client"
sdk "github.com/cosmos/cosmos-sdk/types"
"go.uber.org/zap"

"github.com/dymensionxyz/eibc-client/config"
"github.com/dymensionxyz/eibc-client/types"
)

type orderPoller struct {
chainID string
indexerURL string
rollapps []string
interval time.Duration
indexerClient *http.Client
rollappClient types.QueryClient
logger *zap.Logger

getOrders func() ([]Order, error)
getOrders func(ctx context.Context) ([]Order, error)
orderTracker *orderTracker
lastBlockHeight atomic.Uint64
}

func newOrderPoller(
chainID string,
clientCtx client.Context,
orderTracker *orderTracker,
pollingCfg config.OrderPollingConfig,
rollapps []string,
logger *zap.Logger,
) *orderPoller {
o := &orderPoller{
chainID: chainID,
chainID: clientCtx.ChainID,
rollapps: rollapps,
indexerURL: pollingCfg.IndexerURL,
interval: pollingCfg.Interval,
logger: logger.With(zap.String("module", "order-poller")),
orderTracker: orderTracker,
rollappClient: types.NewQueryClient(clientCtx),
indexerClient: &http.Client{Timeout: 25 * time.Second},
}
o.getOrders = o.getDemandOrdersFromIndexer
return o
}

const (
ordersQuery = `{"query": "{ibcTransferDetails(filter: {network: {equalTo: \"%s\"} status: { in: [EibcPending, Refunding] }, blockHeight: { greaterThan: \"%s\" }}) {nodes { eibcOrderId amount proofHeight blockHeight price rollappId eibcFee }}}"}`
rollappOrdersQuery = `{"query": "{ibcTransferDetails(filter: {network: {equalTo: \"%s\"} status: { in: [EibcPending, Refunding] }, blockHeight: { greaterThan: \"%s\" }, rollappId: { equalTo: \"%s\"}, proofHeight: {greaterThan: \"%s\"}}) {nodes { eibcOrderId amount proofHeight blockHeight price rollappId eibcFee }}}"}`
)

type Order struct {
Expand All @@ -70,7 +77,7 @@ type ordersResponse struct {
}

func (p *orderPoller) start(ctx context.Context) error {
if err := p.pollPendingDemandOrders(); err != nil {
if err := p.pollPendingDemandOrders(ctx); err != nil {
return fmt.Errorf("failed to refresh demand orders: %w", err)
}

Expand All @@ -80,7 +87,7 @@ func (p *orderPoller) start(ctx context.Context) error {
case <-ctx.Done():
return
default:
if err := p.pollPendingDemandOrders(); err != nil {
if err := p.pollPendingDemandOrders(ctx); err != nil {
p.logger.Error("failed to refresh demand orders", zap.Error(err))
}
}
Expand All @@ -89,8 +96,8 @@ func (p *orderPoller) start(ctx context.Context) error {
return nil
}

func (p *orderPoller) pollPendingDemandOrders() error {
newDemandOrders, err := p.getOrders()
func (p *orderPoller) pollPendingDemandOrders(ctx context.Context) error {
newDemandOrders, err := p.getOrders(ctx)
if err != nil {
return fmt.Errorf("failed to get demand orders: %w", err)
}
Expand Down Expand Up @@ -209,8 +216,34 @@ func (p *orderPoller) convertOrders(demandOrders []Order) (orders []*demandOrder
return orders
}

func (p *orderPoller) getDemandOrdersFromIndexer() ([]Order, error) {
queryStr := fmt.Sprintf(ordersQuery, p.chainID, fmt.Sprint(p.lastBlockHeight.Load()))
func (p *orderPoller) getDemandOrdersFromIndexer(ctx context.Context) ([]Order, error) {
var demandOrders []Order
for _, rollapp := range p.rollapps {
orders, err := p.getRollappDemandOrdersFromIndexer(ctx, rollapp)
if err != nil {
return nil, fmt.Errorf("failed to get demand orders: %w", err)
}
demandOrders = append(demandOrders, orders...)
}

p.logger.Debug("got demand orders", zap.Int("count", len(demandOrders)))

return demandOrders, nil
}

func (p *orderPoller) getRollappDemandOrdersFromIndexer(ctx context.Context, rollappId string) ([]Order, error) {
lastFinalizedHeight := "0"
lastHeightResp, err := p.rollappClient.LatestHeight(ctx, &types.QueryGetLatestHeightRequest{
RollappId: rollappId,
Finalized: true,
})
if err != nil {
p.logger.Warn("failed to get latest height, using 0", zap.Error(err))
} else {
lastFinalizedHeight = fmt.Sprint(lastHeightResp.Height)
}

queryStr := fmt.Sprintf(rollappOrdersQuery, p.chainID, fmt.Sprint(p.lastBlockHeight.Load()), rollappId, lastFinalizedHeight)
body := strings.NewReader(queryStr)

resp, err := p.indexerClient.Post(p.indexerURL, "application/json", body)
Expand Down
8 changes: 0 additions & 8 deletions eibc/order_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,11 +352,3 @@ func (or *orderTracker) isRollappSupported(rollappID string) bool {
_, ok := or.fullNodeClient.rollapps[rollappID]
return ok
}

func (or *orderTracker) isOrderFulfilled(id string) bool {
or.fomu.Lock()
defer or.fomu.Unlock()

_, ok := or.fulfilledOrders[id]
return ok
}
Loading

0 comments on commit 2a16d1b

Please sign in to comment.