Skip to content

Commit

Permalink
Merge branch 'main' into dan/batch-tx
Browse files Browse the repository at this point in the history
  • Loading branch information
boojamya committed Dec 12, 2023
2 parents fb1cbf6 + bd7b450 commit e732647
Show file tree
Hide file tree
Showing 10 changed files with 135 additions and 45 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
.idea
noble-cctp-relayer
.ignore
14 changes: 14 additions & 0 deletions cmd/ethereum/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/strangelove-ventures/noble-cctp-relayer/config"
"github.com/strangelove-ventures/noble-cctp-relayer/types"
"math/big"
"regexp"
"strconv"
"time"
)

Expand Down Expand Up @@ -69,7 +71,19 @@ func Broadcast(
}
logger.Debug(fmt.Sprintf("retrying with new nonce: %d", nonce))
sequenceMap.Put(cfg.Networks.Destination.Ethereum.DomainId, nonce)
}

match, _ := regexp.MatchString("nonce too low: next nonce [0-9]+, tx nonce [0-9]+", parsedErr.Error())
if match {
numberRegex := regexp.MustCompile("[0-9]+")
nextNonce, err := strconv.ParseInt(numberRegex.FindAllString(parsedErr.Error(), 1)[0], 10, 0)
if err != nil {
nonce, err = GetEthereumAccountNonce(cfg.Networks.Destination.Ethereum.RPC, ethereumAddress)
if err != nil {
logger.Error("unable to retrieve account number")
}
}
sequenceMap.Put(cfg.Networks.Destination.Ethereum.DomainId, nextNonce)
}
}

Expand Down
9 changes: 5 additions & 4 deletions cmd/ethereum/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@ package ethereum
import (
"bytes"
"context"
"cosmossdk.io/log"
"embed"
"fmt"
"math/big"
"os"

"cosmossdk.io/log"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/pascaldekloe/etherstream"
"github.com/strangelove-ventures/noble-cctp-relayer/config"
"github.com/strangelove-ventures/noble-cctp-relayer/types"
"math/big"
"os"
)

//go:embed abi/MessageTransmitter.json
Expand Down Expand Up @@ -66,7 +67,7 @@ func StartListener(cfg config.Config, logger log.Logger, processingQueue chan *t
for _, historicalLog := range history {
parsedMsg, err := types.EvmLogToMessageState(messageTransmitterABI, messageSent, &historicalLog)
if err != nil {
logger.Error("Unable to parse history log into MessageState, skipping")
logger.Error("Unable to parse history log into MessageState, skipping", "err", err)
continue
}
logger.Info(fmt.Sprintf("New historical msg from source domain %d with tx hash %s", parsedMsg.SourceDomain, parsedMsg.SourceTxHash))
Expand Down
91 changes: 60 additions & 31 deletions cmd/noble/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"io"
"math/rand"
"net/http"
"regexp"
"strconv"
Expand Down Expand Up @@ -131,50 +132,78 @@ func Broadcast(
}

rpcResponse, err := rpcClient.BroadcastTxSync(context.Background(), txBytes)
if err == nil && rpcResponse.Code == 0 {
msg.Status = types.Complete
return rpcResponse, nil
}
if err != nil || (rpcResponse != nil && rpcResponse.Code != 0) {
// Log the error
logger.Error(fmt.Sprintf("error during broadcast: %s", getErrorString(err, rpcResponse)))

if err != nil || rpcResponse == nil {
// Log retry information
logger.Info(fmt.Sprintf("Retrying in %d seconds", cfg.Networks.Destination.Noble.BroadcastRetryInterval))
time.Sleep(time.Duration(cfg.Networks.Destination.Noble.BroadcastRetryInterval) * time.Second)
// wait a random amount of time to lower probability of concurrent message nonce collision
time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
continue
}

// Log details for non-zero response code
logger.Error(fmt.Sprintf("received non-zero: %d - %s", rpcResponse.Code, rpcResponse.Log))

// check tx response code
logger.Error(fmt.Sprintf("received non zero : %d - %s", rpcResponse.Code, rpcResponse.Log))

if err == nil && rpcResponse.Code == 32 {
// on account sequence mismatch, extract correct account sequence and retry
pattern := `expected (\d+), got (\d+)`
re := regexp.MustCompile(pattern)
match := re.FindStringSubmatch(rpcResponse.Log)

var newAccountSequence int64
if len(match) == 3 {
// Extract the numbers from the match.
newAccountSequence, _ = strconv.ParseInt(match[1], 10, 0)
} else {
// otherwise, just request the account sequence
_, newAccountSequence, err = GetNobleAccountNumberSequence(cfg.Networks.Destination.Noble.API, nobleAddress)
if err != nil {
logger.Error("unable to retrieve account number")
}
// Handle specific error code (32)
if rpcResponse.Code == 32 {
newAccountSequence := extractAccountSequence(logger, rpcResponse.Log, nobleAddress, cfg.Networks.Destination.Noble.API)
logger.Debug(fmt.Sprintf("retrying with new account sequence: %d", newAccountSequence))
sequenceMap.Put(cfg.Networks.Destination.Noble.DomainId, newAccountSequence)
}
logger.Debug(fmt.Sprintf("error during broadcast: %s", rpcResponse.Log))
logger.Debug(fmt.Sprintf("retrying with new account sequence: %d", newAccountSequence))
sequenceMap.Put(cfg.Networks.Destination.Noble.DomainId, newAccountSequence)
}
if err != nil {
logger.Error(fmt.Sprintf("error during broadcast: %s", err.Error()))

// Log retry information
logger.Info(fmt.Sprintf("Retrying in %d seconds", cfg.Networks.Destination.Noble.BroadcastRetryInterval))
time.Sleep(time.Duration(cfg.Networks.Destination.Noble.BroadcastRetryInterval) * time.Second)
// wait a random amount of time to lower probability of concurrent message nonce collision
time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
continue
}

logger.Info(fmt.Sprintf("Retrying in %d seconds", cfg.Networks.Destination.Noble.BroadcastRetryInterval))
time.Sleep(time.Duration(cfg.Networks.Destination.Noble.BroadcastRetryInterval) * time.Second)
// Tx was successfully broadcast
msg.Status = types.Complete
return rpcResponse, nil
}

msg.Status = types.Failed

return nil, errors.New("reached max number of broadcast attempts")
}

// getErrorString returns the appropriate value to log when tx broadcast errors are encountered.
func getErrorString(err error, rpcResponse *ctypes.ResultBroadcastTx) string {
if rpcResponse != nil {
return rpcResponse.Log
}
return err.Error()
}

// extractAccountSequence attempts to extract the account sequence number from the RPC response logs when
// account sequence mismatch errors are encountered. If the account sequence number cannot be extracted from the logs,
// it is retrieved by making a request to the API endpoint.
func extractAccountSequence(logger log.Logger, rpcResponseLog, nobleAddress, nobleAPI string) int64 {
pattern := `expected (\d+), got (\d+)`
re := regexp.MustCompile(pattern)
match := re.FindStringSubmatch(rpcResponseLog)

if len(match) == 3 {
// Extract the numbers from the match.
newAccountSequence, _ := strconv.ParseInt(match[1], 10, 0)
return newAccountSequence
}

// Otherwise, just request the account sequence
_, newAccountSequence, err := GetNobleAccountNumberSequence(nobleAPI, nobleAddress)
if err != nil {
logger.Error("unable to retrieve account number")
}

return newAccountSequence
}

// NewRPCClient initializes a new tendermint RPC client connected to the specified address.
func NewRPCClient(addr string, timeout time.Duration) (*rpchttp.HTTP, error) {
httpClient, err := libclient.DefaultHTTPClient(addr)
Expand Down
15 changes: 10 additions & 5 deletions cmd/noble/listener.go
Original file line number Diff line number Diff line change
@@ -1,32 +1,37 @@
package noble

import (
"cosmossdk.io/log"
"encoding/json"
"fmt"
"github.com/strangelove-ventures/noble-cctp-relayer/config"
"github.com/strangelove-ventures/noble-cctp-relayer/types"
"io"
"net/http"
"strconv"
"sync"
"time"

"cosmossdk.io/log"
"github.com/strangelove-ventures/noble-cctp-relayer/config"
"github.com/strangelove-ventures/noble-cctp-relayer/types"
)

func StartListener(cfg config.Config, logger log.Logger, processingQueue chan *types.MessageState) {
// set up client

logger.Info(fmt.Sprintf("Starting Noble listener at block %d", cfg.Networks.Source.Noble.StartBlock))
logger.Info(fmt.Sprintf("Starting Noble listener at block %d looking back %d blocks",
cfg.Networks.Source.Noble.StartBlock,
cfg.Networks.Source.Noble.LookbackPeriod))

var wg sync.WaitGroup
wg.Add(1)

// enqueue block heights
currentBlock := cfg.Networks.Source.Noble.StartBlock
lookback := cfg.Networks.Source.Noble.LookbackPeriod
chainTip := GetNobleChainTip(cfg)
blockQueue := make(chan uint64, 1000000)

// history
currentBlock = currentBlock - lookback
for currentBlock <= chainTip {
blockQueue <- currentBlock
currentBlock++
Expand All @@ -51,7 +56,7 @@ func StartListener(cfg config.Config, logger log.Logger, processingQueue chan *t
go func() {
for {
block := <-blockQueue
rawResponse, err := http.Get(fmt.Sprintf("https://rpc.testnet.noble.strange.love/tx_search?query=\"tx.height=%d\"", block))
rawResponse, err := http.Get(fmt.Sprintf("%s/tx_search?query=\"tx.height=%d\"", cfg.Networks.Source.Noble.RPC, block))
if err != nil {
logger.Debug(fmt.Sprintf("unable to query Noble block %d", block))
continue
Expand Down
16 changes: 15 additions & 1 deletion cmd/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ func (p *Processor) StartProcessor(cfg config.Config, logger log.Logger, process
// if a filter's condition is met, mark as filtered
if filterDisabledCCTPRoutes(cfg, logger, msg[batchPosition]) ||
filterInvalidDestinationCallers(cfg, logger, msg[batchPosition]) ||
filterNonWhitelistedChannels(cfg, logger, msg[batchPosition]) {
filterNonWhitelistedChannels(cfg, logger, msg[batchPosition]) ||
filterMessages(cfg, logger, msg[batchPosition]) {
p.Mu.Lock()
msg[batchPosition].Status = types.Filtered
p.Mu.Unlock()
Expand Down Expand Up @@ -281,6 +282,19 @@ func filterNonWhitelistedChannels(cfg config.Config, logger log.Logger, msg *typ
return true
}

// filterMessages filters out non-burn messages. It returns true if the message is not a burn.
func filterMessages(_ config.Config, logger log.Logger, msg *types.MessageState) bool {
result := msg.Type != types.Mint
if result {
logger.Info(fmt.Sprintf("Filtered tx %s because it's a not a burn", msg.SourceTxHash))
}
return result
}

func LookupKey(sourceTxHash string, messageType string) string {
return fmt.Sprintf("%s-%s", sourceTxHash, messageType)
}

func init() {
cobra.OnInitialize(func() {})
}
27 changes: 27 additions & 0 deletions cmd/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,30 @@ func TestBatchTx(t *testing.T) {
require.Equal(t, 2, len(actualState))
p.Mu.RUnlock()
}

// created message -> not \ -> filtered
func TestProcessNonBurnMessageWhenDisabled(t *testing.T) {
setupTest()

go cmd.StartProcessor(cfg, logger, processingQueue, sequenceMap)

emptyBz := make([]byte, 32)
expectedState := &types.MessageState{
SourceTxHash: "123",
Type: "",
IrisLookupId: "a404f4155166a1fc7ffee145b5cac6d0f798333745289ab1db171344e226ef0c",
Status: types.Created,
SourceDomain: 0,
DestDomain: 4,
DestinationCaller: emptyBz,
}

processingQueue <- expectedState

time.Sleep(2 * time.Second)

actualState, ok := cmd.State.Load(cmd.LookupKey(expectedState.SourceTxHash, expectedState.Type))
require.True(t, ok)
require.Equal(t, types.Filtered, actualState.Status)

}
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Config struct {
RPC string `yaml:"rpc"`
RequestQueueSize uint32 `yaml:"request-queue-size"`
StartBlock uint64 `yaml:"start-block"`
LookbackPeriod uint64 `yaml:"lookback-period"`
Workers uint32 `yaml:"workers"`
Enabled bool `yaml:"enabled"`
} `yaml:"noble"`
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
)

require (
cosmossdk.io/math v1.1.2
github.com/circlefin/noble-cctp v0.0.0-20230911222715-829029fbba29
github.com/cometbft/cometbft v0.38.0
github.com/gin-gonic/gin v1.8.1
Expand All @@ -30,7 +31,6 @@ require (
cosmossdk.io/api v0.3.1 // indirect
cosmossdk.io/core v0.5.1 // indirect
cosmossdk.io/depinject v1.0.0-alpha.4 // indirect
cosmossdk.io/math v1.1.2 // indirect
filippo.io/edwards25519 v1.0.0 // indirect
github.com/4meepo/tagalign v1.3.2 // indirect
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect
Expand Down
4 changes: 1 addition & 3 deletions types/message_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ type MessageState struct {

// EvmLogToMessageState transforms an evm log into a messageState given an ABI
func EvmLogToMessageState(abi abi.ABI, messageSent abi.Event, log *ethtypes.Log) (messageState *MessageState, err error) {

event := make(map[string]interface{})
_ = abi.UnpackIntoMap(event, messageSent.Name, log.Data)

Expand All @@ -59,7 +58,6 @@ func EvmLogToMessageState(abi abi.ABI, messageSent abi.Event, log *ethtypes.Log)

messageState = &MessageState{
IrisLookupId: hashedHexStr,
Type: Mint,
Status: Created,
SourceDomain: message.SourceDomain,
DestDomain: message.DestinationDomain,
Expand All @@ -82,7 +80,7 @@ func EvmLogToMessageState(abi abi.ABI, messageSent abi.Event, log *ethtypes.Log)
return messageState, nil
}

return nil, errors.New(fmt.Sprintf("unable to parse txn into message. tx hash %s", log.TxHash.Hex()))
return nil, errors.New(fmt.Sprintf("unable to parse tx into message, tx hash %s", log.TxHash.Hex()))
}

// NobleLogToMessageState transforms a Noble log into a messageState
Expand Down

0 comments on commit e732647

Please sign in to comment.