Skip to content

Commit

Permalink
Optimize rebalance process and logs
Browse files Browse the repository at this point in the history
  • Loading branch information
perrornet committed Jun 15, 2024
1 parent 3f3e8f7 commit 5a68f49
Show file tree
Hide file tree
Showing 28 changed files with 896 additions and 372 deletions.
7 changes: 4 additions & 3 deletions cmd/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"omni-balance/internal/daemons"
"omni-balance/internal/db"
"omni-balance/internal/models"
"omni-balance/utils"
"omni-balance/utils/configs"
"omni-balance/utils/constant"
"os"
Expand Down Expand Up @@ -70,11 +71,11 @@ func startHttpServer(_ context.Context, port string) error {
Addr: port,
Handler: http.DefaultServeMux,
}
go func() {
utils.Go(func() {
if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
logrus.Panic(err)
logrus.Fatalf("http server error: %s", err)
}
}()
})
logrus.Infof("http server started on %s", port)
return nil
}
Expand Down
31 changes: 13 additions & 18 deletions internal/daemons/cross_chain/cross_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"omni-balance/internal/daemons"
"omni-balance/internal/db"
"omni-balance/internal/models"
"omni-balance/utils"
"omni-balance/utils/configs"
"omni-balance/utils/provider"
"sync"
Expand Down Expand Up @@ -43,10 +44,12 @@ func processOrders(ctx context.Context, conf configs.Config, orders []*models.Or
var w sync.WaitGroup
for _, order := range orders {
w.Add(1)

go func(order *models.Order) {
defer utils.Recover()
defer w.Done()
log := order.GetLogs()
if err := start(ctx, order, conf, log.WithField("order_id", order.ID)); err != nil {
if err := start(utils.SetLogToCtx(ctx, log), order, conf, log.WithField("order_id", order.ID)); err != nil {
log.Errorf("start cross chain error: %s", err)
}
}(order)
Expand All @@ -71,7 +74,10 @@ func start(ctx context.Context, order *models.Order, conf configs.Config, log *l
}
orderProcess := models.GetLastOrderProcess(ctx, db.DB(), order.ID)
swapParams := daemons.CreateSwapParams(*order, orderProcess, log, wallet)
result, err := bridge.Swap(ctx, swapParams)
if order.CurrentChainName != "" && order.CurrentChainName != swapParams.SourceChain {
swapParams.SourceToken = order.CurrentChainName
}
result, err := bridge.Swap(utils.SetLogToCtx(ctx, order.GetLogs()), swapParams)
if err != nil {
return errors.Wrap(err, "swap error")
}
Expand All @@ -83,25 +89,14 @@ func createUpdateLog(order *models.Order, result provider.SwapResult, log *logru
update := map[string]interface{}{
"error": result.Error,
"current_chain_name": result.CurrentChain,
"status": result.Status,
}
switch result.Status {
case provider.TxStatusSuccess:
log.Infof("order #%d token %s cross from %s to %s success", order.ID, order.TokenOutName,
order.CurrentChainName, order.TargetChainName)
update["status"] = models.OrderStatusSuccess
default:
update["status"] = evaluateStatus(result)
log.Infof("order #%d token %s cross from %s to %s status is %s", order.ID, order.TokenOutName,
order.CurrentChainName, order.TargetChainName, result.Status)
}
return update
}

func evaluateStatus(result provider.SwapResult) models.OrderStatus {
if result.Status == "" {
return models.OrderStatusUnknown
update["status"] = provider.TxStatusFailed
}
return models.OrderStatus(result.Status)
log.Infof("order #%d token %s cross from %s to %s status is %s", order.ID, order.TokenOutName,
order.CurrentChainName, order.TargetChainName, result.Status)
return update
}

func getBridge(ctx context.Context, order *models.Order, conf configs.Config) (provider.Provider, error) {
Expand Down
5 changes: 3 additions & 2 deletions internal/daemons/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"omni-balance/internal/db"
"omni-balance/internal/models"
"omni-balance/utils/configs"
"omni-balance/utils/provider"
"omni-balance/utils/wallet_monitor"
"time"
)
Expand Down Expand Up @@ -41,7 +42,7 @@ func Run(ctx context.Context, conf configs.Config) error {

func getExistingBuyTokens() ([]*models.Order, error) {
var existBuyTokens []*models.Order
err := db.DB().Where("status != ? ", models.OrderStatusSuccess).Find(&existBuyTokens).Error
err := db.DB().Where("status != ? ", provider.TxStatusSuccess).Find(&existBuyTokens).Error
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -87,7 +88,7 @@ func createOrder(result []wallet_monitor.Result, conf configs.Config) error {
TargetChainName: v.ChainName,
CurrentBalance: v.TokenBalance,
Amount: v.Amount,
Status: models.OrderStatusWait,
Status: provider.TxStatusPending,
})
}
}
Expand Down
96 changes: 58 additions & 38 deletions internal/daemons/rebalance/rebalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,21 @@ func Run(ctx context.Context, conf configs.Config) error {
}
var w sync.WaitGroup
for index := range orders {
if utils.InArray(orders[index].Status.String(), []string{models.OrderStatusWaitCrossChain.String()}) {
continue
}
if orders[index].HasLocked() {
logrus.Debugf("order #%d has locked, skip", orders[index].ID)
continue
}
w.Add(1)
go func(order *models.Order) {
defer utils.Recover()
defer w.Done()
log := order.GetLogs()
subCtx, cancel := context.WithCancel(utils.SetLogToCtx(ctx, log))
defer cancel()

go func() {
utils.Go(func() {
defer cancel()
var t = time.NewTicker(time.Second * 5)
defer t.Stop()
Expand All @@ -67,40 +74,50 @@ func Run(ctx context.Context, conf configs.Config) error {
}
}
}
}()
})

if err := reBalance(subCtx, order, conf); err != nil {
log.Errorf("reBalance order #%d error: %s", order.ID, err)
return
}
err = notice.Send(ctx,
fmt.Sprintf("rebalance order #%d success", order.ID),
fmt.Sprintf("rebalance %s %s from %s to %s use %s %s success",
order.TokenInName, order.Amount, order.SourceChainName, order.TargetChainName,
order = models.GetOrder(ctx, db.DB(), order.ID)

err = notice.Send(
provider.WithNotify(ctx, provider.WithNotifyParams{
OrderId: order.ID,
Receiver: common.HexToAddress(order.Wallet),
CurrentBalance: order.CurrentBalance,
}),
fmt.Sprintf("rebalance %s on %s success", order.TokenOutName, order.TargetChainName),
fmt.Sprintf("rebalance %s %s from %s to %s use %s %s",
order.TokenOutName, order.Amount, order.SourceChainName, order.TargetChainName,
order.ProviderName, order.ProviderType),
logrus.InfoLevel,
)
if err != nil {
log.Debugf("notice error: %s", err)
}
log.Debugf("reBalance order #%d success", order.ID)
log.Infof("reBalance order #%d success", order.ID)
}(orders[index])
}
w.Wait()
return nil
}

func transfer(ctx context.Context, order *models.Order, args provider.SwapParams,
conf configs.Config, setWaitTransfer bool, client simulated.Client) (bool, error) {
conf configs.Config, client simulated.Client) (bool, error) {
ctx = context.WithValue(ctx, constant.ChainNameKeyInCtx, order.TargetChainName)

if order.Status != models.OrderStatusWaitTransferFromOperator {
return false, errors.Errorf("order #%d status is %s, not wait transfer from operator", order.ID, order.Status)
}
result, err := provider.Transfer(ctx, conf, args, client)
if errors.Is(err, error_types.ErrNativeTokenInsufficient) ||
errors.Is(err, error_types.ErrWalletLocked) ||
errors.Is(err, context.Canceled) {
return true, errors.Wrap(err, "transfer error")
}
if err == nil {
return true, createUpdateLog(ctx, order, result, conf, setWaitTransfer, client)
return true, createUpdateLog(ctx, order, result, conf, client)
}
if !errors.Is(errors.Unwrap(err), error_types.ErrInsufficientBalance) &&
!errors.Is(errors.Unwrap(err), error_types.ErrInsufficientLiquidity) && err != nil {
Expand Down Expand Up @@ -128,15 +145,15 @@ func reBalance(ctx context.Context, order *models.Order, conf configs.Config) er
return errors.Wrap(err, "new evm client error")
}
defer client.Close()
if wallet.IsDifferentAddress() {
ok, err := transfer(ctx, order, args, conf, false, client)
if wallet.IsDifferentAddress() || order.Status == models.OrderStatusWaitTransferFromOperator {
ok, err := transfer(ctx, order, args, conf, client)
if err != nil && ok {
return errors.Wrap(err, "transfer error")
}
if ok {
return nil
}
log.Debugf("cannot use transfer, try other providers.")
log.Infof("cannot use transfer, try other providers.")
}

balance, err := wallet.GetExternalBalance(ctx, common.HexToAddress(token.ContractAddress), token.Decimals, client)
Expand Down Expand Up @@ -172,35 +189,43 @@ func reBalance(ctx context.Context, order *models.Order, conf configs.Config) er

log.Infof("start reBalance #%d %s on %s use %s provider", order.ID, order.TokenOutName,
order.TargetChainName, providerObj.Name())
result, err := providerObj.Swap(ctx, args)
if err != nil {
return errors.Wrapf(err, "reBalance %s on %s error", order.TokenOutName, providerObj.Name())
}
result, providerErr := providerObj.Swap(ctx, args)
if result.Status == "" {
return errors.New("the result status is empty")
}
if err := createUpdateLog(ctx, order, result, conf, true, client); err != nil {
if result.CurrentChain != args.TargetChain {
result.Status = models.OrderStatusWaitCrossChain
}
if err := createUpdateLog(ctx, order, result, conf, client); err != nil {
return errors.Wrap(err, "create update log error")
}

_, err = transfer(ctx, order, args, conf, false, client)
if err != nil {
return errors.Wrap(err, "transfer error")
if providerErr != nil {
return errors.Wrap(providerErr, "provider error")
}
if args.Receiver != result.Receiver && result.Receiver != "" {
order = models.GetOrder(ctx, db.DB(), order.ID)
if order == nil {
return errors.New("order not found")
}
_, err = transfer(ctx, order, daemons.CreateSwapParams(*order, orderProcess, log, conf.GetWallet(order.Wallet)), conf, client)
if err != nil {
return errors.Wrap(err, "transfer error")
}
}
return nil
}

func listOrders(_ context.Context) ([]*models.Order, error) {
var orders []*models.Order
err := db.DB().Where("status != ?", models.OrderStatusSuccess).Find(&orders).Error
err := db.DB().Where("status != ?", provider.TxStatusSuccess).Find(&orders).Error
if err != nil {
return nil, errors.Wrap(err, "find buy tokens error")
}
return orders, nil
}

func createUpdateLog(ctx context.Context, order *models.Order, result provider.SwapResult, conf configs.Config,
setWaitTransfer bool, client simulated.Client) error {
client simulated.Client) error {

wallet := conf.GetWallet(order.Wallet)
walletBalance := getWalletTokenBalance(ctx, wallet, order.TokenOutName, order.TargetChainName, conf, client)
Expand All @@ -214,23 +239,17 @@ func createUpdateLog(ctx context.Context, order *models.Order, result provider.S
Tx: result.Tx,
Order: result.MarshalOrder(),
Error: result.Error,
Status: result.Status,
}
log := utils.GetLogFromCtx(ctx).WithFields(logrus.Fields{
"order_id": order.ID,
"status": result.Status,
"result": utils.ToMap(result),
})
switch result.Status {
case provider.TxStatusSuccess:
updateOrder.Status = models.OrderStatusSuccess
if setWaitTransfer && wallet.IsDifferentAddress() {
updateOrder.Status = models.OrderStatusWaitTransferFromOperator
}
default:
updateOrder.Status = models.OrderStatus(result.Status)
if result.Status == "" {
updateOrder.Status = models.OrderStatusUnknown
}

if result.Status == provider.TxStatusSuccess &&
wallet.IsDifferentAddress() &&
result.Receiver != order.Wallet &&
result.Receiver != "" {
updateOrder.Status = provider.TxStatus(models.OrderStatusWaitTransferFromOperator)
}
log.Debugf("order status is %v", updateOrder.Status)
return db.DB().Model(&models.Order{}).Where("id = ?", order.ID).Limit(1).Updates(updateOrder).Error
Expand All @@ -255,6 +274,7 @@ func getWalletTokenBalance(ctx context.Context, wallet wallets.Wallets, tokenNam
func getReBalanceProvider(ctx context.Context, order models.Order, conf configs.Config) (provider.Provider, error) {
log := order.GetLogs()
if order.ProviderType != "" && order.ProviderName != "" {
log.Debugf("provider type is %s, provider name is %s", order.ProviderType, order.ProviderName)
fn, err := provider.GetProvider(order.ProviderType, order.ProviderName)
if err != nil {
return nil, errors.Wrap(err, "get provider error")
Expand Down
2 changes: 2 additions & 0 deletions internal/daemons/token_price/token_price.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"omni-balance/internal/daemons"
"omni-balance/internal/db"
"omni-balance/internal/models"
"omni-balance/utils"
"omni-balance/utils/configs"
"omni-balance/utils/token_price"
"sync"
Expand All @@ -33,6 +34,7 @@ func Run(ctx context.Context, conf configs.Config) error {
for _, provider := range providers {
w.Add(1)
go func(provider token_price.TokenPrice) {
defer utils.Recover()
defer w.Done()
result, err := provider.GetTokenPriceInUSDT(ctx, conf.SourceToken...)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions internal/daemons/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

func CreateSwapParams(order models.Order, orderProcess models.OrderProcess, log *logrus.Entry, wallet wallets.Wallets) provider.SwapParams {
return provider.SwapParams{
OrderId: order.ID,
SourceChain: order.CurrentChainName,
Sender: wallet,
Receiver: order.Wallet,
Expand Down
Loading

0 comments on commit 5a68f49

Please sign in to comment.